apache-kafka - Kafka Stream 根据特定条件生成自定义消息列表
问题描述
我们有以下流处理要求。
Source Stream ->
transform(condition check - If (true) then generate MULTIPLE ADDITIONAL messages else just transform the incoming message) ->
output kafka topic
Example:
If condition is true for message B(D,E,F are the additional messages produced)
A,B,C -> A,D,E,F,C -> Sink Kafka Topic
If condition is false
A,B,C -> A,B,C -> Sink Kafka Topic
有没有办法在 Kafka 流中实现这一点?
解决方案
你可以使用flatMap()
orflatMapValues()
方法。这些方法采用一条记录并产生零、一条或多条记录。
flatMap()
flatMapValues()
可以在保留原始键的同时修改键、值及其数据类型,并改变值和值的数据类型。
这是一个示例伪代码,考虑到新消息“C”、“D”、“E”将有一个新密钥。
KStream<byte[], String> inputStream = builder.stream("inputTopic");
KStream<byte[], String> outStream = inputStream.flatMap(
(key,value)->{
List<KeyValue<byte[], String>> result = new LinkedList<>();
// If message value is "B". Otherwise place your condition based on data
if(value.equalsTo("B")){
result.add(KeyValue.pair("<new key for message C>","C"));
result.add(KeyValue.pair("<new key for message D>","D"));
result.add(KeyValue.pair("<new key for message E>","E"));
}else{
result.add(KeyValue.pair(key,value));
}
return result;
});
outStream.to("sinkTopic");
推荐阅读
- c# - WCF ServiceHost 上的多个服务器证书
- css - 缩放背景图片与背景尺寸封面
- python - Pygame 应用程序第一次没有关闭
- javascript - 简单的 vue 模板条件渲染问题
- linux - 如何通过 Linux 环境为 gcc 设置自定义预定义宏以避免一直传递它们
- r - 配置 r 三明治包以查找 felm 的聚类标准错误
- python-3.x - PySpark 不是 null 也不是 nan 值函数适用于所有类型的列
- sql - 检索最近 5 个不同日期的所有记录
- c++ - CreateWindow() 返回 nullptr 并且 GetLastError() 返回 1400
- android - TypeConverter 函数从未使用过房间数据库