深入理解Java多线程机制,掌握并发编程核心技术,构建高性能、高并发的Java应用程序
Java多线程编程基础
多线程是Java语言的重要特性,允许程序同时执行多个任务,提高CPU利用率和应用程序响应性。现代多核处理器架构下,合理使用多线程可以显著提升程序性能。
线程创建方式
Java提供了多种创建线程的方式,每种方式都有其适用场景:
1. 继承Thread类
public class SimpleThread extends Thread {
@Override
public void run() {
System.out.println("线程运行: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
SimpleThread thread = new SimpleThread();
thread.start(); // 启动线程
}
}
2. 实现Runnable接口
public class Task implements Runnable {
@Override
public void run() {
System.out.println("任务执行: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread thread = new Thread(new Task());
thread.start();
}
}
3. 实现Callable接口(带返回值)
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class ResultTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
}
public static void main(String[] args) throws Exception {
FutureTask<Integer> futureTask = new FutureTask(new ResultTask());
Thread thread = new Thread(futureTask);
thread.start();
// 获取计算结果(会阻塞直到计算完成)
Integer result = futureTask.get();
System.out.println("计算结果: " + result);
}
}
线程生命周期
Java线程有以下几种状态:
- NEW:新创建但尚未启动
- RUNNABLE:正在JVM中执行或等待系统资源
- BLOCKED:等待监视器锁(synchronized)
- WAITING:无限期等待其他线程执行特定操作
- TIMED_WAITING:有限时间等待
- TERMINATED:已退出
线程管理与同步
多线程环境下,线程间的协调和资源共享是关键挑战。Java提供了多种同步机制来保证线程安全。
同步机制
1. synchronized关键字
public class Counter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
// 同步代码块
public void decrement() {
synchronized(this) {
count--;
}
}
public synchronized int getCount() {
return count;
}
}
2. ReentrantLock可重入锁
import java.util.concurrent.locks.ReentrantLock;
public class SafeCounter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
线程间通信
wait()和notify()方法
public class MessageQueue {
private String message;
private boolean empty = true;
public synchronized String take() {
while (empty) {
try {
wait(); // 等待消息
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
empty = true;
notifyAll(); // 通知生产者
return message;
}
public synchronized void put(String message) {
while (!empty) {
try {
wait(); // 等待消费
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
empty = false;
this.message = message;
notifyAll(); // 通知消费者
}
}
线程池管理
使用线程池可以避免频繁创建和销毁线程的开销:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任务
for (int i = 0; i {
System.out.println("执行任务 " + taskId + " 线程: " +
Thread.currentThread().getName());
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 优雅关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Java并发工具类
Java并发包(java.util.concurrent)提供了丰富的工具类,简化并发编程。
1. CountDownLatch
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(threadCount);
for (int i = 0; i {
try {
startLatch.await(); // 等待开始信号
System.out.println(Thread.currentThread().getName() + " 开始工作");
Thread.sleep(1000); // 模拟工作
doneLatch.countDown(); // 完成计数减一
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
System.out.println("主线程准备就绪");
Thread.sleep(1000);
startLatch.countDown(); // 发出开始信号
doneLatch.await(); // 等待所有线程完成
System.out.println("所有线程已完成工作");
}
}
2. CyclicBarrier
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程已到达屏障,继续执行");
});
for (int i = 0; i {
try {
System.out.println("线程 " + threadId + " 开始第一阶段");
Thread.sleep(1000 + threadId * 200);
System.out.println("线程 " + threadId + " 到达屏障");
barrier.await();
System.out.println("线程 " + threadId + " 开始第二阶段");
Thread.sleep(500 + threadId * 100);
System.out.println("线程 " + threadId + " 再次到达屏障");
barrier.await();
System.out.println("线程 " + threadId + " 完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
3. Semaphore
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
// 限制同时只有3个线程可以访问资源
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i {
try {
System.out.println("线程 " + threadId + " 等待许可");
semaphore.acquire();
System.out.println("线程 " + threadId + " 获得许可,开始工作");
Thread.sleep(2000); // 模拟资源使用
System.out.println("线程 " + threadId + " 释放许可");
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
4. ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ConcurrentMapExample {
public static void main(String[] args) {
ConcurrentMap<String, Integer> map = new ConcurrentHashMap();
// 多个线程同时操作
Thread writer1 = new Thread(() -> {
for (int i = 0; i {
for (int i = 100; i {
while (map.size() < 200) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("Map大小: " + map.size());
});
writer1.start();
writer2.start();
reader.start();
try {
writer1.join();
writer2.join();
reader.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
实战项目:多线程文件处理器
下面我们将实现一个多线程文件处理系统,展示如何在实际项目中应用Java并发编程技术。
项目需求
- 监控指定目录下的新文件
- 使用多线程并行处理文件
- 支持不同类型的文件处理器
- 限制同时处理的文件数量
- 提供处理进度监控
核心实现
1. 文件处理器接口
import java.io.File;
public interface FileProcessor {
boolean supports(File file);
void process(File file) throws ProcessingException;
}
public class ProcessingException extends Exception {
public ProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
2. 文件处理任务
import java.io.File;
import java.util.List;
public class FileProcessingTask implements Runnable {
private final File file;
private final List<FileProcessor> processors;
public FileProcessingTask(File file, List<FileProcessor> processors) {
this.file = file;
this.processors = processors;
}
@Override
public void run() {
System.out.println("开始处理文件: " + file.getName());
for (FileProcessor processor : processors) {
if (processor.supports(file)) {
try {
processor.process(file);
System.out.println("文件 " + file.getName() + " 处理成功");
return;
} catch (ProcessingException e) {
System.err.println("处理文件 " + file.getName() + " 失败: " + e.getMessage());
}
}
}
System.out.println("没有找到适合的处理器处理文件: " + file.getName());
}
}
3. 文件监控器
import java.io.File;
import java.nio.file.*;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import static java.nio.file.StandardWatchEventKinds.*;
public class FileMonitor {
private final Path directory;
private final ExecutorService executor;
private final List<FileProcessor> processors;
private final Semaphore semaphore;
private volatile boolean running = false;
public FileMonitor(String directoryPath, ExecutorService executor,
List<FileProcessor> processors, int maxConcurrent) {
this.directory = Paths.get(directoryPath);
this.executor = executor;
this.processors = processors;
this.semaphore = new Semaphore(maxConcurrent);
}
public void start() throws Exception {
if (!Files.exists(directory) || !Files.isDirectory(directory)) {
throw new IllegalArgumentException("目录不存在或不是有效目录: " + directory);
}
running = true;
WatchService watchService = FileSystems.getDefault().newWatchService();
directory.register(watchService, ENTRY_CREATE);
System.out.println("开始监控目录: " + directory);
// 处理现有文件
processExistingFiles();
// 监控新文件
while (running) {
WatchKey key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == ENTRY_CREATE) {
Path filePath = directory.resolve((Path) event.context());
submitFileForProcessing(filePath.toFile());
}
}
key.reset();
}
}
private void processExistingFiles() {
File[] existingFiles = directory.toFile().listFiles();
if (existingFiles != null) {
for (File file : existingFiles) {
if (file.isFile()) {
submitFileForProcessing(file);
}
}
}
}
private void submitFileForProcessing(File file) {
try {
semaphore.acquire(); // 获取许可
executor.submit(() -> {
try {
new FileProcessingTask(file, processors).run();
} finally {
semaphore.release(); // 释放许可
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("提交文件处理任务被中断: " + file.getName());
}
}
public void stop() {
running = false;
}
}
4. 示例文件处理器
import java.io.File;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
public class TextFileProcessor implements FileProcessor {
@Override
public boolean supports(File file) {
return file.getName().toLowerCase().endsWith(".txt");
}
@Override
public void process(File file) throws ProcessingException {
try {
// 模拟文本文件处理:转换为大写并保存
String content = new String(Files.readAllBytes(file.toPath()));
String upperCaseContent = content.toUpperCase();
Path outputPath = file.toPath().resolveSibling(
file.getName().replace(".txt", "_processed.txt"));
Files.write(outputPath, upperCaseContent.getBytes());
System.out.println("文本文件处理完成: " + file.getName());
} catch (Exception e) {
throw new ProcessingException("处理文本文件失败", e);
}
}
}
public class ImageFileProcessor implements FileProcessor {
@Override
public boolean supports(File file) {
String name = file.getName().toLowerCase();
return name.endsWith(".jpg") || name.endsWith(".png") || name.endsWith(".gif");
}
@Override
public void process(File file) throws ProcessingException {
try {
// 模拟图像处理:创建缩略图
Path thumbnailPath = file.toPath().resolveSibling(
"thumbnail_" + file.getName());
// 这里只是模拟,实际应用中会使用图像处理库
Files.copy(file.toPath(), thumbnailPath, StandardCopyOption.REPLACE_EXISTING);
System.out.println("图像文件处理完成: " + file.getName());
} catch (Exception e) {
throw new ProcessingException("处理图像文件失败", e);
}
}
}
5. 主应用程序
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MultiThreadFileProcessorApp {
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("用法: java MultiThreadFileProcessorApp ");
return;
}
String directoryPath = args[0];
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
int maxConcurrentFiles = 5;
ExecutorService executor = Executors.newFixedThreadPool(maxThreads);
List<FileProcessor> processors = Arrays.asList(
new TextFileProcessor(),
new ImageFileProcessor()
);
FileMonitor fileMonitor = new FileMonitor(directoryPath, executor,
processors, maxConcurrentFiles);
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("正在关闭应用程序...");
fileMonitor.stop();
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}));
try {
fileMonitor.start();
} catch (Exception e) {
System.err.println("启动文件监控器失败: " + e.getMessage());
e.printStackTrace();
executor.shutdown();
}
}
}
项目总结与最佳实践
- 合理使用线程池管理线程资源
- 使用Semaphore控制并发访问数量
- 实现优雅的应用程序关闭机制
- 为不同类型的任务创建专用处理器
- 使用WatchService监控文件系统变化
性能优化建议
- 根据CPU核心数合理设置线程池大小
- 使用合适的并发控制机制避免资源竞争
- 实现批处理机制减少IO操作次数
- 使用缓冲区提高文件处理效率
- 监控线程状态,及时发现和解决死锁问题
// 为页面添加简单的交互功能
document.addEventListener(‘DOMContentLoaded’, function() {
// 平滑滚动导航
const navLinks = document.querySelectorAll(‘nav a’);
navLinks.forEach(link => {
link.addEventListener(‘click’, function(e) {
e.preventDefault();
const targetId = this.getAttribute(‘href’);
const targetElement = document.querySelector(targetId);
if (targetElement) {
targetElement.scrollIntoView({
behavior: ‘smooth’,
block: ‘start’
});
}
});
});
// 代码块复制功能
const codeBlocks = document.querySelectorAll(‘pre’);
codeBlocks.forEach(block => {
block.addEventListener(‘click’, function() {
const textToCopy = this.textContent;
navigator.clipboard.writeText(textToCopy).then(() => {
const originalText = this.textContent;
this.textContent = ‘代码已复制到剪贴板!’;
setTimeout(() => {
this.textContent = originalText;
}, 1500);
}).catch(err => {
console.error(‘无法复制文本: ‘, err);
});
});
});
// 添加代码语法高亮提示
const codeElements = document.querySelectorAll(‘code’);
codeElements.forEach(code => {
code.title = ‘点击复制代码’;
});
});