apache-spark - 使用数据框列值作为输入来选择表达式
问题描述
我有一系列用于将原始 JSON 数据映射到规范化列数据的表达式。我正在尝试想一种方法来有效地将其应用于每一行,因为要考虑多种模式。
现在,我有一个巨大的 CASE 语句(动态构建),它被解释为 SQL,如下所示:
SELECT
CASE
WHEN schema = 'A' THEN CONCAT(get_json_object(payload, '$.FirstName'), ' ', get_json_object(payload, '$.LastName'))
WHEN schema = 'B' THEN get_json_object(payload, '$.Name')
END as name,
CASE
WHEN schema = 'A' THEN get_json_object(payload, '$.Telephone')
WHEN schema = 'B' THEN get_json_object(payload, '$.PhoneNumber')
END as phone_number
这行得通,我只是担心模式和列数量增加时的性能。我想看看是否有另一种方法,这是我的想法。
我有一个expressions_df
有效 SparkSQL 表达式的 DataFrame。
图式 | 柱子 | 列表达式 |
---|---|---|
一个 | 姓名 | CONCAT(get_json_object(payload, '$.FirstName'), '', get_json_object(payload, '$.LastName')) |
一个 | 电话号码 | get_json_object(有效负载,'$.Telephone') |
乙 | 姓名 | get_json_object(有效负载,'$.Name') |
乙 | 电话号码 | get_json_object(有效负载,'$.PhoneNumber') |
此 DataFrame 用作针对 DataFrame 的各种查找表raw_df
:
图式 | 有效载荷 |
---|---|
一个 | {“名”:“约翰”,“姓”:“Doe”,“电话”:“123-456-7890”} |
乙 | {“姓名”:“Jane Doe”,“电话号码”:“123-567-1234”} |
我想做这样的事情,其中column_expression
传递F.expr
并用于解释 SQL 并返回适当的值。
from pyspark.sql import functions as F
(
raw_df
.join(expressions_df, 'schema')
.select(
F.expr(column_expression)
)
.dropDuplicates()
)
所需的最终结果将是这样的,因此无论原始模式是什么,都使用 SQL 或expressions_df
.
| name | phone_number |
| -------- | ------------ |
| John Doe | 123-456-7890 |
| Jane Doe | 123-567-1234 |
解决方案
您不能直接将 DataFrame 列值用作带有expr
函数的表达式。您必须将所有表达式收集到一个 python 对象中,以便能够将它们作为参数传递给expr
.
这是一种方法,将表达式收集到一个字典中,然后对于每个模式,我们应用不同的选择表达式。最后,合并所有数据帧以获得所需的输出:
from collections import defaultdict
from functools import reduce
import pyspark.sql.functions as F
exprs = defaultdict(list)
for r in expressions_df.collect():
exprs[r.schema].append(F.expr(r.column_expression).alias(r.column))
schemas = [r.schema for r in raw_df.select("schema").distinct().collect()]
final_df = reduce(DataFrame.union, [raw_df.filter(f"schema='{s}'").select(*exprs[s]) for s in schemas])
final_df.show()
#+--------+------------+
#| name|phone_number|
#+--------+------------+
#|Jane Doe|123-567-1234|
#|John Doe|123-456-7890|
#+--------+------------+
推荐阅读
- python - 如何使用 findall 将文件拆分为多个文件
- c++ - 将项目从 Debian 8 Xenomai 2.x 迁移到 Debain 9 Xenomai 3.x
- symfony - 我在录制 Php bin/console make:entity 时出错
- notepad++ - Notepad ++正则表达式:在最后一次出现单词'$apple'之后插入带有文本'found'的新行
- r - packrat 无法检索 Bioconductor 包裹
- csv - 使用 apache-nifi 从 csv 中的 json 对象获取数据
- javascript - if-else change bg color depending what is the letter in the div
- python - 如何解决“名称'imdb'未定义”错误
- xamarin - 使用设备锁的应用程序锁
- python-3.x - 如何在多元时间序列数据中使用特征缩放?