引言
在多线程编程中,生产者-消费者问题是一个经典的同步问题,它描述了生产者和消费者之间如何协同工作。生产者生成数据到缓冲区,消费者从缓冲区取出数据消费。这个模式在实际应用中非常广泛,例如消息队列、任务调度等场景。
问题分析
生产者-消费者问题需要解决以下几个核心问题:
- 当缓冲区已满时,生产者必须等待
- 当缓冲区为空时,消费者必须等待
- 生产者和消费者不能同时访问缓冲区(互斥访问)
- 需要避免死锁和资源竞争
解决方案一:使用wait()和notify()方法
这是最基础的实现方式,通过Java内置的等待/通知机制实现线程间通信。
代码实现
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerWaitNotify {
private final Queue buffer = new LinkedList();
private final int maxSize;
public ProducerConsumerWaitNotify(int size) {
this.maxSize = size;
}
public void produce(int value) throws InterruptedException {
synchronized (buffer) {
while (buffer.size() == maxSize) {
System.out.println("缓冲区已满,生产者等待...");
buffer.wait();
}
buffer.add(value);
System.out.println("生产: " + value + ",缓冲区大小: " + buffer.size());
buffer.notifyAll();
}
}
public int consume() throws InterruptedException {
synchronized (buffer) {
while (buffer.isEmpty()) {
System.out.println("缓冲区为空,消费者等待...");
buffer.wait();
}
int value = buffer.poll();
System.out.println("消费: " + value + ",缓冲区大小: " + buffer.size());
buffer.notifyAll();
return value;
}
}
public static void main(String[] args) {
ProducerConsumerWaitNotify pc = new ProducerConsumerWaitNotify(5);
// 生产者线程
Thread producer = new Thread(() -> {
for (int i = 0; i {
for (int i = 0; i < 10; i++) {
try {
pc.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
代码解析
这个实现使用了synchronized关键字和wait()/notifyAll()方法:
- synchronized确保了对缓冲区的互斥访问
- 当缓冲区满时,生产者调用wait()进入等待状态
- 当缓冲区空时,消费者调用wait()进入等待状态
- 当有数据被生产或消费后,调用notifyAll()唤醒等待的线程
- 使用while循环而不是if语句检查条件,防止虚假唤醒
解决方案二:使用BlockingQueue实现
Java并发包中的BlockingQueue提供了线程安全的队列操作,可以大大简化生产者-消费者模式的实现。
代码实现
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerBlockingQueue {
private final BlockingQueue queue;
public ProducerConsumerBlockingQueue(int capacity) {
this.queue = new ArrayBlockingQueue(capacity);
}
public void produce(int value) throws InterruptedException {
queue.put(value);
System.out.println("生产: " + value + ",队列大小: " + queue.size());
}
public int consume() throws InterruptedException {
int value = queue.take();
System.out.println("消费: " + value + ",队列大小: " + queue.size());
return value;
}
public static void main(String[] args) {
ProducerConsumerBlockingQueue pc = new ProducerConsumerBlockingQueue(5);
// 生产者线程
Thread producer = new Thread(() -> {
for (int i = 0; i {
for (int i = 0; i < 10; i++) {
try {
pc.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
代码解析
使用BlockingQueue的实现更加简洁:
- BlockingQueue内部已经处理了线程同步问题
- put()方法在队列满时会自动阻塞生产者线程
- take()方法在队列空时会自动阻塞消费者线程
- 不需要手动处理wait()和notify()逻辑
解决方案三:使用Semaphore实现
信号量(Semaphore)是另一种同步机制,可以用于控制对共享资源的访问。
代码实现
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
public class ProducerConsumerSemaphore {
private final Queue buffer = new LinkedList();
private final int maxSize;
private final Semaphore producerSemaphore;
private final Semaphore consumerSemaphore;
private final Semaphore mutex;
public ProducerConsumerSemaphore(int size) {
this.maxSize = size;
this.producerSemaphore = new Semaphore(size);
this.consumerSemaphore = new Semaphore(0);
this.mutex = new Semaphore(1);
}
public void produce(int value) throws InterruptedException {
producerSemaphore.acquire(); // 获取生产许可
mutex.acquire(); // 获取互斥锁
buffer.add(value);
System.out.println("生产: " + value + ",缓冲区大小: " + buffer.size());
mutex.release(); // 释放互斥锁
consumerSemaphore.release(); // 增加消费许可
}
public int consume() throws InterruptedException {
consumerSemaphore.acquire(); // 获取消费许可
mutex.acquire(); // 获取互斥锁
int value = buffer.poll();
System.out.println("消费: " + value + ",缓冲区大小: " + buffer.size());
mutex.release(); // 释放互斥锁
producerSemaphore.release(); // 增加生产许可
return value;
}
public static void main(String[] args) {
ProducerConsumerSemaphore pc = new ProducerConsumerSemaphore(5);
// 生产者线程
Thread producer = new Thread(() -> {
for (int i = 0; i {
for (int i = 0; i < 10; i++) {
try {
pc.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
代码解析
使用信号量的实现方式:
- producerSemaphore初始化为缓冲区大小,表示可以生产的数量
- consumerSemaphore初始化为0,表示可以消费的数量
- mutex用于保证对缓冲区的互斥访问
- 生产前获取生产许可,生产后增加消费许可
- 消费前获取消费许可,消费后增加生产许可
性能比较与选择建议
三种实现方式各有优缺点:
| 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| wait()/notify() | Java内置支持,无需额外依赖 | 代码相对复杂,容易出错 | 简单的同步需求,学习理解多线程基础 |
| BlockingQueue | 代码简洁,线程安全由JDK保证 | 灵活性较低 | 大多数生产环境,快速开发 |
| Semaphore | 灵活性高,可以实现复杂同步模式 | 代码相对复杂,需要理解信号量机制 | 复杂的同步需求,需要精细控制 |
总结
生产者-消费者模式是多线程编程中的经典问题,Java提供了多种实现方式。在实际开发中,应根据具体需求选择合适的实现方式:
- 对于简单场景,可以使用wait()/notify()方式
- 对于大多数生产环境,推荐使用BlockingQueue
- 对于需要精细控制的复杂场景,可以考虑使用Semaphore
无论选择哪种方式,都需要注意线程安全和性能问题,避免死锁和资源竞争。
扩展思考
在实际应用中,生产者-消费者模式还可以进一步扩展:
- 多生产者和多消费者场景
- 优先级队列的实现
- 延迟队列的处理
- 线程池与生产者-消费者模式的结合
这些扩展场景可以根据具体业务需求进行设计和实现。

