apache-kafka - 如何使用kafka集成在春季批处理中调用StepExecutionListener?
问题描述
下面是 etl.xml 中的作业配置
<batch:job id="procuerJob">
<batch:step id="Produce">
<batch:partition partitioner="partitioner">
<batch:handler grid-size="${ partitioner.limit}"></batch:handler>
<batch:step>
<batch:tasklet>
<batch:chunk reader="Reader" writer="kafkaProducer"
commit-interval="20000">
</batch:chunk>
<batch:listeners>
<batch:listener ref="producingListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:partition>
</batch:step>
</batch:job>
下面是用于向主题发送消息的代码。
ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(message);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {
@Override
public void onSuccess(SendResult<String, message > result) {
log.info("marking as SUCCESS");
manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}
@Override
public void onFailure(Throwable ex) {
log.info("marking as FAILURE");
manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}
}
一旦 kafkaTemplate.send(message) 被执行,监听器就会被调用并且作业完成。我看到 onSuccess()、onFailure() 在作业完成后被调用。如何更改作业的配置,以便在收到来自 kafka 主题的确认后调用侦听器?
解决方案
您想要一些您建议阻止等待未来的代码示例吗?这可能会有所帮助。
我没有尝试以下方法,但这是一个想法:
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
try {
SendResult<String, Message> sendResult = future.get();
// inspect sendResult
log.info("marking as SUCCESS");
manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
} catch (Exception e) {
log.info("marking as FAILURE");
manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
// do something with e
}
推荐阅读
- python - 将我的输入与数据库中的值进行比较
- angular - AG-Grid:无法更新 Grid 中的字段,因为它是只读的
- java - Spring DI for Repository (JAVA)
- pascal - 程序在 FPC 和 ObjFPC 中的行为不同
- c - 如何使用 OpenSSL libcrypto 生成 DSA 密钥对?
- android - 授予写入权限,但在执行时被拒绝
- python - 在给定的时间段内按天计算
- java - 侦听器接口如何或为什么工作?除了作为侦听器之外,接口还有其他用途吗?
- xml - 使用 xmlstarlet 编辑 XML 中的值
- mysql - Mysql IF THEN 没有 ELSE