node.js - 卡夫卡消费者在暂停和恢复后不消费消息
问题描述
我正在使用这个node-rdkafka
库来实现带有消费者暂停和恢复方法的节点 kafka 来处理背压。我已经创建了我可以的小演示,pause the consumer
但resume the consumer
问题是在resume the consumer
它停止消费消息之后。
这是我的代码。
const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const log_divider = '-----------------------------------';
const consumer = new Kafka.KafkaConsumer({
'group.id':'gsuite_consumer',
'metadata.broker.list': '*******',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '********',
'sasl.password': '********',
'security.protocol': 'SASL_SSL',
'enable.auto.commit':false
}, {});
// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
if (err) {
console.log(`Error connecting to Kafka broker: ${err}`);
process.exit(-1);
}
console.log("Connected to Kafka broker");
});
consumer.on('disconnected', (args) => {
console.error(`Consumer got disconnected: ${JSON.stringify(args)}`);
});
let max_queue_size = 3;
let current_queue = [];
let is_pause = false;
// register ready handler.
consumer.on('ready', (arg)=>{
console.log('consumer ready.' + JSON.stringify(arg));
console.log('Consumer is ready');
consumer.subscribe([topic]);
setInterval(function() {
console.log('consumer has consume on :'+timeMs());
consumer.consume();
}, 1000);
});
consumer.on('data',async (data)=>{
console.log('************consumer is consuming data***********:'+timeMs());
if(!is_pause) {
is_pause = true;
if(data && typeof data !== 'undefined') {
try {
console.log('consumer has received the data:'+timeMs());
consumer.pause([topic]);
console.log('consumer has pause the consuming:'+timeMs());
await processMessage(data);
console.log('consumer is resumed:'+timeMs());
consumer.resume([topic]);
console.log(log_divider);
is_pause = false;
} catch(error) {
console.log('data consuming error');
console.log(error);
}
} else {
is_pause = false;
}
}
});
async function processMessage(data) {
// await print_bulk(data);
await processData(0,data);
}
async function print_bulk(data) {
for(var i=0;i<data.length;i++) {
await processData(i,data[i]);
}
}
/**
* Wait specified number of milliseconds.
* @param ms
*/
async function wait(ms) {
console.log('wait for the 3 sec');
return new Promise((resolve) => setTimeout(resolve, ms));
}
var timeMs = ()=> {
var d = new Date();
var h = addZero(d.getHours(), 2);
var m = addZero(d.getMinutes(), 2);
var s = addZero(d.getSeconds(), 2);
var ms = addZero(d.getMilliseconds(), 3);
return h + ":" + m + ":" + s + ":" + ms;
}
var addZero = (x, n)=> {
while (x.toString().length < n) {
x = "0" + x;
}
return x;
}
async function processData(i,m) {
if (m) {
console.log('processing a data start:'+timeMs());
console.log('Received a message:');
console.log(' message: ' + m.value.toString());
console.log(' key: ' + m.key);
console.log(' size: ' + m.size);
console.log(' topic: ' + m.topic);
console.log(' offset: ' + m.offset);
console.log(' partition: ' + m.partition);
consumer.commitMessage(m);
}
await wait(3000);
console.log('process a data completed:'+timeMs());
// delete current_queue[i];
// console.log('after delting lenght of current queue:'+current_queue.length);
// console.log(log_divider);
return true;
}
任何人都可以帮助我,恢复消费者时我做错了什么?当我启动消费者时,它只收到一条消息,并且在恢复之后仍然没有消费任何进一步的消息。
解决方案
我已经弄清楚了这个问题。除了consumer.pause()
&consumer.resume()
方法,我还需要使用该consumer.assignments()
方法。
所以会是这样
consumer.pause(consumer.assignments());
consumer.resume(consumer.assignments());
推荐阅读
- hyperledger-fabric - 错误:出现意外状态:BAD_REQUEST - 将配置更新应用到现有频道时出错
- reactjs - 从一个组件单击到另一个组件时如何在 url 中保留参数
- php - 权限被拒绝在本地运行我的项目 Zend Framework
- node.js - 响应获取替换为问号 Node js
- angular - 尽管发现错误,rxjs 仍发生取消订阅
- quickbooks - QuickBooks SDK 与 QuickBooks 桌面集成
- java - 如何将@SpringBootTest(webEnvironment) 与@DataJpaTest 一起使用?
- symfony - 使安全性可用于功能测试中的 onFlush 原则
- c# - 添加多对多对象 EF Core
- haskell - 堆栈退出失败 1