json - pyspark - 从模式中删除标点符号
问题描述
我有一个可以通过以下方式加载的 json 文件:
df = spark.read.json(fpath)
json 是嵌套的,一些嵌套的列名中有标点符号。当我尝试创建非托管表时,这会产生问题。我可以通过创建具有有效列名的模式来解决这个问题,但这是一个劳动密集型的过程,因为我有很多文件,每个文件都有很多名称。
我希望能够读取 json 文件,通过从列名中删除任何标点符号来修改文件架构,然后使用新架构来保存文件。那可能吗?
这是一个示例 json 文件:
[{"cursor": "eyJfaW", "node": {"person": {"_id": "5cca", "display": "66"}, "completedQueues": ["STATEMENT_QUERYBUILDERCACHE_QUEUE", "STATEMENT_PERSON_QUEUE", "STATEMENT_FORWARDING_QUEUE"], "processingQueues": [], "deadForwardingQueue": [], "pendingForwardingQueue": [], "completedForwardingQueue": [], "failedForwardingLog": [], "_id": "5ce372", "hasGeneratedId": false, "organisation": "5b6803e", "lrs_id": "5c9bf", "client": "5c9", "active": true, "voided": false, "timestamp": "2019-05-21T03:36:34.199Z", "stored": "2019-05-21T03:36:34.345Z", "hash": "531c7", "agents": ["mailto:test@gmail.com"], "relatedAgents": ["mailto:test@gmail.com", "mailto:hello@test.net"], "registrations": [], "verbs": ["http://test.gov/expapi/verbs/completed"], "activities": ["https://test.test.org/wgua/pf/spNext/page1"], "relatedActivities": ["https://test.test.org/test/pf/spNext/page1"], "statement": {"id": "b389190", "timestamp": "2019-05-21T03:36:34.199Z", "actor": {"objectType": "Agent", "mbox": "mailto:test@gmail.com", "name": "66"}, "verb": {"id": "http://test.gov/expapi/verbs/completed", "display": {"en-US": "completed"}}, "result": {"extensions": {"http://test.org/xapi/extension/timein": 6.863}}, "object": {"id": "https://test.test.org/wgua/pf/spNext/page1", "objectType": "Activity", "definition": {"type": "http://test.gov/expapi/activities/page", "name": {"en-US": "Strategic Planning - Intro - Page 1"}, "description": {"en-US": "Strategic Planning Introduction Page 1"}, "extensions": {"http://test.org/xapi/extension/path": {"project": "wgua", "course": "test", "section": "spNext", "page": "page1", "object": null}}}}, "stored": "2019-05-21T03:36:34.345Z", "authority": {"objectType": "Agent", "name": "New Client", "mbox": "mailto:hello@test.net"}, "version": "1.0.0"}, "__v": 1, "personaIdentifier": "5cc"}}]
解决方案
尝试这个:
- 将 json 文件加载到 DataFrame
import json
import re
from pyspark.sql.types import *
df=spark.read.json("t.json")
df.printSchema()
这将是架构中使用示例数据的前几个字段,请注意字段“__v”
root
|-- cursor: string (nullable = true)
|-- node: struct (nullable = true)
| |-- __v: long (nullable = true)
| |-- _id: string (nullable = true)
| |-- active: boolean (nullable = true)
| |-- activities: array (nullable = true)
| | |-- element: string (containsNull = true)
...
- 要从
__
“__v”中删除,让我们将模式转换为字符串并使用re
模块
olds = df.schema
jsonStr = olds.json() # get json string for the schema
jsonStrCleaned = re.sub('_', '', jsonStr) # remove '_'
jsonDataCleaned = json.loads(jsonStrCleaned) # convert the string back to json object
news = StructType.fromJson(jsonDataCleaned) # construct new schema from the cleaned json
- 现在使用清理后的模式创建一个新的 DataFrame
df1 = spark.createDataFrame(df.rdd, news)
df1.printSchema()
您可以看到“__v”字段已更改:
root
|-- cursor: string (nullable = true)
|-- node: struct (nullable = true)
| |-- v: long (nullable = true)
| |-- id: string (nullable = true)
| |-- active: boolean (nullable = true)
| |-- activities: array (nullable = true)
| | |-- element: string (containsNull = true)
...
现在您可以将 DataFrame 保存到具有新架构的文件中。
推荐阅读
- java - SmartHomeApp:Java 上的 reportState 总是返回 INVALID_ARGUMENT
- mobile - 移动版 Orbeon 表格
- r - 使用 pyspark 如何使用边缘列表中的选定对创建单向图
- ruby-on-rails - 通过 RoR 应用程序访问上传到 heroku 的图像时出现问题
- vue.js - VUEJS forEach 数组抛出 this is undefined
- java - 如何在java中按距离排序在elasticsearch中设置Ignore_unmapped true?
- wordpress - 我可以在不使用 WordPress 重定向的情况下在 URL 上强制使用斜杠吗?
- android - Firebase:如何从集合中只获取一个文档?防止从数据库中读取的数量
- firebase - 如何减少 Firebase Cloud Firestore 上的读取操作?
- node.js - jenkins作业运行时如何为nodejs项目创建.env文件