apache-kafka - Kafka Streams 中的状态过滤/flatMapValues?
问题描述
我正在尝试编写一个简单的 Kafka Streams 应用程序(针对 Kafka 2.2/Confluent 5.2),以将具有至少一次语义的输入主题转换为精确一次的输出流。我想编码以下逻辑:
- 对于具有给定密钥的每条消息:
- 从消息值中的字符串字段读取消息时间戳
- 从本地状态存储中检索我们之前看到的此密钥的最大时间戳
- 如果消息时间戳小于或等于状态存储中的时间戳,则不发出任何内容
- 如果时间戳大于状态存储中的时间戳,或者状态存储中不存在密钥,则发出消息并使用消息的密钥/时间戳更新状态存储
(这可以保证根据我们从上游系统获得的排序保证提供正确的结果;我不想在这里做任何神奇的事情。)
起初我以为我可以使用Kafka StreamsflatMapValues
运算符来做到这一点,它可以让您将每个输入消息映射到具有相同键的零个或多个输出消息。但是,该文档明确警告:
这是一个无状态的逐条记录操作(参见 transformValues(ValueTransformerSupplier, String...) 用于有状态的值转换)。
这听起来很有希望,但transformValues
文档并没有明确说明如何为每个输入消息发出零个或一个输出消息。除非这就是// or null
示例中的旁白想要表达的意思吗?
flatTransform
看起来也很有希望,但我不需要操纵密钥,如果可能的话,我想避免重新分区。
任何人都知道如何正确执行这种过滤?
解决方案
您可以Transformer
用于实现如上所述的有状态操作。为了不向下游传播消息,您需要null
从方法返回,这在java doctransform
中提到。Transformer
您可以通过processorContext.forward(key, value)
. 下面提供的简化示例
kStream.transform(() -> new DemoTransformer(stateStoreName), stateStoreName)
public class DemoTransformer implements Transformer<String, String, KeyValue<String, String>> {
private ProcessorContext processorContext;
private String stateStoreName;
private KeyValueStore<String, String> keyValueStore;
public DemoTransformer(String stateStoreName) {
this.stateStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
}
@Override
public KeyValue<String, String> transform(String key, String value) {
String existingValue = keyValueStore.get(key);
if (/* your condition */) {
processorContext.forward(key, value);
keyValueStore.put(key, value);
}
return null;
}
@Override
public void close() {
}
}
推荐阅读
- javascript - 显示需要用户名和需要密码的消息是什么?
- c++ - 无法输出可用和已占用的座位
- java - 休息服务器应如何管理用户启用?
- javascript - 可编辑数据的编辑器
- java - 从 Java 中用逗号和标题分隔的文件中读取时间
- javascript - 减少对象数组
- javascript - 如何将徽标添加到路由器视图转换?
- vue.js - 如何在组件中设置道具值时更新道具值
- regex - Regex and Bash file - Extract data from another file and store in variable
- scala - 使用 spark scala 向空数据框添加一行