java - Spark Rdd 输出到 Kafka 主题
问题描述
我有一个pairRdd
不断获取的数据,我想在每 x 分钟内将其内容输出到一个 kafka 主题,然后删除它的内容。
我尝试了一些东西,但每次都会出现这个错误。
线程“Timer-1”中的异常 org.apache.spark.SparkException:在 org.apache.spark.util.ClosureCleaner$ 的 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 中的任务不可序列化.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean (SparkContext.scala:2287) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply (RDD.scala:924) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org. apache.spark.rdd.RDD.withScope(RDD.scala:362) 在 org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:219) 在 org.apache.spark 的 org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) .api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45) at SparkProcess$1.run(SparkProcess.java:94) at java.base/java.util.TimerThread.mainLoop(Timer.java:556) at java.base /java.util.TimerThread.run(Timer.java:506) 引起:java.io.NotSerializableException:SparkProcess$1 序列化堆栈:-对象不可序列化(类:SparkProcess$1,值:SparkProcess$1@28612f9b)-数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小 1) - 字段(类:java.lang.invoke.SerializedLambda,名称:capturedArgs,类型:类 [Ljava.lang.Object;) - 对象(类 java.lang.invoke.SerializedLambda,SerializedLambda[capturingClass=class SparkProcess$1,functionalInterfaceMethod=org/apache/spark/api/java/function/VoidFunction.call:(Ljava/lang/Object;)V, implementation=invokeSpecial SparkProcess$1.lambda$run$e3b46054$1:( Ljava/util/Iterator;)V, instantiatedMethodType=(Ljava/util/Iterator;)V, numCaptured=1]) - writeReplace 数据(类:java.lang.invoke.SerializedLambda) - 对象(类 SparkProcess$1$$Lambda$105 /77425562,SparkProcess$1$$Lambda$105/77425562@346d59fc) - 字段(类:org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1,名称:f$12,类型:接口 org.apache.spark .api.java.function.VoidFunction) - org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在 org.apache.spark.util.ClosureCleaner 的 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) $.ensureSerializable(ClosureCleaner.scala:295) ... 还有 14 个
我当前的功能如下所示;
public void cacheToTopic(){
Timer t = new Timer();
t.scheduleAtFixedRate(
new TimerTask()
{
public void run()
{
pairRdd.foreachPartition(record->{
Producer<String, String> producer=createKafkaProducer();
ProducerRecord<String, String> data = new ProducerRecord<String, String>("output"
, DataObjectFactory.getRawJSON(record));
producer.send(data);
});
}
},
3000, // run first occurrence after three seconds
3000); // run every three seconds
}
解决方案
推荐阅读
- smtp - 使用 GMAIL 的 smtp-relay 从 GCP 计算实例发送电子邮件
- flutter - GridView 中的切换按钮
- service - 程序员在开发自己的 oAuth 服务时应该考虑哪些技术细节?
- mysql - MySQL RDS 聚合函数的性能
- spring-mvc - REST URL 调用抛出文件未找到异常
- javascript - 我不能在模态部分给出 id 值
- reactjs - 使用 React 钩子和 onScroll 或 onWheel 防止重新渲染
- excel - 剪切复制选定范围在一列的最后一个单元格处
- r - 按组更改ggplot中标签的默认颜色
- java - 更改项目目录和模块名称后无法解析活动名称