apache-kafka - toStream() 不适用于窗口化 KTable
问题描述
我想编写一个小型 Kafka Streams 应用程序,它减少输入流的时间窗口,对值进行一些映射,然后将生成的 toStream() 更改日志发送到另一个主题。使用我的代码,我在 toStream() 操作中收到以下错误:
Compilation failure
[ERROR] StreamFilter.java:[39,86] incompatible types: org.apache.kafka.streams.kstream.KStream<org.apache.kafka.streams.kstream.Windowed<java.lang.Integer>,filterExample.SensorMessage> cannot be converted to org.apache.kafka.streams.kstream.KStream<java.lang.Integer,filterExample.SensorMessage>
我在某处读到默认 Serdes 可能是问题所在,但到目前为止,明确包括它们Consumed.with
并没有解决问题。
public static void runStreamFilter(String broker) throws Exception {
final SensorMessageSerializer serializer = new SensorMessageSerializer();
final SensorMessageDeserializer deserializer = new SensorMessageDeserializer();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(serializer, deserializer));
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, SensorMessage> input = builder.stream(KafkaConstants.TOPIC_IN, Consumed.with(Serdes.Integer(), Serdes.serdeFrom(serializer, deserializer)));
final KStream<Integer, SensorMessage> output = input
.filter((k,v) -> v.getValue() > 19)
.groupByKey(Grouped.with(Serdes.Integer(), Serdes.serdeFrom(serializer, deserializer)))
.windowedBy(TimeWindows.of(Duration.ofMillis(500)))
.reduce((aggValue, newValue) -> avgReducer(aggValue, newValue))
.mapValues(value -> latencyMapper(value))
.toStream();
output.to(KafkaConstants.TOPIC_OUT);
解决方案
代码
.windowedBy(TimeWindows.of(Duration.ofMillis(500))).reduce(..)
返回KTable<Windowed<K>, V>
。
为了将结果转换为KStream<Integer, SensorMessage>
,您需要从Windowed
对象中提取值,因此您需要在之后添加以下代码toStream()
:
.map((key, value) -> KeyValue.pair(key.key(), value));
推荐阅读
- javascript - 如何使用数据表以相同的颜色显示重复的行
- oracle - 为什么 truncate table 不会更改 Last DDL Date?
- java - 显示软键盘时如何仅调整布局的一部分(android)?
- go - 客户端应自动发送消息而无需用户交互
- python - 为什么字符串列表的python排序功能不起作用
- python - 如何在python中合并同一数据框中的行?
- java - 我将如何用“你想再试一次吗?![是/否]”的问题循环这个 java 程序?
- c++ - 如何在此“if 块”中定义其他无效输入的值,以便在 C++ 中将字符串转换为对象?
- azure - Azure 计算机视觉 API 可以访问 AWS S3 中的图像文件吗?
- memory-leaks - 太多 TimerQueueTimer 对象被分配并消耗大量内存