python - 使用 Python Spark 数据框读取多行 json 字符串
问题描述
我正在使用 databricks 笔记本中的 pyspark 代码将 api 的内容读入数据框中。我验证了 json 有效负载并且字符串是有效的 json 格式。我猜这个错误是由于多行 json 字符串造成的。下面的代码与其他 json api 有效负载一起工作得很好。
火花版本 < 2.2
import requests
user = "usr"
password = "aBc!23"
response = requests.get('https://myapi.com/allcolor', auth=(user, password))
jsondata = response.json()
from pyspark.sql import *
df = spark.read.json(sc.parallelize([jsondata]))
df.show()
JSON有效载荷:
{
"colors": [
{
"color": "black",
"category": "hue",
"type": "primary",
"code": {
"rgba": [
255,
255,
255,
1
],
"hex": "#000"
}
},
{
"color": "white",
"category": "value",
"code": {
"rgba": [
0,
0,
0,
1
],
"hex": "#FFF"
}
},
{
"color": "red",
"category": "hue",
"type": "primary",
"code": {
"rgba": [
255,
0,
0,
1
],
"hex": "#FF0"
}
},
{
"color": "blue",
"category": "hue",
"type": "primary",
"code": {
"rgba": [
0,
0,
255,
1
],
"hex": "#00F"
}
},
{
"color": "yellow",
"category": "hue",
"type": "primary",
"code": {
"rgba": [
255,
255,
0,
1
],
"hex": "#FF0"
}
},
{
"color": "green",
"category": "hue",
"type": "secondary",
"code": {
"rgba": [
0,
255,
0,
1
],
"hex": "#0F0"
}
}
]
}
错误:
pyspark.sql.dataframe.DataFrame = [_corrupt_record: string]
修改后的代码:
spark.sql("set spart.databricks.delta.preview.enabled=true")
spark.sql("set spart.databricks.delta.retentionDutationCheck.preview.enabled=false")
import json
import requests
from requests.auth import HTTPDigestAuth
import pandas as pd
user = "username"
password = "password"
myResponse = requests.get('https://myapi.com/allcolor', auth=(user, password))
if(myResponse.ok):
jData = json.loads(myResponse.content)
s1 = json.dumps(jData)
#load data from api
x = json.loads(s1)
data = pd.read_json(json.dumps(x))
#create dataframe
spark_df = spark.createDataFrame(data)
spark_df.show()
spark.conf.set("fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net","<your-storage-account-access-key>")
spark_df.write.mode("overwrite").json("wasbs://<container>@<storage-account-name>.blob.core.windows.net/<directory>/")
else:
myResponse.raise_for_status()
输出格式不正确作为源。
修改后的输出:(与源不同)
{
"colors":
{
"color": "black",
"category": "hue",
"type": "primary",
"code": {
"rgba": [
255,
255,
255,
1
],
"hex": "#000"
}
}
}
{
"colors":
{
"color": "white",
"category": "value",
"code": {
"rgba": [
0,
0,
0,
1
],
"hex": "#FFF"
}
}
}
您能否指出我哪里出错了,因为我存储在 ADLS Gen2 中的输出文件与源 api json 有效负载不匹配。
解决方案
在调用之前删除新行spark.read.json
:
df = spark.read.json(sc.parallelize([jsondata.replace('\n','')]))
df.show(truncate=False)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|colors |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[hue, [#000, [255, 255, 255, 1]], black, primary], [value, [#FFF, [0, 0, 0, 1]], white,], [hue, [#FF0, [255, 0, 0, 1]], red, primary], [hue, [#00F, [0, 0, 255, 1]], blue, primary], [hue, [#FF0, [255, 255, 0, 1]], yellow, primary], [hue, [#0F0, [0, 255, 0, 1]], green, secondary]]|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
df.printSchema()
root
|-- colors: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- category: string (nullable = true)
| | |-- code: struct (nullable = true)
| | | |-- hex: string (nullable = true)
| | | |-- rgba: array (nullable = true)
| | | | |-- element: long (containsNull = true)
| | |-- color: string (nullable = true)
| | |-- type: string (nullable = true)
推荐阅读
- c++ - 如何阅读 C/C++ 中的长声明代码
- ruby - 如果我的“delete_at”不适用于 20(或 20)以上的数字,我该如何解决这个问题?
- symfony - 用户对我在 EasyAdmin3 和 symfony5 上的成就的问题
- python - 烧瓶中下载链接中的哈希整数
- java - 嵌入式 Tomcat 中数据源的 JNDI 查找失败
- django - 忽略所有当前迁移并从当前模型状态开始而不删除迁移文件/历史
- python - 如何获取 SQLAlchemy 和 postgresql 数据库中的最后一次出现?
- java - 如何在 Spring Boot Service 中同时运行多个长任务方法?
- python - 如何使用 scipy.optimize.minimize 从数组中最小化 SSE?
- arrays - GO中的嵌套数组