ThreadPoolExecutor和ThreadPoolTaskExecutor

ThreadPoolExecutor

简介

![[Executor.png|800x500]]

这个类是JDK中的线程池类,继承自Executor, Executor 顾名思义是专门用来处理多线程相关的一个接口,所有线程相关的类都实现了这个接口,里面有一个execute()方法,用来执行线程,线程池主要提供一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁的额外开销,提高了响应的速度。相关的继承实现类图例如ScheduledThreadPoolExecutor。

线程池接口

ExecutorService为线程池接口,提供了线程池生命周期方法,继承自Executor接口,ThreadPoolExecutor为线程池实现类,提供了线程池的维护操作等相关方法,继承自AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口。

java.util.concurrent.Executor 负责线程的使用和调度的根接口

1
2
3
4
|--ExecutorService 子接口: 线程池的主要接口
|--ThreadPoolExecutor 线程池的实现类
|--ScheduledExceutorService 子接口: 负责线程的调度
|--ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService

工具类:Executors

Executors为线程池工具类,相当于一个工厂类,用来创建合适的线程池,返回ExecutorService类型的线程池。有如下方法。

ExecutorService newFixedThreadPool() : 创建固定大小的线程池

ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。

ExecutorService newSingleThreadExecutor() : 创建单个线程池。 线程池中只有一个线程

ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务

其中AbstractExecutorService是他的抽象父类,继承自ExecutorService,ExecutorService 接口扩展Executor接口,增加了生命周期方法。

实际应用中我一般都比较喜欢使用Exectuors工厂类来创建线程池,里面有五个方法,分别创建不同的线程池,如上,创建一个制定大小的线程池,Exectuors工厂实际上就是调用的ExectuorPoolService的构造方法,传入默认参数。

ThreadPoolExecutor的使用

👉🏻 阿里的规范。是不允许直接使用Executors去创建线程池的,我们可以使用ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
package com.xiaowo;

import java.util.concurrent.*;

public class ThreadDemo {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
}
}

首先是静态方法newSingleThreadExecutor()、newFixedThreadPool(int nThreads)、newCachedThreadPool()。我们来看一下其源码实现(基于JDK8)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Executors {
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

}

通过查看源码我们知道上述三种静态方法的内部实现均使用了ThreadPoolExecutor类。难怪阿里会建议通过ThreadPoolExecutor的方式实现,原来Executors类的静态方法也是用的它,只不过帮我们配了一些参数而已。
第二是ThreadPoolExecutor类的构造方法。既然现在要直接使用ThreadPoolExecutor类了,那么其中的初始化参数就要我们自己配了,了解其构造方法势在必行。

ThreadPoolExecutor类一共有四个构造方法,我们只需要了解之中的一个就可以了,因为其他三种构造方法只是帮我们配置了一些默认参数,最后还是调用了它。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
  1. corePoolSize与maximumPoolSize的关系

    首先corePoolSize肯定是 <= maximumPoolSize。

    其他关系如下:

    若当前线程池中线程数 < corePoolSize,则每来一个任务就创建一个线程去执行;
    若当前线程池中线程数 >= corePoolSize,会尝试将任务添加到任务队列。如果添加成功,则任务会等待空闲线程将其取出并执行;
    若队列已满,且当前线程池中线程数 < maximumPoolSize,创建新的线程,这类线程又叫救急线程;
    若当前线程池中线程数 >= maximumPoolSize,则会采用拒绝策略(JDK提供了四种,下面会介绍到)。
    注意:关系3是针对的有界队列,无界队列永远都不会满,所以只有前2种关系。

  2. workQueue

    参数workQueue是指提交但未执行的任务队列。若当前线程池中线程数>=corePoolSize时,就会尝试将任务添加到任务队列中。主要有以下几种:

    • SynchronousQueue:直接提交队列。SynchronousQueue没有容量,所以实际上提交的任务不会被添加到任务队列,总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值(maximumPoolSize),则执行拒绝策略。
    • LinkedBlockingQueue:无界的任务队列。当有新的任务来到时,若系统的线程数小于corePoolSize,线程池会创建新的线程执行任务;当系统的线程数量等于corePoolSize后,因为是无界的任务队列,总是能成功将任务添加到任务队列中,所以线程数量不再增加。若任务创建的速度远大于任务处理的速度,无界队列会快速增长,直到内存耗尽。
      ![[Policy.png|700x600]]
  3. handler

    JDK内置了四种拒绝策略

    • DiscardOldestPolicy策略:丢弃任务队列中最早添加的任务,并尝试提交当前任务;
    • CallerRunsPolicy策略:调用主线程执行被拒绝的任务,这提供了一种简单的反馈控制机制,将降低新任务的提交速度;
    • DiscardPolicy策略:默默丢弃无法处理的任务,不予任何处理;
    • AbortPolicy策略:直接抛出异常,阻止系统正常工作。
  4. 处理流程

    1. 查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第二步。
    2. 查看任务队列是否已满,不满就将任务存储在任务队列中,否则执行第三步。
    3. 查看线程池是否已满,即就是是否达到最大线程池数,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。

ThreadPoolTaskExecutor

该线程池一般结合异步方法来使用

需要了解Async注解 需要先在启动类上加上@EnableAsync注解

创建线程池

一般使用自定义线程池

使用Java代码结合@Configuration进行配置(推荐使用)

编写异步方法

线程池的提交任务方法

execute

  1. 提交任务

无返回值的任务使用public void execute(Runnable command) 方法提交

子线程可能在主线程结束之后结束,由于子线程比较耗时,主线程结束后子线程还没有执行完

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RequestMapping("/execute")
public String execute(){
System.out.println("进入方法");
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(20000);
System.out.println("sleep后");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("执行提交后");
return "aa";
}

//执行结果
//进入方法
//执行提交后
//sleep后
  1. 处理异常

Runnable执行run时遇到异常并不会抛出

  1. 多线程和事务回滚

execute遇到异常主线程无法感知,也不会触发回滚

submit

  1. 提交任务

有返回值的任务使用public <T> Future<T> submit(Callable) 方法提交

提交任务后有个取数据的过程,在从Future取数据的过程中,Callable自带的阻塞机制,这个机制保证主线程一定在子线程结束之后结束。反之如果没有取数据,子线程可能会在主线程结束之后才结束。

  1. 处理异常

Callable执行call时遇到异常会抛出

  1. 多线程和事务回滚

如果在事务中调用了多线程,submit遇到异常会抛出且必须被捕获,不会触发回滚;