apache-spark - 如何将一些 pyspark 数据框的列转换为具有列名的 dict 并将它们组合成 json 列?
问题描述
我有以下格式的数据,我想使用带有两列('tag'和'data')的pyspark更改其格式。'tag'列值是唯一的,'data'列值是从原始列'date、stock、price'获取的json字符串,其中'stock'和'price'组合为'A'列值,将 'date' 和 'num' 组合为 'B' 列的值。
我没有找到或编写好的函数来实现这种效果。
我的火花版本是 2.1.0
原始数据框
date, stock, price, tag, num
1388534400, GOOG, 50, a, 1
1388534400, FB, 60, b, 2
1388534400, MSFT, 55, c, 3
1388620800, GOOG, 52, d, 4
我期望输出:
新数据框
tag| data
'a'| "{'A':{'stock':'GOOD', 'price': 50}, B:{'date':1388534400, 'num':1}"
'b'| "{'A':{'stock':'FB', 'price': 60}, B:{'date':1388534400, 'num':2}"
'c'| "{'A':{'stock':'MSFT', 'price': 55}, B:{'date':1388534400, 'num':3}"
'd'| "{'A':{'stock':'GOOG', 'price': 52}, B:{'date':1388620800, 'num':4}"
+--+---------------------------------- ----------------+
from pyspark.sql import SparkSession
from pyspark.sql.functions import create_map
spark = SparkSession.builder.appName("example").getOrCreate()
df = spark.createDataFrame([
(1388534400, "GOOG", 50, 'a', 1),
(1388534400, "FB", 60, 'b', 2),
(1388534400, "MSFT", 55, 'c', 3),
(1388620800, "GOOG", 52, 'd', 4)]
).toDF("date", "stock", "price", 'tag', 'num')
df.show()
tag_cols = {'A':['stock', 'price'], 'B':['date', 'num']}
# todo, change the Dataframe columns format
解决方案
IIUC,只需使用pyspark.sql.functions.struct和pyspark.sql.functions.to_json(两者都应该在 spark 2.1 中可用)
from pyspark.sql import functions as F
# skip df initialization[enter link description here][1]
df_new = df.withColumn('A', F.struct('stock', 'price')) \
.withColumn('B', F.struct('date', 'num')) \
.select('tag', F.to_json(F.struct('A', 'B')).alias('data'))
>>> df_new.show(5,0)
+---+-----------------------------------------------------------------+
|tag|data |
+---+-----------------------------------------------------------------+
|a |{"A":{"stock":"GOOG","price":50},"B":{"date":1388534400,"num":1}}|
|b |{"A":{"stock":"FB","price":60},"B":{"date":1388534400,"num":2}} |
|c |{"A":{"stock":"MSFT","price":55},"B":{"date":1388534400,"num":3}}|
|d |{"A":{"stock":"GOOG","price":52},"B":{"date":1388620800,"num":4}}|
+---+-----------------------------------------------------------------+
推荐阅读
- django - DJango:我们可以在 raw() 函数中使用 ALTER TABLE 查询吗
- orocommerce - 删除访客访问时无法访问图像
- linux - 在 GDB 中获取线程的堆栈区域
- graphdb - 我们如何在 GDB 工作台中对存储库进行排序?
- firebase - How can I get the dowload url from images that are resized with firestorage image-resize extension?
- c - 当 arg 不同时,为什么这段代码的结果是一样的?
- javascript - Uncaught SyntaxError: Invalid left-hand side in assignment - 解决方案?
- reactjs - Material-ui MuiThemeProvider 不适用
- python - ValueError:无法将操作数与形状一起广播到矢量化函数
- node.js - 无法安装 mongodb-client-encryption(libmongocrypt 的 Node.js 包装器)