javascript - Kafka 节点 - 如何检索压缩主题上的所有消息
问题描述
我正在尝试使用 kafka-node 从 kafka 主题中读取压缩消息。
问题是最近插入的消息留在 EOL 上方,并且在插入其他消息之前无法访问。实际上,在 EOL 和 High Water Offset 之间存在差距,这会阻止阅读最新消息。尚不清楚为什么会这样。
已创建一个主题
kafka-topics.sh --zookeeper ${KAFKA_HOST}:2181 --create --topic atopic --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0" --partitions 1 --replication-factor 1
主题中产生了许多关键值。有些钥匙是一样的。
var client = new kafka.KafkaClient({kafkaHost: "<host:port>",autoConnect: true})
var producer = new HighLevelProducer(client);
producer.send(payload, function(error, result) {
debug('Sent payload to Kafka: ', payload);
if (error) {
console.error(error);
} else {
res(true)
}
client.close()
});
});
这是插入的键和值
key - 1
key2 - 1
key3 - 1
key - 2
key2 - 2
key3 - 2
key1 - 3
key - 3
key2 - 3
key3 - 3
然后请求了一组主题键。
var options = {
id: 'consumer1',
kafkaHost: "<host:port>",
groupId: "consumergroup1",
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest'
};
var consumerGroup = new ConsumerGroup(options, topic);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);
consumerGroup.on('done', function(message) {
consumerGroup.close(true,function(){ });
})
function onError (error) {
console.error(error);
}
function onMessage (message) {)
console.log('%s read msg Topic="%s" Partition=%s Offset=%d HW=%d', this.client.clientId, message.topic, message.partition, message.offset, message.highWaterOffset, message.value);
}
})
consumer1 read msg Topic="atopic" Partition=0 Offset=4 highWaterOffset=10 Key=key2 value={"name":"key2","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=5 highWaterOffset=10 Key=key3 value={"name":"key3","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=6 highWaterOffset=10 Key=key1 value={"name":"key1","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=7 highWaterOffset=10 Key=key value={"name":"key","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
有一个高水位偏移量,代表最新值 10。然而,消费者看到的偏移量值只有 7。不知何故,压缩阻止了消费者看到最新消息。
目前尚不清楚如何避免这种约束并允许消费者看到最新消息。
任何建议表示赞赏。谢谢。
解决方案
在使用 kafka 进行更多工作之后,似乎 kafka-node api 具有以下行为(我认为这实际上源自 kafka 本身)。
当在 highWaterOff 之前查询消息时,只有达到 highWaterOffset 的消息才会返回到 ConsumerGroup。如果消息没有被复制,这是有道理的,因为组中的另一个消费者不一定会看到这些消息。
仍然可以使用 Consumer 而不是 ConsumerGroup 并通过查询特定分区来请求和接收超出 highWaterOffset 的消息。
此外,当偏移量不一定在 latestOffset 时,似乎会触发“完成”事件。在这种情况下,有必要在 message.offset+1 处提交进一步的查询。如果您继续这样做,您可以获得最新偏移量的所有消息。
我不清楚为什么 kafka 有这种行为,但可能有一些较低级别的细节可以显示这种紧急行为。
推荐阅读
- sql - 如何在 PostgreSQL 中通过数组正确创建多个条目?
- arrays - 通过 Int 随机播放结构
- python - 计算由字典组成的给定列表中的城市数量?
- wordpress - 如何修复linkedin随机不显示共享的帖子图片?
- go - fmt.Println 中的可变变量展开
- python - 如何应用于具有多索引列的数据框中的一组列
- android - 使用 Observable.zip 调用多个 API
- swift - 有什么方法可以对数组内的自定义对象进行排序?
- coldfusion - 在 ColdFusion 8 到 ColdFusion 11 的导入设置步骤中看不到迁移向导
- sql - 在多个案例条件下加入 2 个表