首页 > 解决方案 > JMSConnection 序列化失败

问题描述

目前,我正在构建一个应用程序,用于读取 Kafka 主题中的消息(json 中的事务)并在生产中发送到 IBM MQ。我在 JMS 类中的序列化方面遇到了一些麻烦,并且对如何修复它有点迷茫。我的代码是:

object DispatcherMqApp extends Serializable {

  private val logger = LoggerFactory.getLogger(this.getClass)
  val config = ConfigFactory.load()

  def inicialize(transactionType: String) = {

    val spark = new SparkConf()
      .setAppName("Dispatcher MQ Categorization")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.streaming.stopGracefullyOnShutDown", "true")
    logger.debug(s"Loading configuration at ${printConfig(config).head} =>\n${printConfig(config)(1)}")

    val kafkaConfig = KafkaConfig.buildFromConfiguration(config, "dispatcher-mq")
    val streamCtx = new StreamingContext(spark, Seconds(kafkaConfig.streamingInterval))

    sys.ShutdownHookThread {
      logger.warn("Stopping the application ...")
      streamCtx.stop(stopSparkContext = true, stopGracefully = true)
      logger.warn("Application Finish with Success !!!")
    }

    val topic = config.getString(s"conf.dispatcher-mq.consumer-topic.$transactionType")
    logger.info(s"Topic: $topic")
    val zkdir = s"${kafkaConfig.zookeeperBaseDir}$transactionType-$topic"
    val kafkaManager = new KafkaManager(kafkaConfig)
    val stream = kafkaManager.createStreaming(streamCtx, kafkaConfig.offset, topic, zkdir)
    val kafkaSink = streamCtx.sparkContext.broadcast(kafkaManager.createProducer())
    val mqConfig = MQConfig(config.getString("conf.mq.name"),
      config.getString("conf.mq.host"),
      config.getInt("conf.mq.port"),
      config.getString("conf.mq.channel"),
      config.getString("conf.mq.queue-manager"),
      config.getInt("conf.mq.retries"),
      config.getString("conf.mq.app-name"),
      Try(config.getString("conf.mq.user")).toOption,
      Try(config.getString("conf.mq.password")).toOption,
      config.getString("conf.dispatcher-mq.send.category_file"))

    val queueConn = new MQService(mqConfig)

    (stream, queueConn, streamCtx, kafkaSink, zkdir)
  }

  def main(args: Array[String]): Unit = {
    val transactionType = args.head
    if (transactionType=="account" | transactionType=="credit") {
      val (messages, queueConn, sc, kafkaSink, zkdir) = inicialize(transactionType)
      val fieldsType = config.getString(s"conf.dispatcher-mq.send.fields.$transactionType")
      val source = config.getString("conf.dispatcher-mq.parameters.source")
      val mqVersion = config.getString(s"conf.dispatcher-mq.parameters.version.$transactionType")
      val topicError = config.getString("conf.kafka.topic_error")

      messages.foreachRDD(rdd => {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.map(_._2).filter(_.toUpperCase.contains("BYCATEGORIZER"))
          .foreach(message => {
            val msg:Option[TextMessage] = try {
              Some(queueConn.createOutputMq(message, fieldsType, source, mqVersion))
            } catch {
              case ex: Exception =>
                logger.error(s"[ERROR] input: [[$message]]\n$ex")
                val errorReport = ErrorReport("GENERAL", "DISPATCHER-MQ", transactionType.toString, ex.getMessage, None, Option(ex.toString))
                ErrorReportService.sendError(errorReport, topicError, kafkaSink.value)
                None
            }
            if(msg.nonEmpty) queueConn.submit(msg.get)
          })
        logger.info(s"Save Offset in  $zkdir...\n${offsetRanges.toList.to}")
        ZookeeperConn.saveOffsets(zkdir, offsetRanges)
      })


      sc.start()
      sc.awaitTermination()

    } else
      logger.error(s"${args.head} is not a valid argument. ( account or credit ) !!! ")
  }

我在序列化 JMSConnection 时遇到错误,它在createOutputMq方法中被称为隐藏。错误是:

20/09/04 17:21:00 ERROR JobScheduler: Error running job streaming job 1599250860000 ms.0
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
    at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:80)
    at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:76)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
20/09/04 17:21:00 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
    at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:80)
    at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:76)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more

有人对如何解决它有一些想法吗?错误消息(76 和 80)中显示的行分别是 mymessages.foreachRDD(rdd => {.foreach(message => {。提前致谢

标签: scalajmsspark-streamingibm-mqjms-serializer

解决方案


推荐阅读