Java并发基石——所谓“阻塞”:Object Monitor和AQS(3)

九音引魂箫 1年前 ⋅ 338 阅读

====================================
(接上文《Java并发基石——所谓“阻塞”:Object Monitor和AQS(2)》)

  • 使用AQS实现的Mutex

现在我们来看看在AbstractQueuedSynchronizer官方文档中(实际上就是JDK源代码上的注释说明)给出的另一个示例性代码Mutex。这是一个使用AQS实现互斥锁的例子,完全可以用在正式环境下。以下代码片段中,英文注释信息是示例中自带的,中文信息是笔者加上的详细功能说明:

// ........省略掉import区域的信息
/** * 这是一个java AbstractQueuedSynchronizer中自带的Mutex互斥锁的实现,实际上就是基于AQS的独占机制实现。本文将详细讲解它的工作过程。 * @author Doug Lea */
class Mutex implements Lock, Serializable {
  // Our internal helper class
  private static class Sync extends AbstractQueuedSynchronizer {
    /** * Reports whether in locked state<br> * isHeldExclusively该方被重写,这个方法用于向当前线程反馈,是否AQS正处于独占操作状态下。<br> * 当AQS目前state状态为1时,就说明当前AQS队列正处于独占操作状态。 */
    protected boolean isHeldExclusively() {
      return getState() == 1;
    }
    /** * Acquires the lock if state is zero<br> * tryAcquire也在示例中被重写,当前线程试图获取AQS独占操作权时,该方法将被调用。 * */
    public boolean tryAcquire(int acquires) {
      assert acquires == 1;// Otherwise unused
      /* * 在上篇文章中已经介绍过compareAndSetState这个方法非常重要,多线程下状态的线程安全性主要也在这里体现 * ——因为在多线程抢占执行compareAndSetState这句代码时,只有一个线程能够执行成功 * 放在这个示例代码段落中,具体的工作特点是:比较当前AQS的state状态,如果为0,则设置为state状态为1,并返回true; * 其它情况下放弃对state的设定操作,并返回false。 * */
      if (compareAndSetState(0, 1)) {
        // 如果成功设置当前AQS的state状态为1,则说明当前线程获得了AQS的独占操作权
        // 那么通过setExclusiveOwnerThread方法,设定AQS的独占操作线程为当前线程
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }
    
    /** * Releases the lock by setting state to zero * tryRelease在示例中被重写,当前线程释放AQS队列的独占操作权,该方法被调用 * */
    protected boolean tryRelease(int releases) {
      assert releases == 1; // Otherwise unused
      // 如果调用该方法时,发现state状态不为1,就抛出异常
      // 这是为什么呢?这是因为,至少有一个线程在没有获得独占操作权的情况下,就调用了释放操作权的操作。
      if (getState() == 0) {
        throw new IllegalMonitorStateException();
      }
      
      // 通过setExclusiveOwnerThread方法,设定AQS的独占操作线程为null
      // 并且设定state状态为0
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }

    // Provides a Condition
    // 关于java中的java.util.concurrent.locks.Condition接口,请参见后文介绍
    Condition newCondition() {
      return new ConditionObject();
    }
  }

  // The sync object does all the hard work. We just forward to it.
  private final Sync sync = new Sync();

  // 该方法进行加锁处理,实际上就是调用AQS的acquire方法。这里的参数1,会出现在tryAcquire方法中
  // 一旦加锁成功(获取操作权的操作成功),其它试图加锁(试图获取操作权)但又没有成功的线程,则进入阻塞状态。
  public void lock() {
    sync.acquire(1); 
  }
  // acquireInterruptibly方法和acquire方法的区别,在上文中已经有了说明
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
  // 尝试加锁(尝试获取独占操作权),如果成功了(也就是说state因为当前线程的这个操作被置为1),则返回true;
  // 如果加锁操作没有成功,则返回false,这时当前线程还可以继续动作。
  public boolean tryLock() {
    return sync.tryAcquire(1); 
  }
  // 在一段持续的时间内,尝试加锁(尝试获取独占操作权),如果成功则返回true;
  // 如果加锁操作没有成功,则返回false,这是当前线程还可以继续动作。
  public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
  // 解除独占状态,AQS将在执行过程中调用Mutex.Sync中的tryRelease方法。
  public void unlock() {
    sync.release(1); 
  }
  public Condition newCondition() {
    return sync.newCondition();
  }
  // 查询当前AQS队列是否已经被某个线程独占。
  // 实际上就是调用isHeldExclusively方法,看状态是否为1,为1就是说明被独占了
  public boolean isLocked() {
    return sync.isHeldExclusively(); 
  }
  // 查询AQS中是否有任何线程正在等待获取。当AQS中的head != tail时,说明AQS中有等待队列
  public boolean hasQueuedThreads() {
    return sync.hasQueuedThreads(); 
  }
}

示例代码中的注释信息已经非常清楚了,这里就不再赘述。

  • Mutex的运行使用效果

接下来我们看一下Mutex的运行效果,既然是基于AQS中的独占模式进行的实现,那么在某一个线程获得独占操作权的情况下,其它线程就不能获得操作权了——要么进入阻塞等待获得独占操作权,要么放弃抢占操作权继续执行当前线程的后续操作。

情况一:某个线程抢得了操作权,其它线程等待

public static void main(String[] args) {
  final Mutex mutex = new Mutex();
  for(int index = 0 ; index < 3 ; index++) {
    new Thread(() -> {
      try {
        mutex.lock();
      } finally {
        // 如果不调用unlock,那么独占状态将不会解除,其它线程将一直处于阻塞状态
        mutex.unlock();
      }
    } , "index" + index).start();
  } 
}

情况二:某个线程抢得了操作权,其它线程尝试抢占操作权一段时间后,放弃抢占操作权的操作

public static void main(String[] args) {
  final Mutex mutex = new Mutex();
  for(int index = 0 ; index < 3 ; index++) {
    new Thread(() -> {
      boolean isMutex = false;
      try {
        // 尝试获得独占操作权(10秒的时间内)
        if(mutex.tryLock(10, TimeUnit.SECONDS)) {
          isMutex = true;
          System.out.println("本线程尝试获取独占操作权成功");
        } else {
          System.out.println("本线程尝试获取独占操作权失败");
        }
      } catch(InterruptedException e) {
        e.printStackTrace(System.out);
      } finally {
        if(isMutex) {
          mutex.unlock();
        }
      }
    } , "index" + index).start();
  }
}

3.3、AQS代码分析

本届中我们主要分析AQS中几个关键方法,包括独占工作模式下的acquire()方法和release()方法,以及共享工作模式下的acquireShared()方法和releaseShared()方法。

3.3.1、AQS中的Node和Node构成的AQS队列

要理解AQS内部如何基于CAS原理和LockSupport工具完成工作的,就需要先把AQS中关于Node的关键定义搞清楚:

上图是对上文中AQS队列中,每一个Node节点结构图的细化说明。这个队列是一个双向的带头、尾标识的队列,在AQS队列初始化时,其中的Tail节点为null。在一个Node节点涉及到的独占模式工作效果中,主要包括了几个重要的部分:

static final class Node {
	volatile int waitStatus;
	volatile Node prev;
	volatile Node next;
	volatile Thread thread;
	// 。。。。。先省略,后续还有其它补充
}	
  • next:指向当前Node节点的下一个节点,如果当前节点为尾节点,则next属性为null。

  • prev:指向当前Node节点的上一个Node节点,如果当前节点为head节点,则prev属性为null。

  • thread:AQS的双向队列说白了是对进程中若干Thread集合的管理,那么Node节点就需要一个属性来标识其管理的/代表的线程对象。

  • waitStatus:节点等待状态,这个状态信息非常重要:包括这些状态标识:

    • 1:CANCELLED,指示当前节点所代表的线程由于等待超时或者interrupt中断信号的原因,已经取消了其在AQS队列中的抢占操作,处于这种标识状态下的节点不会再被本AQS队列继续阻塞;
    • -1:SIGNAL,指示当前节点的后续节点所代表的线程,已经被(或者很快将被)阻塞(通过parking模式),也就是说当前后续节点所代表的线程需要被执行unpark后,才能解除阻塞。
    • -2:CONDITION,指示当前对象所代表的线程,需要等待一个Condition对象的激活——在Condition队列中被阻塞。
    • -3:PROPAGATE,当节点处于“共享”模式下,该值指示共享操作权的释放信号还可以扩散到AQS队列中的其它节点上去。
    • 0:节点不处于以上任何状态。

这里我们只是介绍了waitStatus状态属性的字面含义,实际上这些字面含义只要查看java.util.concurrent.locks.AbstractQueuedSynchronizer.Node的源代码就能看到,最重要的还是这些状态如何在Node的工作过程中发挥作用。

3.3.2、独占模式下的acquire()的工作过程

有了以上小节的对Node中各个属性的基础描述作为铺垫,我们就可以首先开始分析AQS独占模式下的acquire()方法、release()工作过程。当我们调用AbstractQueuedSynchronizer.acquire(int)方法时,AQS做了什么工作呢?首先上代码——acquire(int)方法中的详细代码片段:

//......
public final void acquire(int arg) {
  // 请注意这个 Node.EXCLUSIVE 值,从源代码可以看到这个对象代表的就是一个null值
  if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}
//......

acquire 方法中一共执行了三个关键方法:

  • tryAcquire(int) 这个方法在上文中都已经介绍过,就是需要开发人员在使用AQS独占模式之前需要重写的方法。当这个方法返回true,就表示当前线程获得了独占操作权;从acquire方法的代码详情可以看出,如果tryAcquire(int)方法返回true,则整个方法就结束,不再进行任何处理——包括也不需要进行AQS队列的任何变动了。

  • addWaiter(Node) 方法的作用创建一个Node节点,并试图将这个节点挂载到AQS的队尾(以CAS线程安全的方式)。根据AQS的工作模式(独占或者共享)创建的节点特点是不一样的。这里我们先介绍在独占模式下创建节点的过程。

/** * Creates and enqueues node for current thread and given mode. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared */
private Node addWaiter(Node mode) {
  // 根据源代码的,当独占模式的acquire方法调用addWaiter这个方法的时候,其传参的mode的值为null
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  // 最常见的this.tail为null的情况就是当前AQS队列中还没有节点
  Node pred = this.tail;
  if (pred != null) {
    node.prev = pred;
    // 这个CAS方法的含义是,判断当前AQS的尾部节点是pred,如果是则重新设置为当前node
    // 设定成功,则退出
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}

这个addWaiter方法中的CAS操作只进行了一次,并不是如果没有成功就进入enq方法。enq方法实际上就是addWaiter方法的“无限循环”操作,也就是说不将新增的节点添加到AQS队列中决不罢休。代码如下:

/** * Inserts node into queue, initializing if necessary. See picture above. * 反正笔者是不知道picture在哪里 * @param node the node to insert */
private Node enq(final Node node) {
  // 一直试图添加,直到添加到了AQS队列的头部或者尾部未知
  for (;;) {
    Node t = tail;
	if (t == null) { // Must initialize
	  if (compareAndSetHead(new Node()))
	    tail = head;
	} else {
	  node.prev = t;
	  if (compareAndSetTail(t, node)) {
		t.next = node;
	    return t;
	  }
	}
  }
}
  • acquireQueued(Node , int) 通过addWaiter方法我们将一个代表线程的AQS节点进行了创建,并且加入到了AQS队列中,但是我们还没有让这个节点所代表的线程进入阻塞状态,也没有让这个节点和AQS队列的整体建立交互机制(参与独占操作权的抢占)。所以这个acquireQueued方法就是让这个新加入AQS队列的节点根据调用者的设定逻辑进入不同的阻塞状态,并且让这个节点所代表的线程参与独占权抢占。代码详情如下:
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * @return true:if interrupted while waiting */
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // 取得当前的节点的前驱节点(前置节点)
      final Node p = node.predecessor();
      // 这个if段落的代码有得一读。仔细看
      // 如果当前节点的前驱节点是头结点,那么再次试图执行tryAcquire方法。
      // 如果执行成功则当前节点为头部节点,通过if段落中的代码去掉当前的头结点信息
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      
      // 如果当前节点前驱节点不是头部节点,或者执行tryAcquire再次失败,那么就执行到这个if块
      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 
        interrupted = true;
    }
  } finally {
    if (failed) 
      cancelAcquire(node);
  }
}

acquireQueued方法中如果当前节点的前驱节点是AQS队列的head节点,且再次试图执行tryAcquire方法又成功的特殊情况这里就不再进行赘述了,我们主要讲解后面的两个方法shouldParkAfterFailedAcquire和parkAndCheckInterrupt。

shouldParkAfterFailedAcquire方法是判断当前节点是否在获取独占操作权失败后进入阻塞状态;parkAndCheckInterrupt方法用于让当前节点所代表的线程进入阻塞状态,并监控其是否收到了“中断”信号。先来看一下shouldParkAfterFailedAcquire方法的代码详情:

// 参数为node:当前节点;pred:当前节点的前置节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  // 前置节点Node的waitStatus状态属性为SIGNAL(-1),就说明可以进行阻塞,于是返回true
  if (ws == Node.SIGNAL)
    // This node has already set status asking a release to signal it, so it can safely park.
    return true;
  // waitStatus状态属性大于0的,就只有一种叫做CANCELLED的状态
  // 文档中对于这种状态的描述也非常清楚,就是这些节点由于超时或者终止的原因不在需要进入阻塞状态。
  // 也不会再参与独占操作权的抢占操作,于是循环执行以下代码,将这些节点剔除
  if (ws > 0) {
    // Predecessor was cancelled. Skip over predecessors and indicate retry.
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    // waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet.
    // Caller will need to retry to make sure it cannot acquire before parking.
    // 这个代码很明确,不再说明了
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}

一旦shouldParkAfterFailedAcquire方法的返回值为false,则重新进入acquireQueued方法的“for ( ; ; ) 死循环”;如果返回值为true,则调用parkAndCheckInterrupt方法。parkAndCheckInterrupt方法的详情如下:

/** * Convenience method to park and then check if interrupted * @return {@code true} if interrupted */
private final boolean parkAndCheckInterrupt() {
  LockSupport.park(this);
  return Thread.interrupted();
}

================================================================
(接下文)


文章来源:CSDN

全部评论: 0

    我有话说: