首页 > 解决方案 > 反应程序在将所有消息发送到 Kafka 之前提前退出

问题描述

这是上一个响应式 kafka 问题(发送数据通量到响应式 kafka 时发出的问题)的后续问题。

我正在尝试使用响应式方法将一些日志记录发送到 kafka。这是使用响应式 kafka 发送消息的响应式代码。

public class LogProducer {

    private final KafkaSender<String, String> sender;

    public LogProducer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);

        sender = KafkaSender.create(senderOptions);
    }

    public void sendMessages(String topic, Flux<Logs.Data> records) throws InterruptedException {
    
        AtomicInteger sentCount = new AtomicInteger(0);
        sender.send(records
        .map(record -> {
            LogRecord lrec = record.getRecords().get(0);
            String id = lrec.getId();
            Thread.sleep(0, 5); // sleep for 5 ns
            return SenderRecord.create(new ProducerRecord<>(topic, id,
                    lrec.toString()), id);
        })).doOnNext(res -> sentCount.incrementAndGet()).then()
        .doOnError(e -> {
            log.error("[FAIL]: Send to the topic: '{}' failed. "
                    + e, topic);
        })
        .doOnSuccess(s -> {
            log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
        })
        .subscribe();
    }

}


public class ExecuteQuery implements Runnable {

    private LogProducer producer = new LogProducer("localhost:9092");

    @Override
    public void run() {
        Flux<Logs.Data> records = ...
        producer.sendMessages(kafkaTopic, records);
        .....
        .....
        // processing related to the messages sent
    }

}

因此,即使Thread.sleep(0, 5);存在,有时它也不会将所有消息发送到 kafka,并且程序存在早期打印成功消息(log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);)。有没有更具体的方法来解决这个问题。例如,使用某种回调,以便该线程将等待所有消息发送成功。

我有一个弹簧控制台应用程序并通过调度程序以固定速率运行 ExecuteQuery,就像这样

public class Main {

private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);

public static void main(String[] args) {
     QueryScheduler scheduledQuery = new QueryScheduler();
     scheduler.scheduleAtFixedRate(scheduledQuery, 0, 5, TimeUnit.MINUTES);
}

class QueryScheduler implements Runnable {

  @Override
  public void run() {
      // preprocessing related to time
      executor.execute(new ExecuteQuery());
      // postprocessing related to time
  }

}
}

标签: apache-kafkaproject-reactorfluxreactorreactive-kafka

解决方案


Thread.sleep(0, 5); // sleep for 5 ns对阻塞的主线程没有任何价值,因此它会在需要时退出,您ExecuteQuery可能还没有完成它的工作。

目前尚不清楚如何启动应用程序,但我建议Thread.sleep()完全在主线程中阻塞。准确地说,在public static void main(String[] args) {方法 impl 中。


推荐阅读