首页 > 解决方案 > 参数不匹配;org.joda.time.Instant 无法转换为 org.apache.beam.sdk.transforms.SerializableFunction

问题描述

我试图在创建管道时在无界数据集上应用窗口,而且我是 Apache Kafka 的新手,所以我尝试了各种“即时”,但没有任何效果!这是代码片段

pipeline
        .apply(
                KafkaIO.<Long, String>read()
                        .withBootstrapServers(bootstrapServers)
                        .withTopic(inputTopic)
                        .withKeyDeserializer(LongDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class)
                        .withoutMetadata()
              )
        .apply(Values.<String>create())
        .apply("append event time", WithTimestamps.of(...)
           .apply("extract message string", MapElements
                      .into(TypeDescriptors.strings())
                      .via(Record::getMessage))
           .apply("apply window", Window
                                .<String>into(FixedWindows.of(Duration.standardMinutes(5)))
                                .withAllowedLateness(Duration.standardMinutes(5))
                                .triggering(AfterWatermark.pastEndOfWindow())
                                .accumulatingFiredPanes()
                 ));

我不知道 WithTimestamps.of(??) 里面有什么,如果有人能帮助我,我将不胜感激,谢谢。

标签: apache-kafkaapache-beam

解决方案


推荐阅读