image - 如何将 BufferedImage RDD 保存为 HDFS 文件
问题描述
我需要从 HDFS 读取图像,进行一些处理并将图像保存回 HDFS。此处理必须在 spark 中完成。我将图像文件作为 sc.binaryFiles 读取,然后将它们转换为缓冲图像并执行一些操作。但是当我尝试将 RDD[BufferedImage] 保存到 FSDataOutputStream 时出现“任务不可序列化”错误
//read binary files from RDD
val images = sc.binaryFiles("/tmp/images/")
//images: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = /tmp/images/
//get BufferedImageRDD
val bufImages = images.map(x => ImageIO.read(x._2.open))
//bufImages: org.apache.spark.rdd.RDD[java.awt.image.BufferedImage] = MapPartitionsRDD[1]
//try saving in local directory
bufImages.foreach(x => UtilImageIO.saveImage(x,"Mean3.jpg"))
//success
//try saving in hdfs
val conf = new Configuration()
val fileSystem = FileSystem.get(conf);
val out = fileSystem.create(new Path("/tmp/img1.png"));
//out: org.apache.hadoop.fs.FSDataOutputStream = org.apache.hadoop.hdfs.client.HdfsDataOutputStream@440f55ad
bufImages.foreach(x => ImageIO.write(x,"png", out))
上面的代码抛出以下错误
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
... 49 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.hdfs.client.HdfsDataOutputStream
Serialization stack:
- object not serializable (class: org.apache.hadoop.hdfs.client.HdfsDataOutputStream, value: org.apache.hadoop.hdfs.client.HdfsDataOutputStream@440f55ad)
- field (class: $iw, name: out, type: class org.apache.hadoop.fs.FSDataOutputStream)
- object (class $iw, $iw@13c2b782)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@28aedf6e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@14d0c3ff)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@48eb05e9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6b9ba1a6)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@53d519cb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@45d7e92)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@79c1301b)
- field (class: $line49.$read, name: $iw, type: class $iw)
- object (class $line49.$read, $line49.$read@1a714d1)
- field (class: $iw, name: $line49$read, type: class $line49.$read)
- object (class $iw, $iw@79ef07b3)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@2dd246ff)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 58 more
请让我知道是否有任何具体的方法可以实现。
解决方案
rdd 上的 foreach 方法只需要可序列化的参数。因此,只需为ImageIO.write(x,"png", out)编写一个带有可序列化参数的包装器,我就能完成这项工作。
推荐阅读
- session - Webdriver 3.14 IE11:单击打开窗口/弹出窗口的链接/按钮时会话丢失
- model-view-controller - Yii 1.1:我可以有多个基本控制器吗?
- linux - 使用现有 Ansible 角色创建自定义 Docker 映像
- ballerina - JMS 消息内容为 XML
- spring - 在春季云数据流 kubernetes 中查找 pod 和查看工作日志的更简单方法
- node.js - MongoDB ReDOS 测试
- typescript - Angular 6数据绑定问题
- javascript - 如何根据数据值集显示/隐藏额外的 div?
- python - for循环中的浮点转换失败(python)
- python - 来自父类的类型提示