AQS(上) 同步队列AQS介绍篇

AQS——锁的底层支持

AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。另外,大多数开发者可能永远不会直接使用AQS,但是知道其原理对于架构设计还是很有帮助的。下面看下AQS的类图结构,如图所示。

image-20200420175900001.png

由该图可以看到,AQS是一个FIFO的双向队列,其内部通过节点headtail记录队首和队尾元素,队列元素的类型为Node

Node节点内部参数介绍:

  • thread变量用来存放进入AQS队列里面的线程。

  • SHARED用来标记该线程是获取资源时被阻塞挂起后放入AQS队列的。

  • EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的。

  • waitStatus记录当前线程等待状态,可以为以下几种:

    CANCELLED(线程被取消了)

    SIGNAL(线程需要被唤醒)

    CONDITION(线程在条件队列里面等待)

    PROPAGATE(释放共享资源时需要通知其他节点)

  • prev记录当前节点的前驱节点。

  • next记录节点的后继节点。

在AQS中维持了一个单一状态的信息state,可以通过getState、setState、compareAndSetState函数修改其值。其具体哪些类中用到了我们可以看一下。

  • 对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数。
  • 对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位标识获取到写锁的线程的可重入次数。
  • 对于semaphore来说,state用来表示当前可用信号的个数。
  • 对于CountDownlatch来说,state用来表示计数器当前的值。

AQS有个内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,如类图所示,这个条件队列的头,尾元素分别为firstWaiter和lastWaiter。

对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。

  • 在独占方式下获取和释放资源使用的方法为:

    void acquire(int arg)

    void acquireInterruptibly(int art)

    boolean release(int arg)

  • 在共享方式下获取和释放资源的方法为:

    void acquireShared(int arg)

    void acquireSharedInterruptibly(int arg)

    boolean releaseShared(int arg)

使用独占锁方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时会发现它就是锁的持有者,则会把状态从1变为2,也就是设置可重入次数,而当另一个线程获取锁时发现不是该锁的持有者就会被放入AQS阻塞队列后挂起。

对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式获取即可。比如Semaphore信号量,当一个线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需求,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。

在独占方式下,获取与释放资源的流程如下:

  1. 当一个线程调用acquire(int arg)方法时,会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己。

    1
    2
    3
    4
    5
    public final void acquire(int arg){
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  2. 当一个线程调用release(int arg)方法时会尝试使用tryRelease操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryAcquire尝试,看当前状态变量state的值是否能满足自己的需求,满足则该线程被激活,然后继续向下运行,否则还是会被入AQS队列被挂起。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final boolean release(int arg) {
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }

需要注意的是,AQS类并没有提供可用的tryAcquire和tryRelease方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquire和tryRelease需要由具体的子类来实现。子类在实现tryAcquire和tryRelease时需要根据场景使用CAS算法尝试修改state状态值,成功则返回true。子类还需要定义,在调用acquire和release方法时state状态值的增减代表什么含义。

比如继承自AQS实现的独占锁ReetrantLock,定义当status为0时表示锁空闲,为1表示锁已经被占用。再重写tryAcquire时,在内部需要使用CAS算法查看当前state是否为0,如果为0则使用CAS设置为1,并设置当前锁的持有者为当前线程,而后返回true,如果CAS失败则返回false。

在共享方式下,获取与释放资源的流程如下:

  1. 当线程调用acquireShared(int arg)获取共享资源时,会首先使用 trγAcquireShared 尝试获取资源, 具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线 程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻塞 队列的尾部,并使用 LockSupport.park(this)方法挂起自己。

    1
    2
    3
    4
    public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
    }
  2. 当一个线程调用 releaseShared(int arg)时会尝试使用 tryReleaseShared 操作释放资源,这里是设置状态变量state 的值,然后使用 LockSupport.unpark (thread)激活 AQS 队 列里面被阻塞的一个线程 (thread)。被激活的线程则使用 tryReleaseShared 查看当前状态变 量 state 的值是否能满足自己的需要,满足则该线程被撤活,然后继续向下运行,否则还 是会被放入 AQS 队列并被挂起。

    1
    2
    3
    4
    5
    6
    7
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
    }
    return false;
    }

同样需要注意的是, AQS 类并没有提供可用的 t可AcquireShared 和 tryRelease Shared 方法,正如 AQS 是锁阻塞和同步器的基础框架一样,tryAcquireShared 和 tryReleaseShared 需要由具体的子类来实现。子类在实现 tryAcquireShared 和 tryReleaseShared 时要根据具体 场景使用CAS算法尝试修改 state 状态值,成功则返回 true否则返回 false。

比如继承自 AQS 实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryAcquireShared 时,首先查看写锁是否被其他线程持有,如果是则直接返回 false,否则使用 CAS 递增 state 的高16位 (在 ReentrantReadWriteLock 中,state 的 高 16 位为获取读锁的次数)。

比如继 承自 AQS 实现 的 读写也Ji ReentrantReadWriteLock 里面的 读锁在重写 t叩ReleaseShared 时,在内部需要使用 CAS 算法把当前 state 值的高 16 位减 1, 然后返回 true,如果 CAS 失败则返回 false。

基于 AQS 实现的锁除了 需要重写上面介绍的方法外,还需要重写 isHeldExclusively 方法,来判断锁是被当前线程独占还是被共享。

另外, 也许你会好奇,独占方式下的 void acquire(int arg)和void acquirelnterruptibly(int arg),与共享方式下的 void acquireShared(int arg)和 void acquireSharedlnterruptibly(int arg), 这两套函数中都有一个带有 Interruptibly 关键字的函数,那么带这个关键字和不带有什么 区别呢?我们来讲讲。

其实不带 Intenuptibly 关键字的方法的意思是不对中断进行响应,也就是线程在调用 不带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了 该线程, 那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就 是说不对中断进行响应,忽略中断。

而带 Interruptibly 关键字的方法要对中断进行l响应,也就是线程在调用带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线 程会抛出 InterruptedException 异常而返回。

最后,我们来看看如何维护 AQS 提供的队列,主要看入队操作。

入队操作: 当一个线程获取锁失败后该线程会被转换为 Node 节点,然后就会使用 enq(final Node node)方法将该节点插入到 AQS 的阻塞队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail; // (1)
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // (2)
tail = head;
} else {
node.prev = t; // (3)
if (compareAndSetTail(t, node)) { // (4)
t.next = node;
return t;
}
}
}
}

下面结合代码和节点图(下方图)来讲解入队的过程。 如上代码在第一次循环中, 当要在 AQS 队列尾部插入元素时, AQS 队列状态如下方图中(default)所示。 也就是队列 头、尾节点都指向 null ; 当执行代码 (1 )后节点 t 指向了尾部节点,这时候队列状态如下方图 中 (I)所示。

这时候 t 为 null ,故执行代码(2 ),使用 CAS 算法设置一个哨兵节点为头节点,如果 CAS 设置成功,则让尾部节点也指向哨兵节点,这时候队列状态如下方图中(II)所示。

到现在为止只插入了一个哨兵节点,还需要插入 node 节点,所以在第二次循环后执 行到代码 (1 ),这时候队列状态如下方图 (III)所示 ; 然后执行代码 (3 )设置 node 的 前驱节点为尾部节点,这时候队列状态如下方图 中 (IV)所示:然后通过 CAS 算法设置 node 节点为尾部节点, CAS 成功后队列状态如下方图 中 CV )所示: CAS 成功后再设置原 来的尾部节点的后驱节点为 node 这时候就完成了双向链表的插入,此时队列状态如下方图 中( VI) 所示。

image-20200423235313934.png

到这里AQS概述,就结束了,下节说条件变量的支持!

喜欢关注公众号:

qrcode

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:
https://cloud.tencent.com/developer/support-plan?invite_code=33ptblcpxsis8

评论

You forgot to set the shortname for Disqus. Please set it in _config.yml.