首页 > 解决方案 > 消费者是否应该处理消息,然后将消息发送回 Kafka

问题描述

我想用 topic1 处理来自消费者的数据,然后将消息发送回 Kafka 到 topic2

Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka

我的尝试:

 consumer.on('message', (message) => {
     let processedMsg = processMessage(message);

     payloads = [
        { topic: 'topic2', messages: processedMsg }
     ];

     producer.on('ready', function () {
         producer.send(payloads, function (err, data) {
             console.log(data);
         });
     });

     producer.on('error', function (err) {})
});

但是,Producer 无法将处理后的消息发送到 Kafka。我得到的错误

MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit

我使用节点模块Kafka-node

标签: node.jsapache-kafkakafka-consumer-apikafka-producer-api

解决方案


您需要切换生产者就绪监听器和消费者消息监听器的顺序。

否则,您将为每条使用的消息设置就绪侦听器

例如

 producer.on('ready', function () {
    consumer.on('message', (message) => {
      let processedMsg = processMessage(message);

      payloads = [
       { topic: 'topic2', messages: processedMsg }
      ];

       producer.send(payloads, function (err, data) {
         console.log(data);
       });
 });

不过,如果主要处理和转发到新主题https://github.com/nodefluent/kafka-streams/,我建议查看这个库


推荐阅读