透彻了解线程池
线程池
1.1 构造参数
参数名 |
类型 |
含义 |
corePoolSize |
int |
核心线程数,在线程池完成初始化后,默认情况下,线程池中并没有任务线程,线程池会等待有任务到来时,再去创建新线程去执行任务 |
maximumPoolSize |
int |
线程池在核心线程数的基础上,额外增加一些线程,但是新增加的线程数有一个上限,最大量就是maximumPoolSize |
keepAliveTime |
long |
保持存活时间,如果线程池当前的线程数多于corePoolSize,当这些多余线程空闲时间超过keepAliveTime,它们就会被终止 |
workQueue |
BlockingQueue |
任务存储队列 |
threadFactory |
ThreadFactory |
当线程池需要新的线程的时候,会使用threadFactory来生成新的线程,默认使用 |
handler |
RejectedExecutionHandler |
由于线程池无法接受你所提交的任务的拒绝策略 |
1.2 添加线程规则
- 如果线程数小于corePoolSize,即使其他工作线程处于空闲,也会创建一个新线程来运行任务
- 如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列
- 如果队列已满,并且线程数少于maximumPoolSize,则创建一个新线程来运行任务
- 如果列队已满,并且线程数大于或者等于maximumPoolSize,则拒绝该任务
工作队列有3种类型:
- 直接交接:SynchronousQueue, 无容量,无缓冲
- 无界队列:LinkedBlockingQueue,可以无限制添加,直到oom,maximumPoolSize将无意义
- 有界队列:ArrayBlockingQueue
1.3 四种线程池
- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
由于传进去的LinkedBlockingQueue是没有容量上限的,所以请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能导致OOM
- newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
可以看出,这里和刚才的newFixedThreadPool的原理基本一样,只不过线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存
- newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
maximumPoolSize被设置成了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM
- newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.scheduleAtFixedRate(()->{ System.out.println('delay 1 seconds, and excute every 3s.'); }, 1, 3, TimeUnit.SECONDS);
Tue Oct 05 18:21:54 CST 2021delay 1 seconds, and excute every 3s. Tue Oct 05 18:21:57 CST 2021delay 1 seconds, and excute every 3s. Tue Oct 05 18:22:00 CST 2021delay 1 seconds, and excute every 3s. Tue Oct 05 18:22:03 CST 2021delay 1 seconds, and excute every 3s. Tue Oct 05 18:22:06 CST 2021delay 1 seconds, and excute every 3s. Tue Oct 05 18:22:09 CST 2021delay 1 seconds, and excute every 3s. Tue Oct 05 18:22:12 CST 2021delay 1 seconds, and excute every 3s. Tue Oct 05 18:22:15 CST 2021delay 1 seconds, and excute every 3s. Tue Oct 05 18:22:18 CST 2021delay 1 seconds, and excute every 3s.
1.4 手动创建线程池
1.4.1 线程数设置
- CPU密集型(加密,计算hash等):最佳线程数为CPU核心数的1~2倍左右
- 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:线程数 = CPU核心数 * (1 + 平均等待时间 / 平均工作时间)
1.4.2 初始化参数
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(初始化参数);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
1.4.3 提交任务
//1.无返回值poolExecutor.execute(() -> { System.out.println('hello world');});//2.有返回值Future<?> future = poolExecutor.submit(() -> { System.out.println('hello world');});
1.4.4 关闭线程池
- shutdown():线程不会马上终止,直到执行完任务
- shutdownNow():中断运行的线程,返回任务队列
1.4.5 拒绝策略
- 当Executor关闭时,提交新任务会被拒绝
- 当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时
策略 |
操作 |
AbortPolicy |
抛出异常 |
DiscardPolicy |
丢弃任务,不会通知 |
DiscardOldestPolicy |
将当前处于等待队列列头的等待任务强行取出,然后再试图将当前被拒绝的任务提交到线程池执行 |
CallerRunsPolicy |
返回给提交任务的线程执行 |
1.4.6 实现线程池暂停和恢复
public class PauseableThreadPool extends ThreadPoolExecutor { private boolean isPaused; private Lock lock = new ReentrantLock(); private Condition unpaused = lock.newCondition(); public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); lock.lock(); try { while (isPaused) { unpaused.await(); } }catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } public void resume() { lock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { lock.unlock(); } } public void pause() { lock.lock(); try { isPaused = true; } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { PauseableThreadPool pool = new PauseableThreadPool(10, 20, 20L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); for (int i = 0; i < 10000; i++) { pool.execute(() -> {System.out.println('我被执行了'); try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } }); } Thread.sleep(1500L); pool.pause(); System.out.println('线程池被暂停了'); Thread.sleep(5000L); pool.resume(); System.out.println('线程池恢复'); Thread.sleep(5000L); pool.pause(); System.out.println('线程池被暂停了'); Thread.sleep(5000L); pool.resume(); System.out.println('线程池恢复'); }}
1.4.7 线程池状态
running |
接受新任务并处理队列任务 |
shutdown |
不接受新任务,但处理队列任务 |
stop |
不接受新任务,也不处理队列任务,并中断正在进行的任务 |
tidying |
所有任务都已经终止,workerCount为0,并运行terminate() |
terminated |
terminate()运行完成 |
1.4.8 线程池复用原理
- 线程池提交任务execute(Runnable command) ,addWorker添加任务
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
- addWorker(), w = new Worker(firstTask), 创建一次相当于创建线程
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
- Worker类实现了Runnable接口
public void run() { runWorker(this); }
- runWorker(this)不断执行task.run(),也就是任务的run方法,就是我们提交任务的run方法。简而言之,就是通过Worker的run方法执行我们提交的任务,注意,这里有个循环哦
Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
1.4.9 注意点
- 避免任务堆积
- 避免线程数过度增加
- 排查线程泄漏