作者 | 李新然 来源 | 阿里技术公众号
一 背景
JUC 工具包是 JAVA 并发编程的利器。
本文讲述在没有 JUC 工具包帮助下,借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。
希望你也能在文后体会到并发编程的复杂之处,以及 JUC 工具包的强大。
二 方法
本文使用到的基本工具:
同步监听器 synchronized ,方法基本和代码块级别;
Object 基础类的 wait, notify, notifyAll;
基于以上基础工具,实现公平有界的阻塞队列,此处:
将公平的定义限定为 FIFO ,也就是先阻塞等待的请求,先解除等待;
并不保证解除等待后执行 Action 的先后顺序;
确保队列的大小始终不超过设定的容量;但阻塞等待的请求数不做限制;
三 实现
1 基础版本
首先,考虑在非并发场景下,借助 ADT 实现一个基础版本
interfaceQueue{booleanoffer(Objectobj);Objectpoll();}classFairnessBoundedBlockingQueueimplementsQueue{//当前大小protectedintsize;//容量protectedfinalintcapacity;//头指针,empty:head.next==tail==nullprotectedNodehead;//尾指针protectedNodetail;publicFairnessBoundedBlockingQueue(intcapacity){this.capacity=capacity;this.head=newNode(null);this.tail=head;this.size=0;}//如果队列已满,通过返回值标识publicbooleanoffer(Objectobj){if(size<capacity){Nodenode=newNode(obj);tail.next=node;tail=node;++size;returntrue;}returnfalse;}//如果队列为空,head.next==null;返回空元素publicObjectpoll(){if(head.next!=null){Objectresult=head.next.value;head.next.value=null;head=head.next;//丢弃头结点--size;returnresult;}returnnull;}classNode{Objectvalue;Nodenext;Node(Objectobj){this.value=obj;next=null;}}}
以上
定义支持队列的两个基础接口, poll 和 offer;
队列的实现,采用经典实现;
考虑在队列空的情况下, poll 返回为空,非阻塞;
队列在满的情况下, offer 返回 false ,入队不成功,无异常;
需要注意的一点:在出队时,本文通过迁移头结点的方式实现,避免修改尾结点。 在下文实现并发版本时,会看到此处的用意。
2 并发版本
如果在并发场景下,上述的实现面临一些问题,同时未实现给定的一些需求。
通过添加 synchronized ,保证并发条件下的线程安全问题。
注意此处做同步的原因是为了保证类的不变式。
并发问题
在并发场景下,基础版本的实现面临的问题包括:原子性,可见性和指令重排的问题。
参考 JMM 的相关描述。
并发问题,最简单的解决方法是:通过 synchronized 加锁,一次性解决问题。
//省略接口定义classBoundedBlockingQueueimplementsQueue{//当前大小protectedintsize;//容量protectedfinalintcapacity;//头指针,empty:head.next==tail==nullprotectedNodehead;//尾指针protectedNodetail;publicBoundedBlockingQueue(intcapacity){this.capacity=capacity;this.head=newNode(null);this.tail=head;this.size=0;}//如果队列已满,通过返回值标识publicsynchronizedbooleanoffer(Objectobj){if(size<capacity){Nodenode=newNode(obj);tail.next=node;tail=node;++size;returntrue;}returnfalse;}//如果队列为空,head.next==null;返回空元素publicsynchronizedObjectpoll(){if(head.next!=null){Objectresult=head.next.value;head.next.value=null;head=head.next;//丢弃头结点--size;returnresult;}returnnull;}//省略Node的定义}
以上,简单粗暴的加 synchronized 可以解决问题,但会引入新的问题:系统活性问题(此问题下文会解决)。
同时,简单加 synchronized 同步是无法实现阻塞等待;即
如果队列为空,那么出队的动作还是会立即返回,返回为空;
如果队列已满,那么入队动作还是会立即返回,返回操作不成功;
实现阻塞等待,需要借助 JAVA 中的 PV 原语:wait, notify, notifyAll 。
参考:JDK 中对 wait, notify, notifyAll 的相关描述。
卫式方法
阻塞等待,可以通过简单的卫式方法来实现,此问题本质上可以抽象为:
任何一个方法都需要在满足一定条件下才可以执行;
执行方法前需要首先校验不变式,然后执行变更;
在执行完成后,校验是否满足后验不变式;
WHEN(condition) Object action(Object arg) { checkPreCondition(); doAction(arg); checkPostCondition(); }
此种抽象 Ada 在语言层面上实现。在 JAVA 中,借助 wait, notify, notifyAll 可以翻译为:
//当前线程synchronizedObjectaction(Objectarg){while(!condition){wait();}//前置条件,不变式checkPreCondition();doAction();//后置条件,不变式checkPostCondition();}//其他线程synchronizedObjectnotifyAction(Objectarg){notifyAll();}
需要注意:
通常会采用 notifyAll 发送通知,而非 notify ;因为如果当前线程收到 notify 通知后被中断,那么系统将一直等待下去。
如果使用了 notifyAll 那么卫式语句必须放在 while 循环中;因为线程唤醒后,执行条件已经不满足,虽然当前线程持有互斥锁。
卫式条件的所有变量,有任何变更都需要发送 notifyAll 不然面临系统活性问题
据此,不难实现简单的阻塞版本的有界队列,如下
interfaceQueue{booleanoffer(Objectobj)throwsInterruptedException;Objectpoll()throwsInterruptedException;}classFairnessBoundedBlockingQueueimplementsQueue{//当前大小protectedintsize;//容量protectedfinalintcapacity;//头指针,empty:head.next==tail==nullprotectedNodehead;//尾指针protectedNodetail;publicFairnessBoundedBlockingQueue(intcapacity){this.capacity=capacity;this.head=newNode(null);this.tail=head;this.size=0;}//如果队列已满,通过返回值标识publicsynchronizedbooleanoffer(Objectobj)throwsInterruptedException{while(size<capacity){wait();}Nodenode=newNode(obj);tail.next=node;tail=node;++size;notifyAll();//可以出队returntrue;}//如果队列为空,阻塞等待publicsynchronizedObjectpoll()throwsInterruptedException{while(head.next==null){wait();}Objectresult=head.next.value;head.next.value=null;head=head.next;//丢弃头结点--size;notifyAll();//可以入队returnresult;}//省略Node的定义}
以上,实现了阻塞等待,但也引入了更大的性能问题
入队和出队动作阻塞等待同一把锁,恶性竞争;
当队列变更时,所有阻塞线程被唤醒,大量的线程上下文切换,竞争同步锁,最终可能只有一个线程能执行;
需要注意的点:
阻塞等待 wait 会抛出中断异常。关于异常的问题下文会处理;
接口需要支持抛出中断异常;
队里变更需要 notifyAll 避免线程中断或异常,丢失消息;
3 锁拆分优化
以上第一个问题,可以通过锁拆分来解决,即:定义两把锁,读锁和写锁;读写分离。
//省略接口定义classFairnessBoundedBlockingQueueimplementsQueue{//容量protectedfinalintcapacity;//头指针,empty:head.next==tail==nullprotectedNodehead;//尾指针protectedNodetail;//guard:canPollCount,headprotectedfinalObjectpollLock=newObject();protectedintcanPollCount;//guard:canOfferCount,tailprotectedfinalObjectofferLock=newObject();protectedintcanOfferCount;publicFairnessBoundedBlockingQueue(intcapacity){this.capacity=capacity;this.canPollCount=0;this.canOfferCount=capacity;this.head=newNode(null);this.tail=head;}//如果队列已满,通过返回值标识publicbooleanoffer(Objectobj)throwsInterruptedException{synchronized(offerLock){while(canOfferCount<=0){offerLock.wait();}Nodenode=newNode(obj);tail.next=node;tail=node;canOfferCount--;}synchronized(pollLock){++canPollCount;pollLock.notifyAll();}returntrue;}//如果队列为空,阻塞等待publicObjectpoll()throwsInterruptedException{Objectresult=null;synchronized(pollLock){while(canPollCount<=0){pollLock.wait();}result=head.next.value;head.next.value=null;head=head.next;canPollCount--;}synchronized(offerLock){canOfferCount++;offerLock.notifyAll();}returnresult;}//省略Node定义}
以上
定义了两把锁, pollLock 和 offerLock 拆分出队和入队竞争;
入队锁同步的变量为:callOfferCount 和 tail;
出队锁同步的变量为:canPollCount 和 head;
出队的动作:首先拿到 pollLock 卫式等待后,完成出队动作;然后拿到 offerLock 发送通知,解除入队的等待线程。
入队的动作:首先拿到 offerLock 卫式等待后,完成入队的动作;然后拿到 pollLock 发送通知,解除出队的等待线程。
以上实现
确保通过入队锁和出队锁,分别保证入队和出队的原子性;
出队动作,通过特别的实现,确保出队只会变更 head ,避免获取 offerLock;
通过 offerLock.notifyAll 和 pollLock.notifyAll 解决读写竞争的问题;
但上述实现还有未解决的问题:
当有多个入队线程等待时,一次出队的动作会触发所有入队线程竞争,大量的线程上下文切换,最终只有一个线程能执行。
即,还有 读与读 和 写与写 之间的竞争问题。
4 状态追踪解除竞争
此处可以通过状态追踪,解除读与读之间和写与写之间的竞争问题
classFairnessBoundedBlockingQueueimplementsQueue{//容量protectedfinalintcapacity;//头指针,empty:head.next==tail==nullprotectedNodehead;//尾指针protectedNodetail;//guard:canPollCount,headprotectedfinalObjectpollLock=newObject();protectedintcanPollCount;protectedintwaitPollCount;//guard:canOfferCount,tailprotectedfinalObjectofferLock=newObject();protectedintcanOfferCount;protectedintwaitOfferCount;publicFairnessBoundedBlockingQueue(intcapacity){this.capacity=capacity;this.canPollCount=0;this.canOfferCount=capacity;this.waitPollCount=0;this.waitOfferCount=0;this.head=newNode(null);this.tail=head;}//如果队列已满,通过返回值标识publicbooleanoffer(Objectobj)throwsInterruptedException{synchronized(offerLock){while(canOfferCount<=0){waitOfferCount++;offerLock.wait();waitOfferCount--;}Nodenode=newNode(obj);tail.next=node;tail=node;canOfferCount--;}synchronized(pollLock){++canPollCount;if(waitPollCount>0){pollLock.notify();}}returntrue;}//如果队列为空,阻塞等待publicObjectpoll()throwsInterruptedException{Objectresult;synchronized(pollLock){while(canPollCount<=0){waitPollCount++;pollLock.wait();waitPollCount--;}result=head.next.value;head.next.value=null;head=head.next;canPollCount--;}synchronized(offerLock){canOfferCount++;if(waitOfferCount>0){offerLock.notify();}}returnresult;}//省略Node的定义}
以上
通过 waitOfferCount 和 waitPollCount 的状态追踪解决 读写内部的竞争问题;
当队列变更时,根据追踪的状态,决定是否派发消息,触发线程阻塞状态解除;
但,上述的实现在某些场景下会运行失败,面临活性问题,考虑
情况一:
初始状态队列为空 线程 A 执行出队动作,被阻塞在 pollLock , 此时 waitPollCount==1;
此时线程 A 在执行 wait 时被中断,抛出异常, waitPollCount==1 并未被重置;
阻塞队列为空,但 waitPollCount==1 类状态异常;
情况二:
初始状态队列为空 线程 A B 执行出队动作,被阻塞在 pollLock , 此时 waitPollCount==2;
线程 C 执行入队动作,可以立即执行,执行完成后,触发 pollLock 解除一个线程等待 notify;
触发的线程在 JVM 实现中是随机的,假设线程 A 被解除阻塞;
假设线程 A 在阻塞过程中已被中断,阻塞解除后 JVM 检查 interrupted 状态,抛出 InterruptedException 异常;
此时队列中有一个元素,但线程 A 仍阻塞在 pollLock 中,且一直阻塞下去;
以上为解除阻塞消息丢失的例子,问题的根源在与异常处理。
5 解决异常问题
解决线程中断退出的问题,线程校验中断状态的场景
JVM 通常只会在有限的几个场景检测线程的中断状态, wait, Thread.join, Thread.sleep;
JVM 在检测到线程中断状态 Thread.interrupted() 后,会清除中断标志,抛出 InterruptedException;
通常为了保证线程对中断及时响应, run 方法中需要自主检测中断标志,中断线程,特别是对中断比较敏感需要保持类的不变式的场景;
class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity;
//头指针,empty:head.next==tail==nullprotectedNodehead;//尾指针protectedNodetail;//guard:canPollCount,head,waitPollCountprotectedfinalObjectpollLock=newObject();protectedintcanPollCount;protectedintwaitPollCount;//guard:canOfferCount,tail,waitOfferCountprotectedfinalObjectofferLock=newObject();protectedintcanOfferCount;protectedintwaitOfferCount;publicFairnessBoundedBlockingQueue(intcapacity){this.capacity=capacity;this.canPollCount=0;this.canOfferCount=capacity;this.waitPollCount=0;this.waitOfferCount=0;this.head=newNode(null);this.tail=head;}//如果队列已满,通过返回值标识publicbooleanoffer(Objectobj)throwsInterruptedException{if(Thread.interrupted()){thrownewInterruptedException();//线程已中断,直接退出即可,防止中断线程竞争锁}synchronized(offerLock){while(canOfferCount<=0){waitOfferCount++;try{offerLock.wait();}catch(InterruptedExceptione){//触发其他线程offerLock.notify();throwe;}finally{waitOfferCount--;}}Nodenode=newNode(obj);tail.next=node;tail=node;canOfferCount--;}synchronized(pollLock){++canPollCount;if(waitPollCount>0){pollLock.notify();}}returntrue;}//如果队列为空,阻塞等待publicObjectpoll()throwsInterruptedException{if(Thread.interrupted()){thrownewInterruptedException();}Objectresult=null;synchronized(pollLock){while(canPollCount<=0){waitPollCount++;try{pollLock.wait();}catch(InterruptedExceptione){pollLock.notify();throwe;}finally{waitPollCount--;}}result=head.next.value;head.next.value=0;//ignorehead;head=head.next;canPollCount--;}synchronized(offerLock){canOfferCount++;if(waitOfferCount>0){offerLock.notify();}}returnresult;}}//省略Node的定义
以上
当等待线程中断退出时,捕获中断异常,通过 pollLock.notify 和 offerLock.notify 转发消息;
通过在 finally 中恢复状态追踪变量;
通过状态变量追踪可以解决读与读之间和写与写之间的锁竞争问题。
以下考虑如果解决读与读之间和写与写之间的公平性问题。
6 解决公平性
公平性的问题的解决需要将状态变量的追踪转换为:请求监视器追踪。
每个请求对应一个监视器;
通过内部维护一个 FIFO 队列,实现公平性;
在队列状态变更时,释放队列中的监视器;
以上逻辑可以统一抽象为
booleanneedToWait;synchronized(this){needToWait=calculateNeedToWait();if(needToWait){enqueue(monitor);//请求对应的monitor}}if(needToWait){monitor.doWait();}
需要注意
monitor.doWait() 需要在 this 的卫式语句之外,因为如果在内部, monitor.doWait 并不会释放 this锁;
calculateNeedToWait() 需要在 this 的守卫之内完成,避免同步问题;
需要考虑中断异常的问题;
基于以上的逻辑抽象,实现公平队列
//省略接口定义classFairnessBoundedBlockingQueueimplementsQueue{//容量protectedfinalintcapacity;//头指针,empty:head.next==tail==nullprotectedNodehead;//尾指针protectedNodetail;//guard:canPollCount,head,pollQueueprotectedfinalObjectpollLock=newObject();protectedintcanPollCount;//guard:canOfferCount,tail,offerQueueprotectedfinalObjectofferLock=newObject();protectedintcanOfferCount;protectedfinalWaitQueuepollQueue=newWaitQueue();protectedfinalWaitQueueofferQueue=newWaitQueue();publicFairnessBoundedBlockingQueue(intcapacity){this.capacity=capacity;this.canOfferCount=capacity;this.canPollCount=0;this.head=newNode(null);this.tail=head;}//如果队列已满,通过返回值标识publicbooleanoffer(Objectobj)throwsInterruptedException{if(Thread.interrupted()){thrownewInterruptedException();//线程已中断,直接退出即可,防止中断线程竞争锁}WaitNodewait=null;synchronized(offerLock){//在有阻塞请求或者队列为空时,阻塞等待if(canOfferCount<=0||!offerQueue.isEmpty()){wait=newWaitNode();offerQueue.enq(wait);}else{//continue.}}try{if(wait!=null){wait.doWait();}if(Thread.interrupted()){thrownewInterruptedException();}}catch(InterruptedExceptione){offerQueue.doNotify();throwe;}//确保此时线程状态正常,以下不会校验中断synchronized(offerLock){Nodenode=newNode(obj);tail.next=node;tail=node;canOfferCount--;}synchronized(pollLock){++canPollCount;pollQueue.doNotify();}returntrue;}//如果队列为空,阻塞等待publicObjectpoll()throwsInterruptedException{if(Thread.interrupted()){thrownewInterruptedException();}Objectresult=null;WaitNodewait=null;synchronized(pollLock){//在有阻塞请求或者队列为空时,阻塞等待if(canPollCount<=0||!pollQueue.isEmpty()){wait=newWaitNode();pollQueue.enq(wait);}else{//ignore}}try{if(wait!=null){wait.doWait();}if(Thread.interrupted()){thrownewInterruptedException();}}catch(InterruptedExceptione){//传递消息pollQueue.doNotify();throwe;}//以下不会检测线程中断状态synchronized(pollLock){result=head.next.value;head.next.value=0;//ignorehead;head=head.next;canPollCount--;}synchronized(offerLock){canOfferCount++;offerQueue.doNotify();}returnresult;}classWaitQueue{WaitNodehead;WaitNodetail;WaitQueue(){head=newWaitNode();tail=head;}synchronizedvoiddoNotify(){for(;;){WaitNodenode=deq();if(node==null){break;}elseif(node.doNotify()){//此处确保NOTIFY成功break;}else{//ignore,andretry.}}}synchronizedbooleanisEmpty(){returnhead.next==null;}synchronizedvoidenq(WaitNodenode){tail.next=node;tail=tail.next;}synchronizedWaitNodedeq(){if(head.next==null){returnnull;}WaitNoderes=head.next;head=head.next;if(head.next==null){tail=head;//为空,迁移tail节点}returnres;}}classWaitNode{booleanreleased;WaitNodenext;WaitNode(){released=false;next=null;}synchronizedvoiddoWait()throwsInterruptedException{try{while(!released){wait();}}catch(InterruptedExceptione){if(!released){released=true;throwe;}else{//如果是NOTIFY之后收到中断的信号,不能抛出异常;需要做RELAY处理Thread.currentThread().interrupt();}}}synchronizedbooleandoNotify(){if(!released){released=true;notify();//明确释放了一个线程,返回truereturntrue;}else{//没有释放新的线程,返回falsereturnfalse;}}}//省略Node的定义}
以上
核心是替换状态追踪变量为同步节点, WaitNode;
WaitNode 通过简单的同步队列组织实现 FIFO 协议,每个线程等待各自的 WaitNode 监视器;
WaitNode 内部维持 released 状态,标识线程阻塞状态是否被释放,主要是为了处理中断的问题;
WaitQueue 本身是全同步的,由于已解决了读写竞争已经读写内部竞争的问题, WaitQueue 同步并不会造成问题;
WaitQueue 是无界队列,是一个潜在的问题;但由于其只做同步的追踪,而且追踪的通常是线程,通常并不是问题;
最终的公平有界队列实现,无论是入队还是出队,首先卫式语句判定是否需要入队等待,如果入队等待,通过公平性协议等待;
当信号释放时,借助读写锁同步更新队列;最后同样借助读写锁,触发队列更新消息;
7 等待时间的问题
并发场景下,等待通常会设置为限时等待 TIMED_WAITING ,避免死锁或损失系统活性;
实现同步队列的限时等待,并没想象的那么困难
classTimeoutExceptionextendsInterruptedException{}classWaitNode{booleanreleased;WaitNodenext;WaitNode(){released=false;next=null;}synchronizedvoiddoWait(longmilliSeconds)throwsInterruptedException{try{longstartTime=System.currentTimeMillis();longtoWait=milliSeconds;for(;;){wait(toWait);if(released){return;}longnow=System.currentTimeMillis();toWait=toWait-(now-startTime);if(toWait<=0){thrownewTimeoutException();}}}catch(InterruptedExceptione){if(!released){released=true;throwe;}else{//如果已经释放信号量,此处不抛出异常;但恢复中断状态Thread.currentThread().interrupt();}}}synchronizedbooleandoNotify(){if(!released){released=true;notify();returntrue;}else{returnfalse;}}
由于所有的等待都阻塞在 WaitNode 监视器,以上
首先定义超时异常,此处只是为了方便异常处理,继承 InterruptedException;
此处依赖于 wait(long timeout) 的超时等待实现,这通常不是问题;
最后,将 WaitNode 超时等待的逻辑,带入到 FairnessBoundedBlockingQueue 实现中,即可。
四 总结
本文通过一步步迭代,最终借助 JAVA 同步原语实现初版的公平有界队列。迭代实现过程中可以看到以下几点:
观念的转变,将调用一个类的方法思维转换为:在满足一定条件下方法才可以调用,在调用前需要满足不变式,调用后满足不变式;由于并发的问题很难测试,通常要采用卫式表达证明并发的正确性;
在迭代实现中会看到很多模式,比如,读写分离时,其实可以抽象为读锁和写锁;就得到了一个抽象的 Lock 的定义;比如,读写状态追踪,可以采用 Exchanger 抽象表达;
另外,本文的实现远非完善,还需要考虑支持 Iterator 遍历、状态查询及数据迁移等操作;
最后,相信大家再看 JUC 的工具包实现,定有不一样的体会。