首页 > 解决方案 > 是什么导致 Node Kafka Streams 出现此间歇性问题?

问题描述

我有一个 Kafka 生产者和消费者。

制作人这样做:

    const returnMessage = {
        prop1: 'some string',
        prop2: 'another string',
        prop3: nestedObject
    };
    console.log(JSON.stringify(returnMessage))
    await stream.writeToStream(JSON.stringify(returnMessage));

消费者这样做:

incomingStream.forEach(
                message => {
                        console.log(message.value)
                        let messageObject = message.value;
                        ...other stuff...
                }
            );

现在,在生产者方面,返回消息始终记录为正确的字符串,一切都很好。但在消费者方面,首先,message.value 是一个正确的字符串,可以从中解析 JSON,但在后续请求中,它会遇到“[object Object]”。如果

我觉得我在这里遗漏了一些重要的东西……如果您有任何见解,请提供帮助。

标签: apache-kafkakafka-consumer-apikafka-producer-apinode-kafka-streams

解决方案


好的,我明白了。发生在路由内部,而incomingStream.forEach流在控制器级别被实例化。我通过将 移动到控制器级别来修复它forEach,并让它在每条消息上发出一个解析的消息,然后订阅路由内的事件。


推荐阅读