java - 具有单个执行器的发布订阅通道
问题描述
我有一个绑定到来自 Kafka 的云流输入通道的集成流。
然后消息通过 singleThreadExecutor 进入发布订阅通道。
从那里他们转到处理他们的一个订阅者。处理可能需要一些时间。
据我了解,处理是在 singleThreadExecutor 上完成的。云流的线程被释放以从 Kafka 获取另一条消息。
如果新消息到达但处理线程仍然忙,会发生什么?云流的线程会等待还是消息会被丢弃?如果要等多久呢?是否有一些默认超时值?
我猜测我们在这种情况下丢失了一些消息,因为我可以在 Kafka 中看到消息,但数据库中没有相应的更新......
但大多数消息都按预期处理。
public interface PointChannelProcessor {
@Input("point-channel")
MessageChannel pointChannel();
}
@Bean
public MessageChannel routeByKeyMessageChannel() {
return MessageChannels.publishSubscribe(Executors.newSingleThreadExecutor()).get();
}
@Bean
public IntegrationFlow retrievePointListFromKafka() {
return IntegrationFlows.from(pointChannelProcessor.pointChannel())
.transform(new JsonToObjectTransformer(PointKafkaImport.class))
.channel(ROUTE_BY_KEY_MESSAGE_CHANNEL)
.get();
}
@Bean
public IntegrationFlow routePointsByKey(){
return IntegrationFlows.from(ROUTE_BY_KEY_MESSAGE_CHANNEL)
.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, failedPointsChannel(), true))
.route((GenericHandler<PointKafkaImport>) (payload, headers) -> {
//some routing logic
if (...) {
return ONE_CHANNEL;
} else if (...){
return NULL_CHANNEL;
} else {
return ANOTHER_CHANNEL;
}
})
.get();
}
//Other direct channels from routing which handles payload
@Bean
public IntegrationFlow savePoint(){
return IntegrationFlows.from(ONE_CHANNEL)
.handle((GenericHandler<PointKafkaImport>) (payload, headers) -> ...)
.get();
}
@Bean
public IntegrationFlow updatePoint(){
return IntegrationFlows.from(ANOTHER_CHANNEL)
.handle((GenericHandler<PointKafkaImport>) (payload, headers) -> ...)
.get();
}
解决方案
您需要更清楚地了解您的问题,但据我所知,请确保您找到 和之间的区别 。在Kafka中,您的代码负责从队列中提取消息,并且消息不会发送到您的代码。因此,您的代码不能忙于从队列中提取消息。publish-subscribe
Producer-Consumer
此外,有许多策略可以确保您从队列中读取和处理数据并且不会丢失任何内容。在消费者中,您只需读取消息并增加偏移量,如果处理失败,您可以再次读取消息。消息将通过您在设置 Kafka 时设置的策略删除。
推荐阅读
- settings - CLion 方法签名没有提示
- java - 使用 BulkWrite (DeleteManyModel) 删除 100 万条记录需要更长的时间
- math - 如何识别右手和左手手势张量流
- javascript - 如何使用可移动文本和徽标更新threejs纹理
- react-native - 从对象 React Native 中获取特定属性
- reactjs - 在反应中更新对象状态的正确方法(合并状态?传播运算符?)
- python - 使用 BayesSearchCV 时如何提取最佳特征?
- javascript - 在 GEE 中导出图像时没有显示错误但有一行像素错误
- android - 如何在真实设备上配置 Android zScaler Client Connector VPN
- tomcat - 服务器 http://localhost:8081 需要用户名和密码