首页 > 解决方案 > 在 flink 中使用表(数据流)时出错

问题描述

我正在使用 flink 1.9.0,无法导入或获取表文件

我尝试导入与之相关的不同 SBT

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    val tempSource = CsvTableSource.builder()
      .path("/home/amulya/Desktop/csvForBroadcast/CSV.csv")
      .fieldDelimiter(",")
      .field("locationID", Types.STRING())
      .field("temp", Types.DOUBLE())
      .build()

    tEnv.registerTableSource("Temperatures", tempSource)

    val askTable = tEnv
      .scan("Temperatures")
      .where(" 'Temperature >= 50")
      .select("'locationID, 'temp")

    val stream = tEnv.toAppendStream[Events](askTable)
      .print()
    env.execute()


  }
  case class Events(locationID: String, temp: Long)
}


我有一个简单的 CSV 格式数据:-

locationID,temp
"1",25
"2",25
"3",35
"4",45
"5",55

这是错误:-

Error:scalac: missing or invalid dependency detected while loading class file 'ScalaCaseClassSerializer.class'.
Could not access type SelfResolvingTypeSerializer in object org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'ScalaCaseClassSerializer.class' was compiled against an incompatible version of org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.

我正在尝试对这些基本数据执行 CEP,以便开始使用 apache flink,任何形式的帮助都将不胜感激

标签: scalacsvsbtapache-flinkflink-cep

解决方案


试试 flink-table-api-java-bridge。

目前 scala 桥接模块仅提供表和数据集/数据流之间的基本转换。


推荐阅读