pyspark - 如何在 RDD 上运行过滤器功能并一次性使用不同的数据帧模式创建输出
问题描述
我正在阅读带有火花的专有二进制格式(rosbags),需要进行一些反序列化。完成后,我将获得具有固定数量的不同模式的数据。我想编写输出文件,每个不同的模式都有一个。
通过按类型过滤和反序列化,我设法为每个模式创建了一个数据帧,但是所有原始数据都被重复读取。
下面的示例演示了json.loads()
用于输入和collect()
输出的问题。
import json
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def transform(data):
print("transforming", data)
return json.loads(data[1])
def filter_by_type(data, type_):
print("filtering %s == %s" % (data[0], type_))
return data[0] == type_
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
dd = sc.parallelize([
['type1', '"string1"'],
['type2', '2'],
])
print(spark.createDataFrame(dd.filter(lambda x: filter_by_type(x, "type1")).map(transform), StringType()).collect())
print(spark.createDataFrame(dd.filter(lambda x: filter_by_type(x, "type2")).map(transform), IntegerType()).collect())
输出
filtering type1 == type1
transforming ['type1', '"string1"']
filtering type2 == type1
[Row(value='string1')]
filtering type1 == type2
filtering type2 == type2
transforming ['type2', '2']
[Row(value=2)]
解决方案
推荐阅读
- javascript - 如何发送 Post 请求
- python - 使用 IPython 显示(markdown())后 Jupyter 笔记本输入()不显示
- java - 处理页面排名问题时出错。Mapreduce 错误
- flutter - 颤振中的 App 级 Gradel 问题。安卓工作室
- swift - swiftui错误'在范围内找不到'ContentView''
- matlab - 如何在matlab中对样条图求微分
- td-engine - TDengine 无法通过别名选择订单?
- r - 计算 R 中多列的毒性特征变量并将该信息保存在数据集中
- php - 使用准备好的语句插入 Mysqli 错误
- mdriven - 自定义服务中调用 AsTajson 的派生属性