Java之ThreadPoolExcutor和四种常见的线程池
一、ThreadPoolExcutors的作用
java提供了ThreadPoolExcutors来创建一个线程池,我们为什么要用线程池呢?
1.降低资源的消耗:通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
2.提高响应速度:因为线程池中的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
3.提高线程的可管理性
二、ThreadPoolExecutor构造函数参数详细介绍
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1.int corePoolSize(核心线程数)
线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程;核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态);如果设置了 allowCoreThreadTimeOut 为 true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉。
2.int maximumPoolSize(线程池能容纳的最大线程数量)
线程总数 = 核心线程数 + 非核心线程数。
3.long keepAliveTime(非核心线程空闲存活时长)
非核心线程空闲时长超过该时长将会被回收,主要应用在缓存线程池中,当设置了 allowCoreThreadTimeOut 为 true 时,对核心线程同样起作用。
4.TimeUnit unit
空闲线程的存活时间(keepAliveTime 的单位)它是一个枚举类型,常用的如:TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)。
5.BlockingQueue workQueue(任务队列)
当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务
常用的workQueue类型:
(1)SynchornousQueue:这队列接收到任务时会直接交给线程处理,而不保留它,如果所有的线程都在工作,那就创建一 个新的线程来处理这个任务,为了保证不出现线程数达到maxnumPoolSize而不能新建线程的错误,所以使用这个类型 的队列时,maxnumPoolSize一般指定成Integer.MAX_VALUE,即无限大,然后核心线程数corePoolSize一般是0.
(2)LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效,因为总线程数永远不会超过 corePoolSize。
(3)ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到 corePoolSize 的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了 maximumPoolSize,并且队列也满了,则发生错误。
(4)DelayQueue:队列内元素必须实现 Delayed 接口,这就意味着你传进去的任务必须先实现 Delayed 接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
6.ThreadFactory threadFactory(线程工厂):用来创建线程池中的线程,通常用默认的即可
7.RejectedExecutionHandler handler(拒绝策略):在线程池已经关闭的情况下和任务太多导致最大线程数和任务队列已经饱和,无法再接收新的任务,在上面两种情况下,只要满足其中一种时,在使用 execute() 来提交新的任务时将会拒绝,线程池提供了以下 4 种策略:
AbortPolicy:默认策略,在拒绝任务时,会抛出RejectedExecutionException。
CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
DiscardOldestPolicy:该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
DiscardPolicy:该策略默默的丢弃无法处理的任务,不予任何处理。
线程池遵循的原则:
其会优先创建核心线程,执行任务,当核心线程增加CorePoolSize后,我们会把任务添加到work Queue中,当work Queue里面的任务也塞满了,线程池就会创建非核心线程执行去执行任务,当线程达到maximumPoolSize时候和work queue也达最大值时候我们会执行对应的拒绝策略
三、4类常见的线程池
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
其实他们都是通过ThreadPoolExcutors创建的
1.CachedThreadPool
由Executors的newCachedThreadPool方法创建,不存在核心线程,只存在数量不定的非核心线程,而且其数量最大值为Integer.MAX_VALUE。当线程池中的线程都处于活动时(全满),线程池会创建新的线程来处理新的任务,否则就会利用新的线程来处理新的任务,线程池中的空闲线程都有超时机制,默认超时时长为60s,超过60s的空闲线程就会被回收。和FixedThreadPool不同的是,CachedThreadPool的任务队列其实相当于一个空的集合,这将导致任何任务都会被执行,因为在这种场景下SynchronousQueue是不能插入任务的,SynchronousQueue是一个特殊的队列,在很多情况下可以理解为一个无法储存元素的队列。从CachedThreadPool的特性看,这类线程比较适合执行大量耗时较小的任务。当整个线程池都处于闲置状态时,线程池中的线程都会因为超时而被停止回收,几乎是不占任何系统资源。实现方式如下:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
2.FixedThreadPool
由Executors的newFixedThreadPool方法创建。它是一种线程数量固定的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。FixedThreadPool只有核心线程,且该核心线程都不会被回收,这意味着它可以更快地响应外界的请求。jdk实现如下:FixedThreadPool没有额外线程,只存在核心线程,而且核心线程没有超时机制,而且任务队列没有长度的限制。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
3.ScheduledThreadPool
通过Executors的newScheduledThreadPool方式创建,核心线程数量是固定的,而非核心线程是没有限制的,并且当非核心线程闲置时它会被立即回收,ScheduledThreadPool这类线程池主要用于执行定时任务和具有固定时期的重复任务,实现方法如下:
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
4.SingleThreadPool
通过Executors的newSingleThreadExecutor方法来创建。这类线程池内部只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。SingleThreadExecutor的意义在于统一所有外界任务一个线程中,这使得这些任务之间不需要处理线程同步的问题,实现方式如下:
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
四、为什么《阿里巴巴Java开发手册》上要禁止使用Executors来创建线程池
Executors创建出来的线程池使用的全都是无界队列,而使用无界队列会带来很多弊端,最重要的就是,它可以无限保存任务,因此很有可能造成OOM异常。同时在某些类型的线程池里面,使用无界队列还会导致maxinumPoolSize、keepAliveTime、handler等参数失效
五、我们用代码测试下
import java.util.*;
import java.lang.*;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class Rextester
{
private static int produceTaskSleepTime = 5;
private static int consumeTaskSleepTime = 1000;
private static int taskMaxNumber = 10; //定义最大添加10个线程到线程池中
public static void main(String args[]) {
//构造一个线程池
//该策略默默的丢弃无法处理的任务,不予任何处理。
ThreadPoolExecutor threadPool = getTpe("DiscardPolicy");
//该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
//ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");
for (int i = 1; i <= taskMaxNumber; i++) {
try {
//添加任务到线程池,我们要知道添加的任务是Runnable
String value = "value is: " + i;
System.out.println("put:" + value);
threadPool.execute(new ThreadPoolTask(value));
//线程休息一段时间,方便我们分析结果
Thread. sleep(produceTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 线程池执行的任务
*/
public static class ThreadPoolTask implements Runnable {
//保存任务所需要的数据
private String value;
public ThreadPoolTask(String value) {
this.value = value;
}
public void run() {
//打印语句
System. out.println("start------" + value);
try {
Thread. sleep(consumeTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
value = "";
}
}
/**
*初始化线程池
*/
public static ThreadPoolExecutor getTpe(String type) {
if ("DiscardOldestPolicy".equals(type)) {
return new ThreadPoolExecutor(2, 4, 3,
TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
} else {
return new ThreadPoolExecutor(2, 4, 3,
TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardPolicy());
}
//DiscardPolicy DiscardOldestPolicy
}
}
运行结果:
put:value is: 1
start------value is: 1
put:value is: 2
start------value is: 2
put:value is: 3
put:value is: 4
put:value is: 5
put:value is: 6
start------value is: 6
put:value is: 7
start------value is: 7
put:value is: 8
put:value is: 9
put:value is: 10
start------value is: 3
start------value is: 4
start------value is: 5
我们把上面的生成线程池的代码不//
ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");
运行结果如下
put:value is: 1
start------value is: 1
put:value is: 2
start------value is: 2
put:value is: 3
put:value is: 4
put:value is: 5
put:value is: 6
start------value is: 6
put:value is: 7
start------value is: 7
put:value is: 8
put:value is: 9
put:value is: 10
start------value is: 8
start------value is: 9
start------value is: 10
部分内容参考了博客:https://blog.csdn.net/qq_34835058/article/details/80497602