首页 > 解决方案 > 如何更新自定义 Spark 数据源中的“写入字节数”?

问题描述

我创建了一个 Spark 数据源,它使用“较旧”的 DataSource V1 API 以我们的测量设备和某些软件需要的特定二进制格式写入数据,即 my DefaultSourceextends CreatableRelationProvider

在适当的createRelation方法中,我调用自己的自定义方法从DataFrame传入的数据中写入数据。我在 Hadoop 的 FileSystem API 的帮助下执行此操作,从 Hadoop 初始化,Configuration可以从提供的数据中提取DataFrame

  def createRelation(sqlContext: SQLContext,
                     mode      : SaveMode,
                     parameters: Map[String, String],
                     data      : DataFrame): BaseRelation = {
    val path = ... // get from parameters; in real here is more preparation code, checking save mode etc.
    MyCustomWriter.write(data, path)
    EchoingRelation(data) // small class that just wraps the data frame into a BaseRelation with TableScan
  }

MyCustomWriter我然后做各种各样的事情,最后,我将数据作为副作用保存到mapmapPartitionsforeachPartition调用集群的执行程序,如下所示:

val confBytes = conf.toByteArray // implicit I wrote turning Hadoop Writables to Byte Array, as Configuration isn't serializable

data.
  select(...).
  where(...).
  // much more
  as[Foo].
  mapPartitions { it =>
    val conf = confBytes.toWritable[Configuration] // vice-versa like toByteArray
    val writeResult = customWriteRecords(it, conf) // writes data to the disk using Hadoop FS API
    writeResult.iterator
  }.
  // do more stuff

虽然这种方法运行良好,但我注意到在运行此方法时,OutputSpark 作业 UI 中的列未更新。是否可以以某种方式传播此信息,或者我是否必须将数据包装在Writables 中并使用 HadoopFileOutputFormat方法?

标签: scalaapache-spark

解决方案


我发现了一个 hacky 方法。

在 RDD/DF 操作中,您可以获得OutputMetrics

val metrics = TaskContext.get().taskMetrics().outputMetrics

这具有字段bytesWrittenrecordsWritten. 但是,setter 对于org.apache.spark.executor. 所以,我在包中创建了一个“突破对象”:

package org.apache.spark.executor

object OutputMetricsBreakout {

  def setRecordsWritten(outputMetrics: OutputMetrics,
                        recordsWritten: Long): Unit =
    outputMetrics.setRecordsWritten(recordsWritten)

  def setBytesWritten(outputMetrics: OutputMetrics,
                      bytesWritten: Long): Unit =
    outputMetrics.setBytesWritten(bytesWritten)
}

然后我可以使用:

val myBytesWritten = ... // calculate written bytes

OutputMetricsBreakout.setBytesWritten(metrics, myBytesWritten + metrics.bytesWritten)

这是一种技巧,但我能想出的唯一“简单”方法。


推荐阅读