使用背景
在调用第三方接口或者使用 MQ 时,会出现网络抖动、连接超时等网络异常,所以需要重试。为了使处理更加健壮并且不太容易出现故障,后续的尝试操作,有时候会帮助失败的操作最后执行成功。一般情况下,需要我们自行实现重试机制
使用案例
基本使用
1 2 3 4 <dependency > <groupId > org.springframework.retry</groupId > <artifactId > spring-retry</artifactId > </dependency >
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableRetry public class Application { public static void main (String[] args) { SpringApplication.run(Application.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Service @Slf4j public class DoRetryService { @Retryable(value = Exception.class, maxAttempts = 4, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) public boolean doRetry (boolean isRetry) throws Exception { log.info("开始通知下游系统" ); if (isRetry) { throw new RuntimeException ("通知下游系统异常" ); } return true ; } }
配置文件获取参数
在 application.properties 文件中定义属性
1 2 retry.maxAttempts =2 # 最大尝试次数 retry.maxDelay =100 # 延迟
在 @Retryable 注入 Properties 文件中定义的 retry.maxAttempts 和 retry.maxDelay
1 2 3 4 5 6 7 8 @Service public class MyService { @Retryable(retryFor = SQLException.class, maxAttemptsExpression = "${retry.maxAttempts}", backoff = @Backoff(delayExpression = "${retry.maxDelay}")) public void retryServiceWithExternalConfiguration (String sql) throws SQLException { } }
相关注解
@Retryable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Retryable { String interceptor () default "" ; Class<? extends Throwable >[] value() default {}; Class<? extends Throwable >[] include() default {}; Class<? extends Throwable >[] exclude() default {}; String label () default "" ; boolean stateful () default false ; int maxAttempts () default 3 ; String maxAttemptsExpression () default "" ; Backoff backoff () default @Backoff ; String exceptionExpression () default "" ; String[] listeners() default {}; }
用于标记当前方法会使用重试机制
参数信息:
interceptor:将 interceptor 的 bean 名称应用到 retryable(),和其他的属性互斥
value:可重试的异常类型,抛出指定异常才会重试
include:哪些异常可以触发重试,和 value 一样,默认为空
exclude:哪些异常将不会触发重试,默认为空,如果和 include 属性同时为空,则所有的异常都将会触发重试
label:统计报告的唯—标签。如果没有提供,调用者可以选择忽略它,或者提供默认值
maxAttempts:最大重试次数,默认 3 次(包括第一次调用)
backoff:重试等待策略,默认使用@Backoff
Retry 配置的优先级规则
方法级别配置:如果某个配置在方法上定义了,则该方法上的配置会覆盖类级别的配置和全局配置。
类级别配置:如果某个配置在类上定义了,并且该类的方法没有单独定义配置,则使用类级别的配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Service @Retryable( value = {RuntimeException.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000) // 类级别的配置 ) public class MyService { @Retryable( value = {RuntimeException.class}, maxAttempts = 5, backoff = @Backoff(delay = 500) // 方法级别的配置 ) public void retryableMethodWithSpecificConfig () { System.out.println("Retrying with specific config..." ); throw new RuntimeException ("Simulated exception" ); } @Retryable( value = {RuntimeException.class} ) public void retryableMethodWithoutSpecificDelay () { System.out.println("Retrying without specific delay..." ); throw new RuntimeException ("Simulated exception" ); } public void nonRetryableMethod () { System.out.println("This method does not retry." ); throw new RuntimeException ("Simulated exception" ); } }
全局配置:如果没有在方法或类上定义配置,则使用全局配置。(例如:定义 RetryTemplate 配置类)
@Backoff
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Backoff { long value () default 1000L ; long delay () default 0L ; long maxDelay () default 0L ; double multiplier () default 0.0 ; String delayExpression () default "" ; String maxDelayExpression () default "" ; String multiplierExpression () default "" ; boolean random () default false ; }
重试回退策略(立即重试还是等待一会再重试)
参数信息:
value:隔多少毫秒后重试,默认为 1000L
delay:和 value 一样,但是默认为 0
maxDelay:重试之间的最大等待时间(以毫秒为单位)
multiplier(指定延迟倍数):默认为 0,表示固定暂停 1 秒后进行重试,如果把 multiplier 设置为 1.5,则第一次重试为 2 秒,第二次为 3 秒,第三次为 4.5 秒
delayExpression:重试之间的等待时间表达式
maxDelayExpression:重试之间的最大等待时间表达式
multiplierExpression:指定延迟的倍数表达式
random:随机指定延迟时间
@Recover
当重试次数耗尽时,RetryOperations 可以将控制传递给另一个回调,即 RecoveryCallback。@Recover 用于@Retryable 重试失败后处理方法,此方法里的异常一定要是@Retryable 方法里抛出的异常,否则不会调用这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Service @Slf4j public class DoRetryService { @Retryable(value = Exception.class, maxAttempts = 4, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) public boolean doRetry (boolean isRetry) throws Exception { log.info("开始通知下游系统" ); if (isRetry) { throw new RuntimeException ("通知下游系统异常" ); } return true ; } @Recover public boolean doRecover (Throwable e, boolean isRetry) throws ArithmeticException { log.info("全部重试失败,执行doRecover" ); return false ; } }
注意
方法的返回值必须与@Retryable 方法一致
方法的第一个参数,必须是 Throwable 类型的,建议是与@Retryable 配置的异常一致,其余的参数需要与@Retryable 方法的参数一致
回调方法和重试方法写在同一个实现类里面,且被@Retryable 标记的方法不能有返回值,这样 Recover 方法才会生效
@Recover 不生效问题
对于@EnableRetry 中的 proxyTargetClass 参数,是控制是否对使用接口实现的 bean 开启代理类,默认下是不开启的
当使用接口实现的 bean 时,需要将 EnableRetry 的参数改为 true,非接口的实现类可以使用
由于 retry 用到了 aspect 增强,所有会有 aspect 的坑,就是方法内部调用,会使 aspect 增强失效,那么 retry 当然也会失效
1 2 3 4 5 6 7 8 9 10 11 public class demo { public void A () { B(); } @Retryable(Exception.class) public void B () { throw new RuntimeException ("retry..." ); } }
RetryTemplate
RetryOperations
1 2 3 4 5 6 7 8 9 10 public interface RetryOperations { <T, E extends Throwable > T execute (RetryCallback<T, E> retryCallback) throws E; <T, E extends Throwable > T execute (RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E; <T, E extends Throwable > T execute (RetryCallback<T, E> retryCallback, RetryState retryState) throws E, ExhaustedRetryException; <T, E extends Throwable > T execute (RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState retryState) throws E; }
RetryCallback
允许插入需要在失败时重试的业务逻辑
1 2 3 4 public interface RetryCallback <T, E extends Throwable > { T doWithRetry (RetryContext context) throws E; }
RetryPolicy 重试策略
NeverRetryPolicy:只允许调用 RetryCallback 一次,不允许重试
AlwaysRetryPolicy:允许无限重试,直到成功
SimpleRetryPolicy(默认策略):固定次数重试策略,默认重试最大次数为 3 次
TimeoutRetryPolicy:超时时间重试策略,默认超时时间为 1 秒,在指定的时间内允许重试
CircuitBreakerRetryPolicy:有熔断功能的重试策略,需设置 3 个参数 openTimeout、resetTimeout 和 delegate
CompositeRetryPolicy:组合重试策略,但不管哪种组合方式,组合中的每一个策略都会执行
乐观组合重试策略:只要有一个策略允许重试即可以
悲观组合重试策略:只要有一个策略不允许重试即不可以
BackOffPolicy 退避策略
策略类
描述
FixedBackOffPolicy
间隔固定时间重试,直接 Thread.sleep 固定时间
NoBackOffPolicy
无等待,立马重试
UniformRandomBackOffPolicy
在一个设置的时间区间内。随机等待后重试
ExponentialBackOffPolicy
在一个设置的时间区间内,等待时长为上一次时长的递增
ExponentialRandomBackOffPolicy
乘数随机的 ExponentialBackOffPolicy
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public class RetryConfig { @Bean public RetryTemplate retryTemplate () { RetryTemplate retryTemplate = new RetryTemplate (); FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy (); fixedBackOffPolicy.setBackOffPeriod(2000l ); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy (); retryPolicy.setMaxAttempts(2 ); retryTemplate.setRetryPolicy(retryPolicy); return retryTemplate; } }
调用 retryTemplate.execute() 方法来运行带有重试处理功能的代码
1 2 3 4 5 6 7 retryTemplate.execute(new RetryCallback <Void, RuntimeException>() { @Override public Void doWithRetry (RetryContext arg0) { myService.templateRetryService(); } });
可以通过 setRetryPolicy()方法来为 RetryTemplate 设置重试策略
监听器
Listener(监听器)会在重试时提供额外的回调,可以利用这些回调处理不同重试中的各种横切关注点(cross-cutting concerns)。
添加回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class DefaultListenerSupport extends RetryListenerSupport { @Override public <T, E extends Throwable > void close (RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { logger.info("onClose" ); super .close(context, callback, throwable); } @Override public <T, E extends Throwable > void onError (RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { logger.info("onError" ); super .onError(context, callback, throwable); } @Override public <T, E extends Throwable > boolean open (RetryContext context, RetryCallback<T, E> callback) { logger.info("onOpen" ); return super .open(context, callback); } }
open 和 close 回调在整个重试前后进行,而 onError 则适用于单个 RetryCallback 调用。
注册监听器
向 RetryTemplate 实例化的 Bean 注册监听器(DefaultListenerSupport)
1 2 3 4 5 6 7 8 9 10 11 @Configuration public class RetryConfig { @Bean public RetryTemplate retryTemplate () { RetryTemplate retryTemplate = new RetryTemplate (); retryTemplate.registerListener(new DefaultListenerSupport ()); return retryTemplate; } }
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = AppConfig.class, loader = AnnotationConfigContextLoader.class) public class SpringRetryIntegrationTest { @Autowired private MyService myService; @Autowired private RetryTemplate retryTemplate; @Test(expected = RuntimeException.class) public void test () { retryTemplate.execute(arg0 -> { myService.templateRetryService(); return null ; }); } }
实现原理
使用@EnableRetry 注解后会导入 RetryConfiguration 配置类
1 2 3 4 5 6 7 8 9 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @EnableAspectJAutoProxy(proxyTargetClass = false) @Import(RetryConfiguration.class) @Documented public @interface EnableRetry { boolean proxyTargetClass () default false ; }
RetryConfiguration 通过 init()方法构建切点和通知
buildPointcut 构建切点:切点类型为使用 Retryable 注解的类或方法(org.springframework.retry.annotation.RetryConfiguration.AnnotationClassOrMethodPointcut)
buildAdvice 构建通知:创建通知 org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Configuration public class RetryConfiguration extends AbstractPointcutAdvisor implements IntroductionAdvisor , BeanFactoryAware { private Advice advice; private Pointcut pointcut; @Autowired(required = false) private RetryContextCache retryContextCache; @Autowired(required = false) private List<RetryListener> retryListeners; @Autowired(required = false) private MethodArgumentsKeyGenerator methodArgumentsKeyGenerator; @Autowired(required = false) private NewMethodArgumentsIdentifier newMethodArgumentsIdentifier; @Autowired(required = false) private Sleeper sleeper; private BeanFactory beanFactory; @PostConstruct public void init () { Set<Class<? extends Annotation >> retryableAnnotationTypes = new LinkedHashSet <Class<? extends Annotation >>(1 ); retryableAnnotationTypes.add(Retryable.class); this .pointcut = buildPointcut(retryableAnnotationTypes); this .advice = buildAdvice(); if (this .advice instanceof BeanFactoryAware) { ((BeanFactoryAware) this .advice).setBeanFactory(beanFactory); } } protected Advice buildAdvice () { AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor (); if (retryContextCache != null ) { interceptor.setRetryContextCache(retryContextCache); } if (retryListeners != null ) { interceptor.setListeners(retryListeners); } if (methodArgumentsKeyGenerator != null ) { interceptor.setKeyGenerator(methodArgumentsKeyGenerator); } if (newMethodArgumentsIdentifier != null ) { interceptor.setNewItemIdentifier(newMethodArgumentsIdentifier); } if (sleeper != null ) { interceptor.setSleeper(sleeper); } return interceptor; } }
切面方法拦截器 AnnotationAwareRetryOperationsInterceptor 会拦截目标方法,查找方法上是否有@Retryable 注解
如果存在方法拦截器委派者,调用委派者 invoke 方法
否则直接调用目标方法,即:不具备可重试功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 public class AnnotationAwareRetryOperationsInterceptor implements IntroductionInterceptor , BeanFactoryAware { @Override public Object invoke (MethodInvocation invocation) throws Throwable { MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod()); if (delegate != null ) { return delegate.invoke(invocation); } else { return invocation.proceed(); } } private MethodInterceptor getDelegate (Object target, Method method) { if (!this .delegates.containsKey(target) || !this .delegates.get(target).containsKey(method)) { synchronized (this .delegates) { if (!this .delegates.containsKey(target)) { this .delegates.put(target, new HashMap <Method, MethodInterceptor>()); } Map<Method, MethodInterceptor> delegatesForTarget = this .delegates.get(target); if (!delegatesForTarget.containsKey(method)) { Retryable retryable = AnnotationUtils.findAnnotation(method, Retryable.class); if (retryable == null ) { retryable = AnnotationUtils.findAnnotation(method.getDeclaringClass(), Retryable.class); } if (retryable == null ) { retryable = findAnnotationOnTarget(target, method); } if (retryable == null ) { return delegatesForTarget.put(method, null ); } MethodInterceptor delegate; if (StringUtils.hasText(retryable.interceptor())) { delegate = this .beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class); } else if (retryable.stateful()) { delegate = getStatefulInterceptor(target, method, retryable); } else { delegate = getStatelessInterceptor(target, method, retryable); } delegatesForTarget.put(method, delegate); } } } return this .delegates.get(target).get(method); } private Retryable findAnnotationOnTarget (Object target, Method method) { try { Method targetMethod = target.getClass().getMethod(method.getName(), method.getParameterTypes()); Retryable retryable = AnnotationUtils.findAnnotation(targetMethod, Retryable.class); if (retryable == null ) { retryable = AnnotationUtils.findAnnotation(targetMethod.getDeclaringClass(), Retryable.class); } return retryable; } catch (Exception e) { return null ; } } private MethodInterceptor getStatelessInterceptor (Object target, Method method, Retryable retryable) { RetryTemplate template = createTemplate(retryable.listeners()); template.setRetryPolicy(getRetryPolicy(retryable)); template.setBackOffPolicy(getBackoffPolicy(retryable.backoff())); return RetryInterceptorBuilder.stateless() .retryOperations(template) .label(retryable.label()) .recoverer(getRecoverer(target, method)) .build(); } private MethodInterceptor getStatefulInterceptor (Object target, Method method, Retryable retryable) { RetryTemplate template = createTemplate(retryable.listeners()); template.setRetryContextCache(this .retryContextCache); CircuitBreaker circuit = AnnotationUtils.findAnnotation(method, CircuitBreaker.class); if (circuit!=null ) { RetryPolicy policy = getRetryPolicy(circuit); CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy (policy); breaker.setOpenTimeout(getOpenTimeout(circuit)); breaker.setResetTimeout(getResetTimeout(circuit)); template.setRetryPolicy(breaker); template.setBackOffPolicy(new NoBackOffPolicy ()); String label = circuit.label(); if (!StringUtils.hasText(label)) { label = method.toGenericString(); } return RetryInterceptorBuilder.circuitBreaker() .keyGenerator(new FixedKeyGenerator ("circuit" )) .retryOperations(template) .recoverer(getRecoverer(target, method)) .label(label) .build(); } RetryPolicy policy = getRetryPolicy(retryable); template.setRetryPolicy(policy); template.setBackOffPolicy(getBackoffPolicy(retryable.backoff())); String label = retryable.label(); return RetryInterceptorBuilder.stateful() .keyGenerator(this .methodArgumentsKeyGenerator) .newMethodArgumentsIdentifier(this .newMethodArgumentsIdentifier) .retryOperations(template) .label(label) .recoverer(getRecoverer(target, method)) .build(); } }
RetryOperationsInterceptor 或 StatefulRetryOperationsInterceptor 拦截器进行拦截
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class RetryOperationsInterceptor implements MethodInterceptor { public Object invoke (final MethodInvocation invocation) throws Throwable { String name; if (StringUtils.hasText(label)) { name = label; } else { name = invocation.getMethod().toGenericString(); } final String label = name; RetryCallback<Object, Throwable> retryCallback = new RetryCallback <Object, Throwable>() { public Object doWithRetry (RetryContext context) throws Exception { context.setAttribute(RetryContext.NAME, label); if (invocation instanceof ProxyMethodInvocation) { try { return ((ProxyMethodInvocation) invocation).invocableClone().proceed(); } catch (Exception e) { throw e; } catch (Error e) { throw e; } catch (Throwable e) { throw new IllegalStateException (e); } } else { throw new IllegalStateException ( "MethodInvocation of the wrong type detected - this should not happen with Spring AOP, " + "so please raise an issue if you see this exception" ); } } }; if (recoverer != null ) { ItemRecovererCallback recoveryCallback = new ItemRecovererCallback ( invocation.getArguments(), recoverer); return this .retryOperations.execute(retryCallback, recoveryCallback); } return this .retryOperations.execute(retryCallback); } }
调用 RetryTemplate 重试模板类进行重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 public class RetryTemplate implements RetryOperations { protected <T, E extends Throwable > T doExecute (RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException { RetryPolicy retryPolicy = this .retryPolicy; BackOffPolicy backOffPolicy = this .backOffPolicy; RetryContext context = open(retryPolicy, state); if (this .logger.isTraceEnabled()) { this .logger.trace("RetryContext retrieved: " + context); } RetrySynchronizationManager.register(context); Throwable lastException = null ; boolean exhausted = false ; try { boolean running = doOpenInterceptors(retryCallback, context); if (!running) { throw new TerminatedRetryException ( "Retry terminated abnormally by interceptor before first attempt" ); } BackOffContext backOffContext = null ; Object resource = context.getAttribute("backOffContext" ); if (resource instanceof BackOffContext) { backOffContext = (BackOffContext) resource; } if (backOffContext == null ) { backOffContext = backOffPolicy.start(context); if (backOffContext != null ) { context.setAttribute("backOffContext" , backOffContext); } } while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { if (this .logger.isDebugEnabled()) { this .logger.debug("Retry: count=" + context.getRetryCount()); } lastException = null ; return retryCallback.doWithRetry(context); } catch (Throwable e) { lastException = e; try { registerThrowable(retryPolicy, state, context, e); } catch (Exception ex) { throw new TerminatedRetryException ("Could not register throwable" , ex); } finally { doOnErrorInterceptors(retryCallback, context, e); } if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { backOffPolicy.backOff(backOffContext); } catch (BackOffInterruptedException ex) { lastException = e; if (this .logger.isDebugEnabled()) { this .logger .debug("Abort retry because interrupted: count=" + context.getRetryCount()); } throw ex; } } if (this .logger.isDebugEnabled()) { this .logger.debug( "Checking for rethrow: count=" + context.getRetryCount()); } if (shouldRethrow(retryPolicy, context, state)) { if (this .logger.isDebugEnabled()) { this .logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); } throw RetryTemplate.<E>wrapIfNecessary(e); } } if (state != null && context.hasAttribute(GLOBAL_STATE)) { break ; } } if (state == null && this .logger.isDebugEnabled()) { this .logger.debug( "Retry failed last attempt: count=" + context.getRetryCount()); } exhausted = true ; return handleRetryExhausted(recoveryCallback, context, state); } catch (Throwable e) { throw RetryTemplate.<E>wrapIfNecessary(e); } finally { close(retryPolicy, context, state, lastException == null || exhausted); doCloseInterceptors(retryCallback, context, lastException); RetrySynchronizationManager.clear(); } } }
通过 RetryPolicy 的实现类判断是否进行重试
1 2 3 4 5 6 7 8 9 10 11 public class SimpleRetryPolicy implements RetryPolicy { @Override public boolean canRetry (RetryContext context) { Throwable t = context.getLastThrowable(); return (t == null || retryForException(t)) && context.getRetryCount() < maxAttempts; } }