node.js - 消费者是否应该处理消息,然后将消息发送回 Kafka
问题描述
我想用 topic1 处理来自消费者的数据,然后将消息发送回 Kafka 到 topic2
Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka
我的尝试:
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
});
但是,Producer 无法将处理后的消息发送到 Kafka。我得到的错误
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit
我使用节点模块Kafka-node
解决方案
您需要切换生产者就绪监听器和消费者消息监听器的顺序。
否则,您将为每条使用的消息设置就绪侦听器
例如
producer.on('ready', function () {
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.send(payloads, function (err, data) {
console.log(data);
});
});
不过,如果主要处理和转发到新主题https://github.com/nodefluent/kafka-streams/,我建议查看这个库
推荐阅读
- xamarin.forms - Xamarin iOS 搜索栏自定义
- python - imread 在 Spyder 中有效,但在 VS Code 中无效
- android - app-release 版本与 app-debug 版本有何不同?
- python - Super:超出最大递归深度
- python - 执行代码时如何打开或关闭 Visual Studio python (Jupyter) 交互式窗口 (2019)?
- jenkins - 如何通过外部 groovy 脚本 CURL 语句访问 jenkins 凭证
- python - 如何使用 pandas 将 1M OHLC 转换为 5M OHLC
- javascript - 等待但从未解决/拒绝的承诺内存使用情况
- javascript - 从数组中以名称和值的形式创建一个带有键的对象并返回它
- python - 如何将数据框的所有列与另一个数据框列进行比较并获得增量