java - 如何识别哪些消息成功发布到 kafka 主题以及哪些消息失败
问题描述
将消息列表发布到 apache kafka。任何人都可以使用 kafka api 提供示例代码,展示如何识别哪些消息成功发布到主题以及哪些消息从响应中失败?(请注意,我在一个请求中将消息列表作为一批发送。)
解决方案
该KafkaProducer.send()
方法采用单个 ProducerRecord(消息)。
有两种方法可以检查集群是否成功接收到此消息:
使用回调:
send()
可以将回调作为第二个参数ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // If Exception is null, the record was sent successfully } });
使用未来:
send()
返回一个Future
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); Future<RecordMetadata> future = producer.send(record); try { RecordMetadata rm = future.get(); // The record was sent successfully } catch (ExecutionException e) { // The record failed }
推荐阅读
- laravel - Laravel Nova 只能从特定守卫访问
- sql - 通过 SQL 查询或函数获取依赖对象列表
- android - 设置通知音量
- powershell - 如何在 ubuntu 16.04 上使用 ansible 安装 powershell、powercli、powernsx
- apache - .htaccess 拒绝访问图像的文件夹,但允许访问查看图像
- node.js - 在客户端获取当前的 windows 用户
- python - 无法使用 djongo“数组模型字段”将数据添加到 MongoDB
- python - 如何(可靠地)在嵌入式(无头)Linux 中读取 USB 条码扫描器?
- assembly - 通过汇编程序制作无限文件,但在不应该时停止
- linux - 模式匹配在 bash 脚本中不起作用