服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|编程技术|正则表达式|C/C++|

服务器之家 - 编程语言 - JAVA教程 - 并发编程之ThreadPoolExecutor线程池原理解析

并发编程之ThreadPoolExecutor线程池原理解析

2020-12-08 23:51今日头条一角钱技术 JAVA教程

在介绍线程池之前,我们先回顾下线程的基本知识。其中线程池包括ThreadPoolExecutor 默认线程和ScheduledThreadPoolExecutor 定时线程池 ,本篇重点介绍ThreadPoolExecutor线程池。

 前言

在介绍线程池之前,我们先回顾下线程的基本知识。其中线程池包括ThreadPoolExecutor 默认线程和ScheduledThreadPoolExecutor 定时线程池 ,本篇重点介绍ThreadPoolExecutor线程池。

线程

线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的是KLT模型,Java线程与OS线程保持 1:1 的映射关系,也就是说有一个Java线程也会在操作系统里有一个对应的线程。

内核线程模型

并发编程之ThreadPoolExecutor线程池原理解析

内核线程(KLT):系统内核管理线程(KLT),内核保存线程的状态和上下文信息,线程阻塞不会引起进程阻塞。在多处理器系统上,多线程在多处理器上并行运行。线程的创建、调度和管理由内核完成,效率比ULT要慢,比进程操作快。

用户线程模型

并发编程之ThreadPoolExecutor线程池原理解析

用户线程(ULT):用户程序实现,不依赖操作系统核心,应用提供创建、同步、调度和管理线程的函数来控制用户线程。不需要用户态/内核态切换,速度快。内核对ULT无感知,线程阻塞则进程(包括它的所有线程)阻塞。

Java线程生命状态

Java线程有多种生命状态:

  • NEW ,新建
  • RUNNABLE ,运行
  • BLOCKED ,阻塞
  • WAITING ,等待
  • TIMED_WAITING ,超时等待
  • TERMINATED,终结

状态切换如下图所示:

并发编程之ThreadPoolExecutor线程池原理解析

Java线程实现方式

Java线程实现方式主要有四种:

  • 继承Thread类
  • 实现Runnable接口、
  • 实现Callable接口通过FutureTask包装器来创建Thread线程、
  • 使用ExecutorService、Callable、Future实现有返回结果的多线程。

其中前两种方式线程执行完后都没有返回值,后两种是带返回值的。

继承Thread类创建线程

Thread类本质上是实现了Runnable接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过Thread类的start()实例方法。start()方法是一个native方法,它将启动一个新线程,并执行run()方法。这种方式实现多线程很简单,通过自己的类直接extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。例如:

  1. public class MyThread extends Thread {   
  2.   public void run() {   
  3.    System.out.println("关注一角钱技术,获取Java架构资料");   
  4.   }   
  5. }   
  6.   
  7. MyThread myThread1 = new MyThread();   
  8. MyThread myThread2 = new MyThread();   
  9. myThread1.start();   
  10. myThread2.start(); 

实现Runnable接口创建线程

如果自己的类已经extends另一个类,就无法直接extends Thread,此时,可以实现一个Runnable接口,如下:

  1. // 实现Runnable接口的类将被Thread执行,表示一个基本的任务 
  2. public interface Runnable { 
  3.     // run方法就是它所有的内容,就是实际执行的任务 
  4.     public abstract void run(); 

  1. public class MyThread implements Runnable {   
  2.   public void run() {   
  3.    System.out.println("关注一角钱技术,获取Java架构资料");   
  4.   }   

为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例:

  1. MyThread myThread = new MyThread();   
  2. Thread thread = new Thread(myThread);   
  3. thread.start();   

事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:

  1. public void run() {   
  2.   if (target != null) {   
  3.    target.run();   
  4.   }   

实现Callable接口通过FutureTask包装器来创建Thread线程

Callable接口(也只有一个方法)定义如下:

  1. public interface Callable<V> {  
  2.  V call() throws Exception;    
  3. }  

  1. //Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容 
  2. public class SomeCallable<V> implements Callable<V> { 
  3.  // 相对于run方法的带有返回值的call方法 
  4.     @Override 
  5.     public V call() throws Exception { 
  6.         // TODO Auto-generated method stub 
  7.         return null
  8.     } 
  9.  

  1. Callable<V> oneCallable = new SomeCallable<V>();    
  2. //由Callable<Integer>创建一个FutureTask<Integer>对象:    
  3. FutureTask<V> oneTask = new FutureTask<V>(oneCallable); 
  4. //注释:FutureTask<Integer>是一个包装器,它通过接受Callable<Integer>来创建,它同时实现了Future和Runnable接口。 
  5. //由FutureTask<Integer>创建一个Thread对象:    
  6. Thread oneThread = new Thread(oneTask);    
  7. oneThread.start();    
  8. //至此,一个线程就创建完成了。 

使用ExecutorService、Callable、Future实现有返回结果的线程

ExecutorService、Callable、Future三个接口实际上都是属于Executor框架。返回结果的线程是在JDK1.5中引入的新特征,有了这种特征就不需要再为了得到返回值而大费周折了。而且自己实现了也可能漏洞百出。(下部分来讲线程池了)

  • 可返回值的任务必须实现Callable接口。
  • 类似的,无返回值的任务必须实现Runnable接口。

执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了。

  • 注意:get方法是阻塞的,即:线程无返回结果,get方法会一直等待。

再结合线程池接口ExecutorService就可以实现传说中有返回结果的多线程了。

下面提供了一个完整的有返回结果的多线程测试例子。代码如下:

  1. package com.niuh.thread.v4; 
  2.  
  3. import java.util.ArrayList; 
  4. import java.util.Date
  5. import java.util.List; 
  6. import java.util.concurrent.Callable; 
  7. import java.util.concurrent.ExecutionException; 
  8. import java.util.concurrent.ExecutorService; 
  9. import java.util.concurrent.Executors; 
  10. import java.util.concurrent.Future; 
  11.  
  12. /** 
  13.  * <p> 
  14.  * 使用ExecutorService、Callable、Future实现有返回结果的线程 
  15.  * </p> 
  16.  */ 
  17. public class MyThread { 
  18.      
  19.     public static void main(String[] args) throws ExecutionException, 
  20.             InterruptedException { 
  21.  
  22.         System.out.println(("----程序开始运行----")); 
  23.         Date date1 = new Date(); 
  24.  
  25.         int taskSize = 5; 
  26.         // 创建一个线程池 
  27.         ExecutorService pool = Executors.newFixedThreadPool(taskSize); 
  28.         // 创建多个有返回值的任务 
  29.         List<Future> list = new ArrayList<Future>(); 
  30.         for (int i = 0; i < taskSize; i++) { 
  31.             Callable c = new MyCallable(i + " "); 
  32.             // 执行任务并获取Future对象 
  33.             Future f = pool.submit(c); 
  34.             // System.out.println(">>>" + f.get().toString()); 
  35.             list.add(f); 
  36.         } 
  37.         // 关闭线程池 
  38.         pool.shutdown(); 
  39.  
  40.         // 获取所有并发任务的运行结果 
  41.         for (Future f : list) { 
  42.             // 从Future对象上获取任务的返回值,并输出到控制台 
  43.             System.out.println(">>>" + f.get().toString()); 
  44.         } 
  45.  
  46.         Date date2 = new Date(); 
  47.         System.out.println("----程序结束运行----,程序运行时间【" 
  48.                 + (date2.getTime() - date1.getTime()) + "毫秒】"); 
  49.     } 
  50.  
  51. class MyCallable implements Callable<Object> { 
  52.     private String taskNum; 
  53.  
  54.     MyCallable(String taskNum) { 
  55.         this.taskNum = taskNum; 
  56.     } 
  57.  
  58.     public Object call() throws Exception { 
  59.         System.out.println(">>>" + taskNum + "任务启动"); 
  60.         Date dateTmp1 = new Date(); 
  61.         Thread.sleep(1000); 
  62.         Date dateTmp2 = new Date(); 
  63.         long time = dateTmp2.getTime() - dateTmp1.getTime(); 
  64.         System.out.println(">>>" + taskNum + "任务终止"); 
  65.         return taskNum + "任务返回运行结果,当前任务时间【" + time + "毫秒】"
  66.     } 

协程

协程(纤程,用户级线程),目的是为了追求最大力度的发挥硬件性能和提升软件的速度,协程基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任务;等完成或达到某个条件时,再还原原来的栈信息并继续执行(整个过程不需要上下文切换)。

协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。

协程的目的:当我们在使用多线程的时候,如果存在长时间的I/O操作。这个时候线程一直处于阻塞状态,如果线程很多的时候,会存在很多线程处于空闲状态,造成了资源应用不彻底。相对的协程不一样了,在单线程中多个任务来回执行如果出现长时间的I/O操作,让其让出目前的协程调度,执行下一个任务。当然可能所有任务,全部卡在同一个点上,但是这只是针对于单线程而言,当所有数据正常返回时,会同时处理当前的I/O操作。

Java原生不支持协程,在纯java代码里需要使用协程的话需要引入第三方包,如:quasar

  1. <dependency> 
  2.  <groupId>co.paralleluniverse</groupId> 
  3.  <artifactId>quasar-core</artifactId> 
  4.  <version>0.8.0</version> 
  5.  <classifier>jdk8</classifier> 
  6. </dependency> 

线程池

“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此 Java 中提供线程池对线程进行同一分配、调优和监控。

线程池介绍

在web开发中,服务器需要接受并处理请求,所以会为一个请求分配一个线程来进行处理。如果每次请求都创建一个线程的话实现起来非常简单,但是存在一个问题:如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。

那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

这就是线程池的目的。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到多个任务上。

什么时候使用线程池?

  • 单个任务处理时间比较短;
  • 需要处理的任务数量很大。

线程池优势

  • 重用存在的线程。减少线程黄金、消亡的开销,提高性能;
  • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行;
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行同一的分配、调优和监控。

Executor框架

Executor接口是线程池框架中最基础的部分,定义来一个用于执行 Runnable 的 execute 方法。下面为它的继承与实现

并发编程之ThreadPoolExecutor线程池原理解析

ExecutorService接口

从图中可以看出 Executor 下有一个重要的子接口 ExecutorService ,其中定义来线程池的具体行为

并发编程之ThreadPoolExecutor线程池原理解析
  • execute(Runnable command):履行Ruannable类型的任务;
  • submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象;
  • shutdown():在完成已提交的任务后封闭办事,不再接管新任务;
  • shutdownNow():停止所有正在履行的任务并封闭办事;
  • isTerminated():测试是否所有任务都履行完毕了;
  • isShutdown():测试是否该ExecutorService已被关闭;
  • awaitTermination(long,TimeUnit):接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用;
  • invokeAll :作用是等待所有的任务执行完成后统一返回;
  • invokeAny :将第一个得到的结果作为返回值,然后立刻终止所有的线程。如果设置了超时时间,未超时完成则正常返回结果,如果超时未完成则报超时异常。

AbstractExcutorService抽象类

此类的定义并没有特殊的意义仅仅是实现了ExecutorService接口

并发编程之ThreadPoolExecutor线程池原理解析
  1. public abstract class AbstractExecutorService implements ExecutorService { 
  2.     //此方法很简单就是对runnable保证,将其包装为一个FutureTask 
  3.     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
  4.         return new FutureTask<T>(runnable, value); 
  5.     } 
  6.     //包装callable为FutureTask 
  7.     //FutureTask其实就是对Callable的一个封装 
  8.     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
  9.         return new FutureTask<T>(callable); 
  10.     } 
  11.     //提交一个Runnable类型的任务 
  12.     public Future<?> submit(Runnable task) { 
  13.         //如果为null则抛出NPE 
  14.         if (task == null) throw new NullPointerException(); 
  15.         //包装任务为一个Future 
  16.         RunnableFuture<Void> ftask = newTaskFor(task, null); 
  17.         //将任务丢给执行器,而此处会抛出拒绝异常,在讲述ThreadPoolExecutor的时候有讲述,不记得的读者可以去再看看 
  18.         execute(ftask); 
  19.         return ftask; 
  20.     } 
  21.  
  22.     //与上方方法相同只不过指定了返回结果 
  23.     public <T> Future<T> submit(Runnable task, T result) { 
  24.         if (task == null) throw new NullPointerException(); 
  25.         RunnableFuture<T> ftask = newTaskFor(task, result); 
  26.         execute(ftask); 
  27.         return ftask; 
  28.     } 
  29.     //与上方方法相同只是换成了callable 
  30.     public <T> Future<T> submit(Callable<T> task) { 
  31.         if (task == null) throw new NullPointerException(); 
  32.         RunnableFuture<T> ftask = newTaskFor(task); 
  33.         execute(ftask); 
  34.         return ftask; 
  35.     } 
  36.  
  37.     //执行集合tasks结果是最后一个执行结束的任务结果 
  38.     //可以设置超时 timed为true并且nanos是未来的一个时间 
  39.     //任何一个任务完成都将会返回结果 
  40.     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 
  41.                               boolean timed, long nanos) 
  42.         throws InterruptedException, ExecutionException, TimeoutException { 
  43.         //传入的任务集合不能为null 
  44.         if (tasks == null
  45.             throw new NullPointerException(); 
  46.         //传入的任务数不能是0 
  47.         int ntasks = tasks.size(); 
  48.         if (ntasks == 0) 
  49.             throw new IllegalArgumentException(); 
  50.         //满足上面的校验后将任务分装到一个ArrayList中 
  51.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); 
  52.         //并且创建一个执行器传入this 
  53.         //这里简单讲述他的执行原理,传入this会使用传入的this(类型为Executor)作为执行器用于执行任务,当submit提交任务的时候回将任务 
  54.         //封装为一个内部的Future并且重写他的done而此方法就是在future完成的时候调用的,而他的写法则是将当前完成的future添加到esc 
  55.         //维护的结果队列中 
  56.         ExecutorCompletionService<T> ecs = 
  57.             new ExecutorCompletionService<T>(this); 
  58.  
  59.         try { 
  60.             //创建一个执行异常,以便后面抛出 
  61.             ExecutionException ee = null
  62.             //如果开启了超时则计算死线时间如果时间是0则代表没有开启执行超时 
  63.             final long deadline = timed ? System.nanoTime() + nanos : 0L; 
  64.             //获取任务的迭代器 
  65.             Iterator<? extends Callable<T>> it = tasks.iterator(); 
  66.             //先获取迭代器中的第一个任务提交给前面创建的ecs执行器 
  67.             futures.add(ecs.submit(it.next())); 
  68.             //前面记录的任务数减一 
  69.             --ntasks; 
  70.             //当前激活数为1 
  71.             int active = 1; 
  72.             //进入死循环 
  73.             for (;;) { 
  74.                 //获取刚才提价的任务是否完成如果完成则f不是null否则为null 
  75.                 Future<T> f = ecs.poll(); 
  76.                 //如果为null则代表任务还在继续 
  77.                 if (f == null) { 
  78.                     //如果当前任务大于0 说明除了刚才的任务还有别的任务存在 
  79.                     if (ntasks > 0) { 
  80.                         //则任务数减一 
  81.                         --ntasks; 
  82.                         //并且再次提交新的任务 
  83.                         futures.add(ecs.submit(it.next())); 
  84.                         //当前的存活的执行任务加一 
  85.                         ++active; 
  86.                     } 
  87.                     //如果当前存活任务数是0则代表没有任务在执行了从而跳出循环 
  88.                     else if (active == 0) 
  89.                         break; 
  90.                     //如果当前任务执行设置了超时时间 
  91.                     else if (timed) { 
  92.                         //则设置指定的超时时间获取 
  93.                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 
  94.                         //等待执行超时还没有获取到则抛出超时异常 
  95.                         if (f == null
  96.                             throw new TimeoutException(); 
  97.                         //否则使用当前时间计算剩下的超时时间用于下一个循环使用 
  98.                         nanos = deadline - System.nanoTime(); 
  99.                     } 
  100.                     //如果没有设置超时则直接获取任务 
  101.                     else 
  102.                         f = ecs.take(); 
  103.                 } 
  104.                 //如果获取到了任务结果f!=null 
  105.                 if (f != null) { 
  106.                     //激活数减一 
  107.                     --active; 
  108.                     try { 
  109.                         //返回获取到的结果 
  110.                         return f.get(); 
  111.                         //如果获取结果出错则包装异常 
  112.                     } catch (ExecutionException eex) { 
  113.                         ee = eex; 
  114.                     } catch (RuntimeException rex) { 
  115.                         ee = new ExecutionException(rex); 
  116.                     } 
  117.                 } 
  118.             } 
  119.             //如果异常不是null则抛出如果是则创建一个 
  120.             if (ee == null
  121.                 ee = new ExecutionException(); 
  122.             throw ee; 
  123.  
  124.         } finally { 
  125.             //其他任务则设置取消 
  126.             for (int i = 0, size = futures.size(); i < size; i++) 
  127.                 futures.get(i).cancel(true); 
  128.         } 
  129.     } 
  130.     //对上方方法的封装 
  131.     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 
  132.         throws InterruptedException, ExecutionException { 
  133.         try { 
  134.             return doInvokeAny(tasks, false, 0); 
  135.         } catch (TimeoutException cannotHappen) { 
  136.             assert false
  137.             return null
  138.         } 
  139.     } 
  140.     //对上方法的封装 
  141.     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 
  142.                            long timeout, TimeUnit unit) 
  143.         throws InterruptedException, ExecutionException, TimeoutException { 
  144.         return doInvokeAny(tasks, true, unit.toNanos(timeout)); 
  145.     } 
  146.     //相对于上一个方法执行成功任何一个则返回结果而此方法是全部执行完然后统一返回结果 
  147.     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
  148.         throws InterruptedException { 
  149.         //传入的任务集合不能是null 
  150.         if (tasks == null
  151.             throw new NullPointerException(); 
  152.         //创建一个集合用来保存获取到的执行future 
  153.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
  154.         //任务是否执行完成 
  155.         boolean done = false
  156.         try { 
  157.             //遍历传入的任务并且调用执行方法将创建的future添加到集合中 
  158.             for (Callable<T> t : tasks) { 
  159.                 RunnableFuture<T> f = newTaskFor(t); 
  160.                 futures.add(f); 
  161.                 execute(f); 
  162.             } 
  163.             //遍历获取到的future 
  164.             for (int i = 0, size = futures.size(); i < size; i++) { 
  165.                 Future<T> f = futures.get(i); 
  166.                 //如果当前任务没有成功则进行f.get方法等待此方法执行成功,如果方法执行异常或者被取消将忽略异常 
  167.                 if (!f.isDone()) { 
  168.                     try { 
  169.                         f.get(); 
  170.                     } catch (CancellationException ignore) { 
  171.                     } catch (ExecutionException ignore) { 
  172.                     } 
  173.                 } 
  174.             } 
  175.             //到这一步则代表所有的任务都已经有了确切的结果 
  176.             done = true
  177.             //返回任务结果集合 
  178.             return futures; 
  179.         } finally { 
  180.             //如果不是truefalse 则代表执行过程中被中断了则需要对任务进行取消操作,如果正常完成则不会被取消 
  181.             if (!done) 
  182.                 for (int i = 0, size = futures.size(); i < size; i++) 
  183.                     futures.get(i).cancel(true); 
  184.         } 
  185.     } 
  186.     //与上方方法的区别在于对于任务集合可以设置超时时间 
  187.     //这里会针对差异进行讲解 
  188.     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 
  189.                                          long timeout, TimeUnit unit) 
  190.         throws InterruptedException { 
  191.         if (tasks == null
  192.             throw new NullPointerException(); 
  193.         //计算设置时长的纳秒时间 
  194.         long nanos = unit.toNanos(timeout); 
  195.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
  196.         boolean done = false
  197.         try { 
  198.             for (Callable<T> t : tasks) 
  199.                 futures.add(newTaskFor(t)); 
  200.             //计算最终计算的确切时间点,运行时长不能超过此时间也就是时间死线 
  201.             //这里是个细节future创建的时间并没有算作执行时间 
  202.             final long deadline = System.nanoTime() + nanos; 
  203.             //获取当前结果数 
  204.             final int size = futures.size(); 
  205.             //遍历将任务进行执行 
  206.             for (int i = 0; i < size; i++) { 
  207.                 execute((Runnable)futures.get(i)); 
  208.                 //并且每次都计算死线 
  209.                 nanos = deadline - System.nanoTime(); 
  210.                 //如果时间已经超过则返回结果 
  211.                 if (nanos <= 0L) 
  212.                     return futures; 
  213.             } 
  214.             //否则遍历future确定每次执行都获取到了结果 
  215.             for (int i = 0; i < size; i++) { 
  216.                 Future<T> f = futures.get(i); 
  217.                 if (!f.isDone()) { 
  218.                     //如果在等待过程中已经超时则返回当前等待结合 
  219.                     if (nanos <= 0L) 
  220.                         return futures; 
  221.                     try { 
  222.                         //如果没有超过死线则设置从future中获取结果的时间如果超过则会派出timeout 
  223.                         f.get(nanos, TimeUnit.NANOSECONDS); 
  224.                     } catch (CancellationException ignore) { 
  225.                     } catch (ExecutionException ignore) { 
  226.                     } catch (TimeoutException toe) { 
  227.                         //抛出了异常则会返回当前的列表 
  228.                         return futures; 
  229.                     } 
  230.                     //计算最新的超时时间 
  231.                     nanos = deadline - System.nanoTime(); 
  232.                 } 
  233.             } 
  234.             //之前的返回都没有设置为true所以在finally中都会设置为取消唯独正常执行完成到此处返回的结果才是最终的结果 
  235.             done = true
  236.             return futures; 
  237.         } finally { 
  238.             if (!done) 
  239.                 for (int i = 0, size = futures.size(); i < size; i++) 
  240.                     futures.get(i).cancel(true); 
  241.         } 
  242.     } 
  243.  

线程池的具体实现

并发编程之ThreadPoolExecutor线程池原理解析
  • ThreadPoolExecutor 默认线程池
  • ScheduledThreadPoolExecutor 定时线程池 (下篇再做介绍)

ThreadPoolExecutor

线程池重点属性

  1. //用来标记线程池状态(高3位),线程个数(低29位) 
  2. //默认是RUNNING状态,线程个数为0 
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
  4.  
  5. //线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数, 
  6. private static final int COUNT_BITS = Integer.SIZE - 3; 
  7.  
  8. //线程最大个数(低29位)000 11111111111111111111111111111 
  9. private static final int CAPACITY   = (1 << COUNT_BITS) - 1; 

ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

ctl相关方法

  • runStateOf:获取运行状态;
  • workerCountOf:获取活动线程数;
  • ctlOf:获取运行状态和活动线程数的值。
  1. / 获取高三位 运行状态 
  2. private static int runStateOf(int c)     { return c & ~CAPACITY; } 
  3.  
  4. //获取低29位 线程个数 
  5. private static int workerCountOf(int c)  { return c & CAPACITY; } 
  6.  
  7. //计算ctl新值,线程状态 与 线程个数 
  8. private static int ctlOf(int rs, int wc) { return rs | wc; } 

线程池存在5种状态

  1. //运行中 111 00000000000000000000000000000 
  2. private static final int RUNNING    = -1 << COUNT_BITS; 
  3. //关闭 000 00000000000000000000000000000 
  4. private static final int SHUTDOWN   =  0 << COUNT_BITS; 
  5. //停止 001 00000000000000000000000000000 
  6. private static final int STOP       =  1 << COUNT_BITS; 
  7. //整理 010 00000000000000000000000000000 
  8. private static final int TIDYING    =  2 << COUNT_BITS; 
  9. //终止 011 00000000000000000000000000000 
  10. private static final int TERMINATED =  3 << COUNT_BITS; 

使用一个整形,前3位表示状态,后29位表示线程容量,也就是说线程最多有 230−1 个

并发编程之ThreadPoolExecutor线程池原理解析

也可以看出当ctl小于零表示线程池仍在运行

RUNNING

  • 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
  • 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

SHUTDOWN

  • 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
  • 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

STOP

  • 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  • 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING

  • 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
  • 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

TERMINATED

  • 状态说明:线程池彻底终止,就变成TERMINATED状态。
  • 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

进入TERMINATED的条件如下:

  • 线程池不是RUNNING状态;
  • 线程池状态不是TIDYING状态或TERMINATED状态;
  • 如果线程池状态是SHUTDOWN并且workerQueue为空;
  • workerCount为0;
  • 设置TIDYING状态成功。
并发编程之ThreadPoolExecutor线程池原理解析

线程池参数

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;

keepAliveTim

线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;

unit

keepAliveTime的单位;

workQueue

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;

3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;

4、priorityBlockingQuene:具有优先级的无界阻塞队列;

threadFactory

它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。

handler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

  1. AbortPolicy:直接抛出异常,默认策略;
  2. CallerRunsPolicy:用调用者所在的线程来执行任务;
  3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  4. DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。

并发编程之ThreadPoolExecutor线程池原理解析

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

线程池的创建

有四个构造函数,其他三个都是调用下面代码中的这个构造函数

  1. public ThreadPoolExecutor(int corePoolSize, 
  2.                           int maximumPoolSize, 
  3.                           long keepAliveTime, 
  4.                           TimeUnit unit, 
  5.                           BlockingQueue<Runnable> workQueue, 
  6.                           ThreadFactory threadFactory, 
  7.                           RejectedExecutionHandler handler)  

线程池监控

  1. public long getTaskCount() //线程池已执行与未执行的任务总数 
  2. public long getCompletedTaskCount() //已完成的任务数 
  3. public int getPoolSize() //线程池当前的线程数 
  4. public int getActiveCount() //线程池中正在执行任务的线程数量 

线程池原理

并发编程之ThreadPoolExecutor线程池原理解析

核心方法分析

由于篇幅有限,核心方法解析请阅读文末的扩展链接。

PS:以上代码提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

原文地址:https://www.toutiao.com/i6903537887346819595/

延伸 · 阅读

精彩推荐