首页 > 解决方案 > Kafka 是否支持按记录获取

问题描述

截至2019 年 10月,使用Kafka v2.3.0Zookeeper v3.5.5

设想:

考虑以下场景,消费者可以根据以下假设从主题中获取记录;

  1. Fetch 的设计方式使其可以在 1 次拍摄中获取N条记录,具体取决于每个容量(以字节为单位)的获取。这是以配置参数的形式实现和提供的,例如max.partition.fetch.bytes.
  2. Fetch 的设计方式是它可以根据每个Number of Records的获取来获取N条记录。因此,如果我将该参数设置为 1 ,这意味着它将在每次获取尝试时获取 1 条记录。这就是卡夫卡现在所缺少的。这可能是由于设计范式和对流、并发、分区功能等的支持。

问题:

  1. 目前有什么方法可以让我每次 Fetch 只获取 1 条记录并相应地工作?
  2. 我的应用程序需要记录的实时流式传输和每次获取 1 条记录。两者都可以使用 Kafka 实现,还是我最终必须使用 2 个不同的消息传递系统,如 Kafka + ActiveMQ 等等?

请指导我。非常感谢你。

标签: apache-kafka

解决方案


NPM 包kafkajs确实支持并帮助您使用fetch_per_record机制,下面是实现此目的的示例代码,

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.1.172:9092']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {

  await consumer.connect()
  await consumer.subscribe({ topic: 'test', fromBeginning: false })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error);

在上面的代码中,您有eachMessage帮助我们实现这种机制的方法


推荐阅读