写在前面
本文主要是针对 ReentrantLock
实现 AQS
的基础上的分析以及对 Condition
的相关分析。
因此建议先了解AQS的实现原理,对 ReentrantLock 原理的理解将更加容易,AQS 详情点击。
一、起源:
1. 什么是可重入锁?
可重入锁
是指当某个线程已经持有了这把锁,但是某个时刻,这个线程还要尝试再次
拿到这把锁,支持这种可重入的实现就是可重入锁;
以 ReentrantLock 可重入锁来看,其 state
表示重入次数,当想要再次拿到这把锁的时候,state + 1
;
当想要释放这把锁的时候state - 1
,因此可以根据 state 是否等于 0 来判断这把锁是否被某个线程锁持有。
2. ReentrantLock的基本用法
先创建 ReentrantLock 对象,然后 lock.lock(),最后 lock.unlock(),即:
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }}
使用上比较简单,推荐的做法是在 finally
块中释放锁,因为这样在出现异常的时候可以及时释放锁资源
3. ReentrantLock如何实现等待/通知模式?
关键字 synchronized 与 wait() 和 notify() / notifyAll() 方法相结合可以实现等待/通知
模式,ReentrantLock 也同样可以借助于 Condition
实现等待/通知模式
Condition 是 JDK1.5 中出现的技术,使用它有更好的灵活性,比如可以实现多路通知功能,也就是在一个 lock 对象里可以创建多个 Condition (即对象监视器)实例,线程对象可以注册在指定的 Condition 中,从而可以有选择性的进行线程通知,在调度线程上更加灵活
在使用 notify()/notifyAll()
方法进行通知时,被通知的线程却是由 JVM 随机选择的;但使用 ReentrantLock 结合 Condition 是可以实现"选择性通知"
在了解 ReentrantLock 实现 AQS 之前,先来看看 AQS 的实现类一般如何去操作...
4. AQS用法
一般要通过 AQS 实现自己的同步器类有以下三步:
新建自己的同步器类
,在内部写一个 Sync
类,该类继承AbstractQueuedSynchronizer,即 AQS
设计同步器类的逻辑,在Sync类里,根据是否独占来重新对应的方法。如果是独占
,则重写 tryAcquire 和 tryRelease 等方法;如果是非独占
,则重写 tryAcquireShared 和 tryReleaseShared 等方法
在自己的同步器类中实现获取/释放
相关方法,并在里面调用 AQS 对应的方法,如果是独占则调用 acquire 或 release 等方法,非独占则调用 acquireShared 或 releaseShared 或 acquireSharedInterruptibly 等方法。
二、实现原理
需要注意的的是,state 在 ReentrantLock 的含义表示的是重入次数
state = 0,表示此锁未被任何线程持有
state > 0, 表示被当前线程重入持有多次,以后每次释放锁都会在 state 上减 1,直到 state = 0
ReentrantLock 是一种独占模式
,相应的会实现 tryAcquire 和 tryRelease 方法。
在 Condition 中有两个名词需要做区分
条件等待队列
,这个队列是由每个 condition 实例自己维护的,也就是说,如果有两个 condition 实例,也就有两个条件等待队列
同步队列
:指的是 AQS 中的 FIFO
等待与唤醒:
condition.await
:等待操作是将节点放入条件等待队列
condition.signal
:唤醒操作是将节点从条件等待队列中移到同步队列中,等待获取资源
另外,ReentrantLock 大部分实现都是由 AQS 完成,在上篇博文中已经对 AQS 做了详细分析,因此这里不在过多重复分析...
1. 内部对象Sync实现AQS
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); // 非公平锁,tryAcquire方法由子类实现 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 拿到当前锁对象的重入次数 int c = getState(); // 如果等于0说明该锁对象没有被任何对象持有 // 这个时候等待队列可能是有等待节点的,只是恰好锁资源在此刻被释放了 if (c == 0) { // 这里尝试去抢这把锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果此锁被当前对象持有,也就是重入操作,累加state else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 尝试释放锁资源 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 检查是不是当前线程获得了锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果重入次数等于0了,说明完全释放了这把锁,其他线程可以获取这把锁了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // 判是否独占模式 return getExclusiveOwnerThread() == Thread.currentThread(); } // 创建Condition对象 final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 重入次数 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // state=0表示此锁未被占用 final boolean isLocked() { return getState() != 0; } } //
具体实现逻辑已经在源码中注释,简单来说,子类(这里的 Sync)需要实现 AQS 提供的抽象方法,来达到自己个性化的需求。
2. NonfairSync 非公平锁
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { // 尝试获取锁 // 这个时候等待队列可能是还有等待节点的,这里取尝试抢一下; // 如果锁资源这个时候刚好被释放了,这里是有可能抢成功的 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 既然没有抢成功,那就老老实实取获取锁资源 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } //
公平锁
与非公平锁
的最大区别在于,非公平锁会尽可能的去抢占资源
(尽管等待队列存在很多等待节点)。
而公平锁,如果等待队列里存在等待节点,那它是不会去抢占资源的,放进队列,然后按先进先出
的顺序去获取资源。
3. FairSync 公平锁实现
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 获取锁,如果拿不到会进入阻塞队列中等待 final void lock() { acquire(1); } // 尝试拿取锁,成功则返回true,失败返回false protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 拿到重入次数 int c = getState(); // 说明这把锁未被任何线程持有,可以尝试获取锁 if (c == 0) { // 和非公平锁的唯一区别是,这里多了hasQueuedPredecessors判断条件 // 意思是:首先判断在等待队列里面没有任何等待节点,它才会尝试取获取资源, // 否则的话,就不去争抢锁资源了,毕竟是先来先服务嘛(保证公平性) if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // CAS设置状态值 // 说明获取锁资源成功了,在锁对象中设置exclusiveOwnerThread=当前线程,表明此锁被当前线程锁住了 setExclusiveOwnerThread(current); return true; } } // 判断是否当前线程持有,是的话,就是重入持有,改变state的值 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } //
4. Condition
在特定条件上等待锁资源
来看个例子:
/** * 使用多个Condition实现通知部分线程 */ public class ReentrantLockExample { private Lock lock = new ReentrantLock(); private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); public void awaitA() { lock.lock(); try { System.out.println("start awaitA at " + System.currentTimeMillis() + " ThreadName:" + Thread.currentThread().getName()); conditionA.await(); System.out.println("end awaitA at " + System.currentTimeMillis() + " ThreadName:" + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void awaitB() { lock.lock(); try { System.out.println("start awaitB at " + System.currentTimeMillis() + " ThreadName:" + Thread.currentThread().getName()); conditionB.await(); System.out.println("end awaitB at " + System.currentTimeMillis() + " ThreadName:" + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void signalAll_A() { lock.lock(); try { System.out.println("signalAll_A at " + System.currentTimeMillis() + " ThreadName:" + Thread.currentThread().getName()); conditionA.signalAll(); } finally { lock.unlock(); } } public void signalAll_B() { lock.lock(); try { System.out.println("signalAll_B at " + System.currentTimeMillis() + " ThreadName:" + Thread.currentThread().getName()); conditionB.signalAll(); } finally { lock.unlock(); } } public static void main(String[] args) { ReentrantLockExample example = new ReentrantLockExample(); Thread a = new Thread(() -> { example.awaitA(); }); a.setName("A"); a.start(); Thread b = new Thread(() -> { example.awaitB(); }); b.setName("B"); b.start(); example.signalAll_A(); /** * print: 从输出结果可以看出只有线程A被唤醒了 * * start awaitA at 1596380331589 ThreadName:A * start awaitB at 1596380331590 ThreadName:B * signalAll_A at 1596380331590 ThreadName:main * end awaitA at 1596380331590 ThreadName:A */ } } //
在以上例子中,想要实现的效果是使用多个 Condition 实现通知部分线程,也就是将唤醒粒度
变小。
比如说,我现在有 10 个线程,定义了 5 个 condition 对象,每个 condition 上都注册两个线程, 假设某种情况下,10 线程都通过 await 阻塞了,这个时候假如 conditionA 的两个线程可以被唤醒处理其业务了(有可用资源),这个时候我可以做到只唤醒 conditionA 上两个线程,其他线程仍然在阻塞状态;这样就做到了精准唤醒
了。
来看看 Condition 的实现原理:
4.1 newCondition
创建 Condition 对象,这里的 ConditionObject 是 AQS 的内部类
final ConditionObject newCondition() { return new ConditionObject(); }
4.2 await
AbstractQueuedSynchronizer#ConditionObject.await()
也就是我们例子中通过 conditionA.await() 进入阻塞
状态,等待其他线程调用 signalAll 或者 signal唤醒
public final void await() throws InterruptedException { // 如果当前线程已经被中断了,就响应中断(也就是抛出异常) if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程包装成Node放入条件等待队列的队尾 Node node = addConditionWaiter(); // 同时还有释放当前线程已获取的资源 int savedState = fullyRelease(node); int interruptMode = 0; // 如果当前节点不在同步队列里,那就将线程挂起 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 说明当前节点已经从条件队列移到了同步队列中(也就是从await状态被signal唤醒之后,可以尝试获取锁资源进行后续操作了) // 从上面被挂起的地方被唤醒之后,尝试去获取锁资源,如果获取失败,那就会进入等待队列中 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 记录中断信息 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //
主要操作
将当前线程包装成 node 节点之后放入条件队列
释放当前线程占用的资源
挂起当前线程
当线程被 signal 或者 signalAll 唤醒之后,从条件队列移到同步队列,并尝试获取锁资源
4.2.1 addConditionWaiter
AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
将当前线程包装成 Node 放入条件等待队列
的队尾
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { // 从条件等待队列中移除取消的节点 unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } //
4.2.2 unlinkCancelledWaiters
AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters
从条件等待队列中移除
被取消的节点
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } //
4.2.3 fullyRelease 释放资源
AbstractQueuedSynchronizer#fullyRelease
final int fullyRelease(Node node) { boolean failed = true; try { // 拿到当前线程锁占用的资源,然后释放 int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }//
4.2.4 isOnSyncQueue
AbstractQueuedSynchronizer#isOnSyncQueue
判断当前节点是否在同步器队列中,如果是的话说明当前节点正在等待获取资源
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); // 非公平锁,tryAcquire方法由子类实现 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 拿到当前锁对象的重入次数 int c = getState(); // 如果等于0说明该锁对象没有被任何对象持有 // 这个时候等待队列可能是有等待节点的,只是恰好锁资源在此刻被释放了 if (c == 0) { // 这里尝试去抢这把锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果此锁被当前对象持有,也就是重入操作,累加state else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 尝试释放锁资源 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 检查是不是当前线程获得了锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果重入次数等于0了,说明完全释放了这把锁,其他线程可以获取这把锁了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // 判是否独占模式 return getExclusiveOwnerThread() == Thread.currentThread(); } // 创建Condition对象 final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 重入次数 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // state=0表示此锁未被占用 final boolean isLocked() { return getState() != 0; } } //0
4.3 signal 唤醒在条件等待队列中的节点
AbstractQueuedSynchronizer.ConditionObject#signal
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); // 非公平锁,tryAcquire方法由子类实现 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 拿到当前锁对象的重入次数 int c = getState(); // 如果等于0说明该锁对象没有被任何对象持有 // 这个时候等待队列可能是有等待节点的,只是恰好锁资源在此刻被释放了 if (c == 0) { // 这里尝试去抢这把锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果此锁被当前对象持有,也就是重入操作,累加state else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 尝试释放锁资源 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 检查是不是当前线程获得了锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果重入次数等于0了,说明完全释放了这把锁,其他线程可以获取这把锁了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // 判是否独占模式 return getExclusiveOwnerThread() == Thread.currentThread(); } // 创建Condition对象 final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 重入次数 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // state=0表示此锁未被占用 final boolean isLocked() { return getState() != 0; } } //1
4.3.1 doSignal
AbstractQueuedSynchronizer.ConditionObject#doSignal
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); // 非公平锁,tryAcquire方法由子类实现 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 拿到当前锁对象的重入次数 int c = getState(); // 如果等于0说明该锁对象没有被任何对象持有 // 这个时候等待队列可能是有等待节点的,只是恰好锁资源在此刻被释放了 if (c == 0) { // 这里尝试去抢这把锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果此锁被当前对象持有,也就是重入操作,累加state else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 尝试释放锁资源 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 检查是不是当前线程获得了锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果重入次数等于0了,说明完全释放了这把锁,其他线程可以获取这把锁了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // 判是否独占模式 return getExclusiveOwnerThread() == Thread.currentThread(); } // 创建Condition对象 final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 重入次数 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // state=0表示此锁未被占用 final boolean isLocked() { return getState() != 0; } } //2
signalAll 和 signal 原理类似,只不过一个是唤醒所有在当前 condition 上等待的节点,另一个是只唤醒一个,这里不在赘述。
相关文档:
JAVA并发基石之AQS --- 心法篇
原文:https://juejin.cn/post/7100485385595125773