首页 > 解决方案 > NestJs - Kafka 对有效负载进行过滤

问题描述

我将 NestJs 与 KafkaJs 一起使用,我有两个微服务订阅相同的 EventPattern,但是其中一个微服务只需要 EventPattern 的单一类型的有效负载,例如:

事件模式::DoThing

value: {
    jobType: "job1",
    otherStuff: {}
}

value: {
    jobType: "job2",
    otherStuff: {}
}

value: {
    jobType: "job3",
    otherStuff: {}
}

在微服务 1 中,我只关心 DoThing -> job1,但是
需要 DoThing -> job2、job3 等。

有没有一种方法可以从嵌套中进行过滤,而不是在消费完成后进行简单的过滤。

欢迎任何建议。

标签: apache-kafkamicroservicesnestjskafkajs

解决方案


看起来所有 EventPattern 对象都属于一个主题。

为不同的微服务维护不同的消费者组,让微服务决定它必须处理哪些消息(使用KafkaJS,例子就像..)

const consumer = kafka.consumer({ groupId: 'microservice-1' })
await consumer.connect()
await consumer.subscribe({ topic: 'topic-A' })
await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
            if( shouldProcess(message.value) ) {
                // process them
            }
        })
    },
})

您可能需要将消息转换为 JSON 并event.jobType相应地检查和处理。

如果您想避免if每种作业类型的条件,请拥有一个微服务将感兴趣的作业类型白名单(或)使用正则表达式来匹配作业类型。

对另一个微服务重复同样的事情,除了在group.id


推荐阅读