前言
Spring 从 3.x
开始支持事件机制。在 Spring 的事件机制中,我们可以令一个事件类继承 ApplicationEvent
类,然后将实现了 ApplicationListener
的 Bean
注册到 spring 容器,最后向 ApplicationEventPublisher
推送事件对象即可令所有订阅者收到事件。在 4.2
以后,甚至不需要实现 ApplicationListener
接口,仅需在 Bean
中方法标记 @EventListener
注解即可。
笔者将基于 Spring 源码的 5.2.x
分支,分析该功能是如何实现的。
本文是其中的第二篇文章,将分析事件是如何通过广播器推送,并被监听器接收并处理的。
在开始前,推荐先阅读前文了解一点 Spring 的注解机制或者事务机制,这将更有利于流程与一些代码的理解。
相关文章:
一、事件的推送
1、将事件推送到上下文
当我们借助 Spring 发送一个事件对象的时候,一般都通过 ApplicationEventPublisher
完成,在默认情况下,通过容器获得的 ApplicationEventPublisher
单例实际上就是 ApplicationContext
本身。
ApplicationEventPublisher
提供了两个 publishEvent
方法,一个用于发布 ApplicationEvent
事件,另一个用于发布其他事件,在 AbstractApplicationContext
中,它们都通过同一个私有方法实现:
1 | // 推送ApplicationEvent事件 |
这一步方法总共做了这些事:
- 如果事件对象没有继承
ApplicationEvent
,则将其包装为PayloadApplicationEvent
; - 若早期事件列表为空,说明还在上下文已有可用的广播器,直接通过广播器推送事件,否则就先把事件加入早期事件列表,等到广播器初始化完成后再推送;
- 如果上下文存在父上下文,则向父上下文也推送事件;
针对早期事件列表,在容器调用 AbstractApplicationContext.refresh
方法进行初始化的过程中,早期事件列表在整个容器启动的第一个步骤 prepareRefresh
中被创建,而在非常靠后的 registerListeners
步骤中才被清空。
也就是说,当 registerListeners
还没执行前,任何向上下文推送的事件实际上都不会立刻执行,而是延迟到 registerListeners
这一步才会推送,在这一步后,向上下文推送的事件都会立刻被推送。
2、将事件推送到广播器
当上下文将事件推送到广播器时,需要调用 ApplicationEventMulticaster.multicastEvent
方法,我们以默认的实现类 SimpleApplicationEventMulticaster
为例:
1 | public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) { |
这一步共干了四件事:
- 获取事件的实际类型;
- 获取广播器中配置的任务执行器;
- 通过事件的实际类型获取对应的监听器;
- 遍历监听器,在执行器中调用监听器;
更简单的概况,就是:找到事件对应的监听器,然后依次放到执行器执行。
下面我们将详细分析监听器查询与执行的过程。
二、监听器的检索
在广播器广播事件时,会调用 getApplicationListeners
方法:
1 | protected Collection<ApplicationListener<?>> getApplicationListeners( |
1、监听器检索器
默认的广播器 SimpleApplicationEventMulticaster
中维护了两个检索器内部类,用于管理注册到广播器的监听器,它们有不同的用途:
DefaultListenerRetriever
:用于支持监听器的注册功能;CachedListenerRetriever
:用于提供基于事件类型快速查询对应监听器的缓存功能;
DefaultListenerRetriever
DefaultListenerRetriever
主要用于支持监听器的注册,他唯一的作用就是提供 getApplicationListeners
方法:
1 | private class DefaultListenerRetriever { |
该检索器支持直接注册 Bean
实例,或者只注册 BeanName
,当调用 getApplicationListeners
将会全量的获得已注册的监听器实例。
CachedListenerRetriever
CachedListenerRetriever
用于提供基于事件类型快速查询对应监听器的缓存功能,它总是在 DefaultListenerRetriever
创建后,向广播器推送事件的时候才会被创建,而当向广播器注册监听器时,如果已有该缓存检索器,则它们全部都会被销毁,等待下一次推送时间时再被创建,从而实现刷新缓存的功能:
1 | private class CachedListenerRetriever { |
而每一个 CachedListenerRetriever
都会有一个对应的 ListenerCacheKey
,这个类重写了 hashCode
与 equals
方法,用来在 Map 集合中根据事件类型与事件 source 作为 key:
1 | private static final class ListenerCacheKey implements Comparable<ListenerCacheKey> { |
2、获取监听器
在 getApplicationListeners
方法中,主要用于创建一个 ListenerCacheKey
对应的 CachedListenerRetriever
缓存实例,不过实际上此时 CachedListenerRetriever
仍然还是空的,需要在最后通过 retrieveApplicationListeners
方法从 DefaultListenerRetriever
中找到对应的监听器,然后将他们都刷进缓存:
1 | private Collection<ApplicationListener<?>> retrieveApplicationListeners( |
而这里多次调用的 support
方法按顺序依次则有:
1 | protected boolean supportsEvent( |
这里的逻辑有点长,不过主要目标还是很清晰的:
- 先根据事件的类型和
source
获取CachedListenerRetriever
缓存,如果没缓存就先创建; - 从
DefaultListenerRetriever
获取全部监听器:- 如果能获取实例,就直接获得实例;
- 不能获得实例,就通过
BeanName
从BeanFactory
里取;
- 判断这些监听器支不支持处理当前推送的事件:
- 如果能拿到实例,并且如果实现了
GenericApplicationListener
,就直接调用supportsEventType
和supportsSourceType
方法判断; - 如果不能拿到实例,那就尝试拿到实际的类型,并且通过泛型确认是否支持当前推送的事件;
- 如果能拿到实例,并且如果实现了
- 获得全部支持该事件的监听器后,再将其刷入
CachedListenerRetriever
缓存,下次再来就直接从缓存里头取;
三、监听器的执行
监听器的执行分为两步,第一步是在广播器中调用监听器的 onApplicationEvent
,第二步是在 onApplicationEvent
调用真正的处理逻辑,这里根据监听器类型的不同而具有不同的实现,这里我们重点关注注解式监听器。
1、广播器的广播
监听器的执行对应 multicastEvent
中的这一段代码:
1 | // 获取任务执行器, |
这里的逻辑非常简单:
- 获取全部支持处理该事件的监听器,然后依次调用
ApplicationListener.onApplicationEvent
方法; - 如果广播器有设置线程池,则调用监听器的时候就在线程池里调用,否则在当前线程里调用;
- 当调用发生异常时,就调用广播器中注册的异常处理器
ErrorHandler
;
2、普通监听器的执行
我们以最基本的注解式监听器 ApplicationListenerMethodAdapter
为例,当调用它的 onApplicationEvent
方法时:
1 |
|
条件判断
此过程平平无奇,其中 shouldHandle
方法用于根据在 @EventListener
中的 condition
属性指定的 SpEL
表达式确定是否需要真正调用注解方法:
1 | private boolean shouldHandle(ApplicationEvent event, Object[] args){ |
至于 evaluator
则是 Spring 基于 SpEL
表达式封装的一个用于事件的工具类 EventExpressionEvaluator
,关于这个可以单独了解 SpelExpressionParser
、StandardEvaluationContext
、 Expression
这三个类,或者直接了解 Spring 的 SpEL
表达式相关功能。
3、返回值支持
相比于普通的编程式监听器,注解式监听器还会多处一步对返回值的处理。我们以 ApplicationListenerMethodAdapter
为例,当 doInvoke
以后,若注解方法返回值不为null,则会尝试通过 handleResult
对返回值进行处理:
1 | protected void handleResult(Object result) { |
当监听器方法有返回值的时候,这里有三种处理:
- 返回值是
CompletionStage
,继续完成下一步异步调用; - 返回值是
ListenableFuture
,调用回调方法; - 返回值返回值是对象、数组或集合,尝试作为将其作为事件对象发送;
返回值是CompletionStage
当看到了 CompletionStage
的时候,很容易联想到基于它实现的 CompletableFuture
。它表示一个待完成的异步任务,在 ApplicationListenerMethodAdapter
中,监听器会通过如下代码,为其追加任务完成后的回调:
1 | // 当这一步完成后,获取执行结构 |
返回值是ListenableFuture
ListenableFuture
也是一个异步任务回调接口,它的用法与 Guava 中的 ListenableFuture
几乎完全一致,处理的逻辑与上文的返回值是 CompletionStage
的情况也完全一致,就是追加任务完成后的回调:
1 | ((ListenableFuture<?>) result).addCallback(this::publishEvents, this::handleAsyncError); |
当任务的返回值不为 null 的时候,就尝试处理它的返回值。
返回值是普通对象、数组或集合
当对象是普通对象的时候,监听器会尝试将返回值也作为一个事件推送。而如果是数组或者集合,会先将其摊平,然后把其中的每一个元素都取出尝试作为事件推送:
1 |
|
4、事务支持
由于编程式监听器天生就是支持事务的,因此 Spring 只单纯为注解式监听器专门准备的事务监听器方法适配器 ApplicationListenerMethodTransactionalAdapter
,它由监听器工厂 TransactionalEventListenerFactory
生产,与一个 @TransactionalEventListener
注解对应。
@TransactionalEventListener
而 @TransactionalEventListener
是一个基于 Spring 元注解机制扩展的注解,它在 @EventListener
的基础上扩展了一些事务相关的配置:
1 |
|
ApplicationListenerMethodTransactionalAdapter
事务监听器方法适配器 ApplicationListenerMethodTransactionalAdapter
继承了非事务的适配器 ApplicationListenerMethodAdapter
,但是基于 TransactionSynchronization
在监听器的 onApplicationEvent
方法做了一些事务的处理:
1 | class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter { |
这里的逻辑也很直白:
- 当调用方法时
onApplicationEvent
方法时,判断事务是否可用; - 事务不可用,并且
@TransactionalEventListener
的fallbackExecution
属性为true
,则直接调用注解放方法; - 若事务可用,则创建一个实现了
TransactionSynchronization
接口的事务同步操作TransactionSynchronizationEventAdapter
,并将其注册到TransactionSynchronizationManager
; TransactionSynchronizationManager
在事务进行到指定阶段后,会调用TransactionSynchronizationEventAdapter
的对应回调方法;TransactionSynchronizationEventAdapter
在回调方法中,再确认当前事务阶段与在@TransactionalEventListener
中phase
属性指定的阶段是否一致,若一致则调用注解方法;
因此此处也不难理解为什么广播器进行广播的时候,若指定了线程池则事务会失效了,因为具体到监听器适配器调用时,通过 TransactionSynchronizationManager
注册的事务是当前线程池中的工作线程的事务,而调用广播器的主线程的事务与其不是一个事务,因此监听器中事务回滚不会令主线程的事务一并回滚。
总结
当我们向 ApplicationContext
推送事件时:
ApplicationContext
会将事件推送至内部持有的广播器实例SimpleApplicationEventMulticaster
;SimpleApplicationEventMulticaster
在内部维护了两个检索器,分别是DefaultListenerRetriever
与CachedListenerRetriever
,前者用于提供监听器注册和全量查询功能,后者用于提供基于事件类型与事件源类型对监听器进行快速查询的缓存,
当推送事件时,会先通过
DefaultListenerRetriever
全量查询,然后将其加入CachedListenerRetriever
缓存,下次查询时直接从缓存中获取订阅了事件的监听器;当获得订阅了事件的监听器后,会尝试调用监听器的
onApplicationEvent
方法:- 若广播器中注册了线程池,则会直接把操作提交到线程池中执行;
- 若广播器中没有注册线程,则会直接在当前线程执行;
监听器被调用的时候,处理基本内的事件处理,而注解时监听器还额外支持一些功能,比如:
- 如果使用了
@TransactionalEventListener
注解会生成支持事务的监听器ApplicationListenerMethodTransactionalAdapter
,该监听器会在调用前向事务管理器注册同步事务,从而获得事务支持; - 默认的注解式监听器生成
ApplicationListenerMethodAdapter
这种类型的监听器,该类监听器在绑定的注解方法有返回值时,会尝试将返回值也作为一个事件发送,而如果是集合或者数组,则会摊平后尝试将每一个元素都作为事件发生;
- 如果使用了