一、生产者-消费者模式概述
生产者-消费者模式是并发编程中的经典设计模式,用于解决多线程环境下线程间协作和资源共享的问题。
该模式包含三个主要组件:
- 生产者:生成数据并放入共享缓冲区
- 消费者:从共享缓冲区取出并处理数据
- 缓冲区:协调生产者和消费者的工作节奏,解耦两者
二、实现方式比较
| 实现方式 | 优点 | 缺点 |
|---|---|---|
| wait()/notify() | 原生支持,无需额外依赖 | 代码复杂,容易出错 |
| BlockingQueue | 简单易用,线程安全 | 需要理解队列特性 |
| Lock和Condition | 灵活性高,功能强大 | 代码相对复杂 |
三、基于BlockingQueue的实现
Java的java.util.concurrent包提供了BlockingQueue接口,它是实现生产者-消费者模式的最佳选择。
1. 生产者线程实现
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private final BlockingQueue queue;
private final AtomicInteger counter = new AtomicInteger(0);
private final int maxItems;
private static final int DELAY = 100;
public Producer(BlockingQueue queue, int maxItems) {
this.queue = queue;
this.maxItems = maxItems;
}
@Override
public void run() {
try {
while (counter.get() < maxItems) {
int value = counter.incrementAndGet();
queue.put(value); // 阻塞直到有空间可用
System.out.println(Thread.currentThread().getName() + " 生产: " + value);
Thread.sleep(DELAY); // 模拟生产耗时
}
System.out.println(Thread.currentThread().getName() + " 完成生产");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().getName() + " 被中断");
}
}
}
2. 消费者线程实现
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private final BlockingQueue queue;
private static final int DELAY = 150;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer value = queue.take(); // 阻塞直到有元素可用
System.out.println(Thread.currentThread().getName() + " 消费: " + value);
processValue(value);
Thread.sleep(DELAY); // 模拟消费耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().getName() + " 被中断");
}
}
private void processValue(Integer value) {
// 模拟数据处理
System.out.println(Thread.currentThread().getName() + " 正在处理: " + value);
}
}
3. 主程序与测试
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerDemo {
public static void main(String[] args) throws InterruptedException {
// 创建有界阻塞队列,容量为5
BlockingQueue queue = new LinkedBlockingQueue(5);
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
// 创建生产者
Producer producer1 = new Producer(queue, 10);
Producer producer2 = new Producer(queue, 10);
// 创建消费者
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
// 提交任务到线程池
executor.execute(producer1);
executor.execute(producer2);
executor.execute(consumer1);
executor.execute(consumer2);
// 运行30秒后关闭
Thread.sleep(30000);
// 优雅关闭线程池
executor.shutdownNow();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("线程池未正常关闭");
}
System.out.println("程序结束,队列剩余: " + queue.size() + " 个元素");
}
}
四、使用wait()和notify()的传统实现
虽然不推荐在现代Java代码中使用,但理解wait()/notify()机制对掌握线程同步原理很有帮助。
1. 传统方式的缓冲区实现
import java.util.LinkedList;
import java.util.Queue;
public class TraditionalBuffer {
private final Queue queue = new LinkedList();
private final int capacity;
public TraditionalBuffer(int capacity) {
this.capacity = capacity;
}
public synchronized void put(int value) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // 缓冲区满,等待
}
queue.add(value);
System.out.println("生产: " + value + ",缓冲区大小: " + queue.size());
notifyAll(); // 通知消费者可以消费了
}
public synchronized int take() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // 缓冲区空,等待
}
int value = queue.poll();
System.out.println("消费: " + value + ",缓冲区大小: " + queue.size());
notifyAll(); // 通知生产者可以生产了
return value;
}
public synchronized int size() {
return queue.size();
}
}
五、高级主题:使用Lock和Condition
Java 5引入了Lock和Condition接口,提供了更灵活的线程同步机制。
1. 使用Lock和Condition的缓冲区
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AdvancedBuffer {
private final Queue queue = new LinkedList();
private final int capacity;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public AdvancedBuffer(int capacity) {
this.capacity = capacity;
}
public void put(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 缓冲区满,等待
}
queue.add(value);
System.out.println("生产: " + value + ",缓冲区大小: " + queue.size());
notEmpty.signal(); // 通知消费者可以消费了
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 缓冲区空,等待
}
int value = queue.poll();
System.out.println("消费: " + value + ",缓冲区大小: " + queue.size());
notFull.signal(); // 通知生产者可以生产了
return value;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
六、性能优化和最佳实践
- 选择合适的队列类型:
- ArrayBlockingQueue:固定大小,基于数组
- LinkedBlockingQueue:可选边界,基于链表
- PriorityBlockingQueue:支持优先级排序
- SynchronousQueue:不存储元素,直接传递
- 合理设置线程数量:根据CPU核心数和任务类型确定
- 正确处理中断:及时响应中断请求,确保线程能正常退出
- 使用线程池:避免频繁创建销毁线程的开销
- 监控和调试:使用JMX或自定义监控统计生产消费速度
七、常见问题与解决方案
| 问题 | 现象 | 解决方案 |
|---|---|---|
| 死锁 | 程序卡住,无响应 | 避免嵌套锁,按固定顺序获取锁 |
| 活锁 | 线程持续工作但无进展 | 引入随机退避机制 |
| 资源耗尽 | 内存溢出,系统变慢 | 使用有界队列,限制生产速度 |
| 生产者过快 | 队列积压,内存增长 | 使用背压机制,限制生产速率 |
| 消费者过快 | CPU空转,资源浪费 | 合理设置消费者等待策略 |
八、总结
生产者-消费者模式是Java多线程编程中最重要和最常用的模式之一。本文介绍了三种实现方式:
- 基于BlockingQueue的现代实现(推荐)
- 使用wait()/notify()的传统实现
- 使用Lock和Condition的高级实现
在实际开发中,应根据具体需求选择合适的实现方式。对于大多数场景,建议使用BlockingQueue,因为它提供了线程安全、高效且易于使用的API。
正确实现生产者-消费者模式能够提高程序性能、资源利用率和响应能力,是多线程开发必备的核心技能。

