深入理解Java多线程机制,掌握高并发场景下的编程技巧与最佳实践
Java多线程编程基础
Java从语言级别支持多线程编程,提供了丰富的API和内置机制来处理并发任务。理解多线程基础是构建高性能Java应用的关键。
线程的创建方式
Java中创建线程主要有三种方式:
1. 继承Thread类
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("线程运行中: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程
}
}
2. 实现Runnable接口
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable线程运行: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
3. 实现Callable接口(带返回值)
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "Callable执行结果: " + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();
// 获取执行结果(会阻塞直到任务完成)
String result = futureTask.get();
System.out.println(result);
}
}
线程生命周期
Java线程有以下几种状态:
- NEW:新创建但尚未启动
- RUNNABLE:正在运行或准备运行
- BLOCKED:等待监视器锁(阻塞状态)
- WAITING:无限期等待其他线程执行特定操作
- TIMED_WAITING:有限期等待
- TERMINATED:已终止
线程管理与同步
在多线程环境中,正确处理线程同步和资源共享是避免竞态条件和数据不一致的关键。
同步机制
1. synchronized关键字
public class SynchronizedCounter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
// 同步代码块
public void decrement() {
synchronized(this) {
count--;
}
}
public synchronized int getCount() {
return count;
}
}
2. ReentrantLock可重入锁
import java.util.concurrent.locks.ReentrantLock;
public class LockCounter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock(); // 确保锁被释放
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
线程间通信
public class WaitNotifyExample {
private boolean flag = false;
public synchronized void waitForFlag() throws InterruptedException {
while (!flag) {
wait(); // 释放锁并等待
}
System.out.println("标志已设置,继续执行");
}
public synchronized void setFlag() {
this.flag = true;
notifyAll(); // 通知所有等待的线程
}
}
线程池管理
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任务
for (int i = 0; i {
System.out.println("执行任务: " + taskId +
", 线程: " + Thread.currentThread().getName());
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 优雅关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Java并发工具类
Java并发包(java.util.concurrent)提供了强大的工具类来处理复杂的并发场景。
1. CountDownLatch
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(threadCount);
for (int i = 0; i {
try {
startLatch.await(); // 等待开始信号
System.out.println(Thread.currentThread().getName() + " 开始工作");
Thread.sleep(1000);
doneLatch.countDown(); // 完成工作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
System.out.println("准备开始所有线程...");
Thread.sleep(1000);
startLatch.countDown(); // 发出开始信号
doneLatch.await(); // 等待所有线程完成
System.out.println("所有线程已完成工作");
}
}
2. CyclicBarrier
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程已到达屏障点,继续执行");
});
for (int i = 0; i {
try {
System.out.println("线程 " + threadId + " 开始第一阶段");
Thread.sleep(1000 + threadId * 200);
System.out.println("线程 " + threadId + " 到达屏障点");
barrier.await();
System.out.println("线程 " + threadId + " 开始第二阶段");
Thread.sleep(500);
System.out.println("线程 " + threadId + " 完成工作");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
3. Semaphore
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
// 限制同时访问的线程数量为3
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i {
try {
semaphore.acquire(); // 获取许可
System.out.println("线程 " + threadId + " 获得许可,开始工作");
Thread.sleep(2000);
System.out.println("线程 " + threadId + " 完成工作,释放许可");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}
4. Concurrent Collections
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public class ConcurrentCollectionExample {
public static void main(String[] args) {
// 线程安全的HashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 线程安全的List
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 多个线程可以安全地并发访问这些集合
Thread writer = new Thread(() -> {
for (int i = 0; i {
for (int i = 0; i < 100; i++) {
Integer value = map.get("key" + i);
if (value != null) {
System.out.println("读取: key" + i + " = " + value);
}
}
});
writer.start();
reader.start();
}
}
实战项目:多线程文件处理器
下面我们实现一个多线程文件处理系统,演示如何在实际项目中应用Java并发编程技术。
项目需求
- 监控指定目录下的新文件
- 使用多线程并行处理文件
- 限制同时处理的文件数量
- 支持处理结果统计和错误处理
核心实现
import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiThreadFileProcessor {
private final Path directory;
private final ExecutorService processorPool;
private final Semaphore processingSemaphore;
private final AtomicInteger processedCount = new AtomicInteger(0);
private final AtomicInteger failedCount = new AtomicInteger(0);
private volatile boolean running = true;
public MultiThreadFileProcessor(String dirPath, int maxConcurrentFiles) {
this.directory = Paths.get(dirPath);
this.processorPool = Executors.newFixedThreadPool(maxConcurrentFiles);
this.processingSemaphore = new Semaphore(maxConcurrentFiles);
}
public void start() throws IOException, InterruptedException {
System.out.println("启动文件处理器,监控目录: " + directory);
// 创建监控服务
WatchService watchService = FileSystems.getDefault().newWatchService();
directory.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
// 处理现有文件
processExistingFiles();
// 监控新文件
while (running) {
WatchKey key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
Path filePath = directory.resolve((Path) event.context());
processFile(filePath);
}
}
key.reset();
}
shutdown();
}
private void processExistingFiles() throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
for (Path file : stream) {
if (Files.isRegularFile(file)) {
processFile(file);
}
}
}
}
private void processFile(Path filePath) {
if (!processingSemaphore.tryAcquire()) {
System.out.println("达到处理上限,跳过文件: " + filePath.getFileName());
return;
}
processorPool.submit(() -> {
try {
System.out.println("开始处理文件: " + filePath.getFileName() +
" (线程: " + Thread.currentThread().getName() + ")");
// 模拟文件处理
processFileContent(filePath);
processedCount.incrementAndGet();
System.out.println("成功处理文件: " + filePath.getFileName());
} catch (Exception e) {
failedCount.incrementAndGet();
System.err.println("处理文件失败: " + filePath.getFileName() +
", 错误: " + e.getMessage());
} finally {
processingSemaphore.release();
}
});
}
private void processFileContent(Path filePath) throws IOException, InterruptedException {
// 模拟处理时间
long fileSize = Files.size(filePath);
long processingTime = Math.min(5000, fileSize / 1000);
Thread.sleep(processingTime);
// 这里可以添加实际的文件处理逻辑
// 例如: 文件解析、数据提取、格式转换等
// 模拟随机失败(10%概率)
if (Math.random() < 0.1) {
throw new IOException("处理过程中发生随机错误");
}
}
public void stop() {
running = false;
}
public void shutdown() {
stop();
processorPool.shutdown();
try {
if (!processorPool.awaitTermination(60, TimeUnit.SECONDS)) {
processorPool.shutdownNow();
}
} catch (InterruptedException e) {
processorPool.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("处理完成统计 - 成功: " + processedCount.get() +
", 失败: " + failedCount.get());
}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("用法: java MultiThreadFileProcessor [并发数]");
return;
}
String directoryPath = args[0];
int maxConcurrent = args.length > 1 ? Integer.parseInt(args[1]) : 5;
MultiThreadFileProcessor processor = new MultiThreadFileProcessor(directoryPath, maxConcurrent);
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(processor::shutdown));
try {
processor.start();
} catch (Exception e) {
System.err.println("处理器启动失败: " + e.getMessage());
processor.shutdown();
}
}
}
高级特性扩展
// 1. 使用CompletableFuture进行异步处理链
public CompletableFuture<ProcessingResult> processFileAsync(Path filePath) {
return CompletableFuture.supplyAsync(() -> {
try {
return processFileContentAdvanced(filePath);
} catch (Exception e) {
throw new CompletionException(e);
}
}, processorPool).handle((result, exception) -> {
if (exception != null) {
return new ProcessingResult(filePath, false, exception.getMessage());
}
return new ProcessingResult(filePath, true, "成功");
});
}
// 2. 使用ThreadLocal存储线程特定数据
private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
// 3. 使用Phaser进行多阶段处理
public void processWithPhases(Path filePath) {
Phaser phaser = new Phaser(1); // 注册主线程
for (int phase = 0; phase {
phaser.register();
try {
processPhase(filePath, currentPhase);
} finally {
phaser.arriveAndDeregister();
}
});
}
// 等待所有阶段完成
phaser.arriveAndAwaitAdvance();
}
最佳实践总结
- 合理配置线程池大小(CPU密集型 vs IO密集型任务)
- 始终使用try-finally块确保资源释放
- 优先使用java.util.concurrent包中的高级工具类
- 避免使用stop()、suspend()等已废弃的线程方法
- 使用ThreadLocal时注意内存泄漏问题
- 正确处理InterruptedException
- 使用原子类和volatile关键字处理简单的原子操作
// 为页面添加简单的交互功能
document.addEventListener(‘DOMContentLoaded’, function() {
// 平滑滚动导航
const navLinks = document.querySelectorAll(‘nav a’);
navLinks.forEach(link => {
link.addEventListener(‘click’, function(e) {
e.preventDefault();
const targetId = this.getAttribute(‘href’);
const targetElement = document.querySelector(targetId);
if (targetElement) {
targetElement.scrollIntoView({
behavior: ‘smooth’,
block: ‘start’
});
}
});
});
// 代码块复制功能
const codeBlocks = document.querySelectorAll(‘pre’);
codeBlocks.forEach(block => {
block.addEventListener(‘click’, function() {
const textToCopy = this.textContent;
navigator.clipboard.writeText(textToCopy).then(() => {
const originalText = this.textContent;
this.textContent = ‘代码已复制到剪贴板!’;
setTimeout(() => {
this.textContent = originalText;
}, 1500);
}).catch(err => {
console.error(‘无法复制文本: ‘, err);
});
});
});
// 添加简单的语法高亮提示
const keywords = [‘class’, ‘public’, ‘private’, ‘static’, ‘void’, ‘new’, ‘import’,
‘extends’, ‘implements’, ‘interface’, ‘synchronized’, ‘volatile’,
‘try’, ‘catch’, ‘finally’, ‘throw’, ‘throws’, ‘return’];
codeBlocks.forEach(block => {
let code = block.textContent;
keywords.forEach(keyword => {
const regex = new RegExp(`\b${keyword}\b`, ‘g’);
code = code.replace(regex, `${keyword}`);
});
block.innerHTML = code;
});
});