首页 > 解决方案 > 具有单个执行器的发布订阅通道

问题描述

我有一个绑定到来自 Kafka 的云流输入通道的集成流。

然后消息通过 singleThreadExecutor 进入发布订阅通道。

从那里他们转到处理他们的一个订阅者。处理可能需要一些时间。

据我了解,处理是在 singleThreadExecutor 上完成的。云流的线程被释放以从 Kafka 获取另一条消息。

如果新消息到达但处理线程仍然忙,会发生什么?云流的线程会等待还是消息会被丢弃?如果要等多久呢?是否有一些默认超时值?

我猜测我们在这种情况下丢失了一些消息,因为我可以在 Kafka 中看到消息,但数据库中没有相应的更新......

但大多数消息都按预期处理。

public interface PointChannelProcessor {
    @Input("point-channel")
    MessageChannel pointChannel();
}

@Bean
public MessageChannel routeByKeyMessageChannel() {
    return MessageChannels.publishSubscribe(Executors.newSingleThreadExecutor()).get();
}

@Bean
public IntegrationFlow retrievePointListFromKafka() {
    return IntegrationFlows.from(pointChannelProcessor.pointChannel())
        .transform(new JsonToObjectTransformer(PointKafkaImport.class))
        .channel(ROUTE_BY_KEY_MESSAGE_CHANNEL)
        .get();
}

@Bean
public IntegrationFlow routePointsByKey(){
    return IntegrationFlows.from(ROUTE_BY_KEY_MESSAGE_CHANNEL)
        .enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, failedPointsChannel(), true))
        .route((GenericHandler<PointKafkaImport>) (payload, headers) -> {
              //some routing logic
              if (...) {
                  return ONE_CHANNEL;
              } else if (...){
                  return NULL_CHANNEL;
              } else {
                  return ANOTHER_CHANNEL;
              }
        })
        .get();
}

//Other direct channels from routing which handles payload
@Bean
public IntegrationFlow savePoint(){
    return IntegrationFlows.from(ONE_CHANNEL)
        .handle((GenericHandler<PointKafkaImport>) (payload, headers) -> ...)
        .get();
}

@Bean
public IntegrationFlow updatePoint(){
    return IntegrationFlows.from(ANOTHER_CHANNEL)
        .handle((GenericHandler<PointKafkaImport>) (payload, headers) -> ...)
        .get();
}

标签: javaspring-integrationpublish-subscribe

解决方案


您需要更清楚地了解您的问题,但据我所知,请确保您找到 和之间的区别 。在Kafka中,您的代码负责从队列中提取消息,并且消息不会发送到您的代码。因此,您的代码不能忙于从队列中提取消息。publish-subscribeProducer-Consumer

此外,有许多策略可以确保您从队列中读取和处理数据并且不会丢失任何内容。在消费者中,您只需读取消息并增加偏移量,如果处理失败,您可以再次读取消息。消息将通过您在设置 Kafka 时设置的策略删除。


推荐阅读