python-3.x - 如何将包含部分json字符串的文本日志转换为pyspark中的结构化?
问题描述
我正在尝试从部分包含 json 的非结构化日志创建数据框
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
这是我尝试过的
rdd = session.sparkContext.textFile("F:\\mypath\\rdd_test_log.txt")
dataFrame = rdd.map(lambda data: Row(time= data.split(" ")[0],
ip= data.split(" ")[1],
EventTime=data.split(":")[2])).toDF()
结果是
---------+------------------------+
|EventTime |ip |time |
+------------------------------+---------+------------------------+
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
+------------------------------+---------+------------------------+
预期的:
time |ip |eventtime |sourcename|Keys |Type
2020-09-24T08:03:01.633Z |10.1.20.1 |2020-09-24 13:33:01|local |-9serverkey |status
那么如何将此json字符串解析为rdd?或者应该是什么方法?
感谢您的帮助..
谢谢
解决方案
您可以find('{')
在字符串上使用来获取一个索引,您可以从中获取 JSON 文本的子字符串,然后解析该 JSON。
dataFrame = (
rdd.map(lambda l: (l.split(" "), l))
.map(
lambda data: Row(
time=data[0][0], ip=data[0][1], EventTime=data[1][data[1].find("{") :]
)
)
.toDF()
.select(
"time",
"ip",
f.regexp_replace(f.col("EventTime"), '"Keys":(.*),', '"Keys":"$1",').alias(
"EventTime"
),
)
)
dataFrame.show(1, False)
节目
+------------------------+---------+---------------------------------------------------------------------------------------------+
|time |ip |EventTime |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+
然后你可以解析EventTime
成一个可以进一步扩展到许多列的映射:
parsed = dataFrame.select(
"time",
"ip",
f.from_json(
"EventTime",
StructType(
[
StructField("EventTime", StringType()),
StructField("sourcename", StringType()),
StructField("Keys", StringType()),
StructField("Type", StringType()),
]
),
).alias("eventdetails"),
)
现在从地图创建单独的列
parsed = (
parsed.withColumn("eventtime", parsed["eventdetails"].getItem("EventTime"))
.withColumn("sourcename", parsed["eventdetails"].getItem("sourcename"))
.withColumn("Keys", parsed["eventdetails"].getItem("Keys"))
.withColumn("Type", parsed["eventdetails"].getItem("Type"))
.drop("eventdetails")
)
parsed.show()
这使:
+--------------------+---------+-------------------+----------+-----------+------+
| time| ip| eventtime|sourcename| Keys| Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01| local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+
请注意,我假设您的 JSON 是有效的。"Keys":-9serverkey
是无效的键/值对,因此我将您的数据编辑为"Keys":"-9serverkey"
推荐阅读
- python - 当我的启动脚本与 VM 实例一起运行时,出现“net/url:URL 中的无效控制字符”错误
- python-3.x - 等待套接字连接的一定时间
- java - Akka 打字比 Classic 慢
- node.js - module.export 中的 nodejs 函数语法
- node.js - Winston 记录器分析器时间单位
- python - 为什么两个 __dict__s 之间的比较是假的?
- wordpress - (未知) Uncaught ReferenceError: u is not defined
- yum - 使用 Centos 6 时,我无法再运行 'yum update',我收到此错误 'Cannot find a valid baseurl for repo: base'
- sql - 重新排序 SQL Server 表
- mariadb - Wildfly 22.0.1 无法使用 MariaDB 数据源部署 EAR