首页 > 解决方案 > FlinkKafkaConsumer - Scala 消费 json 消息

问题描述

我是 scala 和 flink 的新手。我正在使用以下代码来使用来自 kafka 流的 json 消息。

object Job {
  def main(args: Array[String]) {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    // only required for Kafka 0.8
    properties.setProperty("zookeeper.connect", "localhost:2181")
    val topic: String = "test"

    val consumer = new FlinkKafkaConsumer[ObjectNode](topic, new JSONKeyValueDeserializationSchema(false), properties)
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.addSource(consumer).name("opendental")
      .enableCheckpointing(5000) // checkpoint every 5000 msecs
      .print()

    // execute program
    env.execute("Flink Scala API Skeleton")
  }
}

我正在使用命令mvn clean package将 scala 代码编译为 a.jar但由于方法重载而构建失败。这是错误

[ERROR]   (x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR]   (x$1: java.util.regex.Pattern,x$2: (some other)org.apache.flink.api.common.serialization.DeserializationSchema(in <none>)[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR]   (x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR]   (x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema(in <none>)[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR]   (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR]   (x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode]
[ERROR]  cannot be applied to (String, org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema, java.util.Properties)
[ERROR]     val consumer = new FlinkKafkaConsumer[ObjectNode](topic, new JSONKeyValueDeserializationSchema(false), properties)
[ERROR]

我已经尝试了几件事来解决重载的方法,但现在我碰壁了。

标签: jsonscalaapache-kafkaapache-flinkflink-streaming

解决方案


推荐阅读