apache-kafka - NestJs - Kafka 对有效负载进行过滤
问题描述
我将 NestJs 与 KafkaJs 一起使用,我有两个微服务订阅相同的 EventPattern,但是其中一个微服务只需要 EventPattern 的单一类型的有效负载,例如:
事件模式::DoThing
value: {
jobType: "job1",
otherStuff: {}
}
value: {
jobType: "job2",
otherStuff: {}
}
value: {
jobType: "job3",
otherStuff: {}
}
在微服务 1 中,我只关心 DoThing -> job1,但是
需要 DoThing -> job2、job3 等。
有没有一种方法可以从嵌套中进行过滤,而不是在消费完成后进行简单的过滤。
欢迎任何建议。
解决方案
看起来所有 EventPattern 对象都属于一个主题。
为不同的微服务维护不同的消费者组,让微服务决定它必须处理哪些消息(使用KafkaJS,例子就像..)
const consumer = kafka.consumer({ groupId: 'microservice-1' })
await consumer.connect()
await consumer.subscribe({ topic: 'topic-A' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if( shouldProcess(message.value) ) {
// process them
}
})
},
})
您可能需要将消息转换为 JSON 并event.jobType
相应地检查和处理。
如果您想避免if
每种作业类型的条件,请拥有一个微服务将感兴趣的作业类型白名单(或)使用正则表达式来匹配作业类型。
对另一个微服务重复同样的事情,除了在group.id
推荐阅读
- python - 使用“in”运算符求解方程
- c# - C# 将值附加到空数组 int[] 不起作用
- xcode13 - Xcode 13 导航栏导致 TabView 上的额外填充
- java - Android 应用程序不会从登录或注册活动中进入下一个活动
- python - 熊猫小数点后的位数
- java - 如何从多行读取值并将它们保存到不同的数组中,因为用户在 Java 中输入它们
- python - 一个Python函数修改数据库中信息的问题
- javascript - DOM 事件委托与否,哪个资源最好?
- laravel - 如何将对象模型存储到会话中的数组中,并在 laravel 中保持与数据库的关系更新?
- ios - 错误 React Native CLI 对本机依赖项使用自动链接 - RN iOS