scala - 如何将 Spark 数据框列与另一个数据框列值进行比较
问题描述
我有两个数据框如下:
df1: Which will have few values as below. This is dynamic.
+--------------+
|tags |
+--------------+
|first_name |
|last_name |
|primary_email |
|other_email |
+--------------+
df2:第二个数据帧有几个预定义的组合,如下所示:
+---------------------------------------------------------------------------------------------+
|combinations |
+---------------------------------------------------------------------------------------------+
|last_name, first_name, primary_email |
|last_name, first_name, other_email |
|last_name, primary_email, primary_phone |
|last_name, primary_email, secondary_phone |
|last_name, address_line1, address_line2,city_name, state_name,postal_code, country_code, guid|
+---------------------------------------------------------------------------------------------+
预期结果 DF:现在,我想从我的数据框中找到我可以做出的任何有效组合。如果结果与数据框匹配,则结果应具有所有有效组合combinations
。
resultDF:
+---------------------------------------------------------------------------------------------+
|combinations |
+---------------------------------------------------------------------------------------------+
|last_name, first_name, primary_email |
|last_name, first_name, other_email |
+---------------------------------------------------------------------------------------------+
我尝试了一种将两个数据帧都转换为列表并尝试比较它的方法,但我总是得到 0 个组合。
我试过的scala代码。
val combinationList = combinations.map(r => r.getString(0)).collect.toList
var combList: Seq[Seq[String]] = Seq.empty
for (comb <- combinationList) {
var tmp: Seq[String] = Seq.empty
tmp = tmp :+ comb
combList = combList :+ tmp
}
val result = combList.filter(
list => df1.filter(df1.col("tags").isin(list: _*)).count == list.size
)
println(result.size)
这总是返回 0。答案应该是 2。
有人可以指导我什么是最好的方法吗?
解决方案
试试这个。收集你的 df1,用 df1 的值在 df2 中创建一个新的数组列。array_except
使用if 使用 Spark 2.4比较两个数组,它返回两个数组的差异。然后过滤如果那个== 0的大小
scala> val df1 = Seq(
| "first_name",
| "last_name",
| "primary_email",
| "other_email"
| ).toDF("tags")
df1: org.apache.spark.sql.DataFrame = [tags: string]
scala>
scala> val df2 = Seq(
| Seq("last_name", "first_name", "primary_email"),
| Seq("last_name", "first_name", "other_email"),
| Seq("last_name", "primary_email", "primary_phone"),
| Seq("last_name", "primary_email", "secondary_phone"),
| Seq("last_name", "address_line1", "address_line2", "city_name", "state_name", "postal_code", "country_code", "guid")
| ).toDF("combinations")
df2: org.apache.spark.sql.DataFrame = [combinations: array<string>]
scala>
scala> df1.show(false)
+-------------+
|tags |
+-------------+
|first_name |
|last_name |
|primary_email|
|other_email |
+-------------+
scala>
scala> df2.show(false)
+-------------------------------------------------------------------------------------------------+
|combinations |
+-------------------------------------------------------------------------------------------------+
|[last_name, first_name, primary_email] |
|[last_name, first_name, other_email] |
|[last_name, primary_email, primary_phone] |
|[last_name, primary_email, secondary_phone] |
|[last_name, address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|
+-------------------------------------------------------------------------------------------------+
scala>
scala> val df1tags = df1.collect.map(r => r.getString(0))
df1tags: Array[String] = Array(first_name, last_name, primary_email, other_email)
scala>
scala> val df3 = df2.withColumn("tags", lit(df1tags))
df3: org.apache.spark.sql.DataFrame = [combinations: array<string>, tags: array<string>]
scala> df3.show(false)
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+
|combinations |tags |
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+
|[last_name, first_name, primary_email] |[first_name, last_name, primary_email, other_email]|
|[last_name, first_name, other_email] |[first_name, last_name, primary_email, other_email]|
|[last_name, primary_email, primary_phone] |[first_name, last_name, primary_email, other_email]|
|[last_name, primary_email, secondary_phone] |[first_name, last_name, primary_email, other_email]|
|[last_name, address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|[first_name, last_name, primary_email, other_email]|
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+
scala>
scala> val df4 = df3.withColumn("combMinusTags", array_except($"combinations", $"tags"))
df4: org.apache.spark.sql.DataFrame = [combinations: array<string>, tags: array<string> ... 1 more field]
scala> df4.show(false)
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
|combinations |tags |combMinusTags |
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
|[last_name, first_name, primary_email] |[first_name, last_name, primary_email, other_email]|[] |
|[last_name, first_name, other_email] |[first_name, last_name, primary_email, other_email]|[] |
|[last_name, primary_email, primary_phone] |[first_name, last_name, primary_email, other_email]|[primary_phone] |
|[last_name, primary_email, secondary_phone] |[first_name, last_name, primary_email, other_email]|[secondary_phone] |
|[last_name, address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|[first_name, last_name, primary_email, other_email]|[address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
scala>
scala>
scala> df4.filter(size($"combMinusTags") === 0).show(false)
+--------------------------------------+---------------------------------------------------+-------------+
|combinations |tags |combMinusTags|
+--------------------------------------+---------------------------------------------------+-------------+
|[last_name, first_name, primary_email]|[first_name, last_name, primary_email, other_email]|[] |
|[last_name, first_name, other_email] |[first_name, last_name, primary_email, other_email]|[] |
+--------------------------------------+---------------------------------------------------+-------------+
火花 2.3
将您自己的 array_except 函数编写为 udf。
scala> def array_expt[T](a: Seq[T], b:Seq[T]):Seq[T] = {
| a.diff(b)
| }
array_expt: [T](a: Seq[T], b: Seq[T])Seq[T]
scala>
scala> val myUdf = udf { array_expt[String] _ }
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StringType,true),Some(List(ArrayType(StringType,true), ArrayType(StringType,true))))
scala>
scala> val df4 = df3.withColumn("combMinusTags", myUdf($"combinations", $"tags"))
df4: org.apache.spark.sql.DataFrame = [combinations: array<string>, tags: array<string> ... 1 more field]
scala> df4.show(false)
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
|combinations |tags |combMinusTags |
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
|[last_name, first_name, primary_email] |[first_name, last_name, primary_email, other_email]|[] |
|[last_name, first_name, other_email] |[first_name, last_name, primary_email, other_email]|[] |
|[last_name, primary_email, primary_phone] |[first_name, last_name, primary_email, other_email]|[primary_phone] |
|[last_name, primary_email, secondary_phone] |[first_name, last_name, primary_email, other_email]|[secondary_phone] |
|[last_name, address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|[first_name, last_name, primary_email, other_email]|[address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
scala>
scala> df4.filter(size($"combMinusTags") === 0).show(false)
+--------------------------------------+---------------------------------------------------+-------------+
|combinations |tags |combMinusTags|
+--------------------------------------+---------------------------------------------------+-------------+
|[last_name, first_name, primary_email]|[first_name, last_name, primary_email, other_email]|[] |
|[last_name, first_name, other_email] |[first_name, last_name, primary_email, other_email]|[] |
+--------------------------------------+---------------------------------------------------+-------------+
推荐阅读
- javascript - 在 Angular 中使用 iframe 渲染 YouTube 视频
- java - 如何使用json中提供的多个联系人数据在android iin java中添加新的电话联系人
- keras - 在 Keras 中,LSTM 对决策进行采样(通过 softmax 分类器)
- matlab - 在矩阵上使用 lsqcurvefit
- ssas-tabular - 如何刷新ssas表格多维数据集中现有表的元数据
- python - python df代码在循环外工作,但不在循环内
- python - 找不到页面(404)jquery django python
- java - Javadocx:问题 HTML 替换
- java - 不使用@JsonCreator 注解反序列化
- android - 来自 API 内容的 Flutter 多选芯片