Java并发编程(二十一)独占锁 ReentrantLock 的原理

逆流者 2021年06月10日 15次浏览

此文为读书笔记,欢迎评论,讨论问题,共同进步!


简介

ReentrantLock 是可重入的独占锁,同时只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞而被放入该锁的 AQS 阻塞队列里面。

类图:
在这里插入图片描述

从类图可以看到,ReentrantLock 是使用 AOS 来实现的,并且根据参数来决定其内部是一个公平还是非公平锁,默认是非公平锁。

// 默认非公平锁
public ReentrantLock() {
        sync = new NonfairSync();
    }

// fair 为 true 时, 为公平锁, 反之为非公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

其中 Sync 类直接继承自 AQS,它的子类 NonfairSync 和 FairSync 分别实现了获取锁的非公平与公平策略。

在这里,AQS 的state 状态值表示线程获取该锁的可重入次数,在默认情况下,state的值为 0 表示当前锁没有被任何线程持有。当一个线程第一次获取该锁时会尝试使用 CAS设置 state 的值为1,如果 CAS 成功则当前线程获取了该锁,然后记录该锁的持有者为当前线程。在该线程没有释放锁的情况下第二次获取该锁后,状态值被设置为 2,这就是可重入次数。在该线程释放该锁时,会尝试使用 CAS 让状态值减 1,如果减1 后状态值为 0,则当前线程释放该锁。

获取锁

1. void lock() 方法

public void lock() {
    sync.lock();
}

ReentrantLock 的 lock() 委托给了sync类,根据创建 ReentrantLock 构造函数选择 sync 的实现是 NonfairSync 还是 FairSync,这个锁是一个非公平锁或者公平锁。这里先看 sync 的子类 NonfairSync 的情况,也就是非公平锁时。

// 非公平锁实现
final void lock() {
	// (1) CAS设置状态值
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
    	// (2) 调用 AQS acquire 方法
        acquire(1);
}
  • 代码(1)中,AQS 的状态值初始默认值为 0,第一个调用 Lock 的线程会通过 CAS 设置状态值为1,CAS 成功则表示当前线程获取到了锁,然后 setExclusiveOwnerThread 设置该锁持有者是当前线程。
  • 如果这时候有其他线程调用 lock 方法企图获取该锁,CAS 会失败,然后会调用 代码 (2) AQS的 acquire 方法。注意,传递参数为1.

继续往下看 AQS 的 acquire 的核心代码:

// AbstractQueuedSynchronizer 类
public final void acquire(int arg) {
	// (3) 调用 ReentrantLock 重写的 tryAcquire 方法
    if (!tryAcquire(arg) &&
    	// (4) tryAcquire 返回 false 会把当前线程放入 AQS 阻塞队列
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

AQS 并没有提供可用的 tryAcquire 方法,代码(3)会调用ReentrantLock 重写的 tryAcquire方法.

接着看下非公平锁的代码:

protected final boolean tryAcquire(int acquires) {
	return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // (5) 如果 AQS 的状态为0, 说明没有线程占用锁
    if (c == 0) {
    	// 直接采用cas 方式修改state + 1, 进行抢占锁
        if (compareAndSetState(0, acquires)) {
        	// 抢占成功后设置锁的持有者为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果当前线程是该锁的持有者
    else if (current == getExclusiveOwnerThread()) {
    	// 状态值 + 1
        int nextc = c + acquires;
        // nextc < 0 说明可重入次数溢出了。
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 因为当前线程持有锁, 所以这里可以放心set 状态值state
        setState(nextc);
        return true;
    }
    return false;
}

上面nonfairTryAcquire(int acquires) 方法, 不知道读者是否看出了非公平锁的逻辑, 这里再描述一下, 比如有两个线程 A 和 B, 线程A 调用nonfairTryAcquire(int acquires) 方法, 通过cas 抢占锁失败, 发现当前锁有其他线程占用, 返回了 false, 被放入了AQS 阻塞队列, 这时 线程B又过来抢占锁发现该锁没有被占用, 直接用cas抢占. 由此可知, 先来的不一定能先占用锁, 所以这是非公平的.

看了非公平锁的源码后, 那公平锁怎么做的呢?
公平锁的话只需要看 FairSync 重写的 tryAcquire 方法。

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 当前AQS状态值为 0 
    if (c == 0) {
    	// 公平性策略
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果当前线程是该锁的持有者
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平锁和非公平锁的tryAcquire 方法类似, 公平锁多了公平性策略, 下面看下 hasQueuedPredecessors方法具体逻辑:

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

代码中,如果当前线程节点有前驱节点则返回 true,否则如果当前 AOS 队列为空或者当前线程节点是 AQS 的第一个节点则返回 false。

  • 如果 h==t 则说明当前队列为空,直接返回 false;
  • 如果 h!=t 并且s==null 则说明有一个元素将要作为 AQS 的第一个节点入队列(回顾前面的内容,enq 函数的第一个元素入队列是两步操作∶首先创建一个哨兵头节点,然后将第一个元素插入哨兵节点后面),那么返回 true;
  • 如果h!=t 并且s!=nul和 s.thread != Thread.currentThread() 则说明队列里面的第一个元素不是当前线程,那么返回 true。

2. void lockInterruptibly() 方法

该方法与 lock() 方法类似,它的不同在于,它对中断进行响应,就是当前线程在调用该方法时,如果其他线程调用了当前线程的 interrupt() 方法,则当前线程会抛出 InterruptedException 异常,然后返回。

public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }		
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
     // /如采当前线程被中断, 则直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取资源
    if (!tryAcquire(arg))
    	// 调用AQS可被中断的方法
        doAcquireInterruptibly(arg);
}

3. boolean tryLock() 方法

尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁并返回 true,否则返回 false。注意,该方法不会引起当前线程阻塞。

public boolean tryLock() {
	return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

看上面的代码可知, tryLock() 使用的是非公平策略。

4. boolean tryLock(long timeout, TimeUnit unit) 方法

尝试获取锁,与 tryLock() 的不同之处在于,它设置了超时时间,如果超时时间到没有获取到该锁则返 false

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}	

释放锁

1.void unlock() 方法

尝试释放锁,如果当前线程持有该锁,则调用该方法会让该线程对该线程持有的 AQS状态值减 1,

如果减去1后当前状态值为0,则当前线程会释放该锁,否则仅仅减1而已。

如果当前线程没有持有该锁而调用了该方法则会抛出 IegalMonitorStateException异常,代码如下。

public void unlock() {
 sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 如果不是锁持有者调用unlock 则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果当前可重入次数为0,则清空锁持有线程
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

案例

使用 ReentrantLock来实现一个简单的线程安全的 list

public class ReentrantLockList {

    // 线程不安全的list
    private List<String> array = new ArrayList<>();

    // 独占锁
    private volatile ReentrantLock lock = new ReentrantLock();

    /**
     * 添加元素
     * @param e 元素值
     */
    public void add(String e) {
        lock.lock();
        try {
            array.add(e);
        } finally {
            lock.unlock();
        }
    }

    /**
     * 删除元素
     * @param e 元素
     */
    public void remove(String e) {
        lock.lock();
        try {
            array.remove(e);
        } finally {
            lock.unlock();
        }
    }

    /**
     * 获取数据
     * @param index 数据所在的下标
     * @return 数据
     */
    public String get(int index) {
        lock.lock();
        try {
            return array.get(index);
        } finally {
            lock.unlock();
        }
    }
    
}

在操作 array 元素前进行加锁保证同一时间只有一个线程可以对 array 数组进行修改,只能有一个线程对 array 元素进行访问。

总结

假 如 线 程 Thread1、Thread2 和 Thread3 同时 尝试 获 取 独 占锁 ReentrantLock,假设 Thread1 获取到了,则 Thread2 和 Thread3 就会被转换为 Node 节点并被放入 ReentrantLock 对应的 AQS 阻塞队列,而后被阻塞挂起。

在这里插入图片描述

假设 Thread1 获取锁后调用了对应的锁创建的条件变量1,那么 Thread1 就会释放获取到的锁,然后当前线程就会被转换为 Node 节点插入条件变量1 的条件队列。由于Thread1 释放了锁,所以阻塞到 AQS 队列里面的 Thread2 和 Thread3 就有机会获取到该锁,假如使用的是公平策略,那么这时候 Thread2 会获取到该锁,从而从 AQS 队列里面移除 Thread2 对应的 Node 节点。

在这里插入图片描述

ReentrantLock 是 独占锁,某时刻只有一个线程可以获取该锁。底层是使用 AQS 实现的可重入独占锁。在这里 AQS 状态值为 0 表示当前锁空闲,大于等于1的值则说明该锁已经被占用。该锁内部有公平与非公平实现,默认情况下是非公平的实现。