machine-learning - 尝试在 Spark Structured Streaming 中使用 sklearn 库
问题描述
我想使用 Kafka 和 Spark 结构化流将 sklearn.preprocessing 的标签编码器功能应用于流数据。到目前为止的想法是:
当我每次从 Kafka 源接收一批数据时,我想在该批上实现一个函数,如下所示:
def use_label_encoder(label_encoder, y):
return label_encoder.transform(y) + 1
to_transform_class_val = udf(use_label_encoder, IntegerType())
这是架构:
schema = StructType([
StructField("sepal_length_in_cm", StringType()), \
StructField("sepal_width_in_cm", StringType()), \
StructField("petal_length_in_cm", StringType()), \
StructField("petal_width_in_cm", StringType()), \
StructField("class", StringType())
])
df = df.selectExpr("CAST(value AS STRING)")
df1 = df.select(from_json(df.value, schema).alias("json"))
当我尝试定义 label_encoder 时:
label_encoder = enc.fit(df1.select(to_upper("json.class")))
它给出了一个错误“输入形状错误”
我用于非流数据的等效代码是:
y = df['class'].values
enc = LabelEncoder()
label_encoder = enc.fit(y)
y = label_encoder.transform(y) + 1
谁能帮助我了解如何将 sklearn 方法应用于流数据?
解决方案
以后可以加1吗?你的火花代码会变成
def use_label_encoder(label_encoder, y):
return label_encoder.transform(y)
to_transform_class_val = udf(use_label_encoder, IntegerType())
df = df.withColumn('new_col', to_transform_class_val(label_encoder, 'old_column'))
df = df.withColumn('label_enc', col('new_col') + lit(1))
推荐阅读
- python - 如何读取泡菜文件
- excel - DAO.DBEngine 类不再使用 Windows 10 在 MS Access 2016 中注册
- python - 验证是否可以通过在来自两个不同列表的值之间来回循环来构造单词
- android-studio - 您无法移除所有正式版 APK 和 Android App Bundle
- flutter - 如何在任何 Flutter/Dart http 客户端上删除重定向时的授权标头
- python - 无法得到预期的情节
- firebase - 如何在颤振中使用提供者获取 Firebase 收集文档详细信息
- sql - SQL 查询计算两个附加列
- python - 如何在 Tkinter 中使图像可点击?
- visual-studio-code - 在 vscode 中无法连接到 jupyter 服务器