此文为读书笔记,欢迎评论,讨论问题,共同进步!
锁的底层支持
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用 AQS实现的。另外,大多数开发者可能永远不会直接使用 AQS,但是知道其原理对于架构设计还是很有帮助的。下面看下AQS 的类图结构,如图所示。
head 和 tail 变量的 Node类型
由该图可以看到,AQS 是一个 FIFO(First Input First Output),即先进先出的双向队列,其内部通过节点 head和 tail 记录队首和队尾元素,队列元素的类型为 Node。
AbstractQueuedSynchronizer
// 队首元素
private transient volatile Node head;
// 队尾元素
private transient volatile Node tail;
AbstractQueuedSynchronizer 内部类 Node
- Node 中的 thread 变量用来存放进入 AQS队列里面的线程;
- Node 节点内部的 SHARED 用来标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的
- EXCLUSIVE 用来标记线程是获取独占资源时被挂起后放入 AQS 队列的;
- waitStatus 记录当前线程等待状态
可以为 CANCELLED(线程被取消了)、 SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点); - prev 记录当前节点的前驱节点;
- next 记录当前节点的后继节点。
static final class Node {
/** SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的 */
static final Node SHARED = new Node();
/** EXCLUSIVE 用来标记线程是获取独占资源时被挂起后放入 AQS 队列的 */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
// 线程被取消了
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 线程需要被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 线程在条件队列里面等待
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// 释放共享资源时需要通知其他节点
static final int PROPAGATE = -3;
/**
waitStatus 记录当前线程等待状态,
可以为
CANCELLED(线程被取消了)、
SIGNAL(线程需要被唤醒)、
CONDITION(线程在条件队列里面等待)、
PROPAGATE(释放共享资源时需要通知其他节点)
*/
volatile int waitStatus;
/**
prev 记录当前节点的前驱节点
*/
volatile Node prev;
/**
next 记录当前节点的后继节点
*/
volatile Node next;
// thread变量用来存放进入 AQS队列里面的线程
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
状 态 信 息 state
在 AQS 中 维 持了一 个 单 一 的 状 态 信 息 state,可 以 通 过 getState、setState、 compareAndSetState 函数修改其值。
// 表示当前线程获取锁的可重入次数
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
-
对于ReentrantLock 的实现来说,state 可以用来表示当前线程获取锁的可重入次数;
-
对于读写锁 ReentrantReadWriteLock 来说,state 的高 16位表示读状态,也就是获取该读锁的次数,低 16 位表示获取到写锁的线程的可重入次数;
-
对于semaphore(信号量)来说,state 用来表示当前可用信号的个数;
-
对于CountDownlatch来说, state 用来表示计数器当前的值。
对于 AQS 来说,线程同步的关键是对状态值 state 进行操作。
根据 state 是否属于一个线程,操作 state 的方式分为独占方式和共享方式。
独占方式
在独占方式下获取和释放资源使用的方法为∶
- void acquire(int arg)
- void acquireInterruptibly(int arg)
- boolean release(int arg)
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作 state 获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。
比如 独占锁 ReentrantLock 的实现,当一个线程获取了ReentrantLock 的锁后,在 AQS 内部会首先使用 CAS 操作把 state 状态值从 0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从1变为2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。
在独占方式下,获取与释放资源的流程:
- 当一个线程调用 acquire(int arg) 方法获取独占资源时,会首先使用 tryAcquire 方法尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线程封装为类型为 Node.EXCLUSIVE 的 Node 节点后插入到 AQS 阻塞队列的尾部,并调用 LockSupport.park(this)方法挂起自己。
publicfinal void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 为当前线程和给定模式创建和排队节点。
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试enq的快速路径;备份到失败的完整请求
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* 以独占不间断模式获取已在队列中的线程。由条件等待方法以及获取使用。
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
addWaiter(Node mode) 方法中 enq(node) 的调用就是将 node 节点插入到 AQS 的阻塞队列,后面详细介绍下。
- 当一个线程调用 release(int arg) 方法时会尝试使用 tryRelease 操作释放资源,这里是设置状态变量 state 的值,然后调用LockSupport.unpark(thread) 方法激活 AQS 队列里面被阻塞的一个线程 (thread)。被激活的线程则使用 tryAcquire 尝试,看当前状态变量 state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入 AQS 队列并被挂起。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
需要注意的是
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
AQS 类并没有提供可用的 tryAcquire 和 tryRelease 方法,正如 AQS是锁阻塞和同步器的基础框架一样,tryAcquire 和tryRelease 需要由具体的子类来实现。
子类在实现 tryAcquire 和 tryRelease 时需要做什么?
- 根据具体场景使用CAS算法尝试修改 state状态值,成功则返回 true,否则返回 false。
- 子类还需要定义,在调用 acquire 和 release方法时 state状态值的增减代表什么含义。
比如继承自 AQS 实现的独占锁 ReentrantLock:
- 定义当 status 为 0时表示锁空闲,为 1 时表示锁已经被占用。
- 在重写 tryAcquire 时,在内部需要使用 CAS 算法查看当前 state是否为0,如果为0则使用CAS 设置为1,并设置当前锁的持有者为当前线程,而后返回true,如果 CAS 失败则返回 false。
- 在重写 tryRelease 时,在内部需要使用 CAS 算法把当前 state 的值从1修改为 0,并设置当前锁的持有者为 null,然后返回 true,如果 CAS 失败则返回 false。
共享方式
在 共 享 方 式下 获 取 和 释 放 资 源 的 方 法 为∶
- void acquireShared(int arg)
- void acquireSharedInterruptibly(int arg)
- boolean releaseShared(int arg)
对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过 CAS 方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。
比如 Semaphore 信号量,当一个线程通过 acquire() 方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋 CAS 获取信号量。
在共享方式下,获取与释放资源的流程如下 ∶
- 当线程调用 acquireShared(int arg) 获取共享资源时,会首先使用 tryAcquireShared 尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列的尾部,并使用 LockSupport.park(this) 方法挂起自己。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 在共享不间断模式下获取
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
// LockSupport.park(this) 方法挂起自己
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
从源码中可以看到 addWaiter(Node.SHARED) 这个方法,共享模式和独占模式调用的是同一个方法,只是传入的节点的含义不同,共享模式,传入的 Node.SHARED , 独占模式传入的 Node.EXCLUSIVE,前面已经描述过这两个的含义,不知道读者有没有注意到,这里再贴一下,加深对源码的理解:
static final class Node {
/** SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的 */
static final Node SHARED = new Node();
/** EXCLUSIVE 用来标记线程是获取独占资源时被挂起后放入 AQS 队列的 */
static final Node EXCLUSIVE = null;
................
}
- 当一个线程调用 releaseShared(int arg) 时会尝试使用 tryReleaseShared 操作释放资源,这里是设置状态变量 state 的值,然后使用LockSupport.unpark(thread) 激活 AQS 队列里面被阻塞的一个线程 (thread)。被激活的线程则使用 tryReleaseShared 查看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入 AQS 队列并被挂起。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
同样需要注意的是
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
AQS 类并没有提供可用的 tryAcquireShared 和 tryReleaseShared方法,正如 AQS 是锁阻塞和同步器的基础框架一样,tryAcquireShared 和 tryReleaseShared需要由具体的子类来实现。子类在实现 tryAcquireShared 和 tryReleaseShared 时要根据具体场景使用 CAS 算法尝试修改state 状态值,成功则返回 true,否则返回 false。
比 如 继 承 自 AQS 实 现 的 读写锁 ReentrantReadWriteLock:
- 读 锁 在 重 写 tryAcquireShared 时,首先查看写锁是否被其他线程持有,如果是则直接返回 false,否则使用 CAS 递增 state 的高 16 位(在 ReentrantReadWriteLock 中,state 的高 16 位为获取读锁的次数)。
- 读 锁 在 重 写 tryReleaseShared时,在内部需要使用 CAS 算法把当前 state 值的高 16 位减 1,然后返回 true,如果 CAS 失败则返回 false。
AQS 队列的入队操作
入队操作∶当一个线程获取锁失败后该线程会被转换为 Node 节点,然后就会使用enq(final Node node)方法将该节点插入到 AQS 的阻塞队列。
独占以及共享模式都会调用 enq(final Node node) 方法,这里就详细介绍下入队操作的实现原理:
前面贴过源码:
// 队首元素
private transient volatile Node head;
// 队尾元素
private transient volatile Node tail;
private Node enq(final Node node) {
// 无限循环,直到满足if (compareAndSetTail(t, node)) 条件,return 返回
// 没执行之前 默认 队列头head、尾节点tail 都指向 null
for (;;) {
Node t = tail; // 节点t指向尾部节点,第一次循环时 t 为 null
if (t == null) { // 必须初始化,第一次循环
if (compareAndSetHead(new Node())) // 使用 CAS 算法设置一个new Node() 哨兵节点为头节点
tail = head; // 这时,head、tail 都指向了 哨兵节点
} else {
node.prev = t; // 新插入的 node 的前驱节点指向哨兵节点,(这里node.prev 、head、tail、t 变量都指向了哨兵节点)
if (compareAndSetTail(t, node)) { // 通过 CAS 算法设置 node 节点为尾部节点, 也就是tail变量指向 新插入的 node 节点
t.next = node; // 设置原来的尾部节点的后驱节点为 node, 也就是哨兵节点的next 指向 新插入的node 节点,到此,双向链表的插入工作就完成了。
return t; // 返回 head 节点,因为这时 t和head 变量都指向 哨兵节点
}
}
}
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
看完上面代码分析,如果觉得没有明白,结合下面作者书中的图看就明白了:
其他
基于 AQS 实现的锁除了需要重写上面介绍的方法外 ,还需要重写 isHeldExclusively方法,来判断锁是被当前线程独占还是被共享。
独占方式下的 void acquire(int arg) 和 void acquireInterruptibly(int arg),与共享方式下的 void acquireShared(int arg) 和 void acquireSharedInterruptibly(int arg),这两套函数中都有一个带有 Interruptibly 关键字的函数,那么带这个关键字和不带有什么区别呢?
- 不带 Interruptibly 关键字的方法的意思是不对中断进行响应,也就是线程在调用不带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。
- 带 Interruptibly 关键字的方法要对中断进行响应,也就是线程在调用带 Interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程会抛出InterruptedException 异常而返回。
条件变量的支持
不同于 Object 中 notify 和 wait,是配合 synchronized 内置锁实现线程间同步的基础设施一样,条件变量的 signal和 await 方法也是用来配合锁(使用 AOS 实现的锁)实现线程间同步的基础设施。
不同在于:
- synchronized 同时只能与一个共享变量的notify或wait方法实现同步,而 AQS 的一个锁可以对应多个条件变量。
- 在调用共享变量的 notify 和 wait 方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的 signal 和 await方法前也必须先获取条件变量对应的锁。
AQS 有个内部类 ConditionObject,用来结合锁实现线程同步。ConditionObject 可以直接访问 AQS 对象内部的变量,比如 state状态值和 AQS 队列。
ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的 await 方法后被阻塞的线程,如最上面的类图所示,这个条件队列的头、尾元素分别为 firstWaiter 和 lastWaiter。
下面看一个条件变量例子:
这是一段伪代码,并不具备执行的可行性,只为了分析源码需要。
ReentrantLock lock = new ReentrantLock(); // 1
Condition condition = lock.newCondition(); // 2
lock.lock(); // 3
try {
System.out.println("begin wait");
condition.await(); // 4
System.out.println("end wait");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 5
}
lock.lock(); // 6
try {
System.out.println("begin signal");
condition.signal(); // 7
System.out.println("end signal");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 8
}
上述代码中标注了 步骤,下面具体阐述下:
- 代码1 创建了一个独占锁ReentrantLock 对象,ReentrantLock是基于AQS 实现的锁。
- 代码2 使用创建的Lock 对象的 newCondition() 方法创建了一个 ConditionObject变量,这个变量就是Lock 锁对应的一个条件变量。需要注意的是,一个 Lock 对象可以创建多个条件变量。
- 代码3 首先获取了独占锁
- 代码4 则调用了条件变量的 await() 方法阻塞挂起了当前线程。当其他线程调用条件变量的 signal方法时,被阻塞的线程才会从 await处返回。需要注意的是,和调用 Object 的 wait 方法一样,如果在没有获取到锁前调用了条件变量的 await 方法则会抛出 java.lang.IlegalMonitorStateException异常。
- 代码5 则释放了获取的锁。
在上面代码中,lock.newCondition() 的作用其实是 new 了一个在 AOS 内部声明的ConditionObject 对象,AQS 只提供了ConditionObject 的实现,并没有提供 newCondition 函数,该函数用来 new一个ConditionObject对象。需要由AQS 的子类来提供 newCondition函数。
比如 ReentrantLock 类中
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject是 AQS 的内部类,可以访问 AQS 内部的变量(例如状态变量 state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的 await() 方法时被阻塞的线程。注意这个条件队列和 AQS 队列不是一回事。
AQS 的内部类 ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列的头 */
private transient Node firstWaiter;
/** 条件队列的尾 */
private transient Node lastWaiter;
.....
当线程调用条件变量的 await() 方法时(必须先调用锁的 lock, 也就是先上锁) 方法获取锁)
- 在内部会构造一个类型为 Node.CONDITION 的 node 节点,然后将该节点插入条件队列末尾,
- 当前线程会释放获取的锁(也就是会操作锁对应的 state 变量的值),并被阻塞挂起。
- 这时候如果有其他线程调用 lock.lock() 尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的 await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在 await() 方法处阻塞。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建新 node 点,并插入到条件队列末尾
Node node = addConditionWaiter();
// 释放当前线程获取到锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 调用 park方法 阻塞拉起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
在如下代码中,当另外一个线程调用条件变量的 signal 方法时(必须先调用锁的 lock() 方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入 AQS 的阻塞队列里面,然后激活这个线程。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
下面来看当一个线程调用条件变量的await()方法 而被阻塞后,如何将其放入条件队列。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果 lastWaiter 被取消,清除。
// Node.CONDITION 线程在条件队列里面等待状态
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 根据当前线程创建一个类型为 Node.CONDITION 的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 把node 插入到队列尾部
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
注意∶当多个线程同时调用lock.lock() 方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为 Node 节点插入到lock锁对应的 AQS 阻塞队列里面,并做自旋 CAS 尝试获取锁。
如果获取到锁的线程又调用了对应的条件变量的 await()方法,则该线程会释放获取到的锁,并被转换为 Node 节点插入到条件变量对应的条件队列里面。
这时候因为调用 lock.lock() 方法被阻塞到 AQS 队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的 await() 方法则该线程也会被放入条件变量的条件队列里面。
当另外一个线程调用条件变量的 signal() 或者 signalAll() 方法时,会把条件队列里面的一个或者全部 Node 节点移动到 AQS 的阻塞队列里面,等待时机获取锁。
见下图∶一个锁对应一个 AOS 阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。
基于 AQS 实现自定义同步器
基于 AQS 实现一个不可重入的独占锁,自定义 AQS 需要重写一系列函数,还需要定义原子变量 state 的含义。这里定义,state 为0表示目前锁没有被线程持有,state 为1表示锁已经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。并且自定义的锁支持条件变量。
代码实现
如下代码是基于 AOS 实现的不可重入的独占锁。
public class MyNonReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 897066294828641749L;
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -256122133286202513L;
/**
* 是否持有锁
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
/**
* 如果state 为 0 ,尝试获取锁
*/
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 尝试释放锁,设置state 为 0
*/
@Override
protected boolean tryRelease(int arg) {
assert arg == 1;
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 提供条件变量接口
Condition newCondition() {
return new ConditionObject();
}
}
/**
* 创建 Sync 来做具体的工作
*/
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
在如上代码中,NonReentrantLock 定义了一个内部类 Sync 用来实现具体的锁的操作,Sync 则继承了 AQS。由于我们实现的是独占模式的锁,所以 Sync 重写了tryAcquire、 tryRelease和 isHeldExclusively3个方法。另外,Sync 提供了newCondition 这个方法用来支持条件变量。
使用自定义锁实现生产—消费模型
下面我们使用上节自定义的锁实现一个简单的生产—消费模型,代码如下。
public class SimpleProducerConsumerModel {
final static MyNonReentrantLock lock = new MyNonReentrantLock();
final static Condition producerCondition = lock.newCondition();
final static Condition consumerCondition = lock.newCondition();
final static Queue<String> queue = new LinkedBlockingQueue<>();
final static int queueSize = 10;
public static void main(String[] args) {
// 生产者
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
// 获取独占锁
lock.lock();
try {
// 队列满了,生产者线程等待
while (queue.size() == queueSize) {
producerCondition.await();
}
// 添加元素到队列
queue.add("element");
// 唤醒消费线程
consumerCondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
});
// 消费者
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
// 获取独占锁
lock.lock();
try {
// 队列空,消费者线程等待
while (queue.size() == 0) {
consumerCondition.await();
}
// 消费一个元素
queue.poll();
// 唤醒生产线程
producerCondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
});
// 启动线程
producer.start();
consumer.start();
}
}
如上代码首先创建了MyNonReentrantLock的一个对象 lock,然后调用 lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。
在 main 函数里面,首先创建了producer 生产线程,在线程内部首先调用 lock.lock() 获取独占锁,然后判断当前队列是否已经满了,如果满了则调用 producerCondition.await() 阻塞挂起当前线程。需要注意的是,这里使用while 而不是 if是为了避免虚假唤醒。如果队列不满则直接向队列里面添加元素,然后调用 consumerCondition.signalAll() 唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。
然后在 main 函数里面创建了consumer 线程,在线程内部首先调用 lock.lock() 获取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用 consumerCondition.await() 阳塞挂起当前线程。需要注意的是,这里使用 while 而不是 if是为了避免虚假唤醒。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。