python - Pyspark:flattem Array 列
问题描述
我正在解析 Azure EventHub avro 消息。数组中的最后一列。我正在尝试将其展平。
前:
{"records":[{"time":"2020-01-28T04:50:20.0975886Z","resourceId":"/SUBSCRIPTIONS/xxxxxxxxxxxx/RESOURCEGROUPS/xxxxx-xxxxxxxI/PROVIDERS/MICROSOFT.COMPUTE/DISKS/7C3E07DE8xxxxxxx-0-SCRATCHVOLUME","operationName":"MICROSOFT.COMPUTE/DISKS/DELETE","category":"Administrative","resultType":"Start","resultSignature":"Started.","durationMs":"0","callerIpAddress":"43.121.152.99","correlationId":"xxxxxxx"},{"time":"2020-01-28T04:50:20.1122888Z","resourceId":"/SUBSCRIPTIONS/xxxxxxxxxxxx/RESOURCEGROUPS/xxxxx-xxxxxxxI/PROVIDERS/MICROSOFT.COMPUTE/DISKS/7C3E07DE8xxxxxxx-0-SCRATCHVOLUME","operationName":"MICROSOFT.COMPUTE/DISKS/DELETE","category":"Administrative","resultType":"Success","resultSignature":"Succeeded.NoContent","durationMs":"14","callerIpAddress":"43.121.152.99","correlationId":"xxxxxxx"}]}
这就是我想出的,我认为我非常接近。我得到了结构,我能够删除第一个值“记录”,但无法处理其中的数组。
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import explode, flatten
from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
# Creates a DataFrame from a specified directory
df = spark.read.format("avro").load("/mnt/test/xxxxxx/xxxxxxxx/31.avro")
# cast a binary column(Body) into string
df = df.withColumn("Body", col("Body").cast("string"))
sourceSchema= StructType([
StructField("records", ArrayType(
StructType([
StructField("time", StringType(), True),
StructField("resourceId", StringType(), True),
StructField("operationName", StringType(), True),
StructField("category", StringType(), True),
StructField("resultType", StringType(), True),
StructField("resultSignature", StringType(), True),
StructField("durationMs", StringType(), True),
StructField("callerIpAddress", StringType(), True),
StructField("correlationId", StringType(), True)
])
), True)
])
df = df.withColumn("Body", from_json(df.Body, sourceSchema))
# Flatten Body
for c in df.schema['Body'].dataType:
df2 = df.withColumn(c.name, col("Body." + c.name))
display(df2)
后:
[{"time":"2020-01-28T04:50:20.0975886Z","resourceId":"/SUBSCRIPTIONS/xxxxxxxxxxxx/RESOURCEGROUPS/xxxxx-xxxxxxxI/PROVIDERS/MICROSOFT.COMPUTE/DISKS/7C3E07DE8xxxxxxx-0-SCRATCHVOLUME","operationName":"MICROSOFT.COMPUTE/DISKS/DELETE","category":"Administrative","resultType":"Start","resultSignature":"Started.","durationMs":"0","callerIpAddress":"43.121.152.99","correlationId":"xxxxxxx"},{"time":"2020-01-28T04:50:20.1122888Z","resourceId":"/SUBSCRIPTIONS/xxxxxxxxxxxx/RESOURCEGROUPS/xxxxx-xxxxxxxI/PROVIDERS/MICROSOFT.COMPUTE/DISKS/7C3E07DE8xxxxxxx-0-SCRATCHVOLUME","operationName":"MICROSOFT.COMPUTE/DISKS/DELETE","category":"Administrative","resultType":"Success","resultSignature":"Succeeded.NoContent","durationMs":"14","callerIpAddress":"43.121.152.99","correlationId":"xxxxxxx"}]
解决方案
也许试试这个:
import pandas as pd
from pandas.io.json import json_normalize
s = {"records":[{"time":"2020-01-28T04:50:20.0975886Z","resourceId":"/SUBSCRIPTIONS/xxxxxxxxxxxx/RESOURCEGROUPS/xxxxx-xxxxxxxI/PROVIDERS/MICROSOFT.COMPUTE/DISKS/7C3E07DE8xxxxxxx-0-SCRATCHVOLUME","operationName":"MICROSOFT.COMPUTE/DISKS/DELETE","category":"Administrative","resultType":"Start","resultSignature":"Started.","durationMs":"0","callerIpAddress":"43.121.152.99","correlationId":"xxxxxxx"},{"time":"2020-01-28T04:50:20.1122888Z","resourceId":"/SUBSCRIPTIONS/xxxxxxxxxxxx/RESOURCEGROUPS/xxxxx-xxxxxxxI/PROVIDERS/MICROSOFT.COMPUTE/DISKS/7C3E07DE8xxxxxxx-0-SCRATCHVOLUME","operationName":"MICROSOFT.COMPUTE/DISKS/DELETE","category":"Administrative","resultType":"Success","resultSignature":"Succeeded.NoContent","durationMs":"14","callerIpAddress":"43.121.152.99","correlationId":"xxxxxxx"}]}
json_normalize(s).values
你会得到的结果是:
array([[list([{'time': '2020-01-28T04:50:20.0975886Z', 'resourceId': '/SUBSCRIPTIONS/xxxxxxxxxxxx/RESOURCEGROUPS/xxxxx-xxxxxxxI/PROVIDERS/MICROSOFT.COMPUTE/DISKS/7C3E07DE8xxxxxxx-0-SCRATCHVOLUME', 'operationName': 'MICROSOFT.COMPUTE/DISKS/DELETE', 'category': 'Administrative', 'resultType': 'Start', 'resultSignature': 'Started.', 'durationMs': '0', 'callerIpAddress': '43.121.152.99', 'correlationId': 'xxxxxxx'}, {'time': '2020-01-28T04:50:20.1122888Z', 'resourceId': '/SUBSCRIPTIONS/xxxxxxxxxxxx/RESOURCEGROUPS/xxxxx-xxxxxxxI/PROVIDERS/MICROSOFT.COMPUTE/DISKS/7C3E07DE8xxxxxxx-0-SCRATCHVOLUME', 'operationName': 'MICROSOFT.COMPUTE/DISKS/DELETE', 'category': 'Administrative', 'resultType': 'Success', 'resultSignature': 'Succeeded.NoContent', 'durationMs': '14', 'callerIpAddress': '43.121.152.99', 'correlationId': 'xxxxxxx'}])]],
dtype=object)
推荐阅读
- amazon-web-services - I sent it to https when I requested axios, but it will be transferred to http
- c++ - How to create and write a file into a folder on the same directory?
- python - paypal-checkout-serversdk with Django
- r - If column name is missing, use previous column name in R?
- bash - Bash: How to determine if a command exists without running it?
- c# - keras.net issue in visual studio
- typescript - How to "unit test" typescript?
- c++ - GetDIBits 返回所有值为 0 的数组
- java - Databse connection pool thread starvation
- typescript - In a forEach loop in typescript, how can I add to my variable through each instance?