首页 > 解决方案 > 如何通过pyspark查询将数据分组为多个块?

问题描述

我有 Spark DataFrame,我需要在某些条件下向它添加一列。

  1. 随机选择一块数据(chunk = 1000)而不进行替换
  2. 每个数据块在添加的列中应该有相同的数量。

然后我将有一列,通过在该列上使用 groupby,我可以对每个数据块执行一些操作。请你帮我解决这个问题?下面有一个示例,我想要一些类似的查询:


from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

# setup the spark data frame as a table
boston_sp.createOrReplaceTempView("boston")

# add train/test label and expand the data set by 3x (each num trees parameter)
full_df = spark.sql("""
  select *
  from (
    select *, case when rand() < 0.8 then 1 else 0 end as training 
    from boston
  ) b
  cross join (
      select 11 as trees union all select 20 as trees union all select 50 as trees)
""")

schema = StructType([StructField('trees', LongType(), True),
                     StructField('r_squared', DoubleType(), True)])  

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def train_RF(boston_pd):
    trees = boston_pd['trees'].unique()[0]

    # get the train and test groups 
    boston_train = boston_pd[boston_pd['training'] == 1]
    boston_test = boston_pd[boston_pd['training'] == 0] 
        
    # create data and label groups 
    y_train = boston_train['target']
    X_train = boston_train.drop(['target'], axis=1)
    y_test = boston_test['target']
    X_test = boston_test.drop(['target'], axis=1)
   
    # train a classifier 
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)
    
    # return the number of trees, and the R value 
    return pd.DataFrame({'trees': trees, 'r_squared': (r[0]**2)}, index=[0])
  
# use the Pandas UDF
results = full_df.groupby('trees').apply(train_RF)

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


这是一个小例子,可以做你想做的(我认为):

df = spark.range(10)

df2 = df.selectExpr(
    "*",
    "int(rand() * 3) trees",
    "case when rand() < 0.8 then 1 else 0 end training"
)

df2.show()
+---+-----+--------+
| id|trees|training|
+---+-----+--------+
|  0|    1|       1|
|  1|    1|       1|
|  2|    2|       1|
|  3|    0|       1|
|  4|    1|       1|
|  5|    0|       1|
|  6|    2|       0|
|  7|    1|       1|
|  8|    2|       0|
|  9|    2|       1|
+---+-----+--------+

如果您使用的是 Spark,您可能需要考虑使用 Spark ML 库,而不是在分组数据帧上使用 scikit-learn。例如,如果你想做随机森林回归,你可以看看这个例子和文档中的这个例子


推荐阅读