scala - Scala spark:从数据集映射操作创建数据集列表
问题描述
假设我想在转换另一个数据集后创建 2 种类型的 metric:metricA 或 metricB。如果满足某个条件,它将同时生成 metricA 和 B,如果不满足条件,则仅生成 metric A。想法是将 2 个 metric 写入 2 个不同的路径(pathA,pathB)。
我采用的方法是创建一个 GeneralMetric 的数据集,然后根据里面的内容,写入不同的路径,但显然它不起作用,因为数据集中的模式匹配不起作用
val s: SparkSession = SparkSession
.builder()
.appName("Metric")
.getOrCreate()
import s.implicits._
case class original (id : Int, units: List[Double])
case class MetricA (a: Int, b: Int, filtered_unit: List[Double])
case class MetricB (a: Int, filtered_unit: List[Double])
case class GeneralMetric(metricA: MetricA, metricB: Option[MetricB])
def createA: MetricA = {
MetricA(1, 1, List(1.0, 2.0)
}
def createB: MetricB = {
MetricB(1, List(10.0, 20.0)
}
def create (isBoth: Boolean): GeneralMetric = {
if(isBoth) {
val a: MetricA = createA()
val b: MetricB = createB()
GeneralMetric(a, Some(b))
}
else {
val a: MetricA = createA()
GeneralMetric(a, None)
}
}
val originalDF: DataFrame
val result : Dataset[GeneralMetric] =
originalDF.as[original]
.map { r =>
if(r.id == 21) create(true)
else create(false)
}
val pathA: String = "s3://pathA"
val pathB: String = "s3://pathB"
//below code obviously wouldn't work
result.map(x => {
case (metricA, Some(metricB)) => {
metricA.write.parquet(pathA)
metricB.write.parquet(pathB)
}
case (metricA, None) => metricA.write.parquet(pathA)
})
我想到的下一个方法是将结果放在 List[GeneralMetric] 中,其中 GeneralMetric 是一个密封的轨迹,由 MetricA 和 MetricB 扩展,但我怎样才能使数据集转换返回一个 GeneralMetric 列表。
任何想法都会有所帮助
解决方案
为什么不
result.map({
case (metricA, Some(metricB)) =>
metricA.write.parquet(pathA)
metricB.write.parquet(pathB)
case (metricA, None) => metricA.write.parquet(pathA)
})
在你的情况下工作?这只是语法问题吗?
另外:您似乎是独立发送指标(或至少在此示例中)。您可以将其建模为:
sealed trait Metric {
def write
}
case class MetricA (a: Int, b: Int, filtered_unit: List[Double]) extends Metric {
override def write: Unit = ???
}
case class MetricB (a: Int, filtered_unit: List[Double]) extends Metric {
override def write: Unit = ???
}
并打电话
implicit val enc: Encoder[Metric] = Encoders.kryo[Metric]
val result: Dataset[Metric] =
originalDF.as[original]
.flatMap { r =>
if (r.id == 21) createA :: createB :: Nil
else createA :: Nil
}
result.foreach(metric.write.parquet())
推荐阅读
- python - python tsne.transform 不存在?
- python - 找到所有的块
- c# - C# 媒体播放器(VLC 或 WMP)不播放
- python-2.7 - 没有Python预建函数的Netwon的方法:梯度和Hessian的计算
- python - 为什么使用 python 移动文件比直接在终端中移动文件要慢?
- html - 有没有办法使用引导程序将行中的 div 元素对齐到特定列中?
- python - Python Flask:TypeError:视图函数没有返回有效响应
- ffmpeg - FFMPEG 输入区域失败
- mysql - 两列记录在mysql中显示为null
- excel - Excel VBA 数据验证下拉菜单,当前突出显示的项目