生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:
- 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
- 如果共享数据区为空的话,阻塞消费者继续消费数据;
在实现生产者消费者问题时,可以采用三种方式:
1.使用 Object 的 wait/notify 的消息通知机制;
2.使用 Lock 的 Condition 的 await/signal 的消息通知机制;
3.使用 BlockingQueue 实现。本文主要将这三种实现方式进行总结归纳。
1. wait/notify 机制
1 | package 多线程.Producer_Consumer; |
2. await/signal 机制
这个我在之前有写过。不过其实我觉得,producer 和 consumer 应该两把锁会更加合适一些,生产者和消费者不应该有资源冲突。而这刚好就是 ArrayBlockingQueue 和 LinkedBlockingQueue 的区别,前者是生产者和消费者共一把独占锁,但是后者是分开,两把锁,这样并发的效率也就越高,线程等待的机会会更少。
1 | package 多线程.Producer_Consumer; |
上面的代码写的有点问题,为了保证思考的过程呈现,我就不删除了:
首先是主程序 main 中创建线程写错了…这样只会创建两个线程…低级失误;
在 await 时,用 if 会报错,但是用 while 没问题。
然后写完之后我发现输出很奇怪,并且又产生了新的疑问:
- 为何每次都是 producer 和 consumer 连着一起?
- 为何 await 那块用 if 会报错,但是用 while 就可以?
- 按道理来说,consumer 和 producer 不应该有资源竞争关系,所以合理来说应该是两把锁,如果 consumer 和 producer 分两把锁来操作又如何完成呢?
为了更好地解决以上几个疑问,我简化了一下例子,将 producer 的线程降为 1 个,将 consumer 的线程 降为 3 个。
Q1:为何每次都是 producer 和 consumer 连着一起?
经过多次测试发现,不会总是 producer 和 consumer 连在一起出现,只是最开始的时候会连在一起出现,为什么呢?因为在最开始的时候,producer是先启动的,那么在同步队列中排队,producer是在前面的,所以率先抢到锁,consumer等producer抢完之后才开始抢锁消费,但是后期由于偏向锁和Condition的原因,就不会 producer连着出现的情况了。
Q2:为何 await 那块用 if 会报错,但是用 while 就可以?
为了解决这个问题,我特意将 producer 的线程降为 1 个,将 consumer 的线程 降为 3 个,结果是依然会报错。
新的 demo 如下:
1 | package 多线程.Producer_Consumer; |
如上,我们来分析一下为何要用 while,下面我把程序整个运行过程讲一遍。
- 首先我们清楚 Thread-0 是 producer,Thread 1 、2、3 都是 consumer;
- 在最开始, Thread-0 最先启动,所以它也理所当然的第一个抢到了锁
从图中我们也可以看出,此时抢到锁的 exclusiveOwnerThread 为 Thread 0,从队列中我们可以看出剩下的三个 Thread 已经全部进入到同步队列,且顺序是 Thread 3 - Thread 1 - Thread 2(后面那个 5 是指优先级,不用管),为什么不可能进入到条件队列呢?因为 nextWaiter = null,且从 waitStatus = -1可知,三个线程都在同步队列中等待被唤醒。所以我们预测下一个抢到锁的一定是 Thread 3,他会去消费 Thread 0 刚刚生产出的消息,让我们接着看;
- 正如我们所料,真的是 Thread 3抢到了锁然后去消费了。
1 | producer-consumer 同一把锁 |
我们再通过 debug 的变量验证一下。
看,真的 Thread 3 抢到锁了。不信,继续看下面更详细的信息:
如图我们可以看到,Thread 3 消费掉了消息,并且注意到同步队列现在变为了 Thread 1 - Thread 2 - Thread 0,开始是 3 - 1 - 2,现在就变成了 1 - 2 - 0,你还别说,这种同步真的速度有点慢,Thread 0 都休息完了 3 秒了,前面那两老哥还在等着呢,不过这里还有一个要注意的地方,就是 Thread 0 的 waitStatus = 0,这是为啥呢,凭啥别人都是 waitStatus = -1呢,回顾一下,watiStatus = -1意思就是等待被唤醒,waitStatus = 0 意思就是下个节点是 null,当然也可能是下个节点还没初始化完,但是这里是 null。
这下总该肯定是 Thread 3 消费了吧,我们接着往下看。当然了,我们先预测一波,下面要抢锁的肯定是 1号老大哥,然后此时已经没有消息给他消费了,那他就只能掉入 Condition 的大坑了,只能从同步队列移到条件队列了,好苦逼啊,等了这么久,还得换个地方接着等!
- 我们来看看,结果是不是如我们预料的一样。
果然,接着就是 1 号老大哥在运行,不信我们看 debug 详细的参数:
握草,我被打脸了…虽然此时的确是 Thread 1 在执行,但是!!!玛德条件队列竟然不是 1号老大哥,是刚刚消费消息的 3 号小老弟,它在搞什么??????仔细思考一下,我们得知道 jvm 默认开启了偏向锁,至于什么是偏向锁,那就回到多线程(一)去复习吧,3 号小老弟使用了外挂—偏向锁「错了!!!!纠正一下,压根不是用的偏向锁,这里早就升级到了重量级锁,哪里来的偏向锁,真正插队的原因是这里是非公平锁!!!!!」,于是它很恶心的插队了,本来按道理来说,应该是 1号老哥抢到锁,然后因为没有消息可以消费,所以它从同步队列移入到条件队列中等到被 signal,然后同步队列理应就剩下 2 - 0,然后可能 3 号执行完了接在后面。但是现实是什么呢?那就是 3 号小老弟使用外挂偏向锁,直接插队,率先获取锁,由于没有消息可以消费,所以它从同步队列移入到了条件队列,开始等待,我们也可以看到,图中 consumer_condition的队列中只有一个 Thread,那就是 3 号小老弟,3号小老弟在条件队列中等待之后,暂时放弃了锁,于是终于尼玛轮到我 1 号老大哥了,我们可以看到,此时的确是 1号开始运行,然后同步队列中是 2 - 0 - null,即两个队列,这的确是符合我们现在的想法。
虽然刚才预测错了,但是那是因为 3 号使用了外挂,所以我还是接着预测吧,毕竟我预测的其实还是挺准的。此时因为没有消息可以消费,所以 1 号老大哥很可怜,跟 3 号小弟一样的下场,从同步队列转移到条件队列队尾继续等待,按道理来说 1 号将要在条件队列中,也就是 3 - 1,然后他让出锁,等待被唤醒,此时同步队列中队首应该是 2 号老哥了,终于轮到人家了…当然它应该也是很不幸的,只能跟 3 - 1一样的下场,回到条件队列队尾继续待着…预测完了,我们来看看真实情况。
- 看结果的确是 2 号老哥在运行着:
我们来看一下具体的参数,验证一下:
完美,预测的一模一样。2 号抢到锁之后,同步队列就只剩下了 0 号,并且 waitStatus = 0,说明这是队尾了,再提醒一下哈,当前节点的 waitStatus 的值是表示下一节点的状态,比如说 waitStatus = -1说明的是该节点的下一个节点已经准备好被唤醒了。然后我们再看一下条件队列的情况,情况和我们思考的是一样的,条件队列中现在就 2 个 Thread,分别是 3 - 1,当然了,马上 2 就要加入他们了,因为此时还是没有消息可以消费,我们接着往下看。
- 啊啊啊啊,不知道为啥,可能是我点错了,突然就出现了这样的结果。 并且条件队列为空。
1 | producer-consumer 同一把锁 |
不要着急,我们来分析一波,用理论战胜错误。我们刚才是这样的情况,输出框是
1 | producer-consumer 同一把锁 |
说明这期间有一次生产,有一次消费,而此时我们的条件队列应该是 3 - 1 - 2,同步队列只剩下 0。所以此时只剩下 0 可以抢锁了,于是 0 拿到了锁,然后生产出了一条消息,这个可以说得通,此时生产完之后,它调用 consumer_condition.signalall(),把条件队列中的 3 个都给唤醒了,3 号小老弟由于是在条件队列队首,所以它第一个从条件队列中加入到同步队列,1 号 和 2 号 老大哥紧随其后加入到同步队列,于是他第一个抢到锁,继续执行自己的逻辑,注意,我们终于要解决第二个问题了!!!!他发现此时有消息可以消费,于是他进行了消费,此时已经没有消息可以消费了,但是 1号老大哥不知道啊,它刚刚被唤醒,兴高采烈的以为终于可以消费了,因为我们写的是 if,所以它不会再去判断是否还要消息可以消费,因为它以为肯定有消息啊对吧,不然你唤醒我干嘛!于是他去消费,poll()一个空list,这不是找报错嘛…所以,就真的报错了。
所以说,遇到问题不要慌,理论掌握的透透的,不会出现意外情况的,我们来总结一下为什么要用 while 而不能用 if:
- 被唤醒之后,条件队列中的 Thread 会加入到 同步队列中,如果此时同步队列中还有 Thread,那么条件队列中被唤醒的 Thread 必须跟在它们的后面,如果此时同步队列中的 Thread 把消息消费完了,我后面的刚才条件队列中被唤醒的 Thread,根本无法知道没有消息了,因为它们一拿到 lock 就会继续往下执行;
- 当然了,刚才是两批人的战斗,条件队列算一批人,同步队列算一批人,同步队列的把人家条件队列的人给害了还不告诉人家,恶心!当然最恶心的还是自家人的斗争,也就是条件队列中的 Thread 的自相残杀,条件队列后面的压根不知道前面的消费完了,所以也会报错。
因此,将 if 改为 while,我在被唤醒之后依旧可以首先判断一下是否还有消息,这是非常有必要的。
Q3:消费者-生产者使用两把锁,又如何完成呢?
的确,消费者和生产者本就不应该放到一把锁中去控制,我消费我的,你生产你的,只是各有一种情况停止而已,所以使用两把锁才是非常合理的设计,我之前也有说,这就是 ArrayBlockingQueue 和 LinkedBlockingQueue 最大的区别。所以,接下来这个问题,我希望能借助于 LinkedBlockingQueue 的源码来解决这个问题。
1 | package 多线程.Producer_Consumer; |
上述就是完全借鉴于 LinkedBlockingQueue,但是源码里面根本没有调用 signalAll(),而是利用两把锁的优势,只需要唤醒条件队列的第一个,然后一个接一个去唤醒就可以了,这个就很妙!!!!
1 | package 多线程.Producer_Consumer; |
感慨一句,源码写的真的精妙啊!读源码真的让自己学到了很多,就是有点可惜现在还不太懂设计模式,导致很多很好的思想还没有学会。。。
3. BlockingQueue 实现
别人写的:
1 | package 多线程.Producer_Consumer; |
自己无聊重写一个吧:
1 | class BlockingQueue_product_consumer2 { |