apache-kafka - 反应程序在将所有消息发送到 Kafka 之前提前退出
问题描述
这是上一个响应式 kafka 问题(发送数据通量到响应式 kafka 时发出的问题)的后续问题。
我正在尝试使用响应式方法将一些日志记录发送到 kafka。这是使用响应式 kafka 发送消息的响应式代码。
public class LogProducer {
private final KafkaSender<String, String> sender;
public LogProducer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions);
}
public void sendMessages(String topic, Flux<Logs.Data> records) throws InterruptedException {
AtomicInteger sentCount = new AtomicInteger(0);
sender.send(records
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
Thread.sleep(0, 5); // sleep for 5 ns
return SenderRecord.create(new ProducerRecord<>(topic, id,
lrec.toString()), id);
})).doOnNext(res -> sentCount.incrementAndGet()).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' failed. "
+ e, topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
})
.subscribe();
}
}
public class ExecuteQuery implements Runnable {
private LogProducer producer = new LogProducer("localhost:9092");
@Override
public void run() {
Flux<Logs.Data> records = ...
producer.sendMessages(kafkaTopic, records);
.....
.....
// processing related to the messages sent
}
}
因此,即使Thread.sleep(0, 5);
存在,有时它也不会将所有消息发送到 kafka,并且程序存在早期打印成功消息(log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
)。有没有更具体的方法来解决这个问题。例如,使用某种回调,以便该线程将等待所有消息发送成功。
我有一个弹簧控制台应用程序并通过调度程序以固定速率运行 ExecuteQuery,就像这样
public class Main {
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);
public static void main(String[] args) {
QueryScheduler scheduledQuery = new QueryScheduler();
scheduler.scheduleAtFixedRate(scheduledQuery, 0, 5, TimeUnit.MINUTES);
}
class QueryScheduler implements Runnable {
@Override
public void run() {
// preprocessing related to time
executor.execute(new ExecuteQuery());
// postprocessing related to time
}
}
}
解决方案
您Thread.sleep(0, 5); // sleep for 5 ns
对阻塞的主线程没有任何价值,因此它会在需要时退出,您ExecuteQuery
可能还没有完成它的工作。
目前尚不清楚如何启动应用程序,但我建议Thread.sleep()
完全在主线程中阻塞。准确地说,在public static void main(String[] args) {
方法 impl 中。
推荐阅读
- c++ - C++ 确保 2D 向量在内存中是紧凑的
- android - 在 Android Studio 中包含静态库存档
- python - django.core.exceptions.FieldError:无法将关键字“项目”解析为字段。选项有:Item、Item_id、id、order、ordered、quantity、user、user_id
- azure-devops - Azure DevOps:确保生产阶段只能从存储库的主分支发布
- spring - 如何使用证书对 Ribbon 负载均衡器和 Zuul 代理进行身份验证?
- firebase - 用户注册后获取用户UID
- reactjs - 我无法创建 React 项目
- javascript - jQuery - Safari - Mac OS - 单击一次时单击()事件不起作用
- sql - SQL/DB2 替代扩展 CASE 语句
- swift - 根据 UISearchBar 输入过滤 TableView (Swift)