技术文章 > java >  java基础 > 正文

java中SynchronousQueue的核心方法

小妮浅浅

我们之前提过SynchronousQueue入队和出队的两种方法,其实它们都依托transfer方法得以实现。相比较而言,transfer可以同步进行入队和出队的操作,是SynchronousQueue中最重要的核心方法。下面我们就transfer概念、使用场景,以及在代码中增减元素的实例带来全面介绍。

1.transfer概念

进行匹配交换数据,SynchronousQueue内部使用Transferer来交换元素。

(1) 传入元素e,是生产者(put方法),

(2) 传入null,是消费者(take方法)。

2.使用场景

(1)当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。

(2)如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。

3.实例

// TransferStack.transfer()方法
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    // 根据e是否为null决定是生产者还是消费者
    int mode = (e == null) ? REQUEST : DATA;
    // 自旋+CAS,熟悉的套路,熟悉的味道
    for (;;) {
        // 栈顶元素
        SNode h = head;
        // 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的
        // 也就是都是生产者节点或者都是消费者节点
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 如果有超时而且已到期
            if (timed && nanos <= 0) {      // can't wait
                // 如果头节点不为空且是取消状态
                if (h != null && h.isCancelled())
                    // 就把头节点弹出,并进入下一次循环
                    casHead(h, h.next);     // pop cancelled node
                else
                    // 否则,直接返回null(超时返回null)
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 入栈成功(因为是模式相同的,所以只能入栈)
                // 调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到
                SNode m = awaitFulfill(s, timed, nanos);
                // 如果m等于s,说明取消了,那么就把它清除掉,并返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    // 被取消了返回null
                    return null;
                }
 
                // 到这里说明匹配到元素了
                // 因为从awaitFulfill()里面出来要不被取消了要不就匹配到了
 
                // 如果头节点不为空,并且头节点的下一个节点是s
                // 就把头节点换成s的下一个节点
                // 也就是把h和s都弹出了
                // 也就是把栈顶两个元素都弹出了
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                // 根据当前节点的模式判断返回m还是s中的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 到这里说明头节点和当前节点模式不一样
            // 如果头节点不是正在撮合中
 
            // 如果头节点已经取消了,就把它弹出栈
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 头节点没有在撮合中,就让当前节点先入队,再让他们尝试匹配
                // 且s成为了新的头节点,它的状态是正在撮合中
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    // 如果m为null,说明除了s节点外的节点都被其它线程先一步撮合掉了
                    // 就清空栈并跳出内部循环,到外部循环再重新入栈判断
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    // 如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        // 返回撮合结果
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        // 尝试撮合失败,说明m已经先一步被其它线程撮合了
                        // 就协助清除它
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            // 到这里说明当前节点和头节点模式不一样
            // 且头节点是正在撮合中
 
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                // 如果m为null,说明m已经被其它线程先一步撮合了
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                // 协助匹配,如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s
                if (m.tryMatch(h))          // help match
                    // 将栈顶的两个元素弹出后,再让s重新入栈
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    // 尝试撮合失败,说明m已经先一步被其它线程撮合了
                    // 就协助清除它
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}
 
// 三个参数:需要等待的节点,是否需要超时,超时时间
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 到期时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 当前线程
    Thread w = Thread.currentThread();
    // 自旋次数
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 当前线程中断了,尝试清除s
        if (w.isInterrupted())
            s.tryCancel();
 
        // 检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s)
        SNode m = s.match;
        // 如果匹配到了,直接返回m
        if (m != null)
            return m;
 
        // 如果需要超时
        if (timed) {
            // 检查超时时间如果小于0了,尝试清除s
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            // 如果还有自旋次数,自旋次数减一,并进入下一次自旋
            spins = shouldSpin(s) ? (spins-1) : 0;
 
        // 后面的elseif都是自旋次数没有了
        else if (s.waiter == null)
            // 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            // 如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            // 如果允许超时且还有剩余时间,就阻塞相应时间
            LockSupport.parkNanos(this, nanos);
    }
}
 
    // SNode里面的方向,调用者m是s的下一个节点
    // 这时候m节点的线程应该是阻塞状态的
    boolean tryMatch(SNode s) {
        // 如果m还没有匹配者,就把s作为它的匹配者
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                // 唤醒m中的线程,两者匹配完毕
                LockSupport.unpark(w);
            }
            // 匹配到了返回true
            return true;
        }
        // 可能其它线程先一步匹配了m,返回其是否是s
        return match == s;
}

以上就是java中SynchronousQueue的核心方法,相信已经本篇对于transfer方法的学习,在有关入队和出队的操作上就会进行的比较顺利,学会后一定要加强这方面使用方法的记忆。

免费视频教程
本文原创发布python学习网,转载请注明出处,感谢您的尊重!
相关文章
 java ConcurrentLinkedQueue的用法整理
 java ConcurrentLinkedQueue元素获取操作
 java中SynchronousQueue是什么意思
 java中SynchronousQueue的原理
 SynchronousQueue在java中的元素增减
相关视频章节
 网络爬虫
 云端部署Web应用程序视频
 Web应用框架Flask和文件模板
 Web应用程序开发概述
 继承和多态
作者信息

小妮浅浅

认证0级讲师

最近文章
java编译命令是什么517
java的jdk是什么407
java Wrapper类的使用806
推荐视频
视频教程分类