python - 在 PySpark 中的多个列上应用 MinMaxScaler
问题描述
我想将MinMaxScalar
PySpark 应用于 PySpark 数据框的多列df
。到目前为止,我只知道如何将其应用于单个列,例如x
.
from pyspark.ml.feature import MinMaxScaler
pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)
scaler = MinMaxScaler(inputCol="x", outputCol="x")
scalerModel = scaler.fit(df)
scaledData = scalerModel.transform(df)
如果我有 100 列怎么办?有没有办法对 PySpark 中的许多列进行最小-最大缩放?
更新:
另外,如何应用于MinMaxScalar
整数或双精度值?它抛出以下错误:
java.lang.IllegalArgumentException: requirement failed: Column length must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually int.
解决方案
问题一:
如何更改您的示例以正常运行。您需要准备数据作为转换器工作的向量。
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorAssembler
pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)
assembler = VectorAssembler(inputCols=["x"], outputCol="x_vec")
scaler = MinMaxScaler(inputCol="x_vec", outputCol="x_scaled")
pipeline = Pipeline(stages=[assembler, scaler])
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
问题2:
要在多个列上运行 MinMaxScaler,您可以使用管道接收使用列表推导准备的转换列表:
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
columns_to_scale = ["x", "y", "z"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
在官方文档中查看此示例管道。
最终,您将得到以下格式的结果:
>>> scaledData.printSchema()
root
|-- x: long (nullable = true)
|-- y: long (nullable = true)
|-- z: long (nullable = true)
|-- x_vec: vector (nullable = true)
|-- y_vec: vector (nullable = true)
|-- z_vec: vector (nullable = true)
|-- x_scaled: vector (nullable = true)
|-- y_scaled: vector (nullable = true)
|-- z_scaled: vector (nullable = true)
>>> scaledData.show()
+---+---+----+-----+-----+--------+--------+--------+--------------------+
| x| y| z|x_vec|y_vec| z_vec|x_scaled|y_scaled| z_scaled|
+---+---+----+-----+-----+--------+--------+--------+--------------------+
| 0| 1| 100|[0.0]|[1.0]| [100.0]| [0.0]| [0.0]| [0.0]|
| 1| 2| 200|[1.0]|[2.0]| [200.0]| [0.5]| [0.25]|[0.1111111111111111]|
| 2| 5|1000|[2.0]|[5.0]|[1000.0]| [1.0]| [1.0]| [1.0]|
+---+---+----+-----+-----+--------+--------+--------+--------------------+
额外的后处理:
您可以通过一些后处理恢复原始名称中的列。例如:
from pyspark.sql import functions as f
names = {x + "_scaled": x for x in columns_to_scale}
scaledData = scaledData.select([f.col(c).alias(names[c]) for c in names.keys()])
输出将是:
scaledData.show()
+------+-----+--------------------+
| y| x| z|
+------+-----+--------------------+
| [0.0]|[0.0]| [0.0]|
|[0.25]|[0.5]|[0.1111111111111111]|
| [1.0]|[1.0]| [1.0]|
+------+-----+--------------------+
推荐阅读
- javascript - 有没有办法使用 Dojo/query 来获取具有特定数字而不是 getElementsByClassName 的类名
- flutter - How do i add data in two different collections using firestore and flutter
- sql - SQL 案例表达式计算同一列两次
- docker - 如何使用 Dockerfile 构建 ROS noetic 和 ros 包?
- spring - RSocket 负载测试显示性能不佳
- reactjs - 发送到子组件的破坏道具返回未定义
- sql - JSON 到 SQL 表,自动分配列名
- c# - Linq to Entity Group By error 无法翻译
- build - gitlab 的神器触发器
- arrays - 识别一维 numpy 数组中的所有连续正三元组