前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

AQS 的设计初衷

Doug Lea 曾经介绍过 AQS 的设计初衷。

Doug Lea,这个不用多说,大家可以搜一搜 JDK 的源码,JDK5 的并发包就是他开发的,真 Java 大神。

从原理上,一种同步结构往往是可以利用其他的结构实现的,但是,对某种同步结构的倾向,会导致复杂、晦涩的实现逻辑,所以,他选择了将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 为我们构建同步结构提供了范本。

AQS 源码(JDK8)

4 大核心

state

一个 volatile 的整数成员表征状态,同时提供了 setState 和 getState 方法

    /**
     * The synchronization state.
     */
    private volatile int state;

队列

一个先入先出(FIFO)的等待线程队列,以实现多线程间竞争和等待,这是 AQS 机制的核心之一。


    /**
     * 等待队列的头,延迟初始化。
     * 除初始化外,它仅通过方法setHead进行修改。
     * 注意:如果head存在,则保证其 waitStatus 不是 CANCELLED
     */
    private transient volatile Node head;

    /**
     * 等待队列的尾部,延迟初始化。
     * 仅通过方法enq修改以添加新的等待节点。
     */
    private transient volatile Node tail;

从源码可以看出来,AQS 通过双链表来实现 FIFO 队列

CAS

各种基于 CAS 的基础操作方法


    /**
     * 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值。
     * 此操作具有易失性读写的内存语义。 
     * 参数: expect–期望值 
     * 			update–新值 
     * 返回: 如果成功,则为true。假返回表示实际值不等于预期值。
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    /**
     * CAS next field of a node.
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

从源码中可以看出来,CAS 操作的对象是 state 或 双链表Node节点。

acquire/release

各种期望具体同步结构去实现的 acquire/release 方法


    /**
     * 以独占模式获取,忽略中断。
     * 通过调用至少一次tryAcquire并在成功时返回来实现。否则线程将			  
     * 排队,可能会重复阻塞和取消阻塞,调用tryAcquire直到成功。
     * 此方法可用于实现Lock.Lock方法。
     *  参数: arg–acquire参数。此值会传递给tryAcquire,但在其他方面不会被解释,并且可以
     * 表示您喜欢的任何内容。
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * 以独占模式发布。如果tryRelease返回true,则通过取消阻止一个或多个线程来实现。此方法可
     * 用于实现方法Lock.unlock。 
     * 参数: arg–释放参数。该值会传递给tryRelease,但在其他方面并不奇怪,可以表示您喜欢的
     * 任何内容。 
     * 返回: 从tryRelease返回的值
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

利用 AQS 实现一个同步结构,至少要实现两个基本类型的方法,分别是 acquire 操作,获取资源的独占权;还有就是 release 操作,释放对某个资源的独占

ReentrantLock

以 ReentrantLock 为例,它内部通过扩展 AQS 实现了 Sync 类型,以 AQS 的 state 来反映锁的持有情况。

    /**
     * 此锁是同步控制的基础。分为下面的公平和非公平版本。
     * 使用AQS的 state 表示锁上的保留数。
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {...}

下面是 ReentrantLock 对应 acquire 和 release 操作。

lock


    /**
     * 获得锁。 
     * 如果锁未被另一个线程持有,则获取锁并立即返回,将锁持有计数设置为1。 
     * 如果当前线程已经持有锁,那么持有计数将增加1,并且方法立即返回。 
     * 如果锁由另一个线程持有,则当前线程出于线程调度目的将被禁用,并处于休眠状态,直
     * 到获得锁为止,此时锁持有计数设置为1。 
     */
    public void lock() {
        sync.lock();
    }

ReentrantLock 的 lock 内部调用 Sync 的 lock方法


    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

Sync 是个抽象类,lock 分别由它的两个子类 公平锁:FaireSync 和 非公平锁:NonFairSync 实现

FaireSync.lock


    final void lock() {
          acquire(1);
      }

NonFairSync.lock

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

在 ReentrantLock 中,tryAcquire 逻辑实现在 NonfairSync 和 FairSync 中,分别提供了进一步的非公平或公平性方法,而 AQS 内部 tryAcquire 仅仅是个接近未实现的方法(直接抛异常),这是留给实现者自己定义的

FaireSync.tryAcquire


    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

NonFaireSync.tryAcquire

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }


    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

非公平的 tryAcquire 内部实现了如何配合状态与 CAS 获取锁,注意,对比公平版本的 tryAcquire,它在锁无人占有时,并不检查是否有其他等待者,这里体现了非公平的语义。

acquireQueued


    /**
     * 已在队列中的线程以独占不间断模式获取。
     * 用于条件等待方法和获取。
     * 参数: node–节点 
     * 	arg–acquire参数 
     * 返回: 如果在等待时被中断,则为true 
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

如果前面的 tryAcquire 失败,代表着锁争抢失败,进入排队竞争阶段。
这里就是我们所说的,利用 FIFO 队列,实现线程间对锁的竞争的部分,算是是
AQS 的核心逻辑。
当前线程会被包装成为一个排他模式的节点(EXCLUSIVE),通过 addWaiter 方法
添加到队列中。
acquireQueued 的逻辑,简要来说,就是如果当前节点的前面是头节点,则试图获
取锁,一切顺利则成为新的头节点;
否则,有必要则等待。

到这里线程试图获取锁的过程基本展现出来了,tryAcquire 是按照特定场景需要开发者去实现的部分,而线程间竞争则是 AQS 通过 Waiter 队列与 acquireQueued 提供的,在 release 方法中,同样会对队列进行对应操作。

unlock

	/**
	 * 试图释放此锁。 
	 * 如果当前线程是此锁的保持器,则保持计数将减少。
	 * 如果保持计数现在为零,则释放锁。
	 * 如果当前线程不是此锁的持有者,则抛出IllegalMonitorStateException。 
	 * 抛出: IllegalMonitorStateException–如果当前线程未持有此锁
	 */
    public void unlock() {
        sync.release(1);
    }

 	/**
     * 以独占模式发布。
     * 如果tryRelease返回true,则通过取消阻止一个或多个线程来实现。
     * 此方法可用于实现方法Lock.unlock。 
     * 参数: arg–释放参数。该值会传递给tryRelease,但在其他方面并不奇怪,可以表示您喜欢的
     * 任何内容。 
     * 返回: 从tryRelease返回的值
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


	protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
上一篇 下一篇