透彻了解线程池

线程池

1.1 构造参数

参数名

类型

含义

corePoolSize

int

核心线程数,在线程池完成初始化后,默认情况下,线程池中并没有任务线程,线程池会等待有任务到来时,再去创建新线程去执行任务

maximumPoolSize

int

线程池在核心线程数的基础上,额外增加一些线程,但是新增加的线程数有一个上限,最大量就是maximumPoolSize

keepAliveTime

long

保持存活时间,如果线程池当前的线程数多于corePoolSize,当这些多余线程空闲时间超过keepAliveTime,它们就会被终止

workQueue

BlockingQueue

任务存储队列

threadFactory

ThreadFactory

当线程池需要新的线程的时候,会使用threadFactory来生成新的线程,默认使用
Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的优先级并且都不是守护线程

handler

RejectedExecutionHandler

由于线程池无法接受你所提交的任务的拒绝策略

1.2 添加线程规则

  1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲,也会创建一个新线程来运行任务
  2. 如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列
  3. 如果队列已满,并且线程数少于maximumPoolSize,则创建一个新线程来运行任务
  4. 如果列队已满,并且线程数大于或者等于maximumPoolSize,则拒绝该任务

工作队列有3种类型:

  1. 直接交接:SynchronousQueue, 无容量,无缓冲
  2. 无界队列:LinkedBlockingQueue,可以无限制添加,直到oom,maximumPoolSize将无意义
  3. 有界队列:ArrayBlockingQueue

1.3 四种线程池

  1. newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

由于传进去的LinkedBlockingQueue是没有容量上限的,所以请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能导致OOM

  1. newSingleThreadExecutor
   public static ExecutorService newSingleThreadExecutor() {       return new FinalizableDelegatedExecutorService           (new ThreadPoolExecutor(1, 1,                                   0L, TimeUnit.MILLISECONDS,                                   new LinkedBlockingQueue<Runnable>()));   }

可以看出,这里和刚才的newFixedThreadPool的原理基本一样,只不过线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存

  1. newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

maximumPoolSize被设置成了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM

  1. newScheduledThreadPool
   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {       return new ScheduledThreadPoolExecutor(corePoolSize);   }   public ScheduledThreadPoolExecutor(int corePoolSize) {       super(corePoolSize, Integer.MAX_VALUE,             DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,             new DelayedWorkQueue());   }
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.scheduleAtFixedRate(()->{ System.out.println('delay 1 seconds, and excute every 3s.'); }, 1, 3, TimeUnit.SECONDS);
   Tue Oct 05 18:21:54 CST 2021delay 1 seconds, and excute every 3s.   Tue Oct 05 18:21:57 CST 2021delay 1 seconds, and excute every 3s.   Tue Oct 05 18:22:00 CST 2021delay 1 seconds, and excute every 3s.   Tue Oct 05 18:22:03 CST 2021delay 1 seconds, and excute every 3s.   Tue Oct 05 18:22:06 CST 2021delay 1 seconds, and excute every 3s.   Tue Oct 05 18:22:09 CST 2021delay 1 seconds, and excute every 3s.   Tue Oct 05 18:22:12 CST 2021delay 1 seconds, and excute every 3s.   Tue Oct 05 18:22:15 CST 2021delay 1 seconds, and excute every 3s.   Tue Oct 05 18:22:18 CST 2021delay 1 seconds, and excute every 3s.

1.4 手动创建线程池

1.4.1 线程数设置

  1. CPU密集型(加密,计算hash等):最佳线程数为CPU核心数的1~2倍左右
  2. 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:线程数 = CPU核心数 * (1 + 平均等待时间 / 平均工作时间)

1.4.2 初始化参数

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(初始化参数);
public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}

1.4.3 提交任务

//1.无返回值poolExecutor.execute(() -> { System.out.println('hello world');});//2.有返回值Future<?> future = poolExecutor.submit(() -> { System.out.println('hello world');});

1.4.4 关闭线程池

  1. shutdown():线程不会马上终止,直到执行完任务
  2. shutdownNow():中断运行的线程,返回任务队列

1.4.5 拒绝策略

  • 当Executor关闭时,提交新任务会被拒绝
  • 当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时

策略

操作

AbortPolicy

抛出异常

DiscardPolicy

丢弃任务,不会通知

DiscardOldestPolicy

将当前处于等待队列列头的等待任务强行取出,然后再试图将当前被拒绝的任务提交到线程池执行

CallerRunsPolicy

返回给提交任务的线程执行

1.4.6 实现线程池暂停和恢复

public class PauseableThreadPool extends ThreadPoolExecutor {    private boolean isPaused;    private Lock lock = new ReentrantLock();    private Condition unpaused = lock.newCondition();    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);    }    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);    }    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);    }    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);    }    @Override    protected void beforeExecute(Thread t, Runnable r) {        super.beforeExecute(t, r);        lock.lock();        try {            while (isPaused) {                unpaused.await();            }        }catch (InterruptedException e) {            e.printStackTrace();        }finally {            lock.unlock();        }    }    public void resume() {        lock.lock();        try {            isPaused = false;            unpaused.signalAll();        } finally {            lock.unlock();        }    }    public void pause() {        lock.lock();        try {            isPaused = true;        } finally {            lock.unlock();        }    }    public static void main(String[] args) throws InterruptedException {        PauseableThreadPool pool = new PauseableThreadPool(10, 20, 20L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());        for (int i = 0; i < 10000; i++) {            pool.execute(() -> {System.out.println('我被执行了');                try {                    Thread.sleep(100L);                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }        Thread.sleep(1500L);        pool.pause();        System.out.println('线程池被暂停了');        Thread.sleep(5000L);        pool.resume();        System.out.println('线程池恢复');        Thread.sleep(5000L);        pool.pause();        System.out.println('线程池被暂停了');        Thread.sleep(5000L);        pool.resume();        System.out.println('线程池恢复');    }}

1.4.7 线程池状态

running

接受新任务并处理队列任务

shutdown

不接受新任务,但处理队列任务

stop

不接受新任务,也不处理队列任务,并中断正在进行的任务

tidying

所有任务都已经终止,workerCount为0,并运行terminate()

terminated

terminate()运行完成

1.4.8 线程池复用原理

  1. 线程池提交任务execute(Runnable command) ,addWorker添加任务
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
  1. addWorker(), w = new Worker(firstTask), 创建一次相当于创建线程
   private boolean addWorker(Runnable firstTask, boolean core) {       retry:       for (int c = ctl.get();;) {           // Check if queue empty only if necessary.           if (runStateAtLeast(c, SHUTDOWN)               && (runStateAtLeast(c, STOP)                   || firstTask != null                   || workQueue.isEmpty()))               return false;           for (;;) {               if (workerCountOf(c)                   >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))                   return false;               if (compareAndIncrementWorkerCount(c))                   break retry;               c = ctl.get();  // Re-read ctl               if (runStateAtLeast(c, SHUTDOWN))                   continue retry;               // else CAS failed due to workerCount change; retry inner loop           }       }       boolean workerStarted = false;       boolean workerAdded = false;       Worker w = null;       try {           w = new Worker(firstTask);           final Thread t = w.thread;           if (t != null) {               final ReentrantLock mainLock = this.mainLock;               mainLock.lock();               try {                   // Recheck while holding lock.                   // Back out on ThreadFactory failure or if                   // shut down before lock acquired.                   int c = ctl.get();                   if (isRunning(c) ||                       (runStateLessThan(c, STOP) && firstTask == null)) {                       if (t.isAlive()) // precheck that t is startable                           throw new IllegalThreadStateException();                       workers.add(w);                       int s = workers.size();                       if (s > largestPoolSize)                           largestPoolSize = s;                       workerAdded = true;                   }               } finally {                   mainLock.unlock();               }               if (workerAdded) {                   t.start();                   workerStarted = true;               }           }       } finally {           if (! workerStarted)               addWorkerFailed(w);       }       return workerStarted;   }
  1. Worker类实现了Runnable接口
public void run() { runWorker(this); }
  1. runWorker(this)不断执行task.run(),也就是任务的run方法,就是我们提交任务的run方法。简而言之,就是通过Worker的run方法执行我们提交的任务,注意,这里有个循环哦
    Thread wt = Thread.currentThread();       Runnable task = w.firstTask;       w.firstTask = null;       w.unlock(); // allow interrupts       boolean completedAbruptly = true;       try {           while (task != null || (task = getTask()) != null) {               w.lock();               // If pool is stopping, ensure thread is interrupted;               // if not, ensure thread is not interrupted.  This               // requires a recheck in second case to deal with               // shutdownNow race while clearing interrupt               if ((runStateAtLeast(ctl.get(), STOP) ||                    (Thread.interrupted() &&                     runStateAtLeast(ctl.get(), STOP))) &&                   !wt.isInterrupted())                   wt.interrupt();               try {                   beforeExecute(wt, task);                   try {                       task.run();                       afterExecute(task, null);                   } catch (Throwable ex) {                       afterExecute(task, ex);                       throw ex;                   }               } finally {                   task = null;                   w.completedTasks++;                   w.unlock();               }           }           completedAbruptly = false;       } finally {           processWorkerExit(w, completedAbruptly);       }   }

1.4.9 注意点

  • 避免任务堆积
  • 避免线程数过度增加
  • 排查线程泄漏
(0)

相关推荐

  • java开发技术之Executors创建线程池的弊端

    java开发技术之Executors创建线程池的弊端

  • 面试官:怎样去运用线程池?工作中如何使用?

    面试官:怎样去运用线程池?工作中如何使用? 工作中,我们有时候需要实现一些耗时的任务.比如:将 Word 转换成 PDF 存储的需求. 假设我们不使用线程池.那么每次请求都会开启新的线程,如果请求过多 ...

  • 万字长文爆肝线程池

    加个"星标",及时接收最新文章 这是程序员cxuan 的第 59 篇原创文章 更多文章见 https://github.com/crisxuan/bestJavaer 我们知道,线 ...

  • 线程池全整理(附面试题)

    本文总结一下线程池是怎么回事,分以下几个部分,对哪个部分感兴趣,可以直接跳到对应的章节第一部分:线程池类的结构介绍第二部分:线程池的使用第三部分:线程池的创建流程第四部分:线程池的应用场景第五部分:线 ...

  • 如何合理地估算线程池大小?

    这个问题虽然看起来很小,却并不那么容易回答. 大家如果有更好的方法欢迎赐教,先来一个天真的估算方法: 假设要求一个系统的TPS(Transaction Per Second或者Task Per Sec ...

  • C#线程学习笔记三:线程池中的I/O线程

    本笔记摘抄自:https://www.cnblogs.com/zhili/archive/2012/07/20/MultiThreads.html,记录一下学习过程以备后续查用.     一.I/O线 ...

  • 多线程之旅(ThreadPool 线程池)

    一.什么是ThreadPool 线程池(源码) 1.线程池顾名思义,有我们的系统创建一个容器装载着我们的线程,由CLR控制的所有AppDomain共享.线程池可用于执行任务.发送工作项.处理异步 I/ ...

  • Java主线程等待子线程、线程池

    print public class TestThread extends Thread { public void run() { System.out.println(this.getName() ...

  • 线程池ThreadPoolExecutor源码分析,看这一篇就够了

    前言 多线程是我们日常工作中很少能接触到的技术,但是面试的时候100%会被问到,万一工作中用到了基本不会,本篇咱们就来深入分析线程池的实现类ThreadPoolExecutor. 1.构造方法 构造方 ...

  • 分析源码,学会正确使用 Java 线程池

    在日常的开发工作当中,线程池往往承载着一个应用中最重要的业务逻辑,因此我们有必要更多地去关注线程池的执行情况,包括异常的处理和分析等.本文主要聚焦在如何正确使用线程池上,以及提供一些实用的建议.文中会 ...

  • 5000字、12 连环炮、一张图快速搞定线程池

    回复"000"获取大量电子书 写在前面 前面文章中,我们总结了JVM18连环炮.并发并最基础的12连环炮,建议先阅读: 连环炮继续走起,今天我给大家总结了线程池的12连环炮. 1. ...

  • 详述Java线程池实现原理

    优质文章,第一时间送达一.写在前面1.1 线程池是什么线程池(Thread Pool) 是一种池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL.线程过多会带来额外的开销,其中包括创建销毁 ...

  • 10问10答:你真的了解线程池吗?

    <Java开发手册>中强调,线程资源必须通过线程池提供,而创建线程池必须使用ThreadPoolExecutor.手册主要强调利用线程池避免两个问题,一是线程过渡切换,二是避免请求过多时造 ...