python - 如何为 Pyspark createDataFrame(rdd, schema) 定义模式?
问题描述
我将 gziped json 读入 rdd
rdd1 =sc.textFile('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
我想将其转换为火花数据框。链接的 SO 问题中的第一种方法不起作用。这是文件的第一行
{"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000", "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "", "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20", "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "", "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656, "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}
如何推断模式?
所以答案说
schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])
为什么是范围(32)?
解决方案
要回答您的问题,range(32) 仅指示 StrucField 类可应用于所需架构的列数。在您的情况下,有 30 列。根据您的数据,我能够使用以下逻辑创建数据框:
from pyspark.sql.functions import *
from pyspark.sql.types import *
data_json = {"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000",
"date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "",
"etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20",
"odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "",
"odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656,
"topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}
column_names = [x for x in data_json.keys()]
row_data = [([x for x in data_json.values()])]
input = []
for i in column_names:
if str(type(data_json[i])).__contains__('str') :
input.append(StructField(str(i), StringType(), True))
elif str(type(data_json[i])).__contains__('int') and len(str(data_json[i])) <= 8:
input.append(StructField(str(i), IntegerType(), True))
else :
input.append(StructField(str(i), LongType(), True))
schema = StructType(input)
data = spark.createDataFrame(row_data, schema)
data.show()
输出
# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
# |code_event|code_event_system|company_id| date_event| date_event_real|ecode_class|ecode_event|eperiod_event| etl_date|event_no|group_no| name_event| name_event_short|odd_coefficient|odd_coefficient_entry|odd_coefficient_user|odd_ekey|odd_name|odd_status|odd_type|odd_voidfactor|odd_win_types|special_bet_value| ticket_id| id_update|topic_group| kafka_key| kafka_epoch|kafka_partition| kafka_topic|
# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
# | 1092406| LOTTO| 2|2020-05-27 12:00:...|0001-01-01 00:00:...| | 183| |2020-05-27| 1| 0|Ungaria Putto - 8/20|Ungaria Putto - 8/20| 1| 1| 1| 11| 11| | 11| 0| | |899M-E2X93P|8000001036823656| cwg5|899M-E2X93P|1590580609424| 0|tickets-calculated_2|
# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
推荐阅读
- django - django-auditlog 不适用于用户模型
- python - Python 4路视频流程序QThread提速问题
- mysql - 如何从两个不同模式的两个表中找到增量
- python-3.x - Python 3将加密的字节字符串写入文件
- sql-server - 无法使用安全套接字层 (SSL) 建立安全连接的 MsSQL 驱动程序?
- html - 这段代码中的高度和最小高度有什么区别?
- android - 创建回收站视图项目动画
- reactjs - 如何等到 useContext 中的状态完全加载?
- go - 如何从错误本身获取错误代码“os.ErrNotExist”?
- c# - Blazor、blazor js 和其他 js 抛出错误