apache-kafka - 在 Kafka Streams DSL 中使用内部连接获取记录密钥
问题描述
join
有没有办法从Kafka Stream DSL 连接中的部分传入或访问消息密钥?
我现在有这样的事情:
KStream<String, GenericRecord> completedEventsStream = inputStartKStream.
join(
inputEndKStream,
(leftValue, rightValue) -> customLambda((Record) leftValue, (Record) rightValue),
JoinWindows.of(windowDuration),
Joined.with(stringSerde, genericAvroSerde, genericAvroSerde)
);
但是,传入的leftValue
and记录无权访问 kafka 消息密钥,因为这是一个单独的字符串。他们拥有的唯一内容是消息本身,而不是密钥。rightValue
customLambda
有没有办法从连接 lambda 内部访问密钥?我可以做的一件事是简单地将消息键添加为消息本身的一部分,并将其作为常规字段访问,但我想知道框架是否提供了直接访问它的方法?
解决方案
大多数情况下,key 也可以在记录的 value 中使用,您的应用程序不是这样吗?
看起来该ValueJoiner
接口已作为 KIP-149 的一部分进行了改进,但尚未像该 KIP 中的其他方法一样完成:ValueTransformer
和ValueMapper
.
您可以在加入之前添加一个步骤以提取密钥并将其包含在消息的值中,然后再使用ValueMapperWithKey
.
推荐阅读
- javascript - 如何对具有相同键的对象值求和并返回一个对象?
- r - 按组嵌套数据框,但在每个组中包含额外的行
- docker - 从两个不同的容器映射端口
- system-verilog - 有没有办法在测试结束时获得断言触发(失败)的次数
- sql - 如何在 BigQuery 中获取两个数组的交集
- java - java.sql.SQLSyntaxErrorException:用户缺少权限或找不到对象:YEAR
- cudd - “相同”的值出现在 ADD 的叶子中
- json - Akka HTTP:从抽象类型创建 HTTPResponse
- c++ - 如何在运行时正确地将指向派生类的 void* 转换为指向基类的 void*
- java - 使用 EasyMock 模拟新的 PrintWriter(String fileName)