pyspark - 如何从另一个 df 值中补充一个 df
问题描述
我有 2 个数据帧,所以一个 df 具有具有良好格式的唯一值,而另一个 df 具有错误值的值,那么我如何才能用相对于另一个数据帧的错误值来补充 df?
示例:具有正确和唯一值的 df
+----------------------------------------+--------------+
|company_id |company_name |
+----------------------------------------+--------------+
|8f642dc67fccf861548dfe1c761ce22f795e91f0|Muebles |
|cbf1c8b09cd5b549416d49d220a40cbd317f952e|MiPasajefy |
+----------------------------------------+--------------+
具有错误值的示例 df:
+----------------------------------------+------------+
|company_id |company_name|
+----------------------------------------+------------+
|******* |MiPasajefy |
|cbf1c8b09cd5b549416d49d220a40cbd317f952e|NaN |
|NaN |MiPasajefy |
+----------------------------------------+------------+
列:company_id 和 company_name 是关键列,因此具有更正值的错误 df 必须是:
+----------------------------------------+------------+
|company_id |company_name|
+----------------------------------------+------------+
|cbf1c8b09cd5b549416d49d220a40cbd317f952e|MiPasajefy |
|cbf1c8b09cd5b549416d49d220a40cbd317f952e|MiPasajefy |
|cbf1c8b09cd5b549416d49d220a40cbd317f952e|MiPasajefy |
+----------------------------------------+------------+
解决方案
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
table_schema = StructType([StructField('key1', StringType(), True),
StructField('key2', IntegerType(), True),
StructField('list1', ArrayType(StringType()), False),
StructField('list2', ArrayType(StringType()), False),
StructField('list3', ArrayType(IntegerType()), False),
StructField('list4', StringType(), False),
StructField('list5', ArrayType(FloatType()), False),
StructField('list6', ArrayType(StringType()), False)
])
df= spark.createDataFrame(
[
("8f642dc67fccf861548dfe1c761ce22f795e91f0","Muebles"),
("cbf1c8b09cd5b549416d49d220a40cbd317f952e","MiPasajefy")
],("company_id","company_name")
)
df2= spark.createDataFrame(
[
( "*****" ,"MiPasajefy" ),
("cbf1c8b09cd5b549416d49d220a40cbd317f952e","NaN" ),
("NaN","MiPasajefy")
],("company_id","company_name")
)
df.createOrReplaceTempView("A")
df2.createOrReplaceTempView("B")
spark.sql("select a.Company_name,a.company_id from B b left join A a on (a.company_id=b.company_id or a.Company_name=b.Company_name )").show(truncate=False)
+------------+----------------------------------------+
|Company_name|company_id |
+------------+----------------------------------------+
|MiPasajefy |cbf1c8b09cd5b549416d49d220a40cbd317f952e|
|MiPasajefy |cbf1c8b09cd5b549416d49d220a40cbd317f952e|
|MiPasajefy |cbf1c8b09cd5b549416d49d220a40cbd317f952e|
+------------+----------------------------------------+
推荐阅读
- dart - 你如何在 Dart 中打印警告
- java - 外部化 spring 配置和日志
- laravel-8 - 将 laravel 8 项目推送到 GitHub 仓库后,点击登录或注册,提示未找到
- python - 如何在熊猫数据框中按顺序获取最高、次高等数字的索引和列?
- java - 不在子查询中更改为 HQL 中的左外连接
- amazon-s3 - 将 Pyspark Dataframe 作为 Parquet 写入 Databricks 上的 S3 只是挂在中间
- c# - 创建一个允许 button.Enable 更改的方法,除非所有字段都是 NotEmptyOrNull Windows Application Form C#
- javascript - reactJs中的Javascirpt数组排序问题
- jquery - 根据选择的单选按钮设置列表项颜色 jquery
- json - 比较和映射 JOLT 中两个对象数组之间的值