首页 > 解决方案 > 如何使用kafka集成在春季批处理中调用StepExecutionListener?

问题描述

下面是 etl.xml 中的作业配置

<batch:job id="procuerJob">

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />

                    
                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

</batch:job>

下面是用于向主题发送消息的代码。

ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(message);

listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {

@Override
public void onSuccess(SendResult<String, message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}

}

一旦 kafkaTemplate.send(message) 被执行,监听器就会被调用并且作业完成。我看到 onSuccess()、onFailure() 在作业完成后被调用。如何更改作业的配置,以便在收到来自 kafka 主题的确认后调用侦听器?

标签: apache-kafkaspring-batchlistenerkafka-producer-apispring-batch-tasklet

解决方案


您想要一些您建议阻止等待未来的代码示例吗?这可能会有所帮助。

我没有尝试以下方法,但这是一个想法:

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
try {
    SendResult<String, Message> sendResult = future.get();
    // inspect sendResult
    log.info("marking as SUCCESS");
    manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
} catch (Exception e) {
     log.info("marking as FAILURE");
     manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
     // do something with e
}

推荐阅读