首页 > 解决方案 > Spring integration kafka - 监听器没有响应 - 我怎么能找出原因?

问题描述

这是关于使用 spring-batch 和 spring-kafka 的其他几个 SO 问题的后续。

意图:

目的是建立这样的调用链(简化视图):

主调用从步骤:

<master job> -> partitioner (MessageChannelPartitionHandler) +aggregator -> messagingTemplate -> outbound-requests (Channel) -> request-outbound-staging (KafkaProducerMessageHandler) -> kafka

Kafka 侦听器响应消息并触发从属工作步骤

kafka -> inbound-request-listener (MessageDrivenChannelAdapter) -> inbound-requests (channel) -> worker-container (KafkaMessageListenerContainer) -> stepExecutionRequestHandler <slave step>

Spring批量回复返回Kafka

stepExecutionRequestHandler <slave step> -> stepMessagingTemplate -> outbound-replies (Channel) -> reply-outbound-staging (KafkaProducerMessageHandler) -> kafka

Kafka 监听器返回对聚合器和分区器的回复

kafka -> inbound-replies (MessageDrivenChannelAdapter) -> partitioner (MessageChannelPartitionHandler) +aggregator -> <master job>

历史:

在最初使用 kafka 完成 spring-integration 的配置之后,进程从属端的 spring-batch 组件在侦听器被启动时没有找到步骤。

我们重构了 spring-batch 组件和驱动它们的 spring-integration,最终将它们和侦听器组件从 java DSL 中移出并进入从属步骤 XML。

当前状态:

重构后,kafka 侦听器似乎不再响应。唯一的症状是从属进程没有响应,并且聚合器超时。

Java DSL 配置:

@Configuration
@Order(6)
@EnableIntegration
@EnableKafka
@IntegrationComponentScan
public class QueueingConfig {
    private static final int MXMODULE = 400;
    private static final String JOB_CONTROL_TOPIC = "job.control";
    private static final String STEP_EXECUTION_TOPICS = "job.step";
    private static final String STEP_REPLY_TOPICS = "job.step.reply";

XML 配置片段:

<bean id="worker-container" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="192.168.2.127:9092" />  <!-- needs to come from factory bean -->
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <entry key="value.deserializer" value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
                <entry key="group.id" value="batch"/>
                <entry key="spring.json.trusted.packages" value="com.mypackage,org.springframework.batch.integration.partition"/>
                <entry key="max.poll.records" value="10"/>
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.ContainerProperties">
            <constructor-arg name="topics" value="job.step" />
        </bean>
    </constructor-arg>
</bean>


<int-kafka:message-driven-channel-adapter
    id="inboundKafkaRequests"
    send-timeout="5000"
    mode="record"
    channel="inbound-requests"
    auto-startup="true"
    listener-container="worker-container" 
    />

先前的研究:

  1. Spring Integration Kafka Consumer Listener 不接收消息

  2. 如何将此弹簧集成配置从 XML 转换为 Java?

  3. kafka 经纪人在开始时不可用


编辑:更新

在 linux 补丁更新期间,kafka 配置文件被默认文件覆盖。我恢复了正确的配置,kafka 恢复了运行。

在编写这个问题的练习中,我将大部分弹簧接线形式化了。形式化过程有助于识别无线集成组件的一些问题。

标签: spring-integrationspring-batch

解决方案


推荐阅读