首页 > 解决方案 > Spark Rdd 输出到 Kafka 主题

问题描述

我有一个pairRdd不断获取的数据,我想在每 x 分钟内将其内容输出到一个 kafka 主题,然后删除它的内容。

我尝试了一些东西,但每次都会出现这个错误。

线程“Timer-1”中的异常 org.apache.spark.SparkException:在 org.apache.spark.util.ClosureCleaner$ 的 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 中的任务不可序列化.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean (SparkContext.scala:2287) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply (RDD.scala:924) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org. apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:219) 在 org.apache.spark 的 org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) .api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45) at SparkProcess$1.run(SparkProcess.java:94) at java.base/java.util.TimerThread.mainLoop(Timer.java:556) at java.base /java.util.TimerThread.run(Timer.java:506) 引起:java.io.NotSerializableException:SparkProcess$1 序列化堆栈:-对象不可序列化(类:SparkProcess$1,值:SparkProcess$1@28612f9b)-数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小 1) - 字段(类:java.lang.invoke.SerializedLambda,名称:capturedArgs,类型:类 [Ljava.lang.Object;) - 对象(类 java.lang.invoke.SerializedLambda,SerializedLambda[capturingClass=class SparkProcess$1,functionalInterfaceMethod=org/apache/spark/api/java/function/VoidFunction.call:(Ljava/lang/Object;)V, implementation=invokeSpecial SparkProcess$1.lambda$run$e3b46054$1:( Ljava/util/Iterator;)V, instantiatedMethodType=(Ljava/util/Iterator;)V, numCaptured=1]) - writeReplace 数据(类:java.lang.invoke.SerializedLambda) - 对象(类 SparkProcess$1$$Lambda$105 /77425562,SparkProcess$1$$Lambda$105/77425562@346d59fc) - 字段(类:org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1,名称:f$12,类型:接口 org.apache.spark .api.java.function.VoidFunction) - org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner 的 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) $.ensureSerializable(ClosureCleaner.scala:295) ... 还有 14 个

我当前的功能如下所示;

   public void cacheToTopic(){

    Timer t = new Timer();
    t.scheduleAtFixedRate(
            new TimerTask()
            {
                public void run()
                {                         

                    pairRdd.foreachPartition(record->{
                        Producer<String, String> producer=createKafkaProducer();
                        ProducerRecord<String, String> data = new ProducerRecord<String, String>("output"
                                , DataObjectFactory.getRawJSON(record));

                        producer.send(data);

                    });
                }
            },
            3000,      // run first occurrence after three seconds
            3000);  // run every three seconds
}

标签: javaapache-sparkapache-kafka

解决方案


推荐阅读