生产者-消费者问题是多线程模型中的经典问题

生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品

当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞

文章参考: https://juejin.cn/post/6844903486895865864

方法一 wait() notify() 方法实现

最基础的实现

缓冲区满时 Producer 调用 wait() 方法进入等待,直到 Consumer 消费一个产品

缓冲区空时 Consumer 调用 wait() 方法进入等待,直到 Producer 生产一个产品

/**
 * wait() notify() 方法实现
 * 这是生产者-消费者模式最简单的实现方法
 */
public class Main {
    public static final String LOCK = "lock";
    public static int count = 0;
    
    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();

        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();

        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}
public class Producer implements Runnable{
    @Override
    public void run() {
        while (true){
            try {
                // sleep() 是防止一个线程占用过多 cpu 时间片
                // 否则一个生产者会连续生产多个资源,不利于观察结果
                Thread.sleep(1000);
                
                // wait() notify() 是基于锁的, 所以要进行 synchronized 同步
                synchronized (LOCK){
                    //满了就等待
                    while (count == 10){
                        // 调用 wait() 会强制当前线程等待
                        // 直到其他线程调用notifyAll()或同一对象上的notify()
                        LOCK.wait();
                    }

                    count++;
                    System.out.println(Thread.currentThread().getName() +
                            "生产者生产,共有 " +
                            count);

                    // notify() 会通知唤醒一个对象的随机某个正在等待的线程
                    // notifyAll() 会通知唤醒一个对象上的所有线程
                    LOCK.notifyAll();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
public class Consumer implements Runnable{
    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(1000);
                
                synchronized (LOCK){
                    //空了就等待
                    while (count == 0){
                        LOCK.wait();
                    }
                    
                    count--;
                    System.out.println(Thread.currentThread().getName() +
                            "消费者消费,共有 " +
                            count);

                    LOCK.notifyAll();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

方法二 ReentrantLock 可重入锁实现

ReentrantLock 位于 java.util.concurrent.locks 包下

通过 lock()unlock() 方法实现对锁的控制

可重入锁也称为递归锁,意为同一线程中,外层获得锁,内层仍然可以获取该锁

该锁维护一个计数器,如果拥有锁的线程再次请求锁,则计数器+1,释放锁-1,直到为0才完全释放

/**
 * ReentrantLock 可重入锁实现
 */
public class Main {
    public static int count = 0;
    // 可重入锁能设置为公平锁或非公平锁 
    // 默认为非公平锁
    public static Lock reentrantLock = new ReentrantLock(false);
    // 通过可重入锁创建两个条件变量(因为 Condition 对象需要绑定一个锁)
    // 一个非满 一个非空
    public static final Condition notFull = reentrantLock.newCondition();
    public static final Condition notEmpty = reentrantLock.newCondition();
    
    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();

        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();

        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}
public class Producer implements Runnable{
    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(500);
                
                reentrantLock.lock();
                while (count == 10){
                    notFull.await();
                }
                count++;
                
                System.out.println(Thread.currentThread().getName() +
                        "生产者生产,共有" +
                        count);
                
                notEmpty.signal();
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }
    }
}
public class Consumer implements Runnable{
    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(1000);
                
                reentrantLock.lock();
                while (count == 0){
                    notEmpty.await();
                }
                count--;

                System.out.println(Thread.currentThread().getName() +
                        "消费者消费,共有" +
                        count);
                
                notFull.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }
    }
}

方法三 BlockingQueue 阻塞队列实现

阻塞队列正如其名,某些情况下访问它可能会造成阻塞:

队列满时又入队

队列空时又出队

因此,阻塞队列无需同步就是线程安全的

BlockingQueue 常用方法

ThrowsException Blocks SpecialValue TimesOut
插入 add(o) put(o) offer(o) offer(o, timeout, timeunit)
删除 remove(o) take(o) poll(o) poll(timeout, timeunit)
检查 element(o) peek(o)
  • ThrowsException:如果操作不能马上进行,则抛出异常
  • Blocks : 如果操作不能马上进行,操作会被阻塞
  • SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  • TimesOut : 如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false

这里我们使用 Blocks 下的 put() take() 方法来实现

/**
 * BlockingQueue 阻塞队列实现
 */
public class Main {
    public static int count = 0;
    
    public static final BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
    public static String food = "food";
    
    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();

        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();

        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}
public class Producer implements Runnable{
    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(1000);

                blockingQueue.put(food);
                count++;

                System.out.println(Thread.currentThread().getName() +
                        "生产者生产,共有" +
                        count);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
public class Consumer implements Runnable{
    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(1000);

                blockingQueue.take();
                count--;

                System.out.println(Thread.currentThread().getName() +
                        "消费者消费,共有" +
                        count);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

方法四 Semaphore 信号量实现

Semaphore 信号量用于控制同时访问某一资源的线程数, 即协调各线程

Semaphore 维护了一个许可集

acquire() 取得一个许可,当许可不足时阻塞

release() 释放一个许可

/**
 * Semaphore 信号量实现
 */
public class Main {
    public static int count = 0;
    
    public final Semaphore notFull = new Semaphore(10);
    public final Semaphore notEmpty = new Semaphore(0);
    // mutex互斥量用于生产者消费者间的同步,保证交替进行
//    public final Semaphore mutex = new Semaphore(1);
    
    public static void main(String[] args) {
        Main main = new Main();

        new Thread(main.new Producer()).start();
        new Thread(main.new Consumer()).start();

        new Thread(main.new Producer()).start();
        new Thread(main.new Consumer()).start();

        new Thread(main.new Producer()).start();
        new Thread(main.new Consumer()).start();
    }
    
    class Producer implements Runnable{
        @Override
        public void run() {
            while (true){
                try {
                    Thread.sleep(1000);
                    
                    notFull.acquire();
//                    mutex.acquire();
                    
                    count++;

                    System.out.println(Thread.currentThread().getName() +
                            "生产者生产,共有" +
                            count);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    notEmpty.release();
//                    mutex.release();
                }
            }
        }
    }
    
    class Consumer implements Runnable{
        @Override
        public void run() {
            while (true){
                try {
                    Thread.sleep(1000);
                    
                    notEmpty.acquire();
//                    mutex.acquire();
                    
                    count--;

                    System.out.println(Thread.currentThread().getName() +
                            "消费者消费,共有" +
                            count);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    notFull.release();
//                    mutex.release();
                }
            }
        }
    }
}

方法五 PipedInputStream 和 PipedOutputStream 管道输入输出流实现

PipedInputStream PipedOutputStream 位于 java.io 包下, 用于多线程通过管道进行通讯

使用时必须配套使用, 先创建管道输入流和输出流, 然后将其进行连接, 生产者往输出流写数据, 消费者往输入流读数据

注意:

计算机语言中,input 和output参照的对象是程序

比如 PipedInputStream,可以理解为程序读取管道数据

PipedOutStream 理解为程序写入数据到管道

而不是以 Pipe 管道作为参照物

所以 InputStream 流一般是跟一个 read() 的方法,OutputStream 跟 write() 方法

如图所示

img

/**
 * PipedInputStream PipedOutputStream 管道输入输出流实现
 */
public class Main {
    public static final PipedInputStream in = new PipedInputStream();
    public static final PipedOutputStream out = new PipedOutputStream();

    public static void main(String[] args) {
        try {
            in.connect(out);
        } catch (Exception e) {
            e.printStackTrace();
        }

        Main main = new Main();

        new Thread(main.new Producer()).start();
        new Thread(main.new Consumer()).start();

        new Thread(main.new Producer()).start();
        new Thread(main.new Consumer()).start();

        new Thread(main.new Producer()).start();
        new Thread(main.new Consumer()).start();
    }
    
    class Producer implements Runnable{
        @Override
        public void run() {
            try {
                while (true){
                    Thread.sleep(1000);

                    int num = new Random().nextInt();

                    System.out.println(Thread.currentThread().getName() +
                            "生产者生产数字:" +
                            num);

                    out.write(num);
                    out.flush();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    in.close();
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    class Consumer implements Runnable{
        @Override
        public void run() {
            try {
                while (true){
                    Thread.sleep(1000);

                    int num = in.read();

                    System.out.println(Thread.currentThread().getName() +
                            "消费者消费一个数字:" +
                            num);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    in.close();
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}