SpringBoot(8-事务监听器) | 总字数: 2.4k | 阅读时长: 10分钟 | 浏览量: |
使用背景
当在完成某些数据的入库后,发布了一个事件,此时使用的是@EventListener,然后在这个事件中又去对刚才入库的数据进行查询,从而完成后续的操作。例如:(数据入库 => 对入库数据进行查询审核),这时候会发现,查询不到刚才入库的数据,这是因为事务还没提交完成,在同一个事务当中,查询不到才存入的数据,那么就引出了下面的解决方式
@TransactionalEventListener
事务同步管理器 TransactionSynchronizationManager
注解属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @EventListener public @interface TransactionalEventListener { TransactionPhase phase () default TransactionPhase.AFTER_COMMIT; boolean fallbackExecution () default false ; @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] value() default {}; @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] classes() default {}; String condition () default "" ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 public enum TransactionPhase { BEFORE_COMMIT, AFTER_COMMIT, AFTER_ROLLBACK, AFTER_COMPLETION }
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Slf4j @Service public class HelloServiceImpl implements HelloService { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private ApplicationEventPublisher applicationEventPublisher; @Transactional @Override public Object hello (Integer id) { String sql = "insert into user (id,name,age) values (" + id + ",'fsx',21)" ; jdbcTemplate.update(sql); applicationEventPublisher.publishEvent(new MyAfterTransactionEvent ("我是和事务相关的事件,请事务提交后执行我~~~" , id)); return "service hello" ; } @Slf4j @Component private static class MyTransactionListener { @Autowired private JdbcTemplate jdbcTemplate; @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) private void onHelloEvent (HelloServiceImpl.MyAfterTransactionEvent event) { Object source = event.getSource(); Integer id = event.getId(); String query = "select count(1) from user where id = " + id; Integer count = jdbcTemplate.queryForObject(query, Integer.class); log.info(source + ":" + count.toString()); } } private static class MyAfterTransactionEvent extends ApplicationEvent { private Integer id; public MyAfterTransactionEvent (Object source, Integer id) { super (source); this .id = id; } public Integer getId () { return id; } } }
实现原理
Spring 对事务监控的处理逻辑是在 TransactionSynchronization
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public interface TransactionSynchronization extends Ordered , Flushable { int STATUS_COMMITTED = 0 ; int STATUS_ROLLED_BACK = 1 ; int STATUS_UNKNOWN = 2 ; default int getOrder () { return Integer.MAX_VALUE; } default void suspend () { } default void resume () { } default void flush () { } default void beforeCommit (boolean readOnly) { } default void beforeCompletion () { } default void afterCommit () { } default void afterCompletion (int status) { } }
Spring 会注册一个 TransactionalEventListenerFactory 类型的 bean 到 Spring 容器中,TransactionalEventListenerFactory 实现了 EventListenerFactory 接口,主要作用是先判断目标方法是否是某个监听器的类型,然后为目标方法生成一个监听器,会在某个 bean 初始化之后由 Spring 调用其方法用于生成监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class TransactionalEventListenerFactory implements EventListenerFactory , Ordered { private int order = 50 ; public void setOrder (int order) { this .order = order; } @Override public int getOrder () { return this .order; } @Override public boolean supportsMethod (Method method) { return (AnnotationUtils.findAnnotation(method, TransactionalEventListener.class) != null ); } @Override public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodTransactionalAdapter (beanName, type, method); } }
TransactionalApplicationListenerMethodAdapter 在监听到发布的事件之后会生成一个 TransactionSynchronization 对象,并且将该对象注册到当前事务逻辑中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class TransactionalApplicationListenerMethodAdapter extends ApplicationListenerMethodAdapter implements TransactionalApplicationListener <ApplicationEvent> { private final TransactionalEventListener annotation; private final TransactionPhase transactionPhase; private final List<TransactionalApplicationListener.SynchronizationCallback> callbacks = new CopyOnWriteArrayList (); public TransactionalApplicationListenerMethodAdapter (String beanName, Class<?> targetClass, Method method) { super (beanName, targetClass, method); TransactionalEventListener ann = (TransactionalEventListener)AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class); if (ann == null ) { throw new IllegalStateException ("No TransactionalEventListener annotation found on method: " + method); } else { this .annotation = ann; this .transactionPhase = ann.phase(); } } public TransactionPhase getTransactionPhase () { return this .transactionPhase; } public void addCallback (TransactionalApplicationListener.SynchronizationCallback callback) { Assert.notNull(callback, "SynchronizationCallback must not be null" ); this .callbacks.add(callback); } public void onApplicationEvent (ApplicationEvent event) { if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) { TransactionSynchronizationManager.registerSynchronization(new TransactionalApplicationListenerSynchronization (event, this , this .callbacks)); } else if (this .annotation.fallbackExecution()) { if (this .annotation.phase() == TransactionPhase.AFTER_ROLLBACK && this .logger.isWarnEnabled()) { this .logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase" ); } this .processEvent(event); } else if (this .logger.isDebugEnabled()) { this .logger.debug("No transaction is active - skipping " + event); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter { private final ApplicationListenerMethodAdapter listener; private final ApplicationEvent event; private final TransactionPhase phase; public TransactionSynchronizationEventAdapter (ApplicationListenerMethodAdapter listener, ApplicationEvent event, TransactionPhase phase) { this .listener = listener; this .event = event; this .phase = phase; } @Override public int getOrder () { return this .listener.getOrder(); } public void beforeCommit (boolean readOnly) { if (this .phase == TransactionPhase.BEFORE_COMMIT) { processEvent(); } } public void afterCompletion (int status) { if (this .phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) { processEvent(); } else if (this .phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) { processEvent(); } else if (this .phase == TransactionPhase.AFTER_COMPLETION) { processEvent(); } } protected void processEvent () { this .listener.processEvent(this .event); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public void processEvent (ApplicationEvent event) { Object[] args = resolveArguments(event); if (shouldHandle(event, args)) { Object result = doInvoke(args); if (result != null ) { handleResult(result); } else { logger.trace("No result object given - no result to handle" ); } } } protected Object[] resolveArguments(ApplicationEvent event) { ResolvableType declaredEventType = getResolvableType(event); if (declaredEventType == null ) { return null ; } if (this .method.getParameterCount() == 0 ) { return new Object [0 ]; } Class<?> eventClass = declaredEventType.getRawClass(); if ((eventClass == null || !ApplicationEvent.class.isAssignableFrom(eventClass)) && event instanceof PayloadApplicationEvent) { return new Object [] {((PayloadApplicationEvent) event).getPayload()}; } else { return new Object [] {event}; } } private boolean shouldHandle (ApplicationEvent event, @Nullable Object[] args) { if (args == null ) { return false ; } String condition = getCondition(); if (StringUtils.hasText(condition)) { Assert.notNull(this .evaluator, "EventExpressionEvaluator must no be null" ); EvaluationContext evaluationContext = this .evaluator.createEvaluationContext( event, this .targetClass, this .method, args, this .applicationContext); return this .evaluator.condition(condition, this .methodKey, evaluationContext); } return true ; } protected void handleResult (Object result) { if (result.getClass().isArray()) { Object[] events = ObjectUtils.toObjectArray(result); for (Object event : events) { publishEvent(event); } } else if (result instanceof Collection<?>) { Collection<?> events = (Collection<?>) result; for (Object event : events) { publishEvent(event); } } else { publishEvent(result); } }