1. 生产者-消费者问题简介
生产者-消费者问题是多线程编程中的经典同步问题,描述的是两类线程(生产者和消费者)共享固定大小缓冲区时的协作问题。
生产者生成数据放入缓冲区,消费者从缓冲区取出数据消费。关键是要确保生产者不会在缓冲区满时添加数据,消费者不会在缓冲区空时消耗数据。
2. 使用BlockingQueue实现
Java的BlockingQueue
接口是解决生产者-消费者问题的理想选择,它提供了线程安全的队列操作,并内置了阻塞机制。
2.1 创建数据模型
// 数据项类
public class DataItem {
private final int id;
private final String data;
public DataItem(int id, String data) {
this.id = id;
this.data = data;
}
public int getId() { return id; }
public String getData() { return data; }
@Override
public String toString() {
return "DataItem{id=" + id + ", data='" + data + "'}";
}
}
2.2 实现生产者
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private final BlockingQueue queue;
private final AtomicInteger counter = new AtomicInteger(0);
private volatile boolean isRunning = true;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (isRunning) {
// 模拟数据生成耗时
Thread.sleep(100);
int id = counter.incrementAndGet();
DataItem item = new DataItem(id, "Data-" + id);
// 将数据放入队列,如果队列满则阻塞
queue.put(item);
System.out.println("生产者生成: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
2.3 实现消费者
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private final BlockingQueue queue;
private volatile boolean isRunning = true;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (isRunning) {
// 从队列取出数据,如果队列空则阻塞
DataItem item = queue.take();
// 模拟数据处理耗时
Thread.sleep(150);
System.out.println("消费者处理: " + item + " | 由线程: "
+ Thread.currentThread().getName());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
2.4 创建主应用程序
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerDemo {
public static void main(String[] args) throws InterruptedException {
// 创建大小为10的阻塞队列
BlockingQueue queue = new LinkedBlockingQueue(10);
// 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 创建生产者
Producer producer = new Producer(queue);
executor.execute(producer);
// 创建3个消费者
for (int i = 0; i < 3; i++) {
Consumer consumer = new Consumer(queue);
executor.execute(consumer);
}
// 运行10秒
Thread.sleep(10 * 1000);
// 停止生产者
producer.stop();
// 等待队列中的数据被消费完
while (!queue.isEmpty()) {
Thread.sleep(500);
}
// 停止消费者
executor.shutdownNow();
// 等待线程池终止
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("仍有任务未结束");
}
System.out.println("程序结束");
}
}
3. 使用wait()和notify()手动实现
除了使用BlockingQueue
,我们也可以使用传统的wait()
和notify()
方法手动实现生产者-消费者模式。
3.1 实现自定义阻塞队列
import java.util.LinkedList;
import java.util.Queue;
public class CustomBlockingQueue {
private final Queue queue = new LinkedList();
private final int capacity;
public CustomBlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T item) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // 队列满,等待
}
queue.add(item);
notifyAll(); // 通知消费者有数据可用
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // 队列空,等待
}
T item = queue.remove();
notifyAll(); // 通知生产者有空间可用
return item;
}
public synchronized int size() {
return queue.size();
}
public synchronized boolean isEmpty() {
return queue.isEmpty();
}
}
4. 性能优化和注意事项
4.1 选择合适的队列实现
- LinkedBlockingQueue: 无界或有界队列,适合生产消费速度大致匹配的场景
- ArrayBlockingQueue: 有界队列,固定大小,内存使用更高效
- SynchronousQueue: 不存储元素的队列,适合直接传递场景
- PriorityBlockingQueue: 支持优先级的无界队列
4.2 线程池配置策略
根据任务特性选择合适的线程池:
// CPU密集型任务
int cpuCores = Runtime.getRuntime().availableProcessors();
ExecutorService cpuBoundExecutor = Executors.newFixedThreadPool(cpuCores);
// IO密集型任务
ExecutorService ioBoundExecutor = Executors.newCachedThreadPool();
// 定时任务
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4);
4.3 异常处理
在多线程环境中,恰当的异常处理至关重要:
// 使用UncaughtExceptionHandler处理未捕获异常
Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("线程 " + thread.getName() + " 发生异常: " + throwable.getMessage());
});
// 或者为每个线程单独设置
Thread producerThread = new Thread(producer);
producerThread.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("生产者线程异常: " + throwable.getMessage());
});
5. 实际应用场景
5.1 日志处理系统
生产者:应用程序生成日志 → 消费者:日志处理服务(写入文件、发送到日志服务器等)
5.2 图像处理管道
生产者:图像上传 → 消费者1:图像压缩 → 消费者2:生成缩略图 → 消费者3:保存到存储系统
5.3 消息推送系统
生产者:接收推送请求 → 消费者:调用第三方推送服务(APNs、FCM等)
6. 总结
生产者-消费者模式是Java多线程编程中的重要模式,能够有效解耦生产者和消费者,平衡处理能力差异,提高系统整体吞吐量。
本文介绍了两种实现方式:使用Java内置的BlockingQueue
和手动使用wait()/notify()
实现。在实际开发中,推荐优先使用BlockingQueue
,因为它更简洁且不易出错。
正确实现生产者-消费者模式需要注意线程安全、死锁避免、资源管理和异常处理等方面,这些都是构建高并发、高性能Java应用的关键技能。
7. 扩展阅读
- Java并发包(java.util.concurrent)深入研究
- Disruptor高性能队列框架
- 响应式编程与Reactor模式
- Java内存模型与happens-before原则