首页 > 解决方案 > Pyspark:json对象中的rdd

问题描述

这是家庭作业

hw2-files-10mb.txt 是 json 数据

rdd = sc.textFile('./hw2-files-10mb.txt')
rdd = rdd.map(json.loads)

Output of rdd.take(1):

[{'created_at': 'Tue Feb 23 17:42:31 +0000 2016',
  'user': {'id': 470520068,
    'id_str': '470520068',
    'name': 'Marni Halasa',
    'screen_name': 'MarniHalasa1',
    'location': 'NYC',
....]

问题:

1)如何以rdd方式在'user'中选择'id_str'?

我尝试了将 rdd 映射到返回 field_list = ['user.id_str'] 的函数的方法,但它不起作用。

field_list = ['user.id_str', 'text']
def f(x):
d = {}
for k in x:
    if k in field_list:
        d[k] = x[k]
return d

rdd1 = rdd.map(f)

Output:
[{'text': "I'm voting 4 #BernieSanders bc he doesn't ride a CAPITALIST PIG adorned w/ #GoldmanSachs $. SYSTEM RIGGED CLASS WAR"}]

2) 如何删除不存在“created_at”字段的推文?

我下面的代码不起作用。当我使用 rdd.count() 时它返回错误

rdd = rdd.filter(lambda row: row['created_at'] is not None)

我的预期输出:

1)删除所有损坏的推文('created_at'字段为空)

2) (user_id, text) 的一对 RDD,其中 user_id 是用户字典的 'id_str' 数据字段。

谢谢。

标签: pythonpyspark

解决方案


先解决2)点,如提取'user.id_str'和后'text',就没有created_at剩下要过滤的字段了。

2)只需检查是否'created_at'存在。此外,您可以一次完成所有操作。

rdd = sc.textFile('./hw2-files-10mb.txt')\
            .map(json.loads)\
            .filter(lambda row: 'created_at' in row.keys())

1)您不需要函数来从 json 对象中提取值。

rdd = rdd.map(lambda row:(row['user']['id_str'], 'text'))
print rdd.take(1)

推荐阅读