java - 确认所有流的方法已成功发布到kafka
问题描述
这是我的 Spring Boot 代码:我使用了 Spring Boot Annotation @Async
@Async("concurrentTasks")
public void fetchAndPost(){
Stream<Sample> sampledata=sampleRepository.fetchSampleDatafromDb();
sampledata.forEach(sample->{
ListenableFuture<SendResult<String,Object>> future=kafkaTemplate.send(kafkaTopic,partitionKey,sample.toString());
handleResponse(future,partitionKey,sample)
});
}
public void handleResponse(Listenable<SendResult<String,Object>> future,String partitionkey,Sample sample){
future.addCallback(new ListenableFutureCallBack<SendResult<String,Object>>(){
@Override
public void onSuccess(SendResult<String,Object>){
log.info("Publised data successfully");
}
@Override
public void onFailure(SendResult<String,Object>){
log.info("Publised data failed");
}
});
}
这是配置类
@Configuration
public class AsyncConfig {
@Bean
public TaskExecutor concurrentTasks() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.initialize();
return executor;
}
}
现在如何知道我的整个 Stream 已发布到 kafka。因为我必须通知客户我的 kafkaPosting 已完成,以便他们可以开始消费。要求就像我们必须通过电子邮件通知客户一样。所以,我该如何弄清楚。
我不能使用 kafkaTemplate.send(kafkaTopic,partitionKey,sample.toString()).get() 因为它使我的过程变得非常缓慢。
解决方案
推荐阅读
- android - OpenGL渲染器:戴维
- php - php会话使用分页自动增加会话值
- typescript - 我需要用类型替换签名中的原语吗?
- json - 如何重置 Visual Studio Code 的默认所有 json 设置?
- html - 如何在图像上叠加渐变
- java - 如何在后台运行代码或完整应用程序
- netlogo - Netlogo,在长度为 x 的列表中迭代 n 个项目(其中 n <= x)
- typescript - Typescript 中的类型定义中的方括号是什么意思?
- matlab - 为什么我不能在这里使用'scatter3'?
- javascript - 从用户输入自动创建 Javascript 表单