首页 > 解决方案 > 在 Spark 数据框中过滤 json 数组数据

问题描述

我有我正在转换为 JSON 格式的 spark 数据框:

json = df.toJSON().collect()
print(json)

['{"lot_number":"4f19-9deb-0ef861c1a6a1","recipients":[{"account":"45678765457876545678","code":"user1","status":"pending"},{"account":"12354567897545678","code":"error2","status":"pending"}]}', 

'{"lot_number":"09ad-451e-8fb1-50bc185ef02f","recipients":[{"account":"4567654567876545678","code":"user3","status":"pending"},{"account":"12354567876545678","code":"user2","status":"pending"}]}']

我需要从数组中过滤数据,即代码为“user1”的所有收件人。

我期待这个结果:

['{"lot_number":"4f19-9deb-0ef861c1a6a1","recipients":[{"account":"45678765457876545678","code":"user1","status":"pending"}' 
]

任何人都可以帮助过滤如上所示的数据吗?

标签: arrayspyspark

解决方案


首先,您需要将stringin转换listdict对象。

import json

json_rdd = df.toJSON().collect()
json_ls = [json.loads(x) for x in json_rdd]
# Now you can filter using "user1"
final_json_ls = [x for x in json_ls if x.get("recipients")[0].get("code") == "user1"]

如果您有多个收件人:

new_list = list()
for lot in json_ls:
    recs = lot.get('recipients')
    lot_recipients = [rec for rec in recs if rec.get("code") == "user1"]
    if lot_recipients:
        new_list.append({"lot_number": lot.get('lot_number'),
                         "recipients": lot_recipients})

# OUTPUT
# [{'lot_number': u'4f19-9deb-0ef861c1a6a1', 'recipients': [{u'status': u'pending', u'account': u'45678765457876545678', u'code': u'user1'}]}]

并且由于您想将其转换回 json 以发送 POST 请求:

for ls in new_list:
    lot = ls.get("lot_number")
    url = "test.com/api/v1/notify/request/"+ batch
    response = requests.put(url, data=item, headers=headers) 
    print(response.text)

推荐阅读