apache-spark - PySpark:执行相同的操作,多列
问题描述
我对 Spark/PySpark 还是很陌生,并且一直在努力保持一致地遵循“最佳实践”,但不幸的是,有时我会恢复到像下面这样的 hacky 解决方案。
我正在研究一个推文数据集,其中包含基本推文、转推内容和引用内容。如果推文不是 retweeted_status 或quoted_status,则根据数据类型将每个子功能设置为 None 或空列表或其他一些变体。
我要做的是为这些功能中的每一个创建新列,如果它不是转发或引用状态,则使用来自基本功能的内容,或者使用转发的内容,或者使用基本功能+引用的内容。
def _shared_content(feature, df):
return df.withColumn(f'tweet_{feature.replace("entities.", "")}',
when((col(f'`retweeted_status.{feature}`').isNotNull()),
df[f'`retweeted_status.{feature}`'])
.when((col(f'`quoted_status.{feature}`').isNotNull()),
concat(
df[f'`{feature}`'],
lit(" "),
df[f'`quoted_status.{feature}`']))
.otherwise(df[f'`{feature}`']))
common_features = [
'text',
'entities.hashtags',
'entities.media',
'entities.urls',
]
for f in common_features:
df = df.transform(lambda df, f=f: _shared_content(f, df))
如您所见,这有点混乱,因此我编写了一些伪代码以提高可读性。在这里,我正在执行以下功能:
- 对于共同特征中的每个特征:
- 如果 retweet_status.[FEATURE] 不是 None,则将 new col tweet_[FEATURE] 设置为 retweet_status.[FEATURE]
- 如果quoted_status.[FEATURE] 不是None,则将new col tweet_[FEATURE] 设置为[FEATURE] + " " + quoted_status.[FEATURE]
- 否则,将 tweet_[FEATURE] 设置为 base [FEATURE]。
这个解决方案目前有效,但感觉非常糟糕,坦率地说难以辨认。我想知道是否有更类似于 Spark 的方法来消除大量冗余代码?我试图从列表中应用某种映射到函数,但有点迷失了。
最后澄清一下,我正在执行相同的转换,但唯一改变的是我正在处理的功能。
编辑:我通过以下方式使这变得更加清晰:
def _shared_content(f, df):
new_col = f'tweet_{f.replace("entities.", "")}'
retweet_cond = ((
col(f'`retweeted_status.{f}`').isNotNull()),
df[f'`retweeted_status.{f}`'])
quoted_cond = ((
col(f'`quoted_status.{f}`').isNotNull()),
concat(df[f'`{f}`'], lit(" "), df[f'`quoted_status.{f}`']))
return df.withColumn(
new_col,
when(*retweet_cond)
.when(*quoted_cond)
.otherwise(df[f'`{f}`'])
)
解决方案
我会写这样的东西:
def _shared_content(feature, df):
feat_col = col(feature)
retweet = col(f"retweeted_status.{feature}")
quoted = col(f"quoted_status.{feature}")
new_feat_name = f'tweet_{feature.replace("entities.", "")}'
return df.withColumn(
new_feat_name,
(
when(retweet.isNotNull(), retweet)
.when(quoted.isNotNull(), concat(feat_col, lit(" "), quoted))
.otherwise(feat_col)
),
)
在使用 Pyspark(或其他任何东西)编写代码时,我通常遵循的一些原则:
- 避免代码重复(您在多个地方重复了列名)
- 当它有助于可读性时,用变量名称替换原始值(如新功能名称的情况)
- 使用 Spark 列对象,而不是使用
df["<column name>"]
.
PS.:我不确定你为什么使用反引号。
推荐阅读
- sql - SSMS - 将长数据导出到 Excel 文件而不截断数据
- sql - SSMS 按日期时间字段过滤数据
- java - 当开发工具重新启动 Web 服务器时,计划任务不会停止
- angularjs - 尝试多次加载 AngularJS(从 1.5 升级到 1.7 后)
- sql-server - 带有日期帮助的 T-SQL SELECT 查询
- ruby-on-rails - 通过 Ruby 中的递归嵌套哈希
- excel - 更改另一个工作簿上的值时,Application.Volatile 不会重新计算函数
- scala - scala 聚合第一个函数给出了意想不到的结果
- python - 比较python中的列表是否相同,当它们有不同的大括号时
- sql - 有没有一种非常有效的方法可以从 SQLite 表中删除不在另一个 SQLite 表中的记录?