• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

并发条件队列之Condition 精讲

其他 雪中孤狼 2731次浏览 0个评论

1. 条件队列的意义

       Condition将Object监控器方法( wait , notify和notifyAll )分解为不同的对象,从而通过与任意Lock实现结合使用,从而使每个对象具有多个等待集。 Lock替换了synchronized方法和语句的使用,而Condition替换了Object监视器方法的使用。

       条件(也称为条件队列或条件变量)为一个线程暂停执行(“等待”)直到另一线程通知某些状态条件现在可能为真提供了一种方法。 由于对该共享状态信息的访问发生在不同的线程中,因此必须对其进行保护,因此某种形式的锁与该条件相关联。 等待条件提供的关键属性是它自动释放关联的锁并挂起当前线程,就像Object.wait一样。

       Condition实例从本质上绑定到锁。 要获取特定Lock实例的Condition实例,请使用其newCondition()方法

2. 条件队列原理

2.1 条件队列结构

       条件队列是一个单向链表,在该链表中我们使用nextWaiter属性来串联链表。但是,就像在同步队列中不会使用nextWaiter属性来串联链表一样,在条件队列是中,也并不会用到prev, next属性,它们的值都为null。

队列的信息包含以下几个部分:

  1. private transient Node firstWaiter;// 队列的头部结点
  2. private transient Node lastWaiter;// 队列的尾部节点

队列中节点的信息包含以下几个部分:

  1. 当前节点的线程 thread
  2. 当前节点的状态 waitStatus
  3. 当前节点的下一个节点指针 nextWaiter

结构图:

注意:

       在条件队列中,我们只需要关注一个值即可那就是CONDITION。它表示线程处于正常的等待状态,而只要waitStatus不是CONDITION,我们就认为线程不再等待了,此时就要从条件队列中出队。

2.2 入队原理

       每创建一个Condtion对象就会对应一个Condtion队列,每一个调用了Condtion对象的await方法的线程都会被包装成Node扔进一个条件队列中

3. 条件队列与同步队列

       一般情况下,等待锁的同步队列和条件队列条件队列是相互独立的,彼此之间并没有任何关系。但是,当我们调用某个条件队列的signal方法时,会将某个或所有等待在这个条件队列中的线程唤醒,被唤醒的线程和普通线程一样需要去争锁,如果没有抢到,则同样要被加到等待锁的同步队列中去,此时节点就从条件队列中被转移到同步队列中

1. 条件队列转向同步队列图


                            注意图中标红色的线

       但是,这里尤其要注意的是,node是被 一个一个转移过去的,哪怕我们调用的是signalAll()方法也是一个一个转移过去的,而不是将整个条件队列接在同步队列的末尾。
       同时要注意的是,我们在同步队列中只使用prev、next来串联链表,而不使用nextWaiter;我们在条件队列中只使用nextWaiter来串联链表,而不使用prev、next.事实上,它们就是两个使用了同样的Node数据结构的完全独立的两种链表。
       因此,将节点从条件队列中转移到同步队列中时,我们需要断开原来的链接(nextWaiter),建立新的链接(prev, next),这某种程度上也是需要将节点一个一个地转移过去的原因之一。

2. 条件队列与同步队列的区别

       同步队列是等待锁的队列,当一个线程被包装成Node加到该队列中时,必然是没有获取到锁;当处于该队列中的节点获取到了锁,它将从该队列中移除(事实上移除操作是将获取到锁的节点设为新的dummy head,并将thread属性置为null)。

       条件队列是等待在特定条件下的队列,因为调用await方法时,必然是已经获得了lock锁,所以在进入条件队列前线程必然是已经获取了锁;在被包装成Node扔进条件队列中后,线程将释放锁,然后挂起;当处于该队列中的线程被signal方法唤醒后,由于队列中的节点在之前挂起的时候已经释放了锁,所以必须先去再次的竞争锁,因此,该节点会被添加到同步队列中。因此,条件队列在出队时,线程并不持有锁。

3. 条件队列与同步队列锁关系

条件队列:入队时已经持有了锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到同步队列

同步队列:入队时没有锁 -> 在队列中争锁 -> 离开队列时获得了锁

4. 实战用法

       例如,假设我们有一个有界缓冲区,它支持put和take方法。 如果尝试在空缓冲区上进行take ,则线程将阻塞,直到有可用项为止。 如果尝试在完整的缓冲区上进行put ,则线程将阻塞,直到有可用空间为止。 我们希望继续等待put线程,并在单独的等待集中take线程,以便我们可以使用仅当缓冲区中的项目或空间可用时才通知单个线程的优化。 这可以使用两个Condition实例来实现一个典型的生产者-消费者模型。这里在同一个lock锁上,创建了两个条件队列fullCondition, notFullCondition。当队列已满,没有存储空间时,put方法在notFull条件上等待,直到队列不是满的;当队列空了,没有数据可读时,take方法在notEmpty条件上等待,直到队列不为空,而notEmpty.signal()和notFull.signal()则用来唤醒等待在这个条件上的线程。

public class BoundedQueue {
  /**
   * 生产者容器
   */
  private LinkedList<Object> buffer;
  /**
   * 容器最大值是多少
   */
  private int maxSize;
  /**
   * 锁
   */
  private Lock lock;
  /**
   * 满了
   */
  private Condition fullCondition;
  /**
   * 不满
   */
  private Condition notFullCondition;
  BoundedQueue(int maxSize) {
    this.maxSize = maxSize;
    buffer = new LinkedList<Object>();
    lock = new ReentrantLock();
    fullCondition = lock.newCondition();
    notFullCondition = lock.newCondition();
  }
  /**
   * 生产者
   *
   * @param obj
   * @throws InterruptedException
   */
  public void put(Object obj) throws InterruptedException {
    //获取锁
    lock.lock();
    try {
      while (maxSize == buffer.size()) {
        System.out.println(Thread.currentThread().getName() + "此时队列满了,添加的线程进入等待状态");
        // 队列满了,添加的线程进入等待状态
        notFullCondition.await();
      }
      buffer.add(obj);
      //通知
      fullCondition.signal();
    } finally {
      lock.unlock();
    }
  }
  /**
   * 消费者
   *
   * @return
   * @throws InterruptedException
   */
  public Object take() throws InterruptedException {
    Object obj;
    lock.lock();
    try {
      while (buffer.size() == 0) {
        System.out.println(Thread.currentThread().getName() + "此时队列空了线程进入等待状态");
        // 队列空了线程进入等待状态
        fullCondition.await();
      }
      obj = buffer.poll();
      //通知
      notFullCondition.signal();
    } finally {
      lock.unlock();
    }
    return obj;
  }
  public static void main(String[] args) {
    // 初始化最大能放2个元素的队列
    BoundedQueue boundedQueue = new BoundedQueue(2);
    for (int i = 0; i < 3; i++) {
      Thread thread = new Thread(() -> {
        try {
          boundedQueue.put("元素");
          System.out.println(Thread.currentThread().getName() + "生产了元素");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      thread.setName("线程" + i);
      thread.start();
    }
    for (int i = 0; i < 3; i++) {
      Thread thread = new Thread(() -> {
        try {
          boundedQueue.take();
          System.out.println(Thread.currentThread().getName() + "消费了元素");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      thread.setName("线程" + i);
      thread.start();
    }
    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

输出结果:
并发条件队列之Condition 精讲

5. 源码分析

Condition接口中的方法

1. await()

       实现可中断条件等待,其实我们以上案例是利用ReentrantLock来实现的生产者消费者案例,进去看源码发现其实实现该方法的是 AbstractQueuedSynchronizer 中ConditionObject实现的
       将节点添加进同步队列中,并要么立即唤醒线程,要么等待前驱节点释放锁后将自己唤醒,无论怎样,被唤醒的线程要从哪里恢复执行呢?调用了await方法的地方

中断模式interruptMode这个变量记录中断事件,该变量有三个值:

  1. 0 : 代表整个过程中一直没有中断发生。
  2. THROW_IE : 表示退出await()方法时需要抛出InterruptedException,这种模式对应于中断发生在signal之前
  3. REINTERRUPT : 表示退出await()方法时只需要再自我中断以下,这种模式对应于中断发生在signal之后。
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加节点到条件队列中
    Node node = addConditionWaiter();
     // 释放当前线程所占用的锁,保存当前的锁状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果当前队列不在同步队列中,说明刚刚被await, 还没有人调用signal方法,
    // 则直接将当前线程挂起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 线程挂起的地方
         // 线程将在这里被挂起,停止运行
        // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了
        // 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
   // 线程将在同步队列中利用进行acquireQueued方法进行“阻塞式”争锁,
   // 抢到锁就返回,抢不到锁就继续被挂起。因此,当await()方法返回时,
   // 必然是保证了当前线程已经持有了lock锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter() 方法是封装一个节点将该节点放入条件队列中

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 如果尾节点被cancel了,则先遍历整个链表,清除所有被cancel的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程包装成Node扔进条件队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果当前节点为空值那么新创建的node节点就是第一个等待节点
    if (t == null)
        firstWaiter = node;
    // 如果当前节点不为空值那么新创建的node节点就加入到当前节点的尾部节点的下一个
    else
        t.nextWaiter = node;
    lastWaiter = node; // 尾部节点指向当前节点
    return node; // 返回新加入的节点
}

注意:

  1. 节点加入条件队列时waitStatus的值为Node.CONDTION。
  2. 如果入队时发现尾节点已经取消等待了,那么我们就不应该接在它的后面,此时需要调用unlinkCancelledWaiters来清除那些已经取消等待的线程(条件队列从头部进行遍历的,同步队列是从尾部开始遍历的)
private void unlinkCancelledWaiters() {
    // 获取队列的头节点
    Node t = firstWaiter;
    Node trail = null;
    // 当前节点不为空 
    while (t != null) {
       // 获取下一个节点
        Node next = t.nextWaiter;
        // 如果当前节点不是条件节点
        if (t.waitStatus != Node.CONDITION) {
            // 在队列中取消当前节点
            t.nextWaiter = null;
            if (trail == null)
                // 队列的头节点是当前节点的下一个节点
                firstWaiter = next;
            else
                // trail的 nextWaiter 指向当前节点t的下一个节点
                // 因为此时t节点已经被取消了
                trail.nextWaiter = next;
                // 如果t节点的下一个节点为空那么lastWaiter指向trail
            if (next == null)
                lastWaiter = trail;
        }
        else
            // 如果是条件节点 trail 指向当前节点
            trail = t;
        // 循环赋值遍历
        t = next;
    }
}

fullyRelease(node) 方法释放当前线程所占用的锁

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 如果释放成功
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            // 节点的状态被设置成取消状态,从同步队列中移除
            node.waitStatus = Node.CANCELLED;
    }
}
public final boolean release(int arg) {
    // 尝试获取锁,如果获取成功,唤醒后续线程
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

线程唤醒后利用checkInterruptWhileWaiting方法检测中断模式

  1. 情况一中断发生时,线程还没有被唤醒过

       这里假设已经发生过中断,则Thread.interrupted()方法必然返回true,接下来就是用transferAfterCancelledWait进一步判断是否发生了signal。

 // 检查是否有中断,如果在发出信号之前被中断,则返回THROW_IE;
 // 在发出信号之后,则返回REINTERRUPT;如果没有被中断,则返回0。
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

       只要一个节点的waitStatus还是Node.CONDITION,那就说明它还没有被signal过。
       由于现在我们分析情况1,则当前节点的waitStatus必然是Node.CONDITION,则会成功执行compareAndSetWaitStatus(node, Node.CONDITION, 0),将该节点的状态设置成0,然后调用enq(node)方法将当前节点添加进同步队列中,然后返回true。

注意: 我们此时并没有断开node的nextWaiter,所以最后一定不要忘记将这个链接断开。
       再回到transferAfterCancelledWait调用处,可知,由于transferAfterCancelledWait将返回true,现在checkInterruptWhileWaiting将返回THROW_IE,这表示我们在离开await方法时应当要抛出THROW_IE异常。

   // ....
   while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 线程挂起的地方
         // 线程将在这里被挂起,停止运行
        // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了
        // 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
   // 线程将在同步队列中利用进行acquireQueued方法进行“阻塞式”争锁,
   // 抢到锁就返回,抢不到锁就继续被挂起。因此,当await()方法返回时,
   // 必然是保证了当前线程已经持有了lock锁
   
   // 我们这里假设它获取到了锁了,由于我们这时
   // 的interruptMode = THROW_IE,则会跳过if语句。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 遍历链表了,把链表中所有没有在等待的节点都拿出去,所以这里调用
    // unlinkCancelledWaiters方法,该方法我们在前面await()第一部分的分析
    // 的时候已经讲过了,它就是简单的遍历链表,找到所有waitStatus
    // 不为CONDITION的节点,并把它们从队列中移除
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 这里我们的interruptMode=THROW_IE,说明发生了中断,
    // 则将调用reportInterruptAfterWait
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }
 }
// 在interruptMode=THROW_IE时,就是简单的抛出了一个InterruptedException
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

       interruptMode现在为THROW_IE,则我们将执行break,跳出while循环。接下来我们将执行acquireQueued(node, savedState)进行争锁,注意,这里传入的需要获取锁的重入数量是savedState,即之前释放了多少,这里就需要再次获取多少
情况一总结:

  1. 线程因为中断,从挂起的地方被唤醒
  2. 随后,我们通过transferAfterCancelledWait确认了线程的waitStatus值为Node.CONDITION,说明并没有signal发生过
  3. 然后我们修改线程的waitStatus为0,并通过enq(node)方法将其添加到同步队列中
  4. 接下来线程将在同步队列中以阻塞的方式获取,如果获取不到锁,将会被再次挂起
  5. 线程在同步队列中获取到锁后,将调用unlinkCancelledWaiters方法将自己从条件队列中移除,该方法还会顺便移除其他取消等待的锁
  6. 最后我们通过reportInterruptAfterWait抛出了InterruptedException

因此:

       由此可以看出,一个调用了await方法挂起的线程在被中断后不会立即抛出InterruptedException,而是会被添加到同步队列中去争锁,如果争不到,还是会被挂起;

       只有争到了锁之后,该线程才得以从同步队列和条件队列中移除,最后抛出InterruptedException。

       所以说,一个调用了await方法的线程,即使被中断了,它依旧还是会被阻塞住,直到它获取到锁之后才能返回,并在返回时抛出InterruptedException。中断对它意义更多的是体现在将它从条件队列中移除,加入到同步队列中去争锁,从这个层面上看,中断和signal的效果其实很像,所不同的是,在await()方法返回后,如果是因为中断被唤醒,则await()方法需要抛出InterruptedException异常,表示是它是被非正常唤醒的(正常唤醒是指被signal唤醒)

  1. 情况二中断发生时,线程已经被唤醒过包含以下两种情况
    a. 被唤醒时,已经发生了中断,但此时线程已经被signal过了
final boolean transferAfterCancelledWait(Node node) {
// 线程A执行到这里,CAS操作将会失败
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 
        enq(node);
        return true;
    }
// 由于中断发生前,线程已经被signal了,则这里只需要等待线程成功进入同步即可
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

       由于transferAfterCancelledWait返回了false,则checkInterruptWhileWaiting方法将返回REINTERRUPT,这说明我们在退出该方法时只需要再次中断因为signal后条件队列加入到了同步队列中所以node.nextWaiter为空了,所以直接走到了reportInterruptAfterWait(interruptMode)方法

    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 这里我们的interruptMode=THROW_IE,说明发生了中断,
    // 则将调用reportInterruptAfterWait
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }
 }
// 在interruptMode=THROW_IE时,就是简单的抛出了一个InterruptedException
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
   // 这里并没有抛出中断异常,而只是将当前线程再中断一次。
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

情况二中的第一种情况总结:

  1. 线程从挂起的地方被唤醒,此时既发生过中断,又发生过signal

  2. 随后,我们通过transferAfterCancelledWait确认了线程的waitStatus值已经不为Node.CONDITION,说明signal发生于中断之前

  3. 然后,我们通过自旋的方式,等待signal方法执行完成,确保当前节点已经被成功添加到同步队列中

  4. 接下来线程将在同步队列中以阻塞的方式获取锁,如果获取不到,将会被再次挂起

  5. 最后我们通过reportInterruptAfterWait将当前线程再次中断,但是不会抛出InterruptedException

    b. 被唤醒时,并没有发生中断,但是在抢锁的过程中发生了中断

       此情况就是已经被唤醒了那么isOnSyncQueue(node)返回true,在同步队列中了就,退出了while循环。
       退出while循环后接下来还是利用acquireQueued争锁,因为前面没有发生中断,则interruptMode=0,这时,如果在争锁的过程中发生了中断,则acquireQueued将返回true,则此时interruptMode将变为REINTERRUPT。
       接下是判断node.nextWaiter != null,由于在调用signal方法时已经将节点移出了队列,所有这个条件也不成立。
       最后就是汇报中断状态了,此时interruptMode的值为REINTERRUPT,说明线程在被signal后又发生了中断,这个中断发生在抢锁的过程中,这个中断来的太晚了,因此我们只是再次自我中断一下。

情况二中的第二种情况总结:

  1. 线程被signal方法唤醒,此时并没有发生过中断
  2. 因为没有发生过中断,我们将从checkInterruptWhileWaiting处返回,此时interruptMode=0
  3. 接下来我们回到while循环中,因为signal方法保证了将节点添加到同步队列中,此时while循环条件不成立,循环退出
  4. 接下来线程将在同步队列中以阻塞的方式获取,如果获取不到锁,将会被再次挂起
  5. 线程获取到锁返回后,我们检测到在获取锁的过程中发生过中断,并且此时interruptMode=0,这时,我们将interruptMode修改为REINTERRUPT
  6. 最后我们通过reportInterruptAfterWait将当前线程再次中断,但是不会抛出InterruptedException

3.情况三一直没发生中断
       直接正常返回

await方法总结

  1. 进入await()时必须是已经持有了锁
  2. 离开await()时同样必须是已经持有了锁
  3. 调用await()会使得当前线程被封装成Node扔进条件队列,然后释放所持有的锁
  4. 释放锁后,当前线程将在条件队列中被挂起,等待signal或者中断
  5. 线程被唤醒后会将会离开条件队列进入同步队列中进行抢锁
  6. 若在线程抢到锁之前发生过中断,则根据中断发生在signal之前还是之后记录中断模式
  7. 线程在抢到锁后进行善后工作(离开条件队列,处理中断异常)
  8. 线程已经持有了锁,从await()方法返回

           在这一过程中我们尤其要关注中断,如前面所说,中断和signal所起到的作用都是将线程从条件队列中移除,加入到同步队列中去争锁,所不同的是,signal方法被认为是正常唤醒线程,中断方法被认为是非正常唤醒线程,如果中断发生在signal之前,则我们在最终返回时,应当抛出InterruptedException;如果中断发生在signal之后,我们就认为线程本身已经被正常唤醒了,这个中断来的太晚了,我们直接忽略它,并在await()返回时再自我中断一下,这种做法相当于将中断推迟至await()返回时再发生。

2. awaitUninterruptibly

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
       //  发生了中断后线程依旧留在了条件队列中,将会再次被挂起
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

       由此可见,awaitUninterruptibly()全程忽略中断,即使是当前线程因为中断被唤醒,该方法也只是简单的记录中断状态,然后再次被挂起(因为并没有并没有任何操作将它添加到同步队列中)要使当前线程离开条件队列去争锁,则必须是发生了signal事件。
       最后,当线程在获取锁的过程中发生了中断,该方法也是不响应,只是在最终获取到锁返回时,再自我中断一下。可以看出,该方法和“中断发生于signal之后的”REINTERRUPT模式的await()方法很像

方法总结

  1. 中断虽然会唤醒线程,但是不会导致线程离开条件队列,如果线程只是因为中断而被唤醒,则他将再次被挂起
  2. 只有signal方法会使得线程离开条件队列
  3. 调用该方法时或者调用过程中如果发生了中断,仅仅会在该方法结束时再自我中断以下,不会抛出InterruptedException

3. awaitNanos

       该方法几乎和await()方法一样,只是多了超时时间的处理该方法的主要设计思想是,如果设定的超时时间还没到,我们就将线程挂起;超过等待的时间了,我们就将线程从条件队列转移到同步对列中。

public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

4. await(long time, TimeUnit unit)

       在awaitNanos(long nanosTimeout)的基础上多了对于超时时间的时间单位的设置,但是在内部实现上还是会把时间转成纳秒去执行。
       可以看出,这两个方法主要的差别就体现在返回值上面,awaitNanos(long nanosTimeout)的返回值是剩余的超时时间,如果该值大于0,说明超时时间还没到,则说明该返回是由signal行为导致的,而await(long time, TimeUnit unit)的返回值就是transferAfterCancelledWait(node)的值,我们知道,如果调用该方法时,node还没有被signal过则返回true,node已经被signal过了,则返回false。因此当await(long time, TimeUnit unit)方法返回true,则说明在超时时间到之前就已经发生过signal了,该方法的返回是由signal方法导致的而不是超时时间。

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

5. awaitUntil

       awaitUntil(Date deadline)方法与上面的几种带超时的方法也基本类似,所不同的是它的超时时间是一个绝对的时间

public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (System.currentTimeMillis() > abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

6. signal

只唤醒一个节点

public final void signal() {
 // getExclusiveOwnerThread() == Thread.currentThread(); 当前线
 // 程是不是独占线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取第一个阻塞线程节点
    Node first = firstWaiter;
    // 条件队列是否为空
    if (first != null)
        doSignal(first);
}
// 遍历整个条件队列,找到第一个没有被cancelled的节点,并将它添加到条件队列的末尾
// 如果条件队列里面已经没有节点了,则将条件队列清空
private void doSignal(Node first) {
    do {
        // 将firstWaiter指向条件队列队头的下一个节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 将条件队列原来的队头从条件队列中断开,则此时该节点成为一个孤立的节点
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

方法总结:
       调用signal()方法会从当前条件队列中取出第一个没有被cancel的节点添加到sync队列的末尾。

7. signalAll

唤醒所有的节点

public final void signalAll() {
 // getExclusiveOwnerThread() == Thread.currentThread(); 当前线
 // 程是不是独占线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取第一个阻塞线程节点
    Node first = firstWaiter;
   // 条件队列是否为空
    if (first != null)
        doSignalAll(first);
}
// 移除并转移所有节点
private void doSignalAll(Node first) {
    // 清空队列中所有数据
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}
// 将条件队列中的节点一个一个的遍历到同步队列中
final boolean transferForSignal(Node node) {
  // 如果该节点在调用signal方法前已经被取消了,则直接跳过这个节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
 // 利用enq方法将该节点添加至同步队列的尾部   
    Node p = enq(node); 
    // 返回的是前驱节点,将其设置SIGNAL之后,才会挂起
    // 当前节点
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

       在transferForSignal方法中,我们先使用CAS操作将当前节点的waitStatus状态由CONDTION设为0,如果修改不成功,则说明该节点已经被CANCEL了则我们直接返回操作下一个节点;如果修改成功,则说明我们已经将该节点从等待的条件队列中成功“唤醒”了,但此时该节点对应的线程并没有真正被唤醒,它还要和其他普通线程一样去争锁,因此它将被添加到同步队列的末尾等待获取锁 。
方法总结:

  1. 将条件队列清空(只是令lastWaiter = firstWaiter = null,队列中的节点和连接关系仍然还存在)
  2. 将条件队列中的头节点取出,使之成为孤立节点(nextWaiter,prev,next属性都为null)
  3. 如果该节点处于被Cancelled了的状态,则直接跳过该节点(由于是孤立节点,则会被GC回收)
  4. 如果该节点处于正常状态,则通过enq方法将它添加到同步队列的末尾
  5. 判断是否需要将该节点唤醒(包括设置该节点的前驱节点的状态为SIGNAL),如有必要,直接唤醒该节点
  6. 重复2-5,直到整个条件队列中的节点都被处理完

6. 总结

       以上便是Condition的分析,下一篇文章将是并发容器类的分析,如有错误之处,帮忙指出及时更正,谢谢, 如果喜欢谢谢点赞加收藏加转发(转发注明出处谢谢!!!)


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明并发条件队列之Condition 精讲
喜欢 (0)

您必须 登录 才能发表评论!

加载中……