首页 > 解决方案 > 确认所有流的方法已成功发布到kafka

问题描述

这是我的 Spring Boot 代码:我使用了 Spring Boot Annotation @Async

     @Async("concurrentTasks")
    
        public void fetchAndPost(){
        Stream<Sample> sampledata=sampleRepository.fetchSampleDatafromDb();
        sampledata.forEach(sample->{
        
        ListenableFuture<SendResult<String,Object>> future=kafkaTemplate.send(kafkaTopic,partitionKey,sample.toString());
        handleResponse(future,partitionKey,sample)
        });
        
        }
        
        public void handleResponse(Listenable<SendResult<String,Object>> future,String partitionkey,Sample sample){
        
        future.addCallback(new ListenableFutureCallBack<SendResult<String,Object>>(){
        
        @Override
        public void onSuccess(SendResult<String,Object>){
        log.info("Publised data successfully");
        
        }
        @Override
        public void onFailure(SendResult<String,Object>){
        log.info("Publised data failed");
        
        }
        
        
        });
        
        }


  

这是配置类

       @Configuration
        public class AsyncConfig {
            @Bean
            public TaskExecutor concurrentTasks() {
                ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                executor.setCorePoolSize(4);
                executor.setMaxPoolSize(4);
                executor.setThreadNamePrefix("default_task_executor_thread");
                executor.initialize();
                return executor;
            }
        }

现在如何知道我的整个 Stream 已发布到 kafka。因为我必须通知客户我的 kafkaPosting 已完成,以便他们可以开始消费。要求就像我们必须通过电子邮件通知客户一样。所以,我该如何弄清楚。

我不能使用 kafkaTemplate.send(kafkaTopic,partitionKey,sample.toString()).get() 因为它使我的过程变得非常缓慢。

标签: javaspring-bootapache-kafkakafka-producer-api

解决方案


推荐阅读