首页 > 解决方案 > 如何在 apache nifi 中为 kafka 生产者指定密钥?

问题描述

我有使用 apache nifi 的简单管道,我想使用现有的 kafka puplisher 处理器在 kafka 主题中发布一些消息。 在此处输入图像描述

问题是如何使用 apache nifi 表达式语言指定 kafka 密钥?我厌倦了类似的东西,${message:jsonPath('$.key')}但是,当然,我得到了一个错误,因为对象message不存在。 在此处输入图像描述

我还尝试使用filename类似于输入消息的默认对象名称的对象,但它没有帮助


使用另一个 kafka 发布者处理器可以通过设置message key field属性来实现,但是PublishKafka处理器呢?

标签: jsonapache-kafkaapache-nifi

解决方案


NiFi表达语言只能引用流文件属性,不能直接引用内容(这是故意的)。

因此,如果您想使用 json 文档中某个字段的值作为键,那么您需要首先使用另一个处理器(例如 EvaluateJsonPath)将该字段的值提取到流文件属性中。

假设您的 json 文档中有一个字段“foo”,您可以使用 EvaluateJsonPath 和目标设置为“流文件属性”,然后添加一个动态属性,例如:

foo = $.foo

然后在 PublishKafka 中将 key 属性设置为 ${foo}。

请记住,这仅在每个流文件有一个 json 文档时才有意义,否则如果您有多个,则不清楚关键是什么,因为流文件只能有一个“foo”属性,但许多“foo " 流文件内容中的字段。


推荐阅读