node.js - Kafkajs - 获取统计信息(滞后)
问题描述
在我们的应用程序中,我们为 kafkanest.js
使用kafkajs客户端。我们需要获得机会监控统计数据。指标之一是lag
。
试图弄清楚kafkajs是否提供任何有趣的东西。(有效载荷中最有趣的是:、、、、、、timestamp
)offset
batchContext.firstOffset
batchContext.firstTimestamp
batchContext.maxTimestamp
问题
是否有任何想法如何记录lag
由提供的值和其他统计信息kafkajs
?
我是否应该考虑实施自己的统计监视器以在使用kafka.js
客户端的节点应用程序中收集所需的信息?
新细节 1
按照我可以获得的文档batch.highWatermark
,在哪里
batch.highWatermark
是主题分区中最后提交的偏移量。它可用于计算滞后。
试
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async (data) => {
console.log('Received data.batch.messages: ', data.batch.messages)
console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
},
})
我可以得到像下一个这样的信息:
Received data.batch.messages: [
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '144',
key: null,
value: <Buffer 68 65 6c 6c 6f 21>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '145',
key: null,
value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '146',
key: null,
value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
}
]
Received data.batch.highWatermark: 147
那么有什么想法可以batch.highWatermark
在标签计算中使用吗?
解决方案
一般来说,描述的配置可以正常工作。附加配置损坏的工作,使用eachMessage
如下属性:
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
})
所以同时消费者配置应该只配置一个属性eachBatch
或eachMessage
.
推荐阅读
- push - No manifest found when i run the Push demo code
- python - pyodbc procedure execute failure
- android - Maximum size for AppBundle for Playstore
- wireshark - 使用 scapy 和 wireshark 生成 STP 流量 [格式错误的数据包]
- javascript - SweetAlert 2 button cancel submitting form
- angular - Emit and Subscribe within Angular 9 Service don't work together
- android - Android INSTALL_PARSE_FAILED_UNEXPECTED_EXCEPTION while starting the app
- sql - chat table summary list pgsql
- azure-devops - Power Automate - Azure Devops - 创建带附件的工作项
- python - 我写了一个新闻网络爬虫代码,错误是 IndexError: list index out of range and it not resolve