基本使用

异步调用

在使用@Async之前,我们需要确保已经启用了Spring的异步任务执行功能,这通常通过在配置类上添加@EnableAsync注解来完成

1
2
3
4
5
6
7
8
@SpringBootApplication
// 开启异步调用
@EnableAsync
public class Async01Application {
public static void main(String[] args) {
SpringApplication.run(Async01Application.class, args);
}
}
  1. 未使用@Async
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
public class Task1 {
public static Random random = new Random();

public void doTaskOne() throws InterruptedException {
System.out.println("start 任务1....");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("任务1耗时:" + (end - start));
}

public void doTaskTwo() throws InterruptedException {
System.out.println("start 任务2....");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("任务2耗时:" + (end - start));
}

public void doTaskThree() throws InterruptedException {
System.out.println("start 任务3....");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("任务3耗时:" + (end - start));
}
}
  1. 使用@Async
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
public class Task2 {
public static Random random = new Random();

@Async
public void doTaskOne() throws InterruptedException {
System.out.println("start 任务1...");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("任务1,耗时" + (end - start));
}

@Async
public void doTaskTwo() throws InterruptedException {
System.out.println("start 任务2...");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("任务2,耗时" + (end - start));
}

@Async
public void doTaskThree() throws InterruptedException {
System.out.println("start 任务3...");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("任务3,耗时" + (end - start));
}
}

使用两个方法进行比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SpringBootTest
class Async01ApplicationTests {

@Autowired
private Task1 task1;

@Autowired
private Task2 task2;

@Test
void test1() throws InterruptedException {
task1.doTaskOne();
task1.doTaskTwo();
task1.doTaskThree();
}

@Test
void test2() throws InterruptedException {
task2.doTaskOne();
task2.doTaskTwo();
task2.doTaskThree();
TimeUnit.SECONDS.sleep(10);
}
}
  1. 未使用@Async执行结果
1
2
3
4
5
6
start 任务1....
任务1耗时:1498
start 任务2....
任务2耗时:1967
start 任务3....
任务3耗时:9994
  1. 使用@Async执行结果
1
2
3
4
5
6
start 任务1...
start 任务3...
start 任务2...
任务2,耗时664
任务1,耗时2378
任务3,耗时4973

可以看出,未使用@Async时所有任务是串行执行的,只能等上一个任务执行完毕才能执行下一个,而使用@Async后任务是并发执行的

异步返回结果

注意:被@Async标注的方法必须是void类型的,且不能有返回值,除非返回类型是Future,这样可以通过Future获取异步操作的结果

1
2
3
4
5
6
7
8
9
@Component
public class AsyncTask {

@Async
public Future<String> asyncTask2() {
System.out.println("asyncTask2 start.....");
return new AsyncResult<String>("hello world");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Autowired
private AsyncTask asyncTask;

@Test
void test3() throws ExecutionException, InterruptedException {
Future<String> future = asyncTask.asyncTask2();
while (true){
//判断是否执行完毕
if (future.isDone()) {
System.out.println(future.get());
break;
}
}
}

异常处理

1
2
3
4
5
6
7
8
9
10
11
@Component
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
private final Logger logger = LoggerFactory.getLogger(AsyncExceptionHandler.class);

// 异常捕获
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error("@Async调用异常,出现异常的方法:{},参数:{},异常信息:{}", method.getDeclaringClass().getName() + "."
+ method.getName(), JSON.toJSONString(params), ex.getMessage());
}
}
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class AsyncConfig extends AsyncConfigurerSupport {
@Autowired
private AsyncExceptionHandler asyncExceptionHandler;

// 将异常处理类注入容器
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return asyncExceptionHandler;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class Task3 {
public static Random random = new Random();

@Async
public void doTaskOne() throws InterruptedException {
System.out.println("start 任务1...");
int i = 1 / 0;
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("任务1,耗时" + (end - start));
}
}
1
2
3
4
5
6
7
8
@Autowired
private Task3 task3;

@Test
void test4() throws InterruptedException {
task3.doTaskOne();
TimeUnit.SECONDS.sleep(10);
}
1
2
3
// 执行结果:
start 任务1...
2024-11-22 09:12:45.828 ERROR 20716 --- [ task-1] c.e.a.d.config.AsyncExceptionHandler : @Async调用异常,出现异常的方法:com.example.async.demos.config.Task3.doTaskOne,参数:[],异常信息:/ by zero

线程池配置

默认线程池

SpringBoot从2.1开始使用的是ThreadPoolTaskExecutor线程池,之前使用的是SimpleAsyncTaskExecutor,查看AsyncExecutionInterceptor源码中的getDefaultExecutor方法,会先去找名称为taskExecutor的Bean,如果找不到才使用SimpleAsyncTaskExecutor

注意:名称为taskExecutor的Bean在TaskExecutionAutoConfiguration会被实例化

  1. ThreadPoolTaskExecutor

此线程池的默认参数(由SpringBoot配置TaskExecutionProperties),核心线程数8,队列容量不限,最大线程数不限。如果业务逻辑需要执行的时间比较长,或者由于代码缺陷导致核心线程不能被释放,那么队列中的任务会越来越多且不会被执行。因此使用@Async必须配置自定义线程池,或者修改默认线程池参数

  1. SimpleAsyncTaskExecutor

此线程池会一直创建新的线程,失去了线程池的优势,不推荐使用,若系统中不断地创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。

针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,查看ConcurrencyThrottleSupport源码中的beforeAccess方法,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能

线程池修改

配置文件修改

1
2
3
4
5
6
7
8
9
10
#核心线程数
spring.task.execution.pool.core-size=200
#最大线程数
spring.task.execution.pool.max-size=1000
#空闲线程保留时间
spring.task.execution.pool.keep-alive=3s
#队列容量
spring.task.execution.pool.queue-capacity=1000
#线程名称前缀
spring.task.execution.thread-name-prefix=test-thread-

@Async异步方法默认使用Spring创建ThreadPoolTaskExecutor(参考TaskExecutionAutoConfiguration)

  1. 默认核心线程数:8
  2. 最大线程数:Integet.MAX_VALUE
  3. 队列使用:LinkedBlockingQueue
  4. 容量是:Integet.MAX_VALUE
  5. 空闲线程保留时间:60s
  6. 线程池拒绝策略:AbortPolicy

自定义线程池

1
2
3
4
task.pool.corePoolSize=20
task.pool.maxPoolSize=40
task.pool.keepAliveSeconds=300
task.pool.queueCapacity=50
1
2
3
4
5
6
7
@SpringBootApplication
@EnableAsync
public class Async02Application {
public static void main(String[] args) {
SpringApplication.run(Async02Application.class, args);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
@Component
@ConfigurationProperties(prefix = "task.pool")
@Data
public class TaskThreadPoolConfig {
private int corePoolSize;

private int maxPoolSize;

private int keepAliveSeconds;

private int queueCapacity;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class TaskExecutePool {

@Autowired
private TaskThreadPoolConfig config;

@Bean
public Executor myTaskAsyncPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("myExecutor-");
//setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
//CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁
executor.setWaitForTasksToCompleteOnShutdown(true);
//用来设置线程池中任务的等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(60);
//线程初始化
executor.initialize();
return executor;
}
}

通过指定@Async中的值去找对应的Bean,从而获取自定义的线程池

1
2
3
4
5
6
7
8
9
10
@Component
public class AsyncTask {
private final Logger logger = LoggerFactory.getLogger(AsyncTask.class);

//myTaskAsynPool即配置线程池的方法名,此处如果不写自定义线程池的方法名,会使用默认的线程池
@Async("myTaskAsyncPool")
public void doTask1(int i) {
logger.info("task:" + i + " start");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
class Async02ApplicationTests {

@Autowired
private AsyncTask asyncTask;

@Test
void contextLoads() {
for (int i = 0; i < 100; i++) {
asyncTask.doTask1(i);
}
}
}

实现接口AsyncConfigurer

AsyncConfigurer接口是Spring框架用于全局配置异步执行器(即线程池)的核心接口。当我们的Spring应用需要统一管理所有异步任务的执行环境时,可以选择实现此接口

使用@EnableAsync后会导入AsyncConfigurationSelector类,根据代理类型返回对应的类(默认为PROXY,即ProxyAsyncConfiguration类),ProxyAsyncConfiguration会实例化AsyncAnnotationBeanPostProcessor,并注入对应的实现AsyncConfigurer接口的类

1
2
3
4
5
6
7
8
9
10
11
public interface AsyncConfigurer {
@Nullable
default Executor getAsyncExecutor() {
return null;
}

@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
  1. getAsyncExecutor():用于实现自定义线程池,控制并发数
  • 在getAsyncExecutor()中创建线程池的时候,必须使用executor.initialize(),不然在调用时会报线程池未初始化的异常
  • 如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化
  1. getAsyncUncaughtExceptionHandler():用于处理异步方法的异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Configuration
@Slf4j
public class NativeAsyncTaskExecutePool implements AsyncConfigurer {

@Autowired
private TaskThreadPoolConfig config;

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("MyExecutor-");
//CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//线程初始化
executor.initialize();
return executor;
}

// @Bean
// public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// executor.setCorePoolSize(10);
// executor.setMaxPoolSize(100);
// executor.setQueueCapacity(100);
// return executor;
// }

/**
* 异步任务中异常处理
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("@Async调用异常,出现异常的方法:{},参数:{},异常信息:{}", method.getDeclaringClass().getName() + "."
+ method.getName(), JSON.toJSONString(params), ex.getMessage());
}
};
}
}

继承AsyncConfigurerSupport

1
2
3
4
5
6
7
8
9
10
11
12
13
public class AsyncConfigurerSupport implements AsyncConfigurer {
public AsyncConfigurerSupport() {
}

public Executor getAsyncExecutor() {
return null;
}

@Nullable
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}

@Async注意点

失效情况

  1. 异步方法使用static修饰
  2. 异步类没有使用@Component注解导致Spring无法扫描到异步类
  3. 需要在启动类上添加@EnableAsync注解
  4. 在Async方法上标注@Transactional是没用的,在Async方法调用的方法上添加@Transactional有效
  5. 异步方法在同一个类调用

异常情况

  1. 异步方法中抛出的异常不能直接捕获,因为调用者将无法获取到异常。建议使用Future或CompletableFuture来捕获异步方法的异常并进行处理
  2. 异步方法的执行是非阻塞的,它们可能以任意顺序完成。如果需要按照特定的顺序处理结果,可以使用CompletableFuture的thenApply方法或者使用@Async的order属性来指定顺序