scala - 在 flink 中使用表(数据流)时出错
问题描述
我正在使用 flink 1.9.0,无法导入或获取表文件
我尝试导入与之相关的不同 SBT
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val tempSource = CsvTableSource.builder()
.path("/home/amulya/Desktop/csvForBroadcast/CSV.csv")
.fieldDelimiter(",")
.field("locationID", Types.STRING())
.field("temp", Types.DOUBLE())
.build()
tEnv.registerTableSource("Temperatures", tempSource)
val askTable = tEnv
.scan("Temperatures")
.where(" 'Temperature >= 50")
.select("'locationID, 'temp")
val stream = tEnv.toAppendStream[Events](askTable)
.print()
env.execute()
}
case class Events(locationID: String, temp: Long)
}
我有一个简单的 CSV 格式数据:-
locationID,temp
"1",25
"2",25
"3",35
"4",45
"5",55
这是错误:-
Error:scalac: missing or invalid dependency detected while loading class file 'ScalaCaseClassSerializer.class'.
Could not access type SelfResolvingTypeSerializer in object org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'ScalaCaseClassSerializer.class' was compiled against an incompatible version of org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.
我正在尝试对这些基本数据执行 CEP,以便开始使用 apache flink,任何形式的帮助都将不胜感激
解决方案
试试 flink-table-api-java-bridge。
目前 scala 桥接模块仅提供表和数据集/数据流之间的基本转换。
推荐阅读
- javascript - 加载 DOM 页面后将所有复选框设置为 true
- scala - 从 UDF 中的 Dataframe 创建一列 ArrayType[StructType]
- hololens - HoloLens2:我想在用户正在看的表面网格上绘制一个目标
- python - 类如何从基类继承
- mysql - 如何简化我的代码以阻止它使服务器崩溃?
- matlab - 如何在图像上旋转多行
- javascript - 我可以使用父函数的参数来命名它的子函数吗?
- r - 更改 ggplot 中变量名称的字体大小
- algorithm - 如何从给定的递归函数中找到递归关系
- python - 无法使用 PyMySQL 与远程 MySQL 服务器建立 TLS TCP 连接,其他工具可以工作