apache-kafka - Kafka 是否支持按记录获取
问题描述
截至2019 年 10月,使用Kafka v2.3.0和Zookeeper v3.5.5
设想:
考虑以下场景,消费者可以根据以下假设从主题中获取记录;
- Fetch 的设计方式使其可以在 1 次拍摄中获取N条记录,具体取决于每个容量(以字节为单位)的获取。这是以配置参数的形式实现和提供的,例如
max.partition.fetch.bytes
. - Fetch 的设计方式是它可以根据每个Number of Records的获取来获取N条记录。因此,如果我将该参数设置为 1 ,这意味着它将在每次获取尝试时获取 1 条记录。这就是卡夫卡现在所缺少的。这可能是由于设计范式和对流、并发、分区功能等的支持。
问题:
- 目前有什么方法可以让我每次 Fetch 只获取 1 条记录并相应地工作?
- 我的应用程序需要记录的实时流式传输和每次获取 1 条记录。两者都可以使用 Kafka 实现,还是我最终必须使用 2 个不同的消息传递系统,如 Kafka + ActiveMQ 等等?
请指导我。非常感谢你。
解决方案
NPM 包kafkajs确实支持并帮助您使用fetch_per_record机制,下面是实现此目的的示例代码,
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['192.168.1.172:9092']
})
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'test', fromBeginning: false })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error);
在上面的代码中,您有eachMessage
帮助我们实现这种机制的方法
推荐阅读
- javascript - 如何自动更新子输入jquery
- karate - 如何在空手道 DSL 的场景大纲中传递动态变量
- javascript - 使用 react 从 google taxonomy api 获取数据?
- r - 如何在 R 中删除数据框中的原始和复制案例?
- javascript - 如何在 QML 的窗格中排列一个矩形?
- reactjs - React 下拉菜单中的百万选项
- c++ - 在抛出 'std::system_error' 的实例后调用终止
- javascript - 从 RSS Feed XML 中提取文本标签(使用 Javascript/React)
- python - 是否可以使用 python 正则表达式找到非连续重复字符?
- php - 在两个表上左连接并希望从特定日期获取结果