python - 如何通过pyspark查询将数据分组为多个块?
问题描述
我有 Spark DataFrame,我需要在某些条件下向它添加一列。
- 随机选择一块数据(chunk = 1000)而不进行替换
- 每个数据块在添加的列中应该有相同的数量。
然后我将有一列,通过在该列上使用 groupby,我可以对每个数据块执行一些操作。请你帮我解决这个问题?下面有一个示例,我想要一些类似的查询:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
# setup the spark data frame as a table
boston_sp.createOrReplaceTempView("boston")
# add train/test label and expand the data set by 3x (each num trees parameter)
full_df = spark.sql("""
select *
from (
select *, case when rand() < 0.8 then 1 else 0 end as training
from boston
) b
cross join (
select 11 as trees union all select 20 as trees union all select 50 as trees)
""")
schema = StructType([StructField('trees', LongType(), True),
StructField('r_squared', DoubleType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def train_RF(boston_pd):
trees = boston_pd['trees'].unique()[0]
# get the train and test groups
boston_train = boston_pd[boston_pd['training'] == 1]
boston_test = boston_pd[boston_pd['training'] == 0]
# create data and label groups
y_train = boston_train['target']
X_train = boston_train.drop(['target'], axis=1)
y_test = boston_test['target']
X_test = boston_test.drop(['target'], axis=1)
# train a classifier
rf= RFR(n_estimators = trees)
model = rf.fit(X_train, y_train)
# make predictions
y_pred = model.predict(X_test)
r = pearsonr(y_pred, y_test)
# return the number of trees, and the R value
return pd.DataFrame({'trees': trees, 'r_squared': (r[0]**2)}, index=[0])
# use the Pandas UDF
results = full_df.groupby('trees').apply(train_RF)
解决方案
这是一个小例子,可以做你想做的(我认为):
df = spark.range(10)
df2 = df.selectExpr(
"*",
"int(rand() * 3) trees",
"case when rand() < 0.8 then 1 else 0 end training"
)
df2.show()
+---+-----+--------+
| id|trees|training|
+---+-----+--------+
| 0| 1| 1|
| 1| 1| 1|
| 2| 2| 1|
| 3| 0| 1|
| 4| 1| 1|
| 5| 0| 1|
| 6| 2| 0|
| 7| 1| 1|
| 8| 2| 0|
| 9| 2| 1|
+---+-----+--------+
如果您使用的是 Spark,您可能需要考虑使用 Spark ML 库,而不是在分组数据帧上使用 scikit-learn。例如,如果你想做随机森林回归,你可以看看这个例子和文档中的这个例子。
推荐阅读
- python - 获取在函数内部创建的变量值
- sql - SQLite Sum 和 CASE 语句
- composer-php - 如何限制 PHP Deployer 作曲家的内存?
- c# - HRESULT 的异常:Excel Interop v2.0.50727 和 VS2017 上的 0x800A03EC
- angular - 如何检查 Angular 中的占位符内容?
- arrays - 将 char[] 转换为 char*[] 后出现分段错误
- sql - 将两个 select 语句的结果连接在一起
- bash - Bash Jobs:如何输出作业过程信息以及错误?
- c++ - 我如何在 C++ 中调用 talib macd?
- r - 如何分隔包含 JSON 数据的 r data.frame 中的行?