作者:Java高级工程师 | 发布日期:2023年11月20日
为什么并发编程在现代Java开发中至关重要?
随着多核处理器的普及,并发编程已成为提升Java应用程序性能的关键技术。合理使用多线程能够充分利用CPU资源,提高程序吞吐量和响应速度,特别是在高并发、大数据处理的场景下。
然而,并发编程也是一把双刃剑。不正确的多线程实现会导致竞态条件、死锁、内存可见性等问题,使得程序行为不可预测且难以调试。本教程将深入探讨Java并发编程的核心概念、最佳实践和常见陷阱,帮助你编写出高效且线程安全的Java应用程序。
Java线程创建与管理的多种方式
Java提供了多种创建线程的方式,每种方式都有其适用场景和优缺点。
1. 继承Thread类
public class SimpleThread extends Thread {
private final String message;
public SimpleThread(String message) {
this.message = message;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(message + " : " + i);
try {
Thread.sleep(500); // 模拟耗时操作
} catch (InterruptedException e) {
System.err.println("线程被中断: " + e.getMessage());
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}
// 使用示例
public static void main(String[] args) {
Thread thread1 = new SimpleThread("线程A");
Thread thread2 = new SimpleThread("线程B");
thread1.start(); // 启动线程
thread2.start();
try {
thread1.join(); // 等待线程结束
thread2.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("所有线程执行完毕");
}
}
2. 实现Runnable接口
public class TaskRunnable implements Runnable {
private final String taskName;
public TaskRunnable(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("开始执行任务: " + taskName + ",线程: " + Thread.currentThread().getName());
// 模拟任务执行
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务完成: " + taskName);
}
// 使用示例
public static void main(String[] args) {
Thread thread1 = new Thread(new TaskRunnable("任务A"));
Thread thread2 = new Thread(new TaskRunnable("任务B"));
// 设置线程优先级(提示性,不保证)
thread1.setPriority(Thread.MAX_PRIORITY);
thread2.setPriority(Thread.MIN_PRIORITY);
// 设置为守护线程(当所有非守护线程结束时,JVM会退出)
thread1.setDaemon(false);
thread2.setDaemon(false);
thread1.start();
thread2.start();
}
}
3. 实现Callable接口和Future
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableExample implements Callable<Integer> {
private final int number;
public CallableExample(int number) {
this.number = number;
}
@Override
public Integer call() throws Exception {
// 模拟计算任务
if (number < 0) {
throw new IllegalArgumentException("数字不能为负数");
}
int result = 0;
for (int i = 1; i <= number; i++) {
result += i;
Thread.sleep(100); // 模拟耗时
}
return result;
}
// 使用示例
public static void main(String[] args) {
Callable<Integer> task = new CallableExample(10);
FutureTask<Integer> futureTask = new FutureTask<>(task);
Thread thread = new Thread(futureTask);
thread.start();
// 主线程可以继续执行其他任务
try {
// 获取计算结果(会阻塞直到任务完成)
Integer result = futureTask.get();
System.out.println("计算结果: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("任务执行出错: " + e.getMessage());
}
}
}
线程同步与锁机制
当多个线程访问共享资源时,必须使用同步机制来确保数据的一致性和线程安全。
1. synchronized关键字
public class SynchronizedCounter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
public synchronized void decrement() {
count--;
}
public synchronized int getCount() {
return count;
}
}
// 使用同步代码块
public class SafeSharedResource {
private final Object lock = new Object();
private int value;
public void updateValue(int newValue) {
// 非同步操作
System.out.println("准备更新值...");
// 同步代码块
synchronized(lock) {
value = newValue;
System.out.println("值已更新为: " + value);
}
// 更多的非同步操作
}
public int getValue() {
synchronized(lock) {
return value;
}
}
}
2. ReentrantLock可重入锁
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockExample {
private final Lock lock = new ReentrantLock();
private int sharedValue = 0;
public void performTask() {
lock.lock(); // 获取锁
try {
// 临界区代码
sharedValue++;
System.out.println("当前值: " + sharedValue + ",线程: " + Thread.currentThread().getName());
Thread.sleep(100); // 模拟工作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock(); // 确保在finally块中释放锁
}
}
// 尝试获取锁,避免死锁
public boolean tryPerformTask() {
if (lock.tryLock()) { // 尝试获取锁,立即返回
try {
// 临界区代码
sharedValue--;
System.out.println("递减后的值: " + sharedValue);
return true;
} finally {
lock.unlock();
}
} else {
System.out.println("无法获取锁,执行其他操作");
return false;
}
}
}
3. 读写锁(ReadWriteLock)
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteMap<K, V> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final java.util.Map<K, V> map = new java.util.HashMap<>();
public V put(K key, V value) {
lock.writeLock().lock(); // 获取写锁
try {
return map.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
public V get(K key) {
lock.readLock().lock(); // 获取读锁
try {
return map.get(key);
} finally {
lock.readLock().unlock();
}
}
public int size() {
lock.readLock().lock();
try {
return map.size();
} finally {
lock.readLock().unlock();
}
}
}
线程池技术与Executor框架
Java的Executor框架提供了强大的线程池实现,可以有效地管理线程生命周期,减少线程创建和销毁的开销。
1. 创建不同类型的线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExamples {
public static void fixedThreadPoolExample() {
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i {
System.out.println("执行任务 " + taskId + ",线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 优雅关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void cachedThreadPoolExample() {
// 创建可缓存的线程池
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i {
System.out.println("执行短期任务 " + taskId + ",线程: " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
public static void scheduledThreadPoolExample() {
// 创建定时任务线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
// 延迟执行
scheduler.schedule(() -> {
System.out.println("延迟5秒执行的任务");
}, 5, TimeUnit.SECONDS);
// 固定频率执行
scheduler.scheduleAtFixedRate(() -> {
System.out.println("固定频率任务,时间: " + new java.util.Date());
}, 1, 2, TimeUnit.SECONDS); // 初始延迟1秒,之后每2秒执行一次
// 固定延迟执行
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟任务开始");
try {
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("固定延迟任务结束");
}, 1, 3, TimeUnit.SECONDS); // 每次任务结束后延迟3秒再执行下一次
// 运行一段时间后关闭
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.shutdown();
}
public static void main(String[] args) {
System.out.println("=== 固定大小线程池示例 ===");
fixedThreadPoolExample();
System.out.println("=== 缓存线程池示例 ===");
cachedThreadPoolExample();
System.out.println("=== 定时任务线程池示例 ===");
scheduledThreadPoolExample();
}
}
2. 自定义线程池
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPool {
public static void main(String[] args) {
// 创建有界队列
BlockingQueue workQueue = new LinkedBlockingQueue(100);
// 自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活时间
workQueue, // 工作队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 允许核心线程超时
executor.allowCoreThreadTimeOut(true);
// 监控线程池状态
monitorThreadPool(executor);
// 提交任务
for (int i = 0; i {
System.out.println("执行任务 " + taskId +
", 活跃线程: " + executor.getActiveCount() +
", 队列大小: " + executor.getQueue().size());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 优雅关闭
executor.shutdown();
}
private static void monitorThreadPool(ThreadPoolExecutor executor) {
Thread monitorThread = new Thread(() -> {
while (!executor.isTerminated()) {
System.out.println("监控 - 线程池状态: " +
"核心大小: " + executor.getCorePoolSize() +
", 活跃线程: " + executor.getActiveCount() +
", 最大大小: " + executor.getMaximumPoolSize() +
", 队列大小: " + executor.getQueue().size());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
monitorThread.setDaemon(true);
monitorThread.start();
}
}
并发集合类的使用
Java并发包提供了多种线程安全的集合类,比使用同步包装器更高效。
1. ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentMap map = new ConcurrentHashMap();
// 多个线程同时操作
Thread writer1 = new Thread(() -> {
for (int i = 0; i {
for (int i = 0; i {
while (!Thread.currentThread().isInterrupted()) {
map.forEach((key, value) -> {
System.out.println(key + " = " + value);
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
writer1.start();
writer2.start();
reader.start();
try {
writer1.join();
writer2.join();
Thread.sleep(2000);
reader.interrupt();
reader.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终映射大小: " + map.size());
}
}
2. BlockingQueue实现生产者-消费者模式
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue queue = new LinkedBlockingQueue(10);
// 生产者
Runnable producer = () -> {
try {
for (int i = 0; i {
try {
while (true) {
String item = queue.take(); // 如果队列空则阻塞
System.out.println("消费: " + item + ",队列大小: " + queue.size());
Thread.sleep(200);
if (queue.isEmpty() && Thread.currentThread().isInterrupted()) {
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
Thread producerThread = new Thread(producer, "Producer");
Thread consumerThread = new Thread(consumer, "Consumer");
producerThread.start();
consumerThread.start();
try {
producerThread.join();
Thread.sleep(1000); // 给消费者时间处理剩余项目
consumerThread.interrupt();
consumerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("生产消费模式完成");
}
}
实战案例:多线程文件处理系统
下面是一个完整的实战案例,演示如何使用线程池和并发集合处理大量文件。
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class ConcurrentFileProcessor {
private final ExecutorService executor;
private final ConcurrentMap wordCounts = new ConcurrentHashMap();
public ConcurrentFileProcessor(int threadPoolSize) {
this.executor = Executors.newFixedThreadPool(threadPoolSize);
}
public void processDirectory(String directoryPath) throws IOException, InterruptedException {
// 获取目录下所有文本文件
List files = Files.walk(Paths.get(directoryPath))
.filter(Files::isRegularFile)
.filter(p -> p.toString().endsWith(".txt"))
.collect(Collectors.toList());
// 创建CompletionService用于获取完成的任务
CompletionService<ConcurrentMap> completionService =
new ExecutorCompletionService(executor);
// 提交所有文件处理任务
for (Path file : files) {
completionService.submit(() -> processFile(file));
}
// 等待所有任务完成并合并结果
for (int i = 0; i < files.size(); i++) {
try {
Future<ConcurrentMap> future = completionService.take();
ConcurrentMap fileCounts = future.get();
// 合并词频统计
fileCounts.forEach((word, count) ->
wordCounts.merge(word, count, Long::sum)
);
} catch (ExecutionException e) {
System.err.println("文件处理失败: " + e.getCause().getMessage());
}
}
// 输出最常见的10个词
wordCounts.entrySet().stream()
.sorted((e1, e2) -> Long.compare(e2.getValue(), e1.getValue()))
.limit(10)
.forEach(entry ->
System.out.println(entry.getKey() + ": " + entry.getValue())
);
}
private ConcurrentMap processFile(Path file) throws IOException {
System.out.println("处理文件: " + file + ",线程: " + Thread.currentThread().getName());
ConcurrentMap localCounts = new ConcurrentHashMap();
// 读取文件并统计词频
Files.lines(file)
.flatMap(line -> List.of(line.split("\W+")).stream())
.filter(word -> !word.isEmpty())
.map(String::toLowerCase)
.forEach(word ->
localCounts.merge(word, 1L, Long::sum)
);
return localCounts;
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
ConcurrentFileProcessor processor = new ConcurrentFileProcessor(8);
try {
long startTime = System.currentTimeMillis();
processor.processDirectory("./documents"); // 替换为实际目录
long endTime = System.currentTimeMillis();
System.out.println("处理完成,耗时: " + (endTime - startTime) + "ms");
System.out.println("总共统计单词: " + processor.wordCounts.size());
} catch (IOException | InterruptedException e) {
System.err.println("处理失败: " + e.getMessage());
} finally {
processor.shutdown();
}
}
}
总结与最佳实践
通过本教程,我们深入探讨了Java并发编程的核心概念和技术。以下是关键要点和最佳实践:
并发编程最佳实践
- 优先使用高级并发工具:如Executor框架、并发集合类,而不是手动管理线程
- 明确线程边界:清晰定义哪些数据由哪些线程访问,减少共享状态
- 使用不可变对象:不可变对象天生线程安全,可以自由共享
- 谨慎使用同步:同步范围应尽可能小,避免在同步块中执行耗时操作
- 处理中断 properly:正确响应中断请求,确保线程能够优雅终止
性能考虑
- 根据任务类型选择合适的线程池(CPU密集型 vs I/O密集型)
- 避免过度创建线程,线程创建和上下文切换都有开销
- 使用线程池监控工具(如JMX)来调优线程池参数
- 考虑使用Fork/Join框架处理可分治的递归任务
调试与测试
- 使用线程转储(thread dump)分析死锁和性能问题
- 编写可测试的并发代码,尽量减少同步复杂性
- 使用专门的并发测试工具(如JCStress)测试线程安全性
并发编程是Java开发中的高级主题,需要深入理解和大量实践。通过掌握这些技术,你将能够构建出高性能、高并发的Java应用程序,充分利用现代多核处理器的计算能力。