scala - 遍历 spark 数据帧并将每一行值存储在另一个类的变量中
问题描述
我想迭代一个火花数据框并将每一行的值存储在一个类数据成员(全局变量)中。
代码:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{
StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
object Main {
class Input_Class {
var name: String = "";
var age: String = "";
var gender: String = "";
def setter(src: Row) {
var row = src.toSeq
var i = 0;
name = (row(i)).toString;
i += 1;
age = (row(i)).toString;
i += 1;
gender = (row(i)).toString;
}
}
class Manager extends Serializable{
var inputObj = new Input_Class();
def inputSetter(src: Row) = {
inputObj.setter(src);
}
}
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("App").config("spark.master", "local").getOrCreate()
val df = spark.read.csv("data.csv ");
var ManagerObj = new Manager();
df.rdd.map(row => {
ManagerObj.inputSetter(row)
})
spark.stop()
}
}
我不确定这段代码是否正确。我使用地图操作员错了吗?正如错误所说,它不可序列化。请在这里帮助我,我是新手,在这方面没有太多经验,如果有更好或其他方式来实现我正在做的事情,请推荐。
这是我得到的错误:
20/06/03 17:44:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:371)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
at Main$.main(Untitled-2.scala:57)
... 51 elided
Caused by: java.io.NotSerializableException: Main$Manager
Serialization stack:
- object not serializable (class: Main$Manager, value: Main$Manager@108f206f)
- field (class: scala.runtime.ObjectRef, name: elem, type: class java.lang.Object)
- object (class scala.runtime.ObjectRef, Main$Manager@108f206f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Main$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Main$.$anonfun$main$1$adapted:(Lscala/runtime/ObjectRef;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class Main$$$Lambda$2851/2090377899, Main$$$Lambda$2851/2090377899@7722c8e6)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 60 more
谢谢!
解决方案
您Manager
在闭包中使用类实例。请extends Serializable
在接口Manager
class Manager extends Serializable {
var inputObj = new Input_Class();
def inputSetter(src: Row) = {
inputObj.setter(src);
}
}
推荐阅读
- python - 如何使用 Inventor API 和 Python 访问装配体中出现的工作平面
- reactjs - 如何从 Ant Design 输入搜索框中删除放大镜按钮?
- django - 如何覆盖 Viewset 中的 create 方法以在 DRF 中接受我的自定义序列化程序?
- java - 原因:javax.net.ssl.SSLHandshakeException:PKIX 路径构建失败,无法找到请求目标的有效证书路径
- go - Go 的泛型结构
- wordpress - 如何在 wordpress 的特定页面上显示谷歌分析统计图表
- c# - EF Core:嵌套实体外键未更新
- r - 给定一个data.table,对于每列上的每个子组,选择第一个非NA
- javascript - 高级下拉菜单
- java - 您的 SQL 语法有错误;请查看与您的 MariaDB 服务器版本相对应的手册,了解在第 1 行使用 near table 的正确语法