python - 仅当列存在时,带有列表达式的 pyspark
问题描述
%python
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False
df = spark.createDataFrame([ \
("C","I"), \
("I","I"), \
("C","B"), \
], ["B2B","E1J"])
df.show()
+---+---+
|B2B|E1J|
+---+---+
| C| I|
| I| I|
| C| B|
+---+---+
现在这就是我想要做的:检查一个列是否存在并且只有它存在,然后检查它的值并基于它为标志列分配一个值。只要在有效列上完成检查,这工作正常, 如下
df.withColumn("flag",when( ((lit(has_column(df, "B2B"))) & (col("B2B")=="C") ) , 1).otherwise(0)).show()
+---+---+----+
|B2B|E1J|flag|
+---+---+----+
| C| I| 1|
| I| I| 0|
| C| B| 1|
+---+---+----+
我遇到的问题是这些检查条件不是静态的,而是它们是从外部文件中读取并动态生成的,它可能具有实际数据框没有的列并导致错误如下。
有什么想法可以解决这个问题吗?
例如:
df.withColumn("flag", \
when( \
(lit(has_column(df, "GBC"))) & (col("GBC")=="C") | \
(lit(has_column(df, "B2B"))) & (col("B2B")=="C") \
, 1)) \
.otherwise(0).show()
org.apache.spark.sql.AnalysisException: cannot resolve '`GBC`' given input columns: [B2B, E1J];;
解决方案
错误是由col('GBC')
. 您可以使用以下代码对可能不存在的列进行预测。
import pyspark.sql.functions as F
def for_exist_column(df, col, pre):
if col in df.columns:
return pre(df[col])
else:
return F.lit(False)
df = spark.createDataFrame([ \
("C","I"), \
("I","I"), \
("C","B"), \
], ["B2B","E1J"])
df.show()
df.withColumn("flag",F.when(for_exist_column(df, 'B2B', lambda c: c=='C'), 1).otherwise(0)).show()
df.withColumn("flag", F.when(for_exist_column(df, 'GBC', lambda c: c=='C') | for_exist_column(df, 'B2B', lambda c: c=='C'), 1).otherwise(0)).show()
推荐阅读
- node.js - Sequelize transaction - 保存事务以在下一个请求中提交
- hadoop - 如何知道 Hadoop 中已删除文件的列表?
- python - 在 Python 中运行其他代码时如何让代码循环?
- c# - 如何从 mongo DB 中的文档中获取值
- html - 如何降低透明度
,我指定的位置是固定的? - elasticsearch - Elasticsearch search_after,不要将_id用于tie_breaker_field?
- java - Centos 7 启动 DMS Agorum Core 时出现问题
- html - 导航栏修改
- cypress - 赛普拉斯:在动态定位元素方面需要帮助
- javascript - 如果 checkValidity() 方法返回 false,则显示表单错误