首页 > 解决方案 > PySpark:执行相同的操作,多列

问题描述

我对 Spark/PySpark 还是很陌生,并且一直在努力保持一致地遵循“最佳实践”,但不幸的是,有时我会恢复到像下面这样的 hacky 解决方案。

我正在研究一个推文数据集,其中包含基本推文、转推内容和引用内容。如果推文不是 retweeted_status 或quoted_status,则根据数据类型将每个子功能设置为 None 或空列表或其他一些变体。

我要做的是为这些功能中的每一个创建新列,如果它不是转发或引用状态,则使用来自基本功能的内容,或者使用转发的内容,或者使用基本功能+引用的内容。

def _shared_content(feature, df):
    return df.withColumn(f'tweet_{feature.replace("entities.", "")}',
        when((col(f'`retweeted_status.{feature}`').isNotNull()),
              df[f'`retweeted_status.{feature}`'])
        .when((col(f'`quoted_status.{feature}`').isNotNull()),
                concat(
                       df[f'`{feature}`'],
                       lit(" "),
                       df[f'`quoted_status.{feature}`']))
        .otherwise(df[f'`{feature}`']))

common_features = [
    'text',
    'entities.hashtags',
    'entities.media',
    'entities.urls',
]

for f in common_features:
    df = df.transform(lambda df, f=f: _shared_content(f, df))

如您所见,这有点混乱,因此我编写了一些伪代码以提高可读性。在这里,我正在执行以下功能:

这个解决方案目前有效,但感觉非常糟糕,坦率地说难以辨认。我想知道是否有更类似于 Spark 的方法来消除大量冗余代码?我试图从列表中应用某种映射到函数,但有点迷失了。

最后澄清一下,我正在执行相同的转换,但唯一改变的是我正在处理的功能。

编辑:我通过以下方式使这变得更加清晰:

def _shared_content(f, df):
    new_col = f'tweet_{f.replace("entities.", "")}'

    retweet_cond = ((
        col(f'`retweeted_status.{f}`').isNotNull()),
                    df[f'`retweeted_status.{f}`'])
    quoted_cond = ((
        col(f'`quoted_status.{f}`').isNotNull()),
                   concat(df[f'`{f}`'], lit(" "), df[f'`quoted_status.{f}`']))

    return df.withColumn(
        new_col,
        when(*retweet_cond)
        .when(*quoted_cond)
        .otherwise(df[f'`{f}`'])
    )

标签: apache-sparkpyspark

解决方案


我会写这样的东西:

def _shared_content(feature, df):
    feat_col = col(feature)
    retweet = col(f"retweeted_status.{feature}")
    quoted = col(f"quoted_status.{feature}")
    new_feat_name = f'tweet_{feature.replace("entities.", "")}'

    return df.withColumn(
        new_feat_name,
        (
            when(retweet.isNotNull(), retweet)
            .when(quoted.isNotNull(), concat(feat_col, lit(" "), quoted))
            .otherwise(feat_col)
        ),
    )

在使用 Pyspark(或其他任何东西)编写代码时,我通常遵循的一些原则:

  • 避免代码重复(您在多个地方重复了列名)
  • 当它有助于可读性时,用变量名称替换原始值(如新功能名称的情况)
  • 使用 Spark 列对象,而不是使用df["<column name>"].

PS.:我不确定你为什么使用反引号。


推荐阅读