scala - 如何修复sparkstreaming中的任务不可序列化异常
问题描述
我想使用 sparkstreaming 总结互联网日志。我已将日志数据转换为地图。计算处理发生错误。
将 spark 序列化配置设置为 avro。但它不起作用。
以下是代码:
...
val sc = new SparkContext(conf)
...
val lines = kafkaStream.map(_._2)
.map { _.split("\\|") }
.map { arr =>
Map(
...
)
}
lines.print() // this works
lines.map { clearMap => // the line exception point to
...
val filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^\\d+_" + uvid + "_.*$"))
val r = HBaseUtils.queryFromHBase(sc, "flux", zerotime.getBytes, nowtime.getBytes,filter)
val uv = if (r.count() == 0) 1 else 0
val sscount = clearMap("sscount")
val vv = if (sscount == "0") 1 else 0
val cip = clearMap("cip")
val filter2 = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^\\d+_\\d+_\\d+_" + cip + "_.*$"))
val r2 = HBaseUtils.queryFromHBase(sc, "flux", zerotime.getBytes, nowtime.getBytes, filter2)
val newip = if (r2.count() == 0) 1 else 0
val filter3 = new RowFilter(CompareOp.EQUAL,new RegexStringComparator("^\\d+_"+uvid+"_.*$"))
val r3 = HBaseUtils.queryFromHBase(sc, "flux", null, nowtime.getBytes, filter3)
val newcust = if (r3.count() == 0) 1 else 0
(nowtime, pv, uv, vv, newip, newcust)
}
...
以下是异常消息:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:679)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:545)
at cn.tedu.flux.fluxdriver$.main(fluxdriver.scala:73)
at cn.tedu.flux.fluxdriver.main(fluxdriver.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@3fc08eec)
- field (class: cn.tedu.flux.fluxdriver$$anonfun$main$2, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class cn.tedu.flux.fluxdriver$$anonfun$main$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 12 more
解决方案
我已经解决了这个问题。SparkContext 在函数中定义时不能作为参数序列化。所以我试着把它定义为这样的态度:
对象驱动程序{
var sc:SparkContext=null
def main(arg:Array[String]):Unit = {
sc = new SparkContext();
....
} }
它成功了!
之前,它是这样的:
对象驱动程序{
def main(arg:Array[String]):Unit = {
vla sc = 新的 SparkContext;
……
} }
推荐阅读
- rss - Typo3 8.7 将插件内容导出为 RSS 提要
- node.js - 如何使用节点 js 在 aws neptune db 中保存数据?
- python - 字典中的元组列表
- c++ - ld 找不到 -latomic 或 -lstdc++
- ruby-on-rails - 获取 TypeError - 尝试保存时没有将 Symbol 隐式转换为 Integer
- java - Java:日历时区问题
- r - 如何更改 R 中 geom_table() 中列标题的颜色?
- python - 动态声明 kivy MDDataTable 返回 NoneType
- arrays - 错误:对象作为 React 子级无效(找到:带键的对象。如果您要渲染子级集合,请改用数组
- java - Java 添加事件表