scala - 如何在 flink kafka 流中使用 sql?
问题描述
我已经从 postgresql DB 加载了一个规则表作为 Flink 表。然后读取 kafka msg 并通过这些规则对 msg 进行分类。代码是这样的
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.enableCheckpointing(5000)
val stenv=StreamTableEnvironment.create(senv)
val streamsource=senv.createInput(inputFormat)
stenv.registerDataStream("rules",streamsource)
val properties = new Properties()
properties.setProperty("bootstrap.servers", KAFKA_BROKER)
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
properties.setProperty("group.id", TRANSACTION_GROUP)
val fkp = new FlinkKafkaProducer010[String](TOPIC1, new SimpleStringSchema(), properties)
val fkc = new FlinkKafkaConsumer010[String](TOPIC, new SimpleStringSchema(), properties)
val stream = senv.addSource(fkc).setParallelism(3)
val jsons = stream.map {
{
r => {
val sub = JSON.parseObject(r.toString)
val value = sub.getDouble("value")
val time = sub.getLong("time")
val tag = sub.getString("name")
val error = sub.getString("error")
val t = stenv.sqlQuery("select * from rules").where("nodeid=" + tag) //error is here
//todo
}
}
错误是这样的
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:686)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1143)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
at cettest$.main(cettest.scala:63)
at cettest.main(cettest.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
... 7 more
我尝试了很多方法来解决问题。但失败了!
解决方案
欢迎来到堆栈溢出!如果您可以列出到目前为止的尝试,那将会很有帮助,但是您的问题的解决方案似乎相当简单——看起来StreamTableEnvironmentImpl没有扩展 Serializable 特征:https ://www.oreilly.com/library/查看/scala-cookbook/9781449340292/ch12s08.html
但是,使用 Flink 的 @Internal 类似乎并不正确。我宁愿创建自己的可序列化类,或者很可能是一个案例类,默认情况下它是可序列化的。
希望能帮助到你!
推荐阅读
- reactjs - 通过博览会运行的本机应用程序中的守望者警告
- angular - 当我将自定义 PropertyValidator 添加到我的儿子元数据验证器数组时,RadDataForm 失败并出现数组索引插入错误
- python - 为什么这个带有记忆功能的 LCS(最长公共子序列)Python 实现表现不佳?
- c# - 访问 Microsoft.Extensions.HostingServices 时出错。无法解析 JSON 文件
- node.js - nodejs中的JSON并没有给我所有信息
- c++ - Eclipse 中的 C++ - 正在运行的程序部分现在冻结
- java - 仅从 DataFrame 中的列中的时间戳中提取日期 - Java 中的 Spark
- html - 在不修改父布局的情况下将 div 内的内容作为列对齐
- c# - C# 从 Excel 中读取实际单元格值,其中日期格式为长日期,日历类型为西方
- c# - 如何从高度计算弹丸的射程?