python - 如何在 PySpark ML 中创建自定义 SQLTransformer 以透视数据
问题描述
我有一个类似于以下结构的数据框:
# Prepare training data
training = spark.createDataFrame([
(990011, 1001, 01, "Salary", 1000, 0.0),
(990011, 1002, 02, "POS Purchase", 50, 0.0),
(990022, 1003, 01, "Cash Withdrawl", 500, 1.0),
(990022, 1004, 02, "Interest Charge", 35, 1.0)
], ["customer_id", "transaction_id", "week_of_year", "category", "amount", "label"])
我可以使用 PySpark 动态地旋转这些数据,这消除了每周和类别的硬代码案例语句的需要:
# Attempt 1
tx_pivot = training \
.withColumn("week_of_year", sf.concat(sf.lit("T"), sf.col("week_of_year"))) \
.groupBy("customer_id") \
.pivot("week_of_year") \
.sum("amount")
tx_pivot.show(20)
我想开发一个自定义 Transformer 来动态透视数据,这样我就可以将这个自定义 Transform 阶段合并到 Spark ML Pipeline 中。不幸的是,Spark/PySpark 中的当前 SQLTransfomer 仅支持 SQL,例如“SELECT ... FROM THIS ”(请参阅 https://github.com/apache/spark/blob/master/python/pyspark/ml/feature.py)。
任何有关如何创建自定义 Transformer 以动态透视数据的指导将不胜感激。
解决方案
实现一个接受数据帧并返回另一个数据帧的自定义转换器非常简单。在你的情况下:
import pyspark.ml.pipeline.Transformer as Transformer
class PivotTransformer(Transformer):
def _transform(self, data):
return data.withColumn("week_of_year",sf.concat(sf.lit("T"),\
sf.col("week_of_year"))) \
.groupBy("customer_id") \
.pivot("week_of_year") \
.sum("amount")
推荐阅读
- css - css 根据屏幕大小不同地定位两个 div
- python - 在图像上卷积一个小型神经网络?
- javascript - 在继续之前是否可以等待内部的`await`?
- python-3.x - 如何使用 Selenium 获取 CSS 类的文本
- c# - C# BadImageFormatException 仅在一台 PC 上?
- wpf - 用于移动网格内容的 WPF 锚点网格 OpacityMask
- python - 如果在字符串变量中发现多个字符串,则选择第二个特定字符串
- sql - 在 Hive 中将字符串转换为时间戳
- python - 在测试集中使用带有新功能的 Sklearn Pipeline(文本分类)
- php - 如何隐藏未登录用户的店面搜索框?