首页 > 解决方案 > 仅当列存在时,带有列表达式的 pyspark

问题描述

%python

def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False
      
df = spark.createDataFrame([ \
    ("C","I"), \
    ("I","I"), \
    ("C","B"), \
], ["B2B","E1J"])

df.show()
    
+---+---+
|B2B|E1J|
+---+---+
|  C|  I|
|  I|  I|
|  C|  B|
+---+---+

现在这就是我想要做的:检查一个列是否存在并且只有它存在,然后检查它的值并基于它为标志列分配一个值。只要在有效列上完成检查,这工作正常, 如下

df.withColumn("flag",when( ((lit(has_column(df, "B2B"))) & (col("B2B")=="C") ) , 1).otherwise(0)).show()


+---+---+----+
|B2B|E1J|flag|
+---+---+----+
|  C|  I|   1|
|  I|  I|   0|
|  C|  B|   1|
+---+---+----+

我遇到的问题是这些检查条件不是静态的,而是它们是从外部文件中读取并动态生成的,它可能具有实际数据框没有的列并导致错误如下。

有什么想法可以解决这个问题吗?

例如:

df.withColumn("flag", \
              when( \
                  (lit(has_column(df, "GBC"))) & (col("GBC")=="C") |   \
                  (lit(has_column(df, "B2B"))) & (col("B2B")=="C")     \
                , 1))   \
              .otherwise(0).show()


org.apache.spark.sql.AnalysisException: cannot resolve '`GBC`' given input columns: [B2B, E1J];;

标签: pythonapache-sparkpyspark

解决方案


错误是由col('GBC'). 您可以使用以下代码对可能不存在的列进行预测。

import pyspark.sql.functions as F

def for_exist_column(df, col, pre):
    if col in df.columns:
        return pre(df[col])
    else:
        return F.lit(False)


df = spark.createDataFrame([ \
    ("C","I"), \
    ("I","I"), \
    ("C","B"), \
], ["B2B","E1J"])

df.show()

df.withColumn("flag",F.when(for_exist_column(df, 'B2B', lambda c: c=='C'), 1).otherwise(0)).show()

df.withColumn("flag", F.when(for_exist_column(df, 'GBC', lambda c: c=='C') | for_exist_column(df, 'B2B', lambda c: c=='C'), 1).otherwise(0)).show()

推荐阅读