pyspark - Pyspark RDD 消除值中的无?
问题描述
我是 pyspark RDD 的新手,并且有一个从 JSON 文件中获取的数据框:
行(created_at='2021-05-05 23:37:51', hash_tags=None, id=1390088382659895296, replyto_id=None, replyto_user_id=None, retweet_id=1390027514332991489, retweet_user_id=807095, text='RT @times: Breaking Newsny : 拜登政府将支持取消对 Covid-19 疫苗的专利保护,这是 glob 的一个突破……', user_id=17799542, user_mentions=[Row(id=807095, indices=[3, 11])])
这是我的所有代码:
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
data_rdd = spark.read.option("multiline","true")\
.json("tweets.json")
print(data_rdd.collect()[0])
def extractColumns(record):
return (record[8],[record[4], record[6]])
ddata_frame = data_rdd.rdd.map(extractColumns)\
.groupByKey()\
.map(lambda r: (r[0], list(r[1])))
我以如下形式获得了RDD数据:[(17799542, [[None, 807095]]),
...
(3094649957,[[无,3094649957],[无,无],[无,3094649957],[无,无],[无,3094649957],[无,无]])]
如何消除 None 以实现以下值:[(17799542,[807095]),
...
(3094649957, [3094649957, 3094649957, 3094649957])]
我在下面尝试过但没有工作:
def eliminateNone(record):
s = list(filter(lambda s: each != None for each in s))
return (record[0], s)
data_frame.mapValues(eliminateNone)
print(data_frame.collect())
我很感激任何帮助。
解决方案
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
data_rdd = spark.read.option("multiline","true")\
.json("tweets.json")
print(data_rdd.collect()[0])
def extractColumns(record):
return (record[8],[record[3], record[5]])
def merge_values(data):
result = []
for l in data:
for x in l:
if x != None:
result.append(x)
return result
data_frame = data_rdd.rdd.map(extractColumns)\
.groupByKey()\
.map(lambda r: (r[0], list(r[1])))
data_frame = data_frame.mapValues(merge_values)
print(data_frame.collect())
你可以试试这个。
推荐阅读
- matlab - MATLAB:使用逻辑索引将矩阵中的值替换为其值除以 2?
- php - 在 Woocommerce 3 中检查最近未完成订单的状态
- c# - 时间:2019-04-10 标签:c#sort HTTP request headers
- c++ - 使用睡眠控制循环时间
- python - 使用 keras.utils.Sequence 时,keras predict_generator 正在改组其输出
- python - MLP 无法学习线性方程组
- amazon-dynamodb - 在 DynamoDB 中存储不同分辨率的时间序列数据
- r - 使用 blogdown 渲染单个 Rmd 文件的最佳方法
- python - Django 2.0 访问模型 (CREATE/REMOVE/FILTER) 独立 [无 manage.py shell]
- sql-server - 如何在不使用 cmdshel 的情况下将 SQL 查询的结果存储到特定文件夹中的 XML 文件