线程池ThreadPoolExecutor源码分析,看这一篇就够了
前言
多线程是我们日常工作中很少能接触到的技术,但是面试
的时候100%
会被问到,万一工作中用到了基本不会,本篇咱们就来深入分析线程池的实现类ThreadPoolExecutor
。
1、构造方法
构造方法中有4个方法,本质上都是调用的下面这个构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); //线程池的核心线程数目 this.corePoolSize = corePoolSize; //线程池的最大线程数目 this.maximumPoolSize = maximumPoolSize; //阻塞的队列(存储的是待运行的线程) this.workQueue = workQueue; //线程空闲等待时间 this.keepAliveTime = unit.toNanos(keepAliveTime); //线程工厂(主要作用是创建线程),一般是默认 this.threadFactory = threadFactory; //工作队列满了时候的饱和策略 this.handler = handler;}复制代码
2、饱和策略
上面的构造方法中,我们着重需要注意的是饱和策略,线程池中定义了四种饱和策略:
1、CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } //使用主线程执行新任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { //此方法相同于同步方法 r.run(); } }}复制代码
2、 AbortPolicy(线程池默认的策略)
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //抛出 RejectedExecutionException来拒绝新任务的处理 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }}复制代码
3、DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } //不执行任何操作,丢弃新任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }}复制代码
4、DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } //此策略将丢弃最早的未处理的任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }}复制代码
3、阻塞队列
咱们看下ThreadPoolExecutor的源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}复制代码
使用的是LinkedBlockingQueue
作为阻塞队列,LinkedBlockingQueue的默认构造函数允许的队列长度是Integer.MAX_VALUE,若堆积大量的请求,可能会造成OOM。
此处就是为什么《阿里巴巴 Java 开发手册》
中不推荐使用Executors工具类创建线程池的原因,要求使用 ThreadPoolExecutor
构造函数的方式,让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
4、execute方法
下面是执行流程图:

对照流程图,我们再来看源码:
//ctl中存放的是int值,int值得高低位保存了线程池运行的状态和有效线程的数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static int workerCountOf(int c) { return c & CAPACITY;}//任务队列private final BlockingQueue<Runnable> workQueue;public void execute(Runnable command) { //如果任务为null,则抛出异常 if (command == null) throw new NullPointerException(); //获取线程池状态和有效线程数 int c = ctl.get(); //以下有3步: //步骤1: //如果线程池工作的线程小于核心线程数 if (workerCountOf(c) < corePoolSize) { //则增加一个线程,并把该任务交给它去执行 if (addWorker(command, true)) //成功则返回 return; //这里说明创建核心线程失败,需要再次获取临时变量c c = ctl.get(); } //步骤2: // 走到这里说明创建新的核心线程失败,也就是当前工作线程数大于等于corePoolSize // 线程池的运行状态是RUNNING,并且尝试将新任务加入到阻塞队列,成功返回true if (isRunning(c) && workQueue.offer(command)) { //进入到这里,是已经向任务队列投放任务成功 //再次获取线程池状态和有效线程数 int recheck = ctl.get(); //如果线程池状态不是RUNNING(线程池异常终止了),将线程从工作队列中移除 if (! isRunning(recheck) && remove(command)) //执行饱和策略 reject(command); // 走到这里说明线程池状态可能是RUNNING // 也可能是移除线程任务失败了(失败的最大的可能是已经执行完毕了) //因为所有存活的工作线程有可能在最后一次检查之后已经终结,所以需要二次检查线程池工作线程的状态 //这里博主也是看了半天,大家好好体会下 else if (workerCountOf(recheck) == 0) //若当前线程池工作线程数为0,则新建一个线程并执行 addWorker(null, false); } //步骤3: // 如果任务队列已满,就需要创建非核心线程 // 如果新建非核心线程失败,则执行饱和策略 else if (!addWorker(command, false)) reject(command);}复制代码
上面的方法多次调用了addWorker方法,我们跟踪进去看下源码:
// 添加工作线程,返回true则创建和启动工作线程成功;返回false则没有新创建工作线程private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //获取线程池对应的int值 int c = ctl.get(); //获取线程池状态 int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取工作线程数 int wc = workerCountOf(c); //工作线程数超过允许的“最大线程数”则返回false //core为true,“最大线程数”就是核心线程数,则表明创建核心线程数失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 成功通过CAS更新工作线程数wc,则break到最外层的循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果线程的状态改变了就跳到外层循环执行 if (runStateOf(c) != rs) continue retry; //如果CAS更新工作线程数wc失败,则可能是并发更新导致的失败,继续在内层循环重试即可 // else CAS failed due to workerCount change; retry inner loop } } // 标记工作线程是否启动成功 boolean workerStarted = false; //标记工作线程是否创建成功 boolean workerAdded = false; //工作线程 Worker w = null; try { //创建一个工作线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 再次确认"线程池状态" if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //把创建的工作线程实例添加到工作线程集合 workers.add(w); /更新当前工作线程的峰值容量largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //如果加入线程池成功 if (workerAdded) { //启动线程 t.start(); workerStarted = true; } } } finally { //如果线程启动失败,则需要从工作线程集合移除对应线程 if (! workerStarted) addWorkerFailed(w); } return workerStarted;}复制代码
5、shutdown方法
线程池不用了,要关闭线程池,下面是源码:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { //校验是否有权限。 checkShutdownAccess(); //设置SHUTDOWN状态。 advanceRunState(SHUTDOWN); //中断线程池中所有空闲线程。 interruptIdleWorkers(); //钩子函数 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { //释放锁 mainLock.unlock(); } //尝试终止线程池 tryTerminate();}复制代码
结束语
本篇详细的分析了ThreadPoolExecutor的execute方法,耗费了不少时间。如果本文对你哪怕是有一点点的帮助,就值了。
赞 (0)