apache-kafka - 参数不匹配;org.joda.time.Instant 无法转换为 org.apache.beam.sdk.transforms.SerializableFunction
问题描述
我试图在创建管道时在无界数据集上应用窗口,而且我是 Apache Kafka 的新手,所以我尝试了各种“即时”,但没有任何效果!这是代码片段
pipeline
.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers(bootstrapServers)
.withTopic(inputTopic)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
.apply(Values.<String>create())
.apply("append event time", WithTimestamps.of(...)
.apply("extract message string", MapElements
.into(TypeDescriptors.strings())
.via(Record::getMessage))
.apply("apply window", Window
.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
.withAllowedLateness(Duration.standardMinutes(5))
.triggering(AfterWatermark.pastEndOfWindow())
.accumulatingFiredPanes()
));
我不知道 WithTimestamps.of(??) 里面有什么,如果有人能帮助我,我将不胜感激,谢谢。
解决方案
推荐阅读
- azure-web-app-service - 选择节点版本时权限被拒绝的部署问题
- android - 应用程序关闭后,工作变量被销毁
- mongodb - 当 MongoDB 的文档说 ObjectID“可能是唯一的”时,它是什么意思?
- javascript - 防止 Webpack 删除 jQuery 函数?
- asp.net-core - 配置 OData 测试服务器
- ios - Scenekit 场景需要太长时间才能显示在视图上
- react-native - React Native 在上下文状态更改后不会重新渲染
- ruby-on-rails - 附加普通 png 文件时出现 ActiveStorage::InvariableError?
- python - 当要求从文件导入时,Jupyter 使用过时版本的函数
- xcode - Cocoapods 无法在新的 m1 mac Big Sur Xcode 上运行