问题
(1)SynchronousQueue的实现方式?
(2)SynchronousQueue真的是无缓冲的吗?
(3)SynchronousQueue在高并发情景下会有什么问题?
简介
SynchronousQueue是java并发包下无缓冲阻塞队列,它用来在两个线程之间移交元素,但是它有个很大的问题,你知道是什么吗?请看下面的分析。
UML继承关系
源码分析
主要属性
1 | // CPU的数量 |
通过属性我们可以Get到两个点:
(1)这个阻塞队列里面是会自旋的;
(2)它使用了一个叫做transferer的东西来交换元素;
主要内部类
1 | // Transferer抽象类,主要定义了一个transfer方法用来传输元素 |
(1)定义了一个抽象类Transferer,里面定义了一个传输元素的方法;
(2)有两种传输元素的方法,一种是栈,一种是队列;
(3)栈的特点是后进先出,队列的特点是先进行出;
(4)栈只需要保存一个头节点就可以了,因为存取元素都是操作头节点;
(5)队列需要保存一个头节点一个尾节点,因为存元素操作尾节点,取元素操作头节点;
(6)每个节点中保存着存储的元素、等待着的线程,以及下一个节点;
(7)栈和队列两种方式有什么不同呢?请看下面的分析。
主要构造方法
1 | public SynchronousQueue() { |
(1)默认使用非公平模式,也就是栈结构;
(2)公平模式使用队列,非公平模式使用栈;
入队
我们这里主要介绍以栈方式实现的传输模式,以put(E e)方法为例。
1 | public void put(E e) throws InterruptedException { |
调用transferer的transfer()方法,传入元素e,说明是生产者
出队
我们这里主要介绍以栈方式实现的传输模式,以take()方法为例。
1 | public E take() throws InterruptedException { |
调用transferer的transfer()方法,传入null,说明是消费者。
transfer()方法
transfer()方法同时实现了取元素和放元素的功能,下面我再来看看这个transfer()方法里究竟干了什么。
1 | // TransferStack.transfer()方法 |
整个逻辑比较复杂,这里为了简单起见,屏蔽掉多线程处理的细节,只描述正常业务场景下的逻辑:
(1)如果栈中没有元素,或者栈顶元素跟将要入栈的元素模式一样,就入栈;
(2)入栈后自旋等待一会看有没有其它线程匹配到它,自旋完了还没匹配到元素就阻塞等待;
(3)阻塞等待被唤醒了说明其它线程匹配到了当前的元素,就返回匹配到的元素;
(4)如果两者模式不一样,且头节点没有在匹配中,就拿当前节点跟它匹配,匹配成功了就返回匹配到的元素;
(5)如果两者模式不一样,且头节点正在匹配中,当前线程就协助去匹配,匹配完成了再让当前节点重新入栈重新匹配;
如果直接阅读这部分代码还是比较困难的,建议写个测试用例,打个断点一步一步跟踪调试。
下面是我的测试用例,可以参考下,在IDEA中可以让断点只阻塞线程:
1 | public class TestSynchronousQueue { |
修改断点只阻塞线程的方法,右击断点,选择Thread:
交给你了
上面的源码分析都是基于Stack的方式来分析的,那么队列是怎么动作的呢?很简单哦,测试用例中的false改成true就可以了,这就交给你了。
总结
(1)SynchronousQueue是java里的无缓冲队列,用于在两个线程之间直接移交元素;
(2)SynchronousQueue有两种实现方式,一种是公平(队列)方式,一种是非公平(栈)方式;
(3)栈方式中的节点有三种模式:生产者、消费者、正在匹配中;
(4)栈方式的大致思路是如果栈顶元素跟自己一样的模式就入栈并等待被匹配,否则就匹配,匹配到了就返回;
(5)队列方式的大致思路是……不告诉你^^(两者的逻辑差别还是挺大的)
彩蛋
(1)SynchronousQueue真的是无缓冲的队列吗?
通过源码分析,我们可以发现其实SynchronousQueue内部或者使用栈或者使用队列来存储包含线程和元素值的节点,如果同一个模式的节点过多的话,它们都会存储进来,且都会阻塞着,所以,严格上来说,SynchronousQueue并不能算是一个无缓冲队列。
(2)SynchronousQueue有什么缺点呢?
试想一下,如果有多个生产者,但只有一个消费者,如果消费者处理不过来,是不是生产者都会阻塞起来?反之亦然。
这是一件很危险的事,所以,SynchronousQueue一般用于生产、消费的速度大致相当的情况,这样才不会导致系统中过多的线程处于阻塞状态。