前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

WHAT

队列 Queue 是常用的数据结构之一。

在 JDK 中提供了一个 ConcurrentLinkedQueue 类用来实现高并发的队列。

从名字可以看到,这个队列使用链表作为其数据结构。

有关 ConcurrentLinkedQueue 类的性能测试大家可以自行尝试,这里限于篇幅就不再给出性能测试的代码了。

大家只要知道 ConcurrentLinkedQueue 类应该算是在高并发环境中性能最好的队列就可以了。

源码分析

它之所以能有很好的性能,是因为其内部复杂的实现。

在这里,我更加愿意花一些篇幅来简单介绍一下 ConcurrentLinkedQueue 类的具体实现细节。

Node

作为一个链表,自然需要定义有关链表内的节点,在 ConcurrentLinkedQueue 类中,定义的节点 Node 源码如下:

	private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

其中 item 是用来表示目标元素的。

比如,当列表中存放 String 时, item 就是 String 类型。

字段 next 表示当前 Node 的下一个元素,这样每个 Node 就能环环相扣,串在一起了。

方法 casItem 表示设置当前 Node 的 item 值。

它需要两个参数,第一个参数为期望值,第二个参数为设置目标值。

当当前值等于 cmp 期望值时,就会将目标设置为 val 。

同样 casNext 方法也是类似的,但是它用于设置 next 字段,而不是 item 字段。

head/tail


    /**
     * A node from which the first live (non-deleted) node (if any)
     * can be reached in O(1) time.
     * Invariants:
     * - all live nodes are reachable from head via succ()
     * - head != null
     * - (tmp = head).next != tmp || tmp != head
     * Non-invariants:
     * - head.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     */
    private transient volatile Node<E> head;

    /**
     * A node from which the last node on list (that is, the unique
     * node with node.next == null) can be reached in O(1) time.
     * Invariants:
     * - the last node is always reachable from tail via succ()
     * - tail != null
     * Non-invariants:
     * - tail.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     * - tail.next may or may not be self-pointing to tail.
     */
    private transient volatile Node<E> tail;

ConcurrentLinkedQueue 类内部有两个重要的字段, head 和 tail ,分别表示链表的头部和尾部,它们都是 Node 类型。

对于 head 来说,它水远不会为 null ,并且通过 head 及 succ() 后继方法一定能完整地遍历整个链表。

对于 tail 来说,它自然应该表示队列的末尾。

但 ConcurrentLinkedQueue 类的内部实现非常复杂,它允许在运行时链表处于多个不同的状态。

以 tail 为例,一般来说,我们期望 tail 总是为链表的末尾,但实际上, tail 的更新并不是及时的,可能会产生拖延现象(即每次更新可能会跳过几个元素)。

offer

下面就是在 ConcurrentLinkedQueue 类中向队列中添加元素的 offer() 方法。

 	/**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

首先值得注意的是,这个方法没有任何锁操作。

线程安全完全由 CAS 操作和队列的算法来保证。

整个方法的核心是 for 循环,这个循环没有出口,直到尝试成功,这也符合 CAS 操作的流程。

当第一次加入元素时,由于队列为空,因此 p.next 为 null 。

将 p 的 next 节点赋值为 newNode ,也就是将新的元素加入队列中。 此时 p = t 成立,因此不会更新 tail 末尾。

如果 casNext() 方法成功,则程序直接返回,如果失败,则再进行一次循环尝试,直到成功。

因此,增加一个元素后, tail 并不会被更新。

当程序试图增加第 2 个元素时,由于 t 还在 head 的位置上,因此 p . next 指向实际的第一个元素,因此 q != null 表示 q 不是最后的节点。

由于往队列中增加元素需要最后一个节点的位置,因此,循环开始查找最后一个节点。

程序会获得最后一个节点,此时, p 实际上是指向链表中的第一个元素,而它的 next 为 null ,故在第 2 个循环时, p 更新自己的 next ,让它指向新加入的节点。

如果成功,由于此时 p != t 成功,则会更新 t 的所在位置,将 t 移动到链表最后。

对于 p == q 的情况, 这种情况是由于遇到了哨兵( sentinel )节点导致的。

所谓哨兵节点,就是 next 指向自己的节点。 这种节点在队列中的存在价值不大,主要表示要删除的节点,或者空节点。

当遇到哨兵节点时,由于无法通过 next 取得后续的节点,因此很可能直接返回 head ,期望通过从链表头部开始遍历,进一步找到链表末尾。

一且在执行过程中发生 tail 被其他线程修改的情况,则进行一次“打赌”,使用新的 tail 作为链表末尾(这样就避免了重新查找 tail 的开销)。

t != t

如果大家对 Java 不是特别熟悉,则可能会对类似下面的代码产生疑惑:

p = (t != (t = tail)) ? t : head;

这句代码虽然只有短短一行,但是包含的信息比较多。

首先“!=”并不是原子操作,它是可以被中断的。

也就是说,在执行“!=”时,程序会先取得 t 的值,再执行 t = tail ,并取得新的 t 的值,然后比较这两个值是否相等。

在单线程时, t !这种语句显然不会成立。

但是在并发环境中,有可能在获得左边的 t 值后,右边的 t 值被其他线程修改。

这样, t !=t 就可能成立了,这里就是这种情况。

如果在比较过程中, tail 被其他线程修改,当它再次赋值给 t 时,就会导致等式左边的 t 和右边的 t 不同。

如果两个 t 不相同,表示 tail 在中途被其他线程篡改。

这时,我们就可以用新的 tail 作为链表末尾,也就是这里等式右边的 t 。

但如果 tail 没有被修改,则返回 head ,要求从头部开始,重新查找尾部。

作为简化问题,我们考察 t != t 的字节码(注意这里假设 t 为静态整型变量)。

 11: getstatic #10 	// Field t:I
 14: getstatic #10 	// Field t:I
 17: if_icmpeq 24

可以看到,在字节码层面, t 被先后取了两次,在多线程环境下,我们自然无法保证两次对 t 的取值是相同的。

poll

下面我们来看一下哨兵节点是如何产生的

ConcurrentLinkedQueue<String> q=new ConcurrentLinkedQueue<String>();
q.add("1");
q.poll();
q.add("3");

上述代码第3行弹出队列内的元素,其执行过程如下:

	public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

由于队列中只有一个元素,根据前文的描述,此时 tail 并没有更新,而是指向和 head 相同的位置。

而此时, head 本身的 item 域为 null ,其 next 为列表第一个元素。

故在第一个循环中,代码直接 将 p 赋值为 q ,而 q 就是 p . next ,也是当前列表中的第一个元素。

接着,在第 2 轮循环中, p.item 显然不为 null (为字符串 1 )。

因此,代码应该可以顺利进入第 10 行(如果 CAS 操作成功)。

进入第 10 行,也意味着 p 的 item 域被设置为 null (因为这是弹出元素,自然需要删除)。

同时,此时 p 和 h 是不相等的(因为 p 已经指向原有的第一个元素了)。

故执行了第 11 行的 updateHead 方法,其实现如下:

	/**
     * Tries to CAS head to p. If successful, repoint old head to itself
     * as sentinel for succ(), below.
     */
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

可以看到,在 updateHead 中将 p 作为新的链表头部(通过 casHead 实现),而原有的 head 就被设置为哨兵(通过 h.lazySetNext 方法实现)。

这样一个哨兵节点就产生了,而由于此时原有的 head 头部和 tail 实际上是同一个元素。

因此,再次用 offer 方法插入元素时,就会遇到这个 tail ,也就是哨兵。

小结

通过这些说明,大家应该可以明显感觉到,不使用锁而单纯地使用 CAS 操作会要求在应用层面保证线程安全,并处理一些可能存在的不一致问题,大大增加了程序设计和实现的难度。

它带来的好处就是可以使性能飞速提升,因此,在有些场合也是值得的。

上一篇 下一篇