生产者-消费者问题是多线程模型中的经典问题
生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品
当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞
文章参考: https://juejin.cn/post/6844903486895865864
方法一 wait() notify() 方法实现
最基础的实现
缓冲区满时 Producer 调用 wait()
方法进入等待,直到 Consumer 消费一个产品
缓冲区空时 Consumer 调用 wait()
方法进入等待,直到 Producer 生产一个产品
/**
* wait() notify() 方法实现
* 这是生产者-消费者模式最简单的实现方法
*/
public class Main {
public static final String LOCK = "lock";
public static int count = 0;
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}
public class Producer implements Runnable{
@Override
public void run() {
while (true){
try {
// sleep() 是防止一个线程占用过多 cpu 时间片
// 否则一个生产者会连续生产多个资源,不利于观察结果
Thread.sleep(1000);
// wait() notify() 是基于锁的, 所以要进行 synchronized 同步
synchronized (LOCK){
//满了就等待
while (count == 10){
// 调用 wait() 会强制当前线程等待
// 直到其他线程调用notifyAll()或同一对象上的notify()
LOCK.wait();
}
count++;
System.out.println(Thread.currentThread().getName() +
"生产者生产,共有 " +
count);
// notify() 会通知唤醒一个对象的随机某个正在等待的线程
// notifyAll() 会通知唤醒一个对象上的所有线程
LOCK.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class Consumer implements Runnable{
@Override
public void run() {
while (true){
try {
Thread.sleep(1000);
synchronized (LOCK){
//空了就等待
while (count == 0){
LOCK.wait();
}
count--;
System.out.println(Thread.currentThread().getName() +
"消费者消费,共有 " +
count);
LOCK.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
方法二 ReentrantLock 可重入锁实现
ReentrantLock 位于 java.util.concurrent.locks 包下
通过 lock()
与 unlock()
方法实现对锁的控制
可重入锁也称为递归锁,意为同一线程中,外层获得锁,内层仍然可以获取该锁
该锁维护一个计数器,如果拥有锁的线程再次请求锁,则计数器+1,释放锁-1,直到为0才完全释放
/**
* ReentrantLock 可重入锁实现
*/
public class Main {
public static int count = 0;
// 可重入锁能设置为公平锁或非公平锁
// 默认为非公平锁
public static Lock reentrantLock = new ReentrantLock(false);
// 通过可重入锁创建两个条件变量(因为 Condition 对象需要绑定一个锁)
// 一个非满 一个非空
public static final Condition notFull = reentrantLock.newCondition();
public static final Condition notEmpty = reentrantLock.newCondition();
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}
public class Producer implements Runnable{
@Override
public void run() {
while (true){
try {
Thread.sleep(500);
reentrantLock.lock();
while (count == 10){
notFull.await();
}
count++;
System.out.println(Thread.currentThread().getName() +
"生产者生产,共有" +
count);
notEmpty.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
}
}
public class Consumer implements Runnable{
@Override
public void run() {
while (true){
try {
Thread.sleep(1000);
reentrantLock.lock();
while (count == 0){
notEmpty.await();
}
count--;
System.out.println(Thread.currentThread().getName() +
"消费者消费,共有" +
count);
notFull.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
}
}
方法三 BlockingQueue 阻塞队列实现
阻塞队列正如其名,某些情况下访问它可能会造成阻塞:
队列满时又入队
队列空时又出队
因此,阻塞队列无需同步就是线程安全的
BlockingQueue 常用方法
ThrowsException | Blocks | SpecialValue | TimesOut | |
---|---|---|---|---|
插入 | add(o) | put(o) | offer(o) | offer(o, timeout, timeunit) |
删除 | remove(o) | take(o) | poll(o) | poll(timeout, timeunit) |
检查 | element(o) | peek(o) |
- ThrowsException:如果操作不能马上进行,则抛出异常
- Blocks : 如果操作不能马上进行,操作会被阻塞
- SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
- TimesOut : 如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false
这里我们使用 Blocks 下的 put()
take()
方法来实现
/**
* BlockingQueue 阻塞队列实现
*/
public class Main {
public static int count = 0;
public static final BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
public static String food = "food";
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}
public class Producer implements Runnable{
@Override
public void run() {
while (true){
try {
Thread.sleep(1000);
blockingQueue.put(food);
count++;
System.out.println(Thread.currentThread().getName() +
"生产者生产,共有" +
count);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class Consumer implements Runnable{
@Override
public void run() {
while (true){
try {
Thread.sleep(1000);
blockingQueue.take();
count--;
System.out.println(Thread.currentThread().getName() +
"消费者消费,共有" +
count);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
方法四 Semaphore 信号量实现
Semaphore 信号量用于控制同时访问某一资源的线程数, 即协调各线程
Semaphore 维护了一个许可集
acquire()
取得一个许可,当许可不足时阻塞
release()
释放一个许可
/**
* Semaphore 信号量实现
*/
public class Main {
public static int count = 0;
public final Semaphore notFull = new Semaphore(10);
public final Semaphore notEmpty = new Semaphore(0);
// mutex互斥量用于生产者消费者间的同步,保证交替进行
// public final Semaphore mutex = new Semaphore(1);
public static void main(String[] args) {
Main main = new Main();
new Thread(main.new Producer()).start();
new Thread(main.new Consumer()).start();
new Thread(main.new Producer()).start();
new Thread(main.new Consumer()).start();
new Thread(main.new Producer()).start();
new Thread(main.new Consumer()).start();
}
class Producer implements Runnable{
@Override
public void run() {
while (true){
try {
Thread.sleep(1000);
notFull.acquire();
// mutex.acquire();
count++;
System.out.println(Thread.currentThread().getName() +
"生产者生产,共有" +
count);
} catch (Exception e) {
e.printStackTrace();
} finally {
notEmpty.release();
// mutex.release();
}
}
}
}
class Consumer implements Runnable{
@Override
public void run() {
while (true){
try {
Thread.sleep(1000);
notEmpty.acquire();
// mutex.acquire();
count--;
System.out.println(Thread.currentThread().getName() +
"消费者消费,共有" +
count);
} catch (Exception e) {
e.printStackTrace();
} finally {
notFull.release();
// mutex.release();
}
}
}
}
}
方法五 PipedInputStream 和 PipedOutputStream 管道输入输出流实现
PipedInputStream PipedOutputStream 位于 java.io 包下, 用于多线程通过管道进行通讯
使用时必须配套使用, 先创建管道输入流和输出流, 然后将其进行连接, 生产者往输出流写数据, 消费者往输入流读数据
注意:
计算机语言中,input 和output参照的对象是程序
比如 PipedInputStream,可以理解为程序读取管道数据
PipedOutStream 理解为程序写入数据到管道
而不是以 Pipe 管道作为参照物
所以 InputStream 流一般是跟一个
read()
的方法,OutputStream 跟write()
方法如图所示
/**
* PipedInputStream PipedOutputStream 管道输入输出流实现
*/
public class Main {
public static final PipedInputStream in = new PipedInputStream();
public static final PipedOutputStream out = new PipedOutputStream();
public static void main(String[] args) {
try {
in.connect(out);
} catch (Exception e) {
e.printStackTrace();
}
Main main = new Main();
new Thread(main.new Producer()).start();
new Thread(main.new Consumer()).start();
new Thread(main.new Producer()).start();
new Thread(main.new Consumer()).start();
new Thread(main.new Producer()).start();
new Thread(main.new Consumer()).start();
}
class Producer implements Runnable{
@Override
public void run() {
try {
while (true){
Thread.sleep(1000);
int num = new Random().nextInt();
System.out.println(Thread.currentThread().getName() +
"生产者生产数字:" +
num);
out.write(num);
out.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
in.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
@Override
public void run() {
try {
while (true){
Thread.sleep(1000);
int num = in.read();
System.out.println(Thread.currentThread().getName() +
"消费者消费一个数字:" +
num);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
in.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
- Post link: http://example.com/2022/01/12/%E7%94%9F%E4%BA%A7%E8%80%85%E6%B6%88%E8%B4%B9%E8%80%85%E6%A8%A1%E5%BC%8F%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B%E5%AE%9E%E7%8E%B0%E6%96%B9%E6%B3%95/
- Copyright Notice: All articles in this blog are licensed under unless otherwise stated.