Java并发基石——所谓“阻塞”:Object Monitor和AQS(2)

一寸光阴,半亩情 1年前 ⋅ 358 阅读

(接上文《Java并发基石——所谓“阻塞”:Object Monitor和AQS(1)》)

3、AQS

上文我们较为详细的介绍了Java线程调度中的Object Monitor机制以及其工作情况,本文我们开始介绍Java线程调度中的另一种实现机制AQS,包括它的使用方式和底层工作原理。

3.1、AQS介绍

JAVA中的AQS队列从根本上来讲是基于CAS的典型实现(也是使用volatile关键字的典型案例)。从技术层面的依赖关系上讲它首先依赖于java中的java.util.concurrent.locks.LockSupport类,这个类在本专题之前的文章中已经介绍过,专门用来实现应用程序级别对硬件级别的“同步多线程(SMT)”技术的封装。

同步多线程(SMT) 技术是一种硬件层面的技术,具体来说是直接由CPU提供的技术实现。而Java语言中,使用了sun.misc.Unsafe类完成了对这个技术的调用封装。一旦使用Unsafe中的park调用,则当我们使用jstack查看可能的线程阻塞状态时,就会看到类似WATING(parking)这个的提示信息。

接着AQS队列的实现还直接依赖于java中的sun.misc.Unsafe类,这个类在本专题之前的文章中也同样介绍过——它是Java语言中实现乐观锁CAS技术的基础,其底层完全依赖于java本地方法JNI完成操作系统级别的调用。另外。如果读者自己观察源代码就会发现,实际上LockSupport类依赖的底层类也是sun.misc.Unsafe,所以Java中ASQ队列技术的整个依赖关系可以如下图所示:

3.2、AQS的主要概念和使用方式

3.2.1、AbstractQueuedSynchronizer中的主要概念

要使用好AbstractQueuedSynchronizer,就需要了解AbstractQueuedSynchronizer中几个关键概念。

  • AbstractQueuedSynchronizer中的state状态

上文已经提到AbstractQueuedSynchronizer的抽象类中保持了一个任务队列,并且在AbstractQueuedSynchronizer中使用了一个“private volatile int state;”变量,协调这个队列中各个节点的调度(下文会详细讲述如何进行协调)。AbstractQueuedSynchronizer中取得或者设置state状态的方法包括getState()、setState()和compareAndSetState()。注意:AQS队列的state状态初始化值为0

// ...... AbstractQueuedSynchronizer前端代码省略
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */
private transient volatile Node head;
/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */
private transient volatile Node tail;
/** * The synchronization state. */
private volatile int state;
// ...... AbstractQueuedSynchronizer后端代码省略
  • AbstractQueuedSynchronizer有两种资源共享方式:Exclusive(独占式) 和 Share(共享式)。所谓独占式是指依据AQS中的state控制状态,只有一个线程能够进行工作(其它参与调度的线程会进入阻塞状态);共享式是指,依据AQS中的state控制状态,可以有多个满足条件的线程同时工作。

3.2.2、AQS中的重要方法

AQS在java中的实现表现为java.util.concurrent.locks.AbstractQueuedSynchronizer类(这个类的简称就是本文提到的AQS,后文我们将直接称为AQS),这是一个抽象类它包含了AQS实现的所有基本过程,不过要真正基于AQS完成完整功能,还需要实现AQS类中的四个方法:tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int)和isHeldExclusively();目前java中原生的AQS实现有很多,例如CountDownLatch、ReentrantLock、ReentrantReadWriteLock、Semaphore等,这些类的内部类都根据自身的锁模式完成了对AQS类的方法实现。

  • protected boolean tryAcquire(int arg) :在独占模式下,该线程尝试获取资源,获取成功则返回true,失败则返回false。

  • protected boolean tryRelease(int arg) :在独占模式下,该线程尝试释放资源,释放成功则返回true,失败则返回false。

  • protected int tryAcquireShared(int arg) :在共享模式下,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  • protected boolean tryReleaseShared(int):在共享模式下,尝试释放资源。成功则返回true,失败则返回false。

  • protected boolean isHeldExclusively():该线程是否正在独占资源,如果返回true,则表示当前线程正在独占资源。

除了以上可以由开发人员覆盖的方法外,还有以下这些关键方法,进行AQS开发的技术人员必须清楚,才能顺利完成AQS的具体实现:

  • getState()、setState()和compareAndSetState() 三个方法:getState方法获取当前AQS队列的状态,既是上文3.2.1小节图中表示的state状态;setState方法用于为AQS队列设置新的状态,由于AQS中state变量采用了volatile关键字,所以一旦state变量发生了变化,这个变化将在第一时间被所有线程知晓;compareAndSetState方法采用CAS思想,在比对成功后为state状态进行赋值:例如compareAndSetState(0, 1)的语句,意义为如果当前AQS队列的state状态值为0,则设置state状态为1并且返回true。这里请特别注意compareAndSetState方法,在独占模式下进行锁请求时经常使用——因为是独占模式所以肯定不允许多个线程同时更改State的状态。这个问题将在后文的代码分析过程中进行详细介绍。

  • sync.acquire(int)、sync.acquireInterruptibly(int) 方法:sync.acquire(int)方法表示在独占模式下尝试获取锁,并忽略线程的中断信号(关于中断信号,在本专题之前的文章已经介绍过,请进行查看)。如果没有获取到则当前线程将在AQS中进行排队(这个问题将在后续文章中进行介绍),并且当前试图获取锁的线程将会继续阻塞(WAITING PARKING)或者终止阻塞。sync.acquireInterruptibly(int)方法和sync.acquire(int)方法类似,都表示在独占模式下尝试获取锁。只是该方法还会检查当前操作线程的中断信号。

  • sync.acquireShared(int)、sync.acquireSharedInterruptibly(int) 方法:sync.acquireShared(int)方法表示在共享模式下尝试获取锁,并忽略线程的中断信号。如果没有获取到则当前线程将在AQS队列中排队,并且当前试图获取锁的线程将继续阻塞(WAITING PARKING)或者终止阻塞。sync.acquireSharedInterruptibly(int)方法和sync.acquireShared(int)方法类似,都表示在共享模式下尝试获取锁。只是该方法还会检查当前操作线程的中断信号。以上四个方法中,传入的参数都将分别传入到开发者负责重写的tryAcquire、tryAcquireShared方法中。

  • sync.release(int)、sync.releaseShared(int) 方法:sync.release(int)方法/sync.releaseShared(int)方法分别表示在独占模式下/在共享模式下释放当前线程的锁,当前线程将推出AQS队列。

当然为了理解AQS的工作原理,并编写良好的代码,读者还需要了解AQS中以下这些方法的功能作用:

  • setExclusiveOwnerThread(currentThread) 方法:这个方法向AQS队列说明当前获得独占操作权线程是哪一个线程,如果当前没有任何线程获得了独占操作权,则调用该方法并传入null。

3.2.3、如何使用AQS

当具体实现一个AQS的功能时,实际上并不需要将上一小节提到的所有方法全部实现(或者全部使用到),根据开发者所需要使用的独占方式或者共享方式,可以重写不同的方法。实际上AQS的抽象类在文档说明中已经为开发者者列举了多个使用AQS进行具体功能实现的示例,要学习AQS的实现只需要完全理解这几个示例即可。有了上文我们对AQS中若干重要方法的说明后,下面本文就可以为读者详细讲解AQS官方文档中几个比较典型的示例:

  • 使用AQS实现的BooleanLatch

BooleanLatch是官方文档中最简单的一个AQS实现效果——一个共享模式的实现效果,其在多线程协调场景中的工作特点是:多个线程调用BooleanLatch.await方法时,程序将检查AQS队列的状态:如果状态为0则表示当前线程获取锁失败(返回-1),当前线程将会被阻塞(WAITING PARKING);如果状态不为0,则表示当前线程获取锁成功(返回1,返回1表示程序会在AQS队列的节点中继续进行“获取锁是否成功”的判断)。

如果有一个线程在某个时候调用了BooleanLatch.signal方法,则通过BooleanLatch中实现的程序代码,AQS将会将状态置为1——这时其它处于阻塞状态的便会解除阻塞。具体的代码注解参看如下

class BooleanLatch {
  private static class Sync extends AbstractQueuedSynchronizer {
    /** * */
    private static final long serialVersionUID = -179644010405536918L;
    boolean isSignalled() {
      return getState() != 0; 
    }
    /* * 重写了tryAcquireShared方法:该方法在共享模式下,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 * 这个接口的入参可以是开发人员喜欢的任意参数 * * 这个方法返回值,有三种表达: * 1、如果返回值为负数,说明这个线程获取共享锁失败 * 2、如果返回值为0,说明当前线程获取共享锁(操作权)成功,但是后续AQS等待队列中的其它线程不能再获取共享锁(操作权) * 3、如果返回值大于0,说明当前线程获取共享锁(操作权)成功,且AQS等待队列中的后续线程还可以继续获取共享锁(操作权) * */
    @Override
    protected int tryAcquireShared(int ignore) {
      // 按照该方法的解释,如果当前AQS的state不为0,则返回1(表示获取操作权成功)
      // 否则返回-1,表示当前线程没有没有获取到操作权。
      return isSignalled() ? 1 : -1;
    }
    /* * 实现了tryReleaseShared方法:该方法在共享模式下,尝试释放资源。成功则返回true,失败则返回false。 * 这个接口的入参和tryAcquireShared类似,可以是开发人员喜欢的任意参数,实际上就是由acquireShared传来的参数,也就是由sync.releaseShared(1);传入的参数 * * 这个方法返回值是一个boolean形态: * 1、如果返回值为true,表示当前线程释放资源(操作权)成功 * 2、如果返回值为false,表示当前线程释放资源(操作权)失败 * */
    @Override
    protected boolean tryReleaseShared(int ignore) {
      // 设置当前AQS队列的状态为1;然后直接返回true—表示在共享模式下,解除锁状态成功。
      setState(1);
      return true;
    }
  }
  
  private final Sync sync = new Sync();
  /** * 提供给BooleanLatch的使用者,通过判定AQS队列的state状态,确认是否已经发出“信号” * @return */
  public boolean isSignalled() {
    return sync.isSignalled();
  }
  /** * 提供给BooleanLatch的使用者,发出信号,终止其它线程的阻塞状态 */
  public void signal() {
    sync.releaseShared(1);
  }
  /** * 提供给BooleanLatch的使用者,用来在AQS状态不为1的情况下,让当前线程进入阻塞状态 * @throws InterruptedException */
  public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }
}

========================================
(接下文)


文章来源:CSDN

全部评论: 0

    我有话说: