json - FlinkKafkaConsumer - Scala 消费 json 消息
问题描述
我是 scala 和 flink 的新手。我正在使用以下代码来使用来自 kafka 流的 json 消息。
object Job {
def main(args: Array[String]) {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
val topic: String = "test"
val consumer = new FlinkKafkaConsumer[ObjectNode](topic, new JSONKeyValueDeserializationSchema(false), properties)
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(consumer).name("opendental")
.enableCheckpointing(5000) // checkpoint every 5000 msecs
.print()
// execute program
env.execute("Flink Scala API Skeleton")
}
}
我正在使用命令mvn clean package
将 scala 代码编译为 a.jar
但由于方法重载而构建失败。这是错误
[ERROR] (x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR] (x$1: java.util.regex.Pattern,x$2: (some other)org.apache.flink.api.common.serialization.DeserializationSchema(in <none>)[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR] (x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR] (x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema(in <none>)[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR] (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode] <and>
[ERROR] (x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.fasterxml.jackson.databind.node.ObjectNode],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[com.fasterxml.jackson.databind.node.ObjectNode]
[ERROR] cannot be applied to (String, org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema, java.util.Properties)
[ERROR] val consumer = new FlinkKafkaConsumer[ObjectNode](topic, new JSONKeyValueDeserializationSchema(false), properties)
[ERROR]
我已经尝试了几件事来解决重载的方法,但现在我碰壁了。
解决方案
推荐阅读
- c++ - 从 ENUM 中检索字符串值时遇到问题
- function - 在 MATLAB 中对具有默认值的多个参数使用 varargin
- python - 为什么我在 globals() 中找不到我的变量?
- rust - 将 u16(或 u32、u64)的数组(或向量)转换为 u8 的数组
- tensorflow - 试验数据集
- ms-access - 换行符访问用户表单和报告
- r - 如何用另一个替换一组 x 轴标签
- flutter - 如何解决 DefaultTabController has size error without height size?
- r - 如何将标题添加到格式表?
- c# - 如何旋转屏幕中心