apache-kafka - 如何将 *windowed* KTable 实体化为 Kafka 主题
问题描述
我正在编写一个 KafkaStreams 应用程序,它从一个主题中获取字符串值,我想输出过去 5 分钟的某个键的值的串联,每分钟更新到另一个(压缩的)Kafka 主题。我有一种感觉,我快到了,但我还没有成功。我用一个简单的方法进行了测试:
grouped_transactions.toStream().foreach((key, value) -> {
System.out.println(key.window().toString()+ key.key() + " "+ value);
});
这给了我类似于您在下面看到的内容(我已按源主题键 00909 过滤以简化调试)我不想要的是具有相同连接值的所有不同 Windows,我只想要我的扩展字符串连接。
Window{start=1525437120000, end=1525437420000}00909 "ABC",-554.53
Window{start=1525437360000, end=1525437660000}00909 "ABC",-554.53
Window{start=1525437240000, end=1525437540000}00909 "ABC",-554.53
Window{start=1525437300000, end=1525437600000}00909 "ABC",-554.53
Window{start=1525437180000, end=1525437480000}00909 "ABC",-554.53
Window{start=1525437120000, end=1525437420000}00909 "ABC",-554.53;"ABC",646.03
Window{start=1525437180000, end=1525437480000}00909 "ABC",-554.53;"ABC",646.03
Window{start=1525437240000, end=1525437540000}00909 "ABC",-554.53;"ABC",646.03
Window{start=1525437300000, end=1525437600000}00909 "ABC",-554.53;"ABC",646.03
Window{start=1525437360000, end=1525437660000}00909 "ABC",-554.53;"ABC",646.03
以下是所有代码。任何人都知道如何做到这一点?提前致谢!
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
long windowSizeMs = TimeUnit.MINUTES.toMillis(5); // 5 * 60 * 1000L
long advanceMs = TimeUnit.MINUTES.toMillis(1); // 1 * 60 * 1000L
TimeWindows window = TimeWindows.of(windowSizeMs).advanceBy(advanceMs);
KTable<Windowed<String>, String> grouped_transactions = source
.filter((k,v)->k.equals("00909"))
.groupByKey()
.windowedBy(window)
.reduce((v1, v2) -> v1 + ";" + v2, Materialized.as("grouped_transactions_5_1"));
// THIS FAILS on runtime with
// java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed
// cannot be cast to java.lang.String
grouped_transactions.toStream().to(GROUPEDTRANSACTIONS);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
解决方案
我不想要的是所有不同的 Windows 具有相同的连接值,我只想要我的扩展字符串连接。
因为您指定重叠窗口,所以单个记录可以包含在多个窗口实例中。也许,您想指定不重叠的窗口,即带有size == advance
.
推荐阅读
- javascript - 如何在 react native + expo 的导航选项卡中使用自定义字体?
- python - 编程错误:使用 Python 连接到 MySQL 时,“字段列表”中的未知列“nan”
- git - 使用 capistrano 在服务器上部署时如何解决“无法锁定 ref”错误?
- rest - 使用 OAuth2 保护单体私有 REST api?
- java - Spring 属性热重载
- python - 在控制器/视图架构中获取/设置 Tkinter 变量——属性错误
- json - 如何为 Scala/Spark 中的数据帧中的每一行编写一个 Json 文件并重命名文件
- c# - .net 核心模拟 dbcontext 不起作用(静态问题?)
- php - Yii2 创建漂亮的 URL,例如 /index.php/viewName/actionName/var1/value1/var2/value2
- java - 在 Java 中,您如何称呼这种模式/习语?