首页 > 解决方案 > Pyspark:将数据帧存储为 MySQL 表列中的 JSON

问题描述

我有一个火花数据框,它需要以 JSON 格式存储在 MYSQL 表中作为列值。(以及它们各自列中的其他字符串类型值)

与此类似的东西:

第 1 列 第 2 栏
值 1 [{"name":"Peter G", "age":44, "city":"Quahog"}, {"name":"John G", "age":30, "city":"Quahog"} , {...}, ...]
值 1 [{“name”:“Stewie G”,“age”:3,“city”:“Quahog”},{“name”:“Ron G”,“age”:41,“city”:“Quahog”} , {...}, ...]
... ...

[{"name":"Peter G", "age":44, "city":"Quahog"}, {"name":"John G", "age":30, "city":"Quahog"}, {...}, ...] 是存储为dict 列表的一个数据帧的结果

我可以:

str(dataframe_object.toJSON().collect())

然后将其存储到 mysql 表列中,但这意味着将整个数据加载到内存中,然后再将其存储到 mysql 表中。有没有更好/最佳的方法来实现这一点,即不使用collect()

标签: mysqlpython-3.xapache-sparkpyspark

解决方案


我想您可以将 StructType 列转换为 JSON 字符串,然后spark.write.jdbc用于写入 MySQL。只要您的 MySQL 表将该列作为 JSON 类型,您就应该准备就绪。

# My sample data
{
    "c1": "val1",
    "c2": [
        { "name": "N1", "age": 100 },
        { "name": "N2", "age": 101 }
    ]
}

from pyspark.sql import functions as F
from pyspark.sql import types as T

schema = T.StructType([
    T.StructField('c1', T.StringType()),
    T.StructField('c2', T.ArrayType(T.StructType([
        T.StructField('name', T.StringType()),
        T.StructField('age', T.IntegerType())
    ])))
])

df = spark.read.json('a.json', schema=schema, multiLine=True)
df.show(10, False)
df.printSchema()

+----+----------------------+
|c1  |c2                    |
+----+----------------------+
|val1|[{N1, 100}, {N2, 101}]|
+----+----------------------+

root
 |-- c1: string (nullable = true)
 |-- c2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: integer (nullable = true)

df.withColumn('j', F.to_json('c2')).show(10, False)
+----+----------------------+-------------------------------------------------+
|c1  |c2                    |j                                                |
+----+----------------------+-------------------------------------------------+
|val1|[{N1, 100}, {N2, 101}]|[{"name":"N1","age":100},{"name":"N2","age":101}]|
+----+----------------------+-------------------------------------------------+

编辑#1:

# My sample data
{
    "c1": "val1",
    "c2": "[{ \"name\": \"N1\", \"age\": 100 },{ \"name\": \"N2\", \"age\": 101 }]"
}

from pyspark.sql import functions as F
from pyspark.sql import types as T

df = spark.read.json('a.json', multiLine=True)
df.show(10, False)
df.printSchema()

+----+-----------------------------------------------------------+
|c1  |c2                                                         |
+----+-----------------------------------------------------------+
|val1|[{ "name": "N1", "age": 100 },{ "name": "N2", "age": 101 }]|
+----+-----------------------------------------------------------+

root
 |-- c1: string (nullable = true)
 |-- c2: string (nullable = true)

schema = T.ArrayType(T.StructType([
    T.StructField('name', T.StringType()),
    T.StructField('age', T.IntegerType())
]))

df2 = df.withColumn('j', F.from_json('c2', schema))
df2.show(10, False)
df2.printSchema()

+----+-----------------------------------------------------------+----------------------+
|c1  |c2                                                         |j                     |
+----+-----------------------------------------------------------+----------------------+
|val1|[{ "name": "N1", "age": 100 },{ "name": "N2", "age": 101 }]|[{N1, 100}, {N2, 101}]|
+----+-----------------------------------------------------------+----------------------+

root
 |-- c1: string (nullable = true)
 |-- c2: string (nullable = true)
 |-- j: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: integer (nullable = true)

推荐阅读