mysql - Pyspark:将数据帧存储为 MySQL 表列中的 JSON
问题描述
我有一个火花数据框,它需要以 JSON 格式存储在 MYSQL 表中作为列值。(以及它们各自列中的其他字符串类型值)
与此类似的东西:
第 1 列 | 第 2 栏 |
---|---|
值 1 | [{"name":"Peter G", "age":44, "city":"Quahog"}, {"name":"John G", "age":30, "city":"Quahog"} , {...}, ...] |
值 1 | [{“name”:“Stewie G”,“age”:3,“city”:“Quahog”},{“name”:“Ron G”,“age”:41,“city”:“Quahog”} , {...}, ...] |
... | ... |
这[{"name":"Peter G", "age":44, "city":"Quahog"}, {"name":"John G", "age":30, "city":"Quahog"}, {...}, ...]
是存储为dict 列表的一个数据帧的结果
我可以:
str(dataframe_object.toJSON().collect())
然后将其存储到 mysql 表列中,但这意味着将整个数据加载到内存中,然后再将其存储到 mysql 表中。有没有更好/最佳的方法来实现这一点,即不使用collect()
?
解决方案
我想您可以将 StructType 列转换为 JSON 字符串,然后spark.write.jdbc
用于写入 MySQL。只要您的 MySQL 表将该列作为 JSON 类型,您就应该准备就绪。
# My sample data
{
"c1": "val1",
"c2": [
{ "name": "N1", "age": 100 },
{ "name": "N2", "age": 101 }
]
}
from pyspark.sql import functions as F
from pyspark.sql import types as T
schema = T.StructType([
T.StructField('c1', T.StringType()),
T.StructField('c2', T.ArrayType(T.StructType([
T.StructField('name', T.StringType()),
T.StructField('age', T.IntegerType())
])))
])
df = spark.read.json('a.json', schema=schema, multiLine=True)
df.show(10, False)
df.printSchema()
+----+----------------------+
|c1 |c2 |
+----+----------------------+
|val1|[{N1, 100}, {N2, 101}]|
+----+----------------------+
root
|-- c1: string (nullable = true)
|-- c2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: integer (nullable = true)
df.withColumn('j', F.to_json('c2')).show(10, False)
+----+----------------------+-------------------------------------------------+
|c1 |c2 |j |
+----+----------------------+-------------------------------------------------+
|val1|[{N1, 100}, {N2, 101}]|[{"name":"N1","age":100},{"name":"N2","age":101}]|
+----+----------------------+-------------------------------------------------+
编辑#1:
# My sample data
{
"c1": "val1",
"c2": "[{ \"name\": \"N1\", \"age\": 100 },{ \"name\": \"N2\", \"age\": 101 }]"
}
from pyspark.sql import functions as F
from pyspark.sql import types as T
df = spark.read.json('a.json', multiLine=True)
df.show(10, False)
df.printSchema()
+----+-----------------------------------------------------------+
|c1 |c2 |
+----+-----------------------------------------------------------+
|val1|[{ "name": "N1", "age": 100 },{ "name": "N2", "age": 101 }]|
+----+-----------------------------------------------------------+
root
|-- c1: string (nullable = true)
|-- c2: string (nullable = true)
schema = T.ArrayType(T.StructType([
T.StructField('name', T.StringType()),
T.StructField('age', T.IntegerType())
]))
df2 = df.withColumn('j', F.from_json('c2', schema))
df2.show(10, False)
df2.printSchema()
+----+-----------------------------------------------------------+----------------------+
|c1 |c2 |j |
+----+-----------------------------------------------------------+----------------------+
|val1|[{ "name": "N1", "age": 100 },{ "name": "N2", "age": 101 }]|[{N1, 100}, {N2, 101}]|
+----+-----------------------------------------------------------+----------------------+
root
|-- c1: string (nullable = true)
|-- c2: string (nullable = true)
|-- j: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- age: integer (nullable = true)
推荐阅读
- oracle - 如果(从表中选择列,其中列=:表中的var_input)?行不通:(
- dart - Flutter的动态AppBar
- java - 没有区分字段时适用于 TreeSet 的比较器
- python - 正则表达式有额外的斜杠
- python-3.x - python cmath 库的存储库
- java - 在按钮单击时实例化一个对象并使用递增的名称作为参考变量?(安卓工作室,java)
- python - 在 Flask 中,如何从表单数据生成动态 URL?
- python - 编写等效的 GNU/Linux 命令
- python - 根据两组不同的列计算一组新的列
- acumatica - 根据自定义字段的值在 PO 创建期间在 POLine 上创建新注释