spring - webflux:内部事件总线和异步,松散耦合的事件监听器
问题描述
如何实现内部事件总线以在 webflux spring 堆栈中执行异步操作?
我想要一个服务来发出一个事件:
@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
}
并且发布者服务不知道的不同组件应该能够决定对该事件做出反应。
@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
// do stuff
}
}
在 MVC 应用程序中,我将使用在处理程序 ( ) 上ApplicationEventPublisher
发布事件 ( publishEvent
) 和@EventListener
+ 。@Async
onDeleteEntry
反应式堆栈中的等价物是什么?
我考虑的另一个选项是运行嵌入式消息传递服务,因为这应该意味着异步语义。但是对于一个简单的场景来说,这感觉像是很多开销。
我找到了这些 SO 线程
但他们没有回答这种情况,因为他们假设发布者知道听众。但我需要松散耦合。
我还发现了这些春季问题
- https://github.com/spring-projects/spring-framework/issues/21025
- https://github.com/spring-projects/spring-framework/issues/21831
特别是看到这个评论有希望表明这一点:
Mono.fromRunnable(() -> context.publishEvent(...))
据我了解,我可以使用@EventListener
,因为我完全可以不传播反应式上下文。
但是请有人解释线程边界的含义以及这在反应堆栈中是否合法?
更新
从测试来看,这样做感觉很好:
@Service
class FeedServiceImpl(
val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
@EventListener
@Async
override fun handle(e: FeedEntryDeletedEvent) {
log.info("Handler started")
runBlocking {
// do stuff that takes some time
delay(1000)
}
log.info("ThreadId: ${Thread.currentThread().id}")
log.info("Handler done")
}
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
applicationEventPublisher.publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
log.info("ThreadId: ${Thread.currentThread().id}")
log.info("Publisher done")
}
}
请注意,这handle
不是挂起函数,因为@EventListener
必须有一个参数,并且协程在后台引入了延续参数。然后处理程序启动一个新的阻塞协程范围,这很好,因为它位于不同的线程上,因为@Async
.
输出是:
2021-05-13 12:15:20.755 INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl : ThreadId: 38
2021-05-13 12:15:20.755 INFO 20252 --- [ task-1] ...FeedServiceImpl : Handler started
2021-05-13 12:15:20.755 INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher done
2021-05-13 12:15:21.758 INFO 20252 --- [ task-1] ...FeedServiceImpl : ThreadId: 54
2021-05-13 12:15:21.759 INFO 20252 --- [ task-1] ...FeedServiceImpl : Handler done
更新 2
不使用 @Async 的另一种方法是:
@EventListener
// @Async
override fun handle(e: FeedEntryDeletedEvent) {
log.info("Handler start")
log.info("Handler ThreadId: ${Thread.currentThread().id}")
runBlocking {
log.info("Handler block start")
delay(1000)
log.info("Handler block ThreadId: ${Thread.currentThread().id}")
log.info("Handler block end")
}
log.info("Handler done")
}
override suspend fun deleteEntry(entryId: Long) {
feedRepository.deleteById(entryId)
Mono.fromRunnable<Unit> {
applicationEventPublisher.publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
log.info("Publisher ThreadId: ${Thread.currentThread().id}")
log.info("Publisher done")
}
2021-05-13 13:06:54.503 INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher ThreadId: 38
2021-05-13 13:06:54.503 INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher done
2021-05-13 13:06:54.504 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler start
2021-05-13 13:06:54.504 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler ThreadId: 53
2021-05-13 13:06:54.505 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block start
2021-05-13 13:06:55.539 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block ThreadId: 53
2021-05-13 13:06:55.539 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block end
2021-05-13 13:06:55.540 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler done
但是,我仍然不了解负载下的应用程序的含义,并且将响应式操作与执行runBlocking { }
.
解决方案
Reactor 提供 Sink。您可以像使用事件总线一样使用它。看看下面的例子。
@Configuration
public class EventNotificationConfig {
@Bean
public Sinks.Many<EventNotification> eventNotifications() {
return Sinks.many().replay().latest();
}
}
@Configuration
你在一个类中创建一个 Sink 的 Bean 。这可以用来发出新事件,也可以变成订阅者的 Flux。
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {
private final @NonNull Sinks.Many<EventNotification> eventNotifications;
/**
* Provide a flux with our notifications.
*
* @return a Flux
*/
public Flux<EventNotification> getNotifications() {
return eventNotifications.asFlux();
}
/**
* Emit a new event to the sink.
*
* @param eventId
* @param status
* @param payload
*/
public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
eventNotifications.tryEmitNext(EventNotification.builder()
.eventId(eventId)
.status(status)
.payload(payload).build());
}
}
您最多可以在应用程序中保留一个 Sink 实例。订阅不同类型的事件可以通过各种订阅者可以应用于 Flux 的过滤器来实现。
@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {
private final @NonNull NotificationUsecase notificationUsecase;
/**
* Start listening to events as soon as class EventListener
* has been constructed.
*
* Listening will continue until the Flux emits a 'completed'
* signal.
*/
@PostConstruct
public void init() {
this.listenToPings()
.subscribe();
this.listenToDataFetched()
.subscribe();
}
public Flux<EventNotification> listenToPings() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
.doOnNext(notification -> log.info("received PING: {}", notification));
}
public Flux<EventNotification> listenToDataFetched() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
.doOnNext(notification -> log.info("received data: {}", notification));
}
}
public Flux<EventNotification> listenToDataFetchedAndWriteToDatabase() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.flatMap(notification -> reactiveMongoRepository
.saveAndReturnNewObject(notification)
.doOnNext(log.info("I just saved something and returned an instance of NewObject!"))
.zipWith(Mono.just(notification)))
.map(tuple->tuple.getT2())
.filter(notification -> notification.getStatus().equals(PlanningNotification.Status.DATA_FETCHED))
.doOnNext(notification -> log.info("received data: {} - saved ", notification));
}
发出新事件同样简单。只需调用发射方法:
notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);
推荐阅读
- c# - 在 Hangfire 上每 9 分钟执行一次的 cron 作业不工作
- node.js - 在 Mongoose 中跨模块管理连接
- c# - 从 postman 发送 post 请求返回状态码 415
- java - CardView 未显示在 RecyclerView 上(在片段中)
- javascript - 尽管配置了正确的 CORS 标头,但 301 响应“跨源请求被阻止”
- java - 如何解析 REST API 流
- javascript - div中的onclick没有调用函数
- visual-studio-2019 - MSB4018“ResolvePackageAssets”任务意外失败
- python - sklearn 中 lbfgs 求解器的 MLPRegressor learning_rate_init
- docker - docker 上的 asp.net core 3.1 antiforgery - 异常反序列化令牌