首页 > 解决方案 > Malformed class name - Scala+Flink while reading from rabbitmq

问题描述

I'm newbie to Scala, but I'm fighting with that issue for last week now, I'm producing json and pushing them via rabbitmq, data is being received by my Flink instance, and I can read it as String and Deserialize using json4s, however, when I'm trying to map it - I'm getting

Exception in thread "main" java.lang.InternalError: Malformed class name
    at java.lang.Class.getSimpleName(Class.java:1330)
    at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.$anonfun$lookupConstructor$1(ScalaCaseClassSerializer.scala:93)
    at scala.Predef$.require(Predef.scala:277)
    at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:97)
    at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
    at RabbitMQTest$$anon$2$$anon$1.<init>(RabbitMQTest.scala:41)
    at RabbitMQTest$$anon$2.createSerializer(RabbitMQTest.scala:41)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:652)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:251)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:210)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
    at RabbitMQTest$.main(RabbitMQTest.scala:49)
    at RabbitMQTest.main(RabbitMQTest.scala)

Process finished with exit code 1

Code of app is:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.{ConfigConstants, Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.rabbitmq.{RMQSink, RMQSource}
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
import org.json4s.DefaultFormats
import org.json4s.native.{JsonMethods, parseJson}
import org.json4s.native.JsonMethods._
import org.json4s._

object RabbitMQTest {
  def main(args: Array[String]): Unit = {
    case class Body(abc: String, qwe: String)
    implicit lazy val formats = org.json4s.DefaultFormats
    val config = new Configuration()
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    config.setInteger(RestOptions.PORT, 8093)
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    env.enableCheckpointing(100000)
    val connectionConfig = new RMQConnectionConfig.Builder()
      .setHost(“hostname”)
      .setPort(5672)
      .setUserName(“user”)
      .setPassword(“password’s”rd)
      .setVirtualHost("/")
      .build


    val stream = env
      .addSource(new RMQSource[String](
        connectionConfig,            // config for the RabbitMQ connection
        "enrichme",                 // name of the RabbitMQ queue to consume
        false,                        // use correlation ids; can be false if only at-least-once is required
        new SimpleStringSchema))     // deserialization schema to turn messages into Java objects
      .setParallelism(1)               // non-parallel source is only required for exactly-once
      .flatMap(raw => JsonMethods.parse(raw).toOption)
      .map(_.extract[Body])

    stream.print()


    env.execute("Flink Test 1")
     }

}

If I will comment line with map I'm getting normal output

6> JObject(List((abc,JString(def)), (qwe,JString(rtl))))

according to produced input

{'abc': 'def', 'qwe': 'rtl'}

标签: scalarabbitmqapache-flink

解决方案


推荐阅读