python - Pyspark 展平 RDD 错误:: 要解压的值太多
问题描述
我正在尝试展平 RDD 中的数据。RDD 被构造为一个 4 元组列表,其中第一个元素 - primary_id ,第二个元素 - 一个字典列表,第三个和第四个元素每个都包含一个包含字典的列表。
rdd= [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
{'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
{'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
[{'pol_cat_id':'234','pol_dt':'20100220'}],
[{'qor_pol_id':'23492','qor_cd':'30'}]),
('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
{'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
{'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
[{'pol_cat_id':'532','pol_dt':'20091020'}],
[{'qor_pol_id':'49320','qor_cd':'21'}]) ]
我想展平数据,使其以格式显示
我将如何在 Pyspark 中做到这一点?
这是我尝试过的,但这给了我一个错误:Too many tuples to unpack
def flatten_map(record):
try:
yield(record)
# Unpack items
id, items, line, pls = record
pol_id = pls["pol_cat_id"]
pol_dt = pls["pol_dt"]
qor_id = pls["qor_pol_id"]
for item in items:
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
except Exception as e:
pass
result = (rdd
# Expand data
.flatMap(flatten_map)
# Flatten tuples
.map(lambda x: x[0], ))
如果需要,我可以发布完整的错误,但为了简洁起见,
ValueError: too many values to unpack (expected 2)
注意:转换为 pandas 不起作用,因为 RDD 太大
解决方案
IIUC,您可以运行 flatMap() 通过使用列表推导来遍历 4 项元组的第二项(1 个字符串 + 3 个列表),例如:
from pyspark.sql import Row
myrdd = sc.parallelize(rdd)
myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ).collect()
#[({'primary_id': 'xxxxx99'},
# {'cov_id': 'Q', 'cov_cd': '100', 'cov_amt': '100', 'cov_state': 'AZ'},
# {'pol_cat_id': '234', 'pol_dt': '20100220'},
# {'qor_pol_id': '23492', 'qor_cd': '30'}),
# ......
简短说明:在 flatMap 函数的列表理解中,除了迭代第二项x[1]
(因为z
它是字典)之外,我还将第一个字符串项x[0]转换为具有一个条目的字典:并取x[{"primary_id":x[0]}
的第一项2]和x[3],它们都是字典。
因此在运行上面的 flatMap 函数后,RDD 元素变成了 4 个字典的元组,你接下来要做的就是将它们合并。下面是我将 4 字典的元组映射到 Row 对象的示例代码,您可能必须更改如何处理异常和缺失字段的逻辑以满足您自己的要求。
cols = ['primary_id', 'cov_id', 'cov_cd', 'cov_amt', 'cov_state', 'pol_cat_id', 'pol_dt', 'qor_pol_id', 'qor_cd']
def merge_dict(arr, cols):
row = {}
try:
for e in arr:
if type(e) is dict: row.update(e)
except:
pass
finally:
return Row(**dict({ c:row.get(c, None) for c in cols })) if row else None
myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ) \
.map(lambda x: merge_dict(x, cols)) \
.filter(bool) \
.toDF() \
.show()
+-------+------+------+---------+----------+--------+----------+------+----------+
|cov_amt|cov_cd|cov_id|cov_state|pol_cat_id| pol_dt|primary_id|qor_cd|qor_pol_id|
+-------+------+------+---------+----------+--------+----------+------+----------+
| 100| 100| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 200| 33| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 10| 64| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 100| 20| R| TX| 532|20091020| xxxxx86| 21| 49320|
| 500| 44| R| TX| 532|20091020| xxxxx86| 21| 49320|
| 50| 66| R| TX| 532|20091020| xxxxx86| 21| 49320|
+-------+------+------+---------+----------+--------+----------+------+----------+
顺便提一句。如果您想让您的原始功能正常工作,请检查以下 5 行,其中包含#<--
:
def flatten_map(record):
try:
#yield(record) #<-- comment this out, no need unprocessed data in output
# Unpack items
id, items, line, pls = record
pol_id = line[0]["pol_cat_id"] #<-- from line[0] not pls
pol_dt = line[0]["pol_dt"] #<-- from line[0] not pls
qor_id = pls[0]["qor_pol_id"] #<-- from pls[0] not pls
for item in items:
#<-- below line removed the ending ", 1", thus no need the last map() function to flatten tuples
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id)
except Exception as e:
pass
推荐阅读
- forms - 将已编译绑定与 Prism 一起使用
- wordpress - 如何在 WordPress 网站后台播放广播?
- angular - 从 CustomForm 组件获取验证器类型
- fortran - 如何修复(“monomds”未解决)素食主义者的错误。使用 metaMDS() 函数的问题
- floating-point - 浮点乘法的错误界限
- java - 如何获取 Web 应用程序的上下文实例。语境?
- python-3.x - if 和 else 语句的语法无效
- makefile - 在 Makefile 中递归添加库文件夹
- c++ - UE4中如何正确使用多个析构函数
- wordpress - 在 Yoast 生成的 JSON-LD 模式中更改作者 @id