首页 > 解决方案 > 如何将包含部分json字符串的文本日志转换为pyspark中的结构化?

问题描述

我正在尝试从部分包含 json 的非结构化日志创建数据框

2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}

这是我尝试过的

rdd = session.sparkContext.textFile("F:\\mypath\\rdd_test_log.txt")

dataFrame = rdd.map(lambda data: Row(time= data.split(" ")[0],
                                     ip= data.split(" ")[1],
                                     EventTime=data.split(":")[2])).toDF()


结果是


---------+------------------------+
|EventTime                     |ip       |time                    |
+------------------------------+---------+------------------------+
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
+------------------------------+---------+------------------------+

预期的:

time                     |ip        |eventtime          |sourcename|Keys        |Type
2020-09-24T08:03:01.633Z |10.1.20.1 |2020-09-24 13:33:01|local     |-9serverkey |status     

那么如何将此json字符串解析为rdd?或者应该是什么方法?

感谢您的帮助..

谢谢

标签: python-3.xapache-sparkpysparkapache-spark-sqlrdd

解决方案


您可以find('{')在字符串上使用来获取一个索引,您可以从中获取 JSON 文本的子字符串,然后解析该 JSON。

dataFrame = (
    rdd.map(lambda l: (l.split(" "), l))
    .map(
        lambda data: Row(
            time=data[0][0], ip=data[0][1], EventTime=data[1][data[1].find("{") :]
        )
    )
    .toDF()
    .select(
        "time",
        "ip",
        f.regexp_replace(f.col("EventTime"), '"Keys":(.*),', '"Keys":"$1",').alias(
            "EventTime"
        ),
    )
)

dataFrame.show(1, False)

节目

+------------------------+---------+---------------------------------------------------------------------------------------------+
|time                    |ip       |EventTime                                                                                    |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+

然后你可以解析EventTime成一个可以进一步扩展到许多列的映射:

parsed = dataFrame.select(
    "time",
    "ip",
    f.from_json(
        "EventTime",
        StructType(
            [
                StructField("EventTime", StringType()),
                StructField("sourcename", StringType()),
                StructField("Keys", StringType()),
                StructField("Type", StringType()),
            ]
        ),
    ).alias("eventdetails"),
)

现在从地图创建单独的列

parsed = (
    parsed.withColumn("eventtime", parsed["eventdetails"].getItem("EventTime"))
    .withColumn("sourcename", parsed["eventdetails"].getItem("sourcename"))
    .withColumn("Keys", parsed["eventdetails"].getItem("Keys"))
    .withColumn("Type", parsed["eventdetails"].getItem("Type"))
    .drop("eventdetails")
)

parsed.show()

这使:

+--------------------+---------+-------------------+----------+-----------+------+
|                time|       ip|          eventtime|sourcename|       Keys|  Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01|     local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+

请注意,我假设您的 JSON 是有效的。"Keys":-9serverkey是无效的键/值对,因此我将您的数据编辑为"Keys":"-9serverkey"


推荐阅读