scala - Malformed class name - Scala+Flink while reading from rabbitmq
问题描述
I'm newbie to Scala, but I'm fighting with that issue for last week now, I'm producing json and pushing them via rabbitmq, data is being received by my Flink instance, and I can read it as String and Deserialize using json4s, however, when I'm trying to map it - I'm getting
Exception in thread "main" java.lang.InternalError: Malformed class name
at java.lang.Class.getSimpleName(Class.java:1330)
at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.$anonfun$lookupConstructor$1(ScalaCaseClassSerializer.scala:93)
at scala.Predef$.require(Predef.scala:277)
at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:97)
at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
at RabbitMQTest$$anon$2$$anon$1.<init>(RabbitMQTest.scala:41)
at RabbitMQTest$$anon$2.createSerializer(RabbitMQTest.scala:41)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:652)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:251)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:210)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at RabbitMQTest$.main(RabbitMQTest.scala:49)
at RabbitMQTest.main(RabbitMQTest.scala)
Process finished with exit code 1
Code of app is:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.{ConfigConstants, Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.rabbitmq.{RMQSink, RMQSource}
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
import org.json4s.DefaultFormats
import org.json4s.native.{JsonMethods, parseJson}
import org.json4s.native.JsonMethods._
import org.json4s._
object RabbitMQTest {
def main(args: Array[String]): Unit = {
case class Body(abc: String, qwe: String)
implicit lazy val formats = org.json4s.DefaultFormats
val config = new Configuration()
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
config.setInteger(RestOptions.PORT, 8093)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
env.enableCheckpointing(100000)
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost(“hostname”)
.setPort(5672)
.setUserName(“user”)
.setPassword(“password’s”rd)
.setVirtualHost("/")
.build
val stream = env
.addSource(new RMQSource[String](
connectionConfig, // config for the RabbitMQ connection
"enrichme", // name of the RabbitMQ queue to consume
false, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema)) // deserialization schema to turn messages into Java objects
.setParallelism(1) // non-parallel source is only required for exactly-once
.flatMap(raw => JsonMethods.parse(raw).toOption)
.map(_.extract[Body])
stream.print()
env.execute("Flink Test 1")
}
}
If I will comment line with map I'm getting normal output
6> JObject(List((abc,JString(def)), (qwe,JString(rtl))))
according to produced input
{'abc': 'def', 'qwe': 'rtl'}
解决方案
推荐阅读
- angular - 如何在 rxjs 6 中将 switchMap-operators 与函数体链接起来
- sql - 如何联合数据透视列中的多行
- python - 语音识别语音到文本在 python 中不起作用
- c# - 为什么 Type.Equals() 对于泛型类型总是返回 false?
- c++ - 无法让球从屏幕边缘反弹
- r - R:在 .Rprofile 中创建环境
- hyperledger-fabric - 创建 Fabric 网络时如何在我的 Fabric 证书中配置 L、ST、C... 属性?
- python - Python:选择具有更改值的行
- javascript - 使用 JavaScript 的变量中的 TagName 值
- javascript - 为什么 react-native-gifted-chat 不能从 firebase 时间戳正确显示时间?