apache-spark - Spark drop 复制源代码
问题描述
我正在研究 Spark 源代码以了解dropDuplicates
方法是如何工作的。在方法定义中有一个方法Deduplicate
调用。但我找不到它的定义或参考。如果有人能指出我正确的方向,那就太好了。链接在这里。
解决方案
它在火花催化剂中,请参见此处。
由于实现有点混乱,我将添加一些解释。
目前的实现Deduplicate
是:
/** A logical plan for `dropDuplicates`. */
case class Deduplicate(
keys: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
目前尚不清楚这里发生了什么,但如果你看一下Optimizer
课堂,你会看到ReplaceDeduplicateWithAggregate
对象,然后它会变得更加清晰。
/**
* Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
*/
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Deduplicate(keys, child) if !child.isStreaming =>
val keyExprIds = keys.map(_.exprId)
val aggCols = child.output.map { attr =>
if (keyExprIds.contains(attr.exprId)) {
attr
} else {
Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
}
}
// SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
// aggregations by checking the number of grouping keys. The key difference here is that a
// global aggregation always returns at least one row even if there are no input rows. Here
// we append a literal when the grouping key list is empty so that the result aggregate
// operator is properly treated as a grouping aggregation.
val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
Aggregate(nonemptyKeys, aggCols, child)
}
}
底线,用于df
带列col1, col2, col3, col4
df.dropDuplicates("col1", "col2")
或多或少
df.groupBy("col1", "col2").agg(first("col3"), first("col4"))
推荐阅读
- scala - Apache Kafka - 如何等待订阅完成
- reactjs - ReactReduxContextValue 尝试访问调度类型错误
- reactjs - 在 react-native (fetch, axios 等) 中进行一个 API 调用的最佳方法?
- android - 如何在 Kotlin 中接收和解析 HTTP Json 文件
- python - 使用 matplotlib 在日志图中设置 yaxis 标签
- angular - 拖放表中的行而不交换
- angular - 某些打字稿库中使用的“@”前缀是什么意思
- kubernetes-helm - “helm list --all”的输出为空
- excel - 返回带有标题的已过滤 Excel 表格
- android-service - Android服务需要定期上网