首页 > 解决方案 > KafkaTableSink:如何使用它?

问题描述

这是表格:

val res: Table = tenv.sqlQuery(
 """
   |select event.ID,event.locationID, event.temp
   |from event
   |JOIN patt
   |ON event.ID = patt.ID
   |AND event.temp >= patt.temperature
   |""".stripMargin
)

这是我想要的架构:

 res.toAppendStream[Event].print("Alert for these location")

 case class Event(ID: Int, locationID: String, temp: Double)

我想做Kafka010TableSink:

  val tableSink = new Kafka010TableSink("ask","Output", properties, new FlinkFixedPartitioner[])

架构和序列化架构中发生了什么,我在使用 FlinkFixedPartitioner 时遇到错误。

标签: scalaapache-kafkasbtapache-flinkflink-sql

解决方案


推荐阅读