首页 > 解决方案 > 在 Kafka Streams DSL 中使用内部连接获取记录密钥

问题描述

join有没有办法从Kafka Stream DSL 连接中的部分传入或访问消息密钥?

我现在有这样的事情:

    KStream<String, GenericRecord> completedEventsStream = inputStartKStream.
        join(
            inputEndKStream,
            (leftValue, rightValue) -> customLambda((Record) leftValue, (Record) rightValue),
            JoinWindows.of(windowDuration),
            Joined.with(stringSerde, genericAvroSerde, genericAvroSerde)
        );

但是,传入的leftValueand记录无权访问 kafka 消息密钥,因为这是一个单独的字符串。他们拥有的唯一内容是消息本身,而不是密钥。rightValuecustomLambda

有没有办法从连接 lambda 内部访问密钥?我可以做的一件事是简单地将消息键添加为消息本身的一部分,并将其作为常规字段访问,但我想知道框架是否提供了直接访问它的方法?

标签: apache-kafkaapache-kafka-streams

解决方案


大多数情况下,key 也可以在记录的 value 中使用,您的应用程序不是这样吗?

看起来该ValueJoiner接口已作为 KIP-149 的一部分进行了改进,但尚未像该 KIP 中的其他方法一样完成:ValueTransformerValueMapper.

您可以在加入之前添加一个步骤以提取密钥并将其包含在消息的值中,然后再使用ValueMapperWithKey.


推荐阅读