问题
(1)LinkedTransferQueue是什么东东?
(2)LinkedTransferQueue是怎么实现阻塞队列的?
(3)LinkedTransferQueue是怎么控制并发安全的?
(4)LinkedTransferQueue与SynchronousQueue有什么异同?
简介
LinkedTransferQueue是LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体,它综合了这三者的方法,并且提供了更加高效的实现方式。
UML继承关系
LinkedTransferQueue实现了TransferQueue接口,而TransferQueue接口是继承自BlockingQueue的,所以LinkedTransferQueue也是一个阻塞队列。
TransferQueue接口中定义了以下几个方法:
1 | // 尝试移交元素 |
主要是定义了三个移交元素的方法,有阻塞的,有不阻塞的,有超时的。
存储结构
LinkedTransferQueue使用了一个叫做dual data structure
的数据结构,或者叫做dual queue
,译为双重数据结构或者双重队列。
双重队列是什么意思呢?
放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。
放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。
取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。
用图形来表示就是下面这样:
不管是放元素还是取元素,都先跟头节点对比,如果二者模式不一样就匹配它们,如果二者模式一样,就入队。
源码分析
主要属性
1 | // 头节点 |
主要内部类
1 | static final class Node { |
典型的单链表结构,内部除了存储元素的值和下一个节点的指针外,还包含了是否为数据节点和持有元素的线程。
内部通过isData区分是生产者还是消费者。
主要构造方法
1 | public LinkedTransferQueue() { |
只有这两个构造方法,且没有初始容量,所以是无界的一个阻塞队列。
入队
四个方法都是一样的,使用异步的方式调用xfer()方法,传入的参数都一模一样。
1 | public void put(E e) { |
xfer(E e, boolean haveData, int how, long nanos)的参数分别是:
(1)e表示元素;
(2)haveData表示是否是数据节点,
(3)how表示放取元素的方式,上面提到的四种,NOW、ASYNC、SYNC、TIMED;
(4)nanos表示超时时间;
出队
出队的四个方法也是直接或间接的调用xfer()方法,放取元素的方式和超时规则略微不同,本质没有大的区别。
1 | public E remove() { |
取元素就各有各的玩法了,有同步的,有超时的,有立即返回的。
移交元素的方法
1 | public boolean tryTransfer(E e) { |
请注意第二个参数,都是true,也就是这三个方法其实也是放元素的方法。
这里xfer()方法的几种模式到底有什么区别呢?请看下面的分析。
神奇的xfer()方法
1 | private E xfer(E e, boolean haveData, int how, long nanos) { |
这三个方法里的内容特别复杂,很大一部分代码都是在控制线程安全,各种CAS,我们这里简单描述一下大致的逻辑:
(1)来了一个元素,我们先查看队列头的节点,是否与这个元素的模式一样;
(2)如果模式不一样,就尝试让他们匹配,如果头节点被别的线程先匹配走了,就尝试与头节点的下一个节点匹配,如此一直往后,直到匹配到或到链表尾为止;
(3)如果模式一样,或者到链表尾了,就尝试入队;
(4)入队的时候有可能链表尾修改了,那就尾指针后移,再重新尝试入队,依此往复;
(5)入队成功了,就自旋或阻塞,阻塞了就等待被其它线程匹配到并唤醒;
(6)唤醒之后进入下一次循环就匹配到元素了,返回匹配到的元素;
(7)是否需要入队及阻塞有四种情况:
a)NOW,立即返回,没有匹配到立即返回,不做入队操作
对应的方法有:poll()、tryTransfer(e)
b)ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
对应的方法有:add(e)、offer(e)、put(e)、offer(e, timeout, unit)
c)SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
对应的方法有:take()、transfer(e)
d)TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
对应的方法有:poll(timeout, unit)、tryTransfer(e, timeout, unit)
总结
(1)LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体;
(2)LinkedTransferQueue的实现方式是使用一种叫做双重队列
的数据结构;
(3)不管是取元素还是放元素都会入队;
(4)先尝试跟头节点比较,如果二者模式不一样,就匹配它们,组成CP,然后返回对方的值;
(5)如果二者模式一样,就入队,并自旋或阻塞等待被唤醒;
(6)至于是否入队及阻塞有四种模式,NOW、ASYNC、SYNC、TIMED;
(7)LinkedTransferQueue全程都没有使用synchronized、重入锁等比较重的锁,基本是通过 自旋+CAS 实现;
(8)对于入队之后,先自旋一定次数后再调用LockSupport.park()或LockSupport.parkNanos阻塞;
彩蛋
LinkedTransferQueue与SynchronousQueue(公平模式)有什么异同呢?
(1)在java8中两者的实现方式基本一致,都是使用的双重队列;
(2)前者完全实现了后者,但比后者更灵活;
(3)后者不管放元素还是取元素,如果没有可匹配的元素,所在的线程都会阻塞;
(4)前者可以自己控制放元素是否需要阻塞线程,比如使用四个添加元素的方法就不会阻塞线程,只入队元素,使用transfer()会阻塞线程;
(5)取元素两者基本一样,都会阻塞等待有新的元素进入被匹配到;