python - 如何更改 pyspark 上的 JSON 结构?
问题描述
我有两个由 kafka 读取的 json 文件,这是他们的 printSchema ()
JSON1 打印模式:
root
|-- _id: string (nullable = true)
|-- Data: string (nullable = true)
|-- NomeAzienda: string (nullable = true)
|-- Valori_Di_Borsa: struct (nullable = false)
| |-- PrezzoUltimoContratto: double (nullable = true)
| |-- Var%: double (nullable = true)
| |-- VarAssoluta: double (nullable = true)
| |-- OraUltimoContratto: string (nullable = true)
| |-- QuantitaUltimo: double (nullable = true)
| |-- QuantitaAcquisto: double (nullable = true)
| |-- QuantitaVendita: double (nullable = true)
| |-- QuantitaTotale: double (nullable = true)
| |-- NumeroContratti: double (nullable = true)
| |-- MaxOggi: double (nullable = true)
| |-- MinOggi: double (nullable = true)
JSON2 打印模式():
root
|-- _id: string (nullable = true)
|-- News: struct (nullable = false)
| |-- TitoloNews: string (nullable = true)
| |-- TestoNews: string (nullable = true)
| |-- DataNews: string (nullable = true)
| |-- OraNews: long (nullable = true)
| |-- SoggettoNews: string (nullable = true)
加入两个 JSON,我得到了这个 printSchema():
root
|-- _id: string (nullable = true)
|-- Data: string (nullable = true)
|-- NomeAzienda: string (nullable = true)
|-- Valori_Di_Borsa: struct (nullable = false)
| |-- PrezzoUltimoContratto: double (nullable = true)
| |-- Var%: double (nullable = true)
| |-- VarAssoluta: double (nullable = true)
| |-- OraUltimoContratto: string (nullable = true)
| |-- QuantitaUltimo: double (nullable = true)
| |-- QuantitaAcquisto: double (nullable = true)
| |-- QuantitaVendita: double (nullable = true)
| |-- QuantitaTotale: double (nullable = true)
| |-- NumeroContratti: double (nullable = true)
| |-- MaxOggi: double (nullable = true)
| |-- MinOggi: double (nullable = true)
|-- _id: string (nullable = true)
|-- News: struct (nullable = false)
| |-- TitoloNews: string (nullable = true)
| |-- TestoNews: string (nullable = true)
| |-- DataNews: string (nullable = true)
| |-- OraNews: long (nullable = true)
| |-- SoggettoNews: string (nullable = true)
但我想要的结果是这样的:
更新根:
-- _id: string (nullable = true)
-- Data: string (nullable = true)
-- NomeAzienda: string (nullable = true)
-- Valori_Di_Borsa: struct (nullable = false)
|-- PrezzoUltimoContratto: double (nullable = true)
|-- Var%: double (nullable = true)
|-- VarAssoluta: double (nullable = true)
|-- OraUltimoContratto: string (nullable = true)
|-- QuantitaUltimo: double (nullable = true)
|-- QuantitaAcquisto: double (nullable = true)
|-- QuantitaVendita: double (nullable = true)
|-- QuantitaTotale: double (nullable = true)
|-- NumeroContratti: double (nullable = true)
|-- MaxOggi: double (nullable = true)
|-- MinOggi: double (nullable = true)
|-- News: struct (nullable = false)
|-- id: string (nullable = true)
|-- TitoloNews: string (nullable = true)
|-- TestoNews: string (nullable = true)
|-- DataNews: string (nullable = true)
|-- OraNews: long (nullable = true)
|-- SoggettoNews: string (nullable = true)
我怎样才能使用 pyspark 做到这一点?
这是我的代码:
df_borsa = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "latest") \
.option("subscribe","Be_borsa") \
.load() \
.selectExpr("CAST(value AS STRING)")
df_news = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "latest") \
.option("subscribe","Ita_news") \
.load() \
.selectExpr("CAST(value AS STRING)")
df_borsa =df_borsa.withColumn("Valori_Di_Borsa",F.struct(F.col("PrezzoUltimoContratto"),F.col("Var%"),F.col("VarAssoluta"),F.col("OraUltimoContratto"),F.col("QuantitaUltimo"),F.col("QuantitaAcquisto"),F.col("QuantitaVendita"),F.col("QuantitaTotale"),F.col("NumeroContratti"),F.col("MaxOggi"),F.col("MinOggi")))
df_borsa.printSchema()
df_news = df_news.withColumn("News",F.struct(F.col("TitoloNews"),F.col("TestoNews"),F.col("DataNews"),F.col("OraNews"),F.col("SoggettoNews")))
df_news.printSchema()
df_join = df_borsa.join(df_news)
df_join.printSchema()
解决方案
检查下面的代码。
提取结构Valori_Di_Borsa
列,添加News
列并重建结构。
df_join = df_borsa.join(df_news)
.withColumn("Valori_Di_Borsa",F.struct(F.col("Valori_Di_Borsa.*"),F.col("News"))))
推荐阅读
- sql-server - SQL Server 递归 - 查询帮助以计算当前和总体总计
- python - 使用递归调用构建模式
- spring - 如何为休眠实体选择正确的级联类型?
- pygame - PyGame中的窗口在前台不起作用
- powerbi - Power BI 通过 URL 传递参数并使用所述参数构建数据集
- google-data-studio - 如何在 Google 数据工作室日期范围过滤器中包含所有数据
- google-data-studio - 在 Google Data Studio 中按类别而不是日期创建饼图
- python - 使用 Python 和 Cx_Oracle 调用带有 XMLTYPE 输入和输出参数的 Oracle 存储过程
- vue.js - 使用 vue.js 从 textfield 和 textarea 中删除 null
- sql - 如何查询三列并且总共只有一列?