apache-spark - pyspark 变更数据捕获实现
问题描述
我有一个基表,其中包含实际数据。下面是表结构
ID | 姓名 | 地址 | 年龄 | 日期 |
---|---|---|---|---|
A1 | {“fname”:“Alex”,“lname”:“Bhatt”} | {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} | 56 | 20201128 |
A2 | {“fname”:“鲍勃”,“lname”:“Natarajan”} | {“lane”:“Royd Street”,“flat”:[“22”,“23”,“27”],“pin”:“123514”} | 53 | 20201123 |
A1 | {“fname”:“Alex”,“lname”:“Bhattacharya”} | {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} | 56 | 20201228 |
A2 | {“fname”:“鲍勃”,“lname”:“Natarajan”} | {“lane”:“Royd Street”,“flat”:[“22”,“24”,“27”],“pin”:“123514”} | 53 | 20201228 |
上表中 A1 和 A2 的数据发生了变化。此更改的数据摘要由另一个表捕获并提供。表结构如下所述。
ID | 更改字段 | 日期 |
---|---|---|
A1 | 名称.lname | 20201228 |
A2 | 地址.flat[1] | 20201228 |
从上面的 2 个表中,我必须准备最终表,其中将捕获更改数据的详细信息。下面是预期的表格。
ID | 更改字段 | 新值 | 新日期 | 旧值 | 旧日期 |
---|---|---|---|---|---|
A1 | 名称.lname | 巴塔查里亚 | 20201228 | 巴特 | 20201128 |
A2 | 地址.flat[1] | 24 | 20201228 | 23 | 20201123 |
我曾尝试使用 spark sql 函数 get_json_object() 但它不起作用。任何建议都会非常有帮助
解决方案
我认为您需要创建另一个 json 列才能使用get_json_object
...请参阅下面的答案。
import pyspark.sql.functions as F
result = df1.select(
'id',
'date',
F.to_json(
F.struct(
F.from_json('name', 'fname string, lname string').alias('name'),
F.from_json('address', 'lane string, flat array<string>, pin string').alias('address')
)
).alias('jsoncol')
).join(
df2.withColumnRenamed('date', 'date2'), 'id'
).withColumn(
'new_value',
F.expr("get_json_object(jsoncol, '$.' || changed_field)")
).groupBy('id', 'changed_field').agg(
F.array_sort(
F.collect_list(
F.array('date', 'new_value')
)
).alias('values')
).select(
'id',
'changed_field',
F.col('values')[1][1].alias('new_value'),
F.col('values')[1][0].alias('newdate'),
F.col('values')[0][1].alias('old_value'),
F.col('values')[0][0].alias('olddate')
)
result.show(truncate=False)
+---+---------------+------------+--------+---------+--------+
|id |changed_field |new_value |newdate |old_value|olddate |
+---+---------------+------------+--------+---------+--------+
|A1 |name.lname |Bhattacharya|20201228|Bhatt |20201128|
|A2 |address.flat[1]|24 |20201228|23 |20201123|
+---+---------------+------------+--------+---------+--------+
推荐阅读
- aws-lambda - 调用 AWS Lambda 的代理背后的 WSO2 Api 网关
- c# - C# 在自身内部启动线程
- apache - 带有哈希“#”字符的 apache 2.4 proxypass url
- asp.net-core - 在浏览器窗口中将二进制字符串作为纯文本返回(而不是作为可下载文件)
- python - psycopg2:复制与执行获取
- powershell - 如何获取仅今天日期的多个文件的子项
- python - 根据具有不同行数的另一个数据框列向熊猫数据框添加新列
- javascript - 表格一次又一次地追加
- java - 将空数组字符串转换为数组
- c++ - 如何解决警告“忽略使用'warn_unused_result'属性声明的函数的返回值”