首页 > 解决方案 > Kafkajs - 获取统计信息(滞后)

问题描述

在我们的应用程序中,我们为 kafkanest.js使用kafkajs客户端。我们需要获得机会监控统计数据。指标之一是lag

试图弄清楚kafkajs是否提供任何有趣的东西。(有效载荷中最有趣的是:、、、、、、timestampoffsetbatchContext.firstOffsetbatchContext.firstTimestampbatchContext.maxTimestamp

问题

是否有任何想法如何记录lag由提供的值和其他统计信息kafkajs

我是否应该考虑实施自己的统计监视器以在使用kafka.js客户端的节点应用程序中收集所需的信息?

新细节 1

按照我可以获得的文档batch.highWatermark,在哪里

batch.highWatermark是主题分区中最后提交的偏移量。它可用于计算滞后。

  await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async (data) => {
      console.log('Received data.batch.messages: ', data.batch.messages)
      console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
    },
  })

我可以得到像下一个这样的信息:

Received data.batch.messages:  [
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '144',
    key: null,
    value: <Buffer 68 65 6c 6c 6f 21>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '145',
    key: null,
    value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '146',
    key: null,
    value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  }
]
Received data.batch.highWatermark:  147

那么有什么想法可以batch.highWatermark在标签计算中使用吗?

标签: node.jsapache-kafkastatisticsnestjskafkajs

解决方案


一般来说,描述的配置可以正常工作。附加配置损坏的工作,使用eachMessage如下属性:

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            key: message.key.toString(),
            value: message.value.toString(),
            headers: message.headers,
        })
    },
})

所以同时消费者配置应该只配置一个属性eachBatcheachMessage.


推荐阅读