spring-integration - Spring integration kafka - 监听器没有响应 - 我怎么能找出原因?
问题描述
这是关于使用 spring-batch 和 spring-kafka 的其他几个 SO 问题的后续。
意图:
目的是建立这样的调用链(简化视图):
主调用从步骤:
<master job> -> partitioner (MessageChannelPartitionHandler) +aggregator -> messagingTemplate -> outbound-requests (Channel) -> request-outbound-staging (KafkaProducerMessageHandler) -> kafka
Kafka 侦听器响应消息并触发从属工作步骤
kafka -> inbound-request-listener (MessageDrivenChannelAdapter) -> inbound-requests (channel) -> worker-container (KafkaMessageListenerContainer) -> stepExecutionRequestHandler <slave step>
Spring批量回复返回Kafka
stepExecutionRequestHandler <slave step> -> stepMessagingTemplate -> outbound-replies (Channel) -> reply-outbound-staging (KafkaProducerMessageHandler) -> kafka
Kafka 监听器返回对聚合器和分区器的回复
kafka -> inbound-replies (MessageDrivenChannelAdapter) -> partitioner (MessageChannelPartitionHandler) +aggregator -> <master job>
历史:
在最初使用 kafka 完成 spring-integration 的配置之后,进程从属端的 spring-batch 组件在侦听器被启动时没有找到步骤。
我们重构了 spring-batch 组件和驱动它们的 spring-integration,最终将它们和侦听器组件从 java DSL 中移出并进入从属步骤 XML。
当前状态:
重构后,kafka 侦听器似乎不再响应。唯一的症状是从属进程没有响应,并且聚合器超时。
Java DSL 配置:
@Configuration
@Order(6)
@EnableIntegration
@EnableKafka
@IntegrationComponentScan
public class QueueingConfig {
private static final int MXMODULE = 400;
private static final String JOB_CONTROL_TOPIC = "job.control";
private static final String STEP_EXECUTION_TOPICS = "job.step";
private static final String STEP_REPLY_TOPICS = "job.step.reply";
XML 配置片段:
<bean id="worker-container" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.2.127:9092" /> <!-- needs to come from factory bean -->
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
<entry key="value.deserializer" value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
<entry key="group.id" value="batch"/>
<entry key="spring.json.trusted.packages" value="com.mypackage,org.springframework.batch.integration.partition"/>
<entry key="max.poll.records" value="10"/>
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="job.step" />
</bean>
</constructor-arg>
</bean>
<int-kafka:message-driven-channel-adapter
id="inboundKafkaRequests"
send-timeout="5000"
mode="record"
channel="inbound-requests"
auto-startup="true"
listener-container="worker-container"
/>
先前的研究:
编辑:更新
在 linux 补丁更新期间,kafka 配置文件被默认文件覆盖。我恢复了正确的配置,kafka 恢复了运行。
在编写这个问题的练习中,我将大部分弹簧接线形式化了。形式化过程有助于识别无线集成组件的一些问题。
解决方案
推荐阅读
- html - 如何在移动视图中的表格中的单元格元素之间具有相等的间距?
- javascript - 如何从javascript中的选择选项中获取多个值?
- powershell - 批处理文件中的循环迭代缓慢
- php - 在 PHP 中使用 DOMXPath 调用 XML 数据
- jquery - 如何以 Unix 时间戳格式控制台记录 jQuery UI 的 datepicker 日期?
- intersection-observer - 如何去抖动或限制 Intersection Observer?
- javascript - 无法为预出价槽复制 JS 变量
- c++ - C++ std::stoul 不抛出异常
- python - Django - 按表单更新用户实例
- python - 阅读连接查询 Sqlalchemy Jinja