首页 > 解决方案 > 如何在 flink kafka 流中使用 sql?

问题描述

我已经从 postgresql DB 加载了一个规则表作为 Flink 表。然后读取 kafka msg 并通过这些规则对 msg 进行分类。代码是这样的

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.enableCheckpointing(5000)
    val stenv=StreamTableEnvironment.create(senv)
    val streamsource=senv.createInput(inputFormat)
    stenv.registerDataStream("rules",streamsource)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", KAFKA_BROKER)
    properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    properties.setProperty("group.id", TRANSACTION_GROUP)
    val fkp = new FlinkKafkaProducer010[String](TOPIC1, new SimpleStringSchema(), properties)
    val fkc = new FlinkKafkaConsumer010[String](TOPIC, new SimpleStringSchema(), properties)
    val stream = senv.addSource(fkc).setParallelism(3)
    val jsons = stream.map {
    {
      r => {
        val sub = JSON.parseObject(r.toString)
        val value = sub.getDouble("value")
        val time = sub.getLong("time")
        val tag = sub.getString("name")
        val error = sub.getString("error")
        val t = stenv.sqlQuery("select * from rules").where("nodeid=" + tag) //error is here
        //todo
        }
    } 

错误是这样的

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
    at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:686)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1143)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
    at cettest$.main(cettest.scala:63)
    at cettest.main(cettest.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
    ... 7 more

我尝试了很多方法来解决问题。但失败了!

标签: scalastreamingapache-flink

解决方案


欢迎来到堆栈溢出!如果您可以列出到目前为止的尝试,那将会很有帮助,但是您的问题的解决方案似乎相当简单——看起来StreamTableEnvironmentImpl没有扩展 Serializable 特征:https ://www.oreilly.com/library/查看/scala-cookbook/9781449340292/ch12s08.html

但是,使用 Flink 的 @Internal 类似乎并不正确。我宁愿创建自己的可序列化类,或者很可能是一个案例类,默认情况下它是可序列化的。

希望能帮助到你!


推荐阅读