首页 > 解决方案 > Kafka Stream 根据特定条件生成自定义消息列表

问题描述

我们有以下流处理要求。

Source Stream -> 
 transform(condition check - If (true) then generate MULTIPLE ADDITIONAL messages else just transform the incoming message) ->
 output kafka topic

Example:
If condition is true for message B(D,E,F are the additional messages produced)
A,B,C -> A,D,E,F,C -> Sink Kafka Topic
If condition is false     
A,B,C -> A,B,C -> Sink Kafka Topic

有没有办法在 Kafka 流中实现这一点?

标签: apache-kafkakafka-consumer-apiapache-kafka-streamsapache-kafka-connectconfluent-platform

解决方案


你可以使用flatMap()orflatMapValues()方法。这些方法采用一条记录并产生零、一条或多条记录。

flatMap()flatMapValues()可以在保留原始键的同时修改键、值及其数据类型,并改变值和值的数据类型。

这是一个示例伪代码,考虑到新消息“C”、“D”、“E”将有一个新密钥。

KStream<byte[], String> inputStream = builder.stream("inputTopic");
KStream<byte[], String> outStream = inputStream.flatMap( 
           (key,value)->{
            List<KeyValue<byte[], String>> result = new LinkedList<>();  
                // If message value is "B". Otherwise place your condition based on data     
                if(value.equalsTo("B")){ 
                      result.add(KeyValue.pair("<new key for message C>","C"));
                      result.add(KeyValue.pair("<new key for message D>","D"));
                      result.add(KeyValue.pair("<new key for message E>","E"));

                 }else{
                         result.add(KeyValue.pair(key,value));
                 }
            return result;
});
outStream.to("sinkTopic");

您可以阅读更多相关信息: https ://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-transformations-stateless


推荐阅读