首页 > 解决方案 > webflux:内部事件总线和异步,松散耦合的事件监听器

问题描述

如何实现内部事件总线以在 webflux spring 堆栈中执行异步操作?

我想要一个服务来发出一个事件:

@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
  }
}

并且发布者服务不知道的不同组件应该能够决定对该事件做出反应。

@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
  override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
    // do stuff
  }
}

在 MVC 应用程序中,我将使用在处理程序 ( ) 上ApplicationEventPublisher发布事件 ( publishEvent) 和@EventListener+ 。@AsynconDeleteEntry

反应式堆栈中的等价物是什么?

我考虑的另一个选项是运行嵌入式消息传递服务,因为这应该意味着异步语义。但是对于一个简单的场景来说,这感觉像是很多开销。


我找到了这些 SO 线程

但他们没有回答这种情况,因为他们假设发布者知道听众。但我需要松散耦合。

我还发现了这些春季问题

特别是看到这个评论有希望表明这一点:

Mono.fromRunnable(() -> context.publishEvent(...))

据我了解,我可以使用@EventListener,因为我完全可以不传播反应式上下文。

但是请有人解释线程边界的含义以及这在反应堆栈中是否合法?


更新

从测试来看,这样做感觉很好:

@Service
class FeedServiceImpl(
  val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
  @EventListener
  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler started")
    runBlocking {
      // do stuff that takes some time
      delay(1000)
    }
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    applicationEventPublisher.publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }
}

请注意,这handle不是挂起函数,因为@EventListener必须有一个参数,并且协程在后台引入了延续参数。然后处理程序启动一个新的阻塞协程范围,这很好,因为它位于不同的线程上,因为@Async.

输出是:

2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl  : ThreadId: 38
2021-05-13 12:15:20.755  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler started
2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl   : Publisher done
2021-05-13 12:15:21.758  INFO 20252 --- [         task-1] ...FeedServiceImpl   : ThreadId: 54
2021-05-13 12:15:21.759  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler done

更新 2

不使用 @Async 的另一种方法是:

  @EventListener
//  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler start")
    log.info("Handler ThreadId: ${Thread.currentThread().id}")
    runBlocking {
      log.info("Handler block start")
      delay(1000)
      log.info("Handler block ThreadId: ${Thread.currentThread().id}")
      log.info("Handler block end")
    }
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    feedRepository.deleteById(entryId)
    Mono.fromRunnable<Unit> {
      applicationEventPublisher.publishEvent(
        FeedEntryDeletedEvent(
          timestamp = time.utcMillis(),
          entryId = entryId,
        )
      )
    }
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe()
    log.info("Publisher ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }

2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher ThreadId: 38
2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher done
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler start
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler ThreadId: 53
2021-05-13 13:06:54.505  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block start
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block ThreadId: 53
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block end
2021-05-13 13:06:55.540  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler done

但是,我仍然不了解负载下的应用程序的含义,并且将响应式操作与执行runBlocking { }.

标签: springspring-bootspring-webflux

解决方案



Reactor 提供 Sink。您可以像使用事件总线一样使用它。看看下面的例子。

@Configuration
public class EventNotificationConfig {

    @Bean
    public Sinks.Many<EventNotification> eventNotifications() {
        return Sinks.many().replay().latest();
    }

}

@Configuration你在一个类中创建一个 Sink 的 Bean 。这可以用来发出新事件,也可以变成订阅者的 Flux。

@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {

    private final @NonNull Sinks.Many<EventNotification> eventNotifications;


    /**
     * Provide a flux with our notifications.
     *
     * @return a Flux
     */
    public Flux<EventNotification> getNotifications() {
        return eventNotifications.asFlux();
    }

    /**
     * Emit a new event to the sink.
     *
     * @param eventId
     * @param status
     * @param payload
     */
    public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
        eventNotifications.tryEmitNext(EventNotification.builder()
          .eventId(eventId)
          .status(status)
          .payload(payload).build());
    }

}

您最多可以在应用程序中保留一个 Sink 实例。订阅不同类型的事件可以通过各种订阅者可以应用于 Flux 的过滤器来实现。


@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {

    private final @NonNull NotificationUsecase notificationUsecase;


    /**
     * Start listening to events as soon as class EventListener
     * has been constructed.
     *
     * Listening will continue until the Flux emits a 'completed'
     * signal.
     */
    @PostConstruct
    public void init() {

        this.listenToPings()
                .subscribe();
        this.listenToDataFetched()
                .subscribe();
    }


    public Flux<EventNotification> listenToPings() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
                .doOnNext(notification -> log.info("received PING: {}", notification));
    }

    public Flux<EventNotification> listenToDataFetched() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {}", notification));
    }
}

    
    public Flux<EventNotification> listenToDataFetchedAndWriteToDatabase() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(notification -> reactiveMongoRepository
                    .saveAndReturnNewObject(notification)
                    .doOnNext(log.info("I just saved something and returned an instance of NewObject!"))
                    .zipWith(Mono.just(notification)))
                .map(tuple->tuple.getT2())
                .filter(notification -> notification.getStatus().equals(PlanningNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {} - saved ", notification));
    }

发出新事件同样简单。只需调用发射方法:



notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);



推荐阅读