ThreadPoolExecutor线程池源码解析与应用分析

拦截器 14天前 ⋅ 145 阅读

1 线程池介绍

 首先我们先来看一下线程池的完整结构示意图1.1:

图1.1 线程池完整流程示意图

 通过这个图我们可以看到线程池一共包含来几部分:

  • execute()执行任务方法

  • 核心池corePool

  • 最大线程数量池maximumPool

  • 阻塞队列BlockingQueue

  • 拒绝策略RejectedExecutionHandler

这里主线程是用来创建和调用线程池的,我们暂不归纳到线程池中。

同样我们可以看到线程池在执行过程中是:

1)  通过execute方法来提交任务

2)在核心线程池中创建线程

3)通过阻塞队列存取任务

4)在核心线程池外的最大数量线程池中创建线程

5)拒绝处理

那么线程池的这些属性与流程究竟是如何进行处理呢,可以在稍后往下看具体实现细节。

这里我们先来看ThreadPoolExecutor类的UML继承关系如图1.2

 

                         

图 1.2 线程池UML图

 

在这里可以看到ThreadPoolExecutor类主要继承了Executor接口   

该接口提供了一种优雅的方式去解耦任务处理机制中的任务提交和任务如何运行(也包含线程的使用,调度)。同理我们的线程池也是一种任务处理机制。

并且该类包含类5个内部类:

1)DiscardOldestPolicy(抛弃队列里面等待最久的一个线程)

2)DiscardPolicy(程序将抛弃当前执行的任务)

3)AbortPolicy(拒绝情况下会直接抛出异常运行时异常)

4)CallerRunsPolicy(使用调用者线程去执行该拒绝的任务)

5)Woker (工作者,是任务线程基本信息模型)

 

5个内部类中有4个是拒绝策略,1个是任务基本存储单元哪又何用的请继续往下看:

        

1.1 为什么要使用线程池

线程池解决了使用线程过程中常见的两种问题:

1) 如果需要使用线程来执行大量的任务,我们是否需要创建大量的线程,同时又销毁大量的线程呢,答案自然是不,因为对操作系统来说,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换的时候还要执行内存换页,CPU 的缓存被 清空,切换回来的时候还要重新从内存中读取信息,破坏了数据的局部性。这里通常在执行大量异步任务时候,使用线程池可以减少了每个任务的调用开销。节省大量的资源。

  2) 线程池提供了一种限制和管理资源的方法,包括线程,在执行任务集合时使用每个ThreadPoolExecutor 还维护一些基本的统计信息,如已完成的任务数。可以想象如果不使用线程池随意创建大量的线程,我们是很难对线程进行逐一管理和统计的。

 

1.2 如何创建线程池对象

比较方便的方式就是使用工具类Executors的工厂方法如

1)Executors类中的newCachedThreadPool方法

 

创建无限制数量的线程池,并带有自动回收功能的线程池。

通过此方法将创建无限制数量的线程,如果线程使用完毕空闲60S后将自动被回收。这种更适合任务线程执行时间较短,并且很短的间隔时间,不会积压过多线程导致程序无限创建线程而OOM,也可以复用线程,减少损耗。

 

1 ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

 

底层实现源码:

直接创建ThreadPoolExecutor对象。

设置了ThreadPoolExecutor类构造器的参数:

  • corePoolSize  、核心池大小为0

  • maximumPoolSize 最大线程为 Integer.MAX_VALUE

  • keepAliveTime 存活时间为60L

  • unit 存活时间单位为TimeUnit.SECONDS

  • workQueue 存储队列为SynchronousQueue对象

 

1public static ExecutorService newCachedThreadPool() {
2        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                      60L, TimeUnit.SECONDS,
4                                      new SynchronousQueue());
5    }

 

2) Executors类中的newFixedThreadPool方法

 

 创建固定大小数量的线程池,超过数量的任务将在队列中等待。

这里我们创建具有3个固定数量线程的线程池,创建方式下:

 

1ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

 

底层实现源码:

直接创建ThreadPoolExecutor对象。

设置了ThreadPoolExecutor类构造器的参数:

  • corePoolSize 核心池大小为方法参数nThreads

  • maximumPoolSize 最大线程为 nThreads

  • keepAliveTime 存活时间为0L

  • unit 存活时间单位为TimeUnit.MILLISECONDS

  • workQueue 存储队列为LinkedBlockingQueue对象

 

1 public static ExecutorService newFixedThreadPool(int nThreads) {
2        return new ThreadPoolExecutor(nThreads, nThreads,
3                                      0L, TimeUnit.MILLISECONDS,
4                                      new LinkedBlockingQueue());
5    }

 

3) Executors类中的 newSingleThreadExecutor

 

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

 

1ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

 

底层实现源码:

直接创建ThreadPoolExecutor对象。

设置了ThreadPoolExecutor类构造器的参数:

  • corePoolSize 核心池大小为方法参数1

  • maximumPoolSize 最大线程为 1

  • keepAliveTime 存活时间为0L

  • unit 存活时间单位为TimeUnit.MILLISECONDS

  • workQueue 存储队列为LinkedBlockingQueue对象

 

1 public static ExecutorService newSingleThreadExecutor() {
2        return new FinalizableDelegatedExecutorService
3            (new ThreadPoolExecutor(11,
4                                    0L, TimeUnit.MILLISECONDS,
5                                    new LinkedBlockingQueue()));
6    }

 

通过以上代码我们可以看到工具类Executors中创建线程池都是通过传不同的参数实现不同应用场景的线程池,那么这些参数的意义究竟是什么含义,我们可以待会继续往下看。如果我们看完之后就会明白,这几个工厂方法针对最常见的使用情况 预先配置了线程池的参数,开发人员也可以自定义线程池,来手动创建线程池,实现自己需要场景的线程池。

1.3 了解线程池的构造方法

如果我们想要针对我们的系统定制化创建线程池该如何去做的呢(这种场景我们会经常遇到)定制化线程池需要通过传不同的参数来满足不同的场景,如果想灵活创建就必须了解线程池构造器几个参数的含义,如下代码是所有构造器中参数最全的一个 接下来详细看解释。

 

 1  /**
2 * 通过给定参数创建新的线程池
3 * parameters.
4 *
5 * @param corePoolSize 核心线程池参数
6 * @param maximumPoolSize 线程池中允许的最大线程数量
7 * @param keepAliveTime 大于核心线程池的线程的存活时间
8 * @param unit keepAliveTime 参数的时间单位
9 * @param workQueue 存储和传递任务的阻塞队列
10 * @param threadFactory 创建线程的工厂
11 * @param handler 超过线程池最大线程容量的拒绝策略
12 * @throws IllegalArgumentException 如果不满足下列情况将抛出此异常
13 * {@code corePoolSize < 0}
14 * {@code keepAliveTime < 0}
15 * {@code maximumPoolSize <= 0}
16 * {@code maximumPoolSize < corePoolSize}
17 * @throws NullPointerException if {@code workQueue}
18 * or {@code threadFactory} or {@code handler} is null
19 */
20 public ThreadPoolExecutor(int corePoolSize,
21 int maximumPoolSize,
22 long keepAliveTime,
23 TimeUnit unit,
24 BlockingQueueworkQueue,
25 ThreadFactory threadFactory,
26 RejectedExecutionHandler handler) {
27
28   if (corePoolSize < 0 ||
29        maximumPoolSize <= 0 ||
30        maximumPoolSize < corePoolSize ||
31        keepAliveTime < 0){
32        throw new IllegalArgumentException();
33    if (workQueue == null || threadFactory == null || handler == null)
34        throw new NullPointerException();
35    this.acc = System.getSecurityManager() == null ?
36            null :
37            AccessController.getContext();
38    this.corePoolSize = corePoolSize;
39    this.maximumPoolSize = maximumPoolSize;
40    this.workQueue = workQueue;
41    this.keepAliveTime = unit.toNanos(keepAliveTime);
42    this.threadFactory = threadFactory;
43    this.handler = handler;
44}

1)核心池大小corePoolSize参数

和最大线程数大小maximumPoolSize 参数

   构造器参数对应的成员变量

1 /**
2   * 核心池大小
3   */

4  private volatile int corePoolSize;
5
6  /**
7   * 最大池大小(包含核心池)
8   */

9  private volatile int maximumPoolSize;

在execute(Runnable)方法中提交新的任务时候:

  • 当正在运行的核心线程少于corePoolSize参数,将创建一个新线程来处理请求,即使线程池中其他工作线程处于空闲状态。

 

  • 如果核心线程数大于corePoolSize参数并且小于maximumPoolSize参数,新进任务将被放入队列。

 

  • 仅当队列已满时才会在核心池外创建一个新线程。

 

前面我们了解了

newCachedThreadPool中参数corePoolSize为0和参数maximumPoolSize为Integer.MAX_VALUE 代表的含义就是默认无线程如果有任务均会先进入队列,当队列满的时候(实际上这个线程池队列有元素就会满具体看阻塞队列参数SynchronousQueue)创建新的线程最大线程数量为Integer.MAX_VALUE,也就是说使用这个线程池我们可以无限制的创建线程。

newFixedThreadPool中参数corePoolSize为形式参数nThreads和

参数maximumPoolSize也为nThreads代表的含义就是默认是创建参数nThreads个线程如果有新的任务则进入队列,如果队列已满则无法创建线程执行拒绝策略(实际上这里队列是无边界的具体看队列参数含义)线程总数可以一直保持在参数nThreads数量。

newSingleThreadExecutor中参数corePoolSize为1和

参数maximumPoolSize也为1 代表的含义就是默认池中有一个线程,如果有任务则进入队列,当队列满(实际上这里队列不会满具体看阻塞队列LinkedBlockingQueue)的时候执行拒绝策略,线程总数可以一直保持一个线程。

通过设置corePoolSize和maximumPoolSize参数同样,您也可以自定义创建一个固定大小的线程池通过将maximumPoolSize参数。设置为本质上无限制的值例如:Integer.MAX_VALUE 线程池中将容纳任意数量的并发任务(实际任务全部被放在了队列中)一般情况下我们会通过构造器中设置核心corePoolSize和最大池大小maximumPoolSize参数 也有一部分场景会通过setCorePoolSize方法和setMaximumPoolSize来动态改变参数大小。

2) 参数存活时间keepAliveTime和TimeUnit  

参数对应的成员变量成员变量

1 /** 
2  * 等待工作的空闲线程超时(以纳秒为单位)  
3  */

4  private volatile long keepAliveTime;  

 

keepAliveTime是一个long型的数值,TimeUnit是keepAliveTime的单位。如果线程池中线程数量超过了核心线程池corePoolSize参数,超过核心线程池参数的线程在空闲存活时间后将会被终止,可以通过调用getKeepAliveTime(TimeUnit)方法查看存活时间,使用这种方式可以减少资源消耗,当线程空闲终止,当有需要更多线程时候线程又被重新创建。

3) 阻塞队列Queuing参数(BlockingQueue子类)

  阻塞队列参数对应成员变量成员变量workQueue

 

1  /** 
2   * 用于保存任务并将其传递给工作线程
3   */

4 private final BlockingQueue workQueue;   

 

参数阻塞队列类型用于传输和存储提交的任务,如果当前正在运行的线程数量少于核心线程池参数,则如果有新的任务总是创建一个新的线程而不是将任务进入阻塞队列如果当前运行线程总数超过了核心线程池参数大小,则新的任务进来后将进入阻塞队列。如果请求无法进入队列(队列已满)一个新的线程将被创建,如果这个情况下创建的新的线程数量超过了maximumPoolSize参数,则任务将被拒绝。

这里有三个常见BlockingQueue的子类可以使用如下:

  • 直接交接队列 如:SynchronousQueue

SynchronousQueue   一般的排队策略直接交接队列。是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。

在这里将任务交给线程,无需队列另外持有它们。如果没有立即可用的线程来运行任务,则尝试将任务排队将失败,因此将创建一个新线程,这种方式通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。从上面我们可以看到newCachedThreadPool设置了队列参数为SynchronousQueue来满足保证新进任务均会创建新的线程来处理任务。

  • 无界队列 如:LinkedBlockingQueue

队列的长度无限,如果核心线程池未满则会创建新的线程,如果核心线程池已满则新进任务均会进入队列,由于队列是无边界的,所以不会有其他缓冲线程被创建,在这里参数maximumPoolSize则相当于无用队列不会满则不会创建新的任务,也不会产生拒绝任务的情况。从上面我们可以知道newFixedThreadPool和newSingleThreadExecutor方法均使用了无界队列来创建线程池,在线程数量固定的情况下,如果任务无法及时处理则均在队列中进行排队。

  • 有界队列 如:ArrayBlockingQueue

与参数maximumPoolSize一起使用时,有助于防止资源耗尽。不过有界队列相对难以控制,队列大小和最大池大小maximumPoolSize可以相互折衷:

  • 队列长度大和maximumPoolSize参数小的时候可以最大程度地减少CPU使用率,操作系统资源和上下文切换开销,但可能导致人为降低吞吐量。

  • 队列长度小的时候通常需要更大的maximumPoolSize参数这会使CPU更加繁忙,可能会遇到无法接受的调度开销,这也会降低吞吐量

4) 线程工厂ThreadFactory参数

  对应成员变量

 

1/**
2  * 新线程的工厂。所有线程都是使用此工厂
3  *(通过方法addWorker)
4  * 
5  */

6private volatile ThreadFactory threadFactory;

 

 

线程的创建一般需要设置线程组,线程名字,线程Runnable对象,线程是否为守护线程,线程优先级等信息在线程池中创建的线程均使用ThreadFactory接口如下面代码的实现类对象来创建线程

 

 

1public interface ThreadFactory {
2
3    Thread newThread(Runnable r);
4}

 

如果没有特殊说明默认将使用Executors.defaultThreadFactory()方法创建线程工厂源码如下:Executors类下:

 

1public static ThreadFactory defaultThreadFactory() {  
2   return new DefaultThreadFactory();
3

 

DefaultThreadFactory是ThreadFactory的一个实现类,重写了线程工厂的newThread方法,如果使用这个线程工厂将创建具有如下性质的线程:

  • 相同线程组(ThreadGroup)的线程

     

  • 相同的优先级(NORM_PRIORITY)

     

  • 相同的进程状态为非守护进程状态的线程。

     

通过使用不同的ThreadFactory对象我们可以修改线程的名字,线程组,线程优先级,和线程守护状态等属性。

5) 拒绝策略

  对应的成员变量

1private volatile RejectedExecutionHandler handler;
2private static final RejectedExecutionHandler   defaultHandler = new AbortPolicy(); 

一般线程池已满无法不再接受新的任务或者线程池被关闭的时候均需要调用拒绝策略。Executor已经被关闭的时候新提交的任务将会被拒绝。Executor使用了最大线程数量限制同时使用了有界队列来限制了容量,并且已经饱和(无法再创建新的任务线程)在任一情况下execute方法将调用RejectedExecutionHandler类下的rejectedExecution(Runnable, ThreadPoolExecutor)方法来进行处理RejectedExecutionHandler接口如下

 

1public interface RejectedExecutionHandler {
2
3    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
4}

 

线程池中通过继承RejectedExecutionHandler接口实现了几个拒绝策略如下几个:

 

  • ThreadPoolExecutor.AbortPolicy内部类

 

1 public static class AbortPolicy implements RejectedExecutionHandler {
2        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
3            throw new RejectedExecutionException("Task " + r.toString() +
4                                                 " rejected from " +
5                                                 e.toString());
6        }
7    }

 

这是一个默认的拒绝处理器,当配置为这个拒绝执行起的时候通过源码我们可以看到在遇到拒绝情况下会直接抛出异常运行时异常RejectedExecutionException

  • ThreadPoolExecutor.CallerRunsPolicy内部类

 

 1   public static class CallerRunsPolicy implements RejectedExecutionHandler {
2
3        public CallerRunsPolicy() { }
4
5
6        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
7            if (!e.isShutdown()) {
8                r.run();
9            }
10        }
11    }

 

假设调用execute方法的线程为A,它提供了一种简单的回调控制机制,如果当前线程池还未关闭将调用任务的run方法,这种调用方式将会在线程A中执行run方法,可以降低新任务提交的速度。

  • ThreadPoolExecutor.DiscardPolicy内部类

 

1  public static class DiscardPolicy implements RejectedExecutionHandler {
2
3        public DiscardPolicy() { }
4
5        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
6        }
7    }

 

通过源码可以看到这个内部什么也没做,程序将抛弃当前执行的任务。

  • ThreadPoolExecutor.DiscardOldestPolicy

 1 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2
3        public DiscardOldestPolicy() { }
4
5        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
6            if (!e.isShutdown()) {
7                e.getQueue().poll();
8                e.execute(r);
9            }
10        }
11    }

 

通过源码可以看到,对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。

如果要自定义拒绝策略则自行实现RejectedExecutionHandler接口即可,注意只有在特定队列容量上限和最大线程缓冲参数maximumPoolSize有上限时候才会触发拒绝策略。

1.4 挂钩方法 Hook methods

  • 在每个任务执行之前和之后的挂钩方法

 

1protected void beforeExecute(Thread, Runnable)

 

1protected afterExecute(Runnable, Throwable)

 

挂钩方法在线程池中并没有具体的实现那挂钩方法有什么用呢?

这两个方法ThreadPoolExecutor类中是由protected修饰的方法(包内访问)如果直接使用ThreadPoolExecutor类来创建线程池的话这两个方法是无法访问到的,ThreadPoolExecutor类中这两个方法未进行具体的实现只是两个挂钩方法,如果要使用则自行扩展ThreadPoolExecutor类来重写方法可以在每个任务执行之前和执行之后执行,可以进行ThreadLocals变量的初始化,收集统计信息,或者进行日志添加。

  • 终止时候调用的挂钩方法terminated

 

1protected void terminated() { }

 

  终止钩子方法,线程池执行程序终止时调用的方法。

1.5 队列相关方法

 

1public BlockingQueue getQueue()

 

 如果想要访问工作中的队列可以调用getQueue方法(强烈建议不要将此方法用于任何其他目的)来用于监视和调试。

 

1public boolean remove(Runnable task)

  

移除尚未启动,仍旧在队列中的任务

 

1public void purge()

  

尝试从工作队列中删除所有已取消的 Future类型的任务。

1.6 如何终止线程池

  • 如果线程池不再有引用的对象并且没有剩余的线程将自动关闭用户希望在不调用shutdown方法情况下自动终止,则需要设置合适的存活时间keep-alive times,使用0个核心线程或设置allowCoreThreadTimeOut(boolean)allowCoreThreadTimeOut为true该值为true,则线程池数量最后销毁到0个。allowCoreThreadTimeOut为false销毁机制:超过核心线程数时,而且(超过最大值或者timeout过),就会销毁。

  • 调用了shutdown()方法 温柔的终止线程池。

  • 调用了shutdownNow() 强硬的终止线程池。

  • 调用了interruptWorkers() 很简单,循环对所有worker调用。

  • 调用interruptIfStarted(),其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()
    需要注意的是,对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,task.run()内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束。

  • awaitTermination() 等待线程池终止。

 

1.7 ctl变量与线程运行生命周期

ctl:对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。

以下ctl变量

 

 1```
2//这里Integer.SIZE值为32
3private static final int COUNT_BITS = Integer.SIZE - 3;            //32-3=29 活跃线程支持29位来表示个数
4private static final int CAPACITY   = (1 << COUNT_BITS) - 1//1左移29位-1(1*2的29次方-1) )
5
6// runState is stored in the high-order bits
7// -1即11111111 左移29位后为11100000后面21位省略 表示运行状态(高3位)
8private static final int RUNNING    = -1 << COUNT_BITS; 
9
10//  0即00000000 左移29位后为00000000后面21位省略 表示关闭状态(高3位)
11private static final int SHUTDOWN   =  0 << COUNT_BITS; 
12
13//  1即00000001 左移29位后为00100000后面21位省略 表示停止状态(高3位)
14private static final int STOP       =  1 << COUNT_BITS;  
15
16//  2即00000010 左移29位后为01000000后面21位省略 表示关闭状态(高3位)
17private static final int TIDYING    =  2 << COUNT_BITS;  
18
19//  3即00000011 左移29位后为01100000后面21位省略 表示关闭状态(高3位)
20private static final int TERMINATED =  3 << COUNT_BITS; 
21
22
23不难发现,高3位很好的表示5种状态 000 SHUTDOWN,001 STOP,010 TIDYING,011 TERMINATED,111 RUNNING

线程池运行状态生命周期控件如下:

  

图1.2 线程池中线程生命周期
 

 

图1.3 线程池中线程生命周期(2)

1.8 线程池提交任务的两种方法

线程池提供了两种提交任务的方法execute方法和submit方法。

有哪些区别呢,这里提供三点:

1)接收的参数不一样,execute方法只支持Runnable类型,submit重载的方法支持Runnable类型和Callable类型。

2)submit有返回值,而execute没有。

3)execute在执行任务时,如果遇到异常会直接抛出,而submit不会直接抛出,只有在使用Future的get方法获取返回值时,才会抛出异常。

 

1.8.1 execute方法原理

 

 1   /**
2 * 在将来某个时候执行给定的任务。
3 * 任务可以在新线程中执行,也可以在现有的池线程中执行。
4 * 如果任务无法提交以供执行,则可能是因为执行器已关闭或已达到其容量。
5 * 如果任务无法接受执行根据判断抛出拒绝的执行异常RejectedExecutionHandler。
6 * @throws NullPointerException if {@code command} is null
7 * @param command 要执行的任务
8 *
9 */
10 public void execute(Runnable command) {
11    if (command == null)
12        throw new NullPointerException();
13    int c = ctl.get();
14    if (workerCountOf(c) < corePoolSize) {
15        if (addWorker(command, true))
16            return;
17         c = ctl.get();
18     }
19    if (isRunning(c) && workQueue.offer(command)) {
20        int recheck = ctl.get();
21        if (! isRunning(recheck) && remove(command))
22            reject(command);
23        else if (workerCountOf(recheck) == 0)
24            addWorker(nullfalse);
25    }
26    else if (!addWorker(command, false))
27        reject(command);
28}

当线程池创建完成之后通过调用execute方法提交runnable类型的任务流程如下:(注意addWorker成功之后将启动线程具体执行代码稍后在看,这里可以先理解为执行任务,无法启动的时候会根据情况添加到workQueue队列中)

  1. 如果运行的线程少于corePoolSize,将尝试以给定的命令作为第一个线程启动新线程任务。(执行addWorker方法的会调用自动检查runState线程池运行状态和workerCount线程数量,添加完成时会开启新的线程,如果添加线程失败时,则返回false)如果添加成功则直接返回了,不再往下继续执行。

     

  2. 上面添加失败了这里判断线程池状态是否仍旧是运行状态,如果是并且任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有的自上次检查后线程池已经死亡)或自进入此方法后,池已关闭。所以我们重新检查状态,如果已经停止则需要回滚排队,如果没有,则启动新线程。

     

  3. 如果无法将任务排队,则尝试添加新的线程。如果失败了,我们知道线程被关闭了或线程已饱和则执行拒绝策略来选择如何拒绝这个任务。

2.12 线程池中任务存储数据结构

 

1    /**
2     * 包含池中所有工作线程的集。仅在保持主锁时访问。
3     */

4    private final HashSet workers = new HashSet();

 

1 /**
2 * 类Worker主要为线程运行任务来维持中断控制状态,以及其他需要记录的信息。
3 * 这个类适时的继承了AbstractQueuedSynchronizer类AQS(提供一个框架来实现阻塞锁和 * 相关的依赖于先进先出(FIFO)等待队列)
4 * 为了简化获取和释放每个任务执行。这样可以防止旨在唤醒等待任务的工作线程而是中断正在 * 运行的任务。我们实现了一个简单的不可重入互斥锁而不是使用可重入锁ReentrantLock
5 * 因为我们不希望工作任务能够重新获取锁在调用线程池中的控制方法时,如setCorePoolSize方法。另外抑制中断直到线程实际上开始运行任务,我们初始化锁声明为负值,并在开始时清除它(
6 */
7 private final class Worker
8 extends AbstractQueuedSynchronizer
9 implements Runnable
10 {
11 /**
12 * This class will never be serialized, but we provide a
13 * serialVersionUID to suppress a javac warning.
14 */
15 private static final long serialVersionUID = 6138294804551838833L;
16   
/** Thread this worker is running in.  Null if factory fails. */
17    final Thread thread;
18    /** Initial task to run.  Possibly null. */
19    Runnable firstTask;
20    /** Per-thread task counter */
21    volatile long completedTasks;
22
23    /**
24     * Creates with given first task and thread from ThreadFactory.
25     * @param firstTask the first task (null if none)
26     */

27    Worker(Runnable firstTask) {
28        setState(-1); // inhibit interrupts until runWorker
29        this.firstTask = firstTask;
30        this.thread = getThreadFactory().newThread(this);
31    }
32
33    /** Delegates main run loop to outer runWorker  */
34    public void run() {
35        runWorker(this);
36    }
37
38    // Lock methods
39    //
40    // The value 0 represents the unlocked state.
41    // The value 1 represents the locked state.
42
43    protected boolean isHeldExclusively() {
44        return getState() != 0;
45    }
46
47    protected boolean tryAcquire(int unused) {
48        if (compareAndSetState(01)) {
49            setExclusiveOwnerThread(Thread.currentThread());
50            return true;
51        }
52        return false;
53    }
54
55    protected boolean tryRelease(int unused) {
56        setExclusiveOwnerThread(null);
57        setState(0);
58        return true;
59    }
60
61    public void lock()        { acquire(1); }
62    public boolean tryLock()  return tryAcquire(1); }
63    public void unlock()      { release(1); }
64    public boolean isLocked() return isHeldExclusively(); }
65
66    void interruptIfStarted() {
67        Thread t;
68        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
69            try {
70                t.interrupt();
71            } catch (SecurityException ignore) {
72            }
73        }
74    }
75}

 

 

AiLinkLife

                                                                欢迎关注微信公众号“拦截器”更多优质文章等你

 



 

 

全部评论: 0

    我有话说: