scala - 如何在while中使用withColumn Spark Dataframe scala
问题描述
这是我的函数应用规则,col mdp_codcat,mdp_idregl, usedRef 根据数组 bRef 中的数据变化。
def withMdpCodcat(bRef: Broadcast[Array[RefRglSDC]])(dataFrame: DataFrame):DataFrame ={var matchRule = false
var i = 0
while (i < bRef.value.size && !matchRule) {
if ((bRef.value(i).sensop.isEmpty || bRef.value(i).sensop.equals(col("signe")))
&& (bRef.value(i).cdopcz.isEmpty || Lib.matchCdopcz(strTail(col("cdopcz")).toString(), bRef.value(i).cdopcz))
&& (bRef.value(i).libope.isEmpty || Lib.matchRule(col("lib_ope").toString(), bRef.value(i).libope))
&& (bRef.value(i).qualib.isEmpty || Lib.matchRule(col("qualif_lib_ope").toString(), bRef.value(i).qualib))) {
matchRule = true
dataFrame.withColumn("mdp_codcat", lit(bRef.value(i).codcat))
dataFrame.withColumn("mdp_idregl", lit(bRef.value(i).idregl))
dataFrame.withColumn("usedRef", lit("SDC"))
}else{
dataFrame.withColumn("mdp_codcat", lit("NOT_CATEGORIZED"))
dataFrame.withColumn("mdp_idregl", lit("-1"))
dataFrame.withColumn("usedRef", lit(""))
}
i += 1
}
dataFrame
}
dataFrame : "cdenjp", "cdguic", "numcpt", "mdp_codcat", "mdp_idregl" , mdp_codcat","mdp_idregl","usedRef" if match add mdp_idregl, mdp_idregl,mdp_idregl with value bRef
示例 - 我的数据框:
val DF = Seq(("tt", "aa","bb"),("tt1", "aa1","bb2"),("tt1", "aa1","bb2")).toDF("t","a","b)
+---+---+---+---+
| t| a| b| c|
+---+---+---+---+
| tt| aa| bb| cc|
|tt1|aa1|bb2|cc3|
+---+---+---+---+
文件.文本内容:
,aa,bb,cc
,aa1,bb2,cc3
tt4,aa4,bb4,cc4
tt1,aa1,,cc6
case class TOTO(a: String, b:String, c: String, d:String)
val text = sc.textFile("file:///home/X176616/file")
val bRef= textFromCsv.map(row => row.split(",", -1))
.map(c => TOTO(c(0), c(1), c(2), c(3))).collect().sortBy(_.a)
def withMdpCodcat(bRef: Broadcast[Array[RefRglSDC]])(dataFrame: DataFrame):DataFrame
dataframe.withColumn("mdp_codcat_new", "NOT_FOUND") //first init not found, change if while if match
var matchRule = false
var i = 0
while (i < bRef.value.size && !matchRule) {
if ((bRef.value(i).a.isEmpty || bRef.value(i).a.equals(signe))
&& (bRef.value(i).b.isEmpty || Lib.matchCdopcz(col(b), bRef.value(i).b))
&& (bRef.value(i).c.isEmpty || Lib.matchRule(col(c), bRef.value(i).c))
)) {
matchRule = true
dataframe.withColumn("mdp_codcat_new", bRef.value(i).d)
dataframe.withColumn("mdp_mdp_idregl_new" = bRef.value(i).e
}
i += 1
}
最后 df 如果条件为真
bRef.value(i).a.isEmpty || bRef.value(i).a.equals(signe))
&& (bRef.value(i).b.isEmpty || Lib.matchCdopcz(b.substring(1).toInt.toString, bRef.value(i).b))
&& (bRef.value(i).c.isEmpty || Lib.matchRule(c, bRef.value(i).c)
+---+---+---+---+-----------+----------+
| t| a| b| c|mdp_codcat |mdp_idregl|
+---+---+---+---+-----------|----------+
| tt| aa| bb| cc|cc | other |
| ab|aa1|bb2|cc3|cc4 | toto | from bRef if true in while
| cd|aa1|bb2|cc3|cc4 | titi |
| b|a1 |b2 |c3 |NO_FOUND |NO_FOUND | (not_found if conditional false)
+---+---+---+---+----------------------+
+---+---+---+---+----------------------+
解决方案
您不能根据运行时值创建数据框架构。我会尝试做的更简单。首先,我将使用默认值创建三列:
dataFrame.withColumn("mdp_codcat", lit(""))
dataFrame.withColumn("mdp_idregl", lit(""))
dataFrame.withColumn("usedRef", lit(""))
然后您可以使用带有广播值的 udf:
def mdp_codcat(bRef: Broadcast[Array[RefRglSDC]]) = udf { (field: String) =>
{
// Your while and if stuff
// return your update data
}}
并将每个 udf 应用于每个字段:
dataframe.withColumn("mdp_codcat_new", mdp_codcat(bRef)("mdp_codcat"))
也许它可以帮助
推荐阅读
- firebase - 网络上的 Firebase 身份验证内部错误“注册被阻止”
- javascript - 谷歌图 - 在 x 和 y 标签上拉伸图
- mysql - 用于简单多选的 mySQL 锁定
- terraform - 如何在不破坏和重新创建的情况下将资源移动到新的 tf 文件和状态?
- c - #define 具有多个值 - C
- macos - ESP-IDF 没有规则来制作目标`/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/lib/clang/9.0.0/include/limits.h'
- scala - 使用 Spark Structured Streaming 时限制 kafka 批量大小
- c# - 将 dataGridViewRows 列表添加到 DataGridView
- linux - “意外标记 `)' 附近的语法错误”?
- powershell - 如何在 Start-Job 中通过引用传递变量?