首页 > 解决方案 > 使用 kafka 流根据消息密钥向主题发送消息

问题描述

我希望能够根据消息键的键将 Kafkastream 中的所有记录发送到不同的主题。前任。Kafka 中的流包含名称作为键和记录作为值。我想根据记录的键将这些记录扇出到不同的主题

数据:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}),预期

下面是我现在这样做的方式,但由于名称列表很长,所以速度很慢。另外,即使有几个名字的记录,我也需要遍历整个列表请提出修复

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 

标签: javaapache-kafkaapache-kafka-streams

解决方案


我认为你应该使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)函数。它使您能够计算每条消息的主题名称。

示例代码:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);

推荐阅读