scala - 如何更新自定义 Spark 数据源中的“写入字节数”?
问题描述
我创建了一个 Spark 数据源,它使用“较旧”的 DataSource V1 API 以我们的测量设备和某些软件需要的特定二进制格式写入数据,即 my DefaultSource
extends CreatableRelationProvider
。
在适当的createRelation
方法中,我调用自己的自定义方法从DataFrame
传入的数据中写入数据。我在 Hadoop 的 FileSystem API 的帮助下执行此操作,从 Hadoop 初始化,Configuration
可以从提供的数据中提取DataFrame
:
def createRelation(sqlContext: SQLContext,
mode : SaveMode,
parameters: Map[String, String],
data : DataFrame): BaseRelation = {
val path = ... // get from parameters; in real here is more preparation code, checking save mode etc.
MyCustomWriter.write(data, path)
EchoingRelation(data) // small class that just wraps the data frame into a BaseRelation with TableScan
}
在MyCustomWriter
我然后做各种各样的事情,最后,我将数据作为副作用保存到map
,mapPartitions
并foreachPartition
调用集群的执行程序,如下所示:
val confBytes = conf.toByteArray // implicit I wrote turning Hadoop Writables to Byte Array, as Configuration isn't serializable
data.
select(...).
where(...).
// much more
as[Foo].
mapPartitions { it =>
val conf = confBytes.toWritable[Configuration] // vice-versa like toByteArray
val writeResult = customWriteRecords(it, conf) // writes data to the disk using Hadoop FS API
writeResult.iterator
}.
// do more stuff
虽然这种方法运行良好,但我注意到在运行此方法时,Output
Spark 作业 UI 中的列未更新。是否可以以某种方式传播此信息,或者我是否必须将数据包装在Writable
s 中并使用 HadoopFileOutputFormat
方法?
解决方案
我发现了一个 hacky 方法。
在 RDD/DF 操作中,您可以获得OutputMetrics
:
val metrics = TaskContext.get().taskMetrics().outputMetrics
这具有字段bytesWritten
和recordsWritten
. 但是,setter 对于org.apache.spark.executor
. 所以,我在包中创建了一个“突破对象”:
package org.apache.spark.executor
object OutputMetricsBreakout {
def setRecordsWritten(outputMetrics: OutputMetrics,
recordsWritten: Long): Unit =
outputMetrics.setRecordsWritten(recordsWritten)
def setBytesWritten(outputMetrics: OutputMetrics,
bytesWritten: Long): Unit =
outputMetrics.setBytesWritten(bytesWritten)
}
然后我可以使用:
val myBytesWritten = ... // calculate written bytes
OutputMetricsBreakout.setBytesWritten(metrics, myBytesWritten + metrics.bytesWritten)
这是一种技巧,但我能想出的唯一“简单”方法。
推荐阅读
- angularjs - ngRoute 中的 AngularJs(1.x) 路由问题?
- sublime-text-plugin - Sublime tmPreference 文件:如何拥有多个范围?
- react-native - 后台通知数据获取
- java - 自定义视图 edittext 调用了 setontouchlistener 但不覆盖 performclick
- java - 如何使用 Spring Boot Data JPA 在一对多映射的子实体中设置 parentId
- mysql - 确保 cron 作业不会两次执行相同的作业
- scala - akka 流 CPU 使用率监控
- dart - immutable StatefulWidget 有什么用,和 Flutter 中的 State 一样,但是可以只做一个 mutable StatefulWidget 没有 state
- javascript - 如何更新 jinja 模板中的全局变量?
- c# - 通过 C# 导出到 CSV 文件时将特定的前导零数字列转换为字符串