首页 > 解决方案 > org.apache.spark.sql.AnalysisException: 无法解析 UDF(df["columnName"])

问题描述

我需要根据几个条件动态生成 Select 表达式(使用合并函数和 UDF-checkNullUdf)。我为此编写了如下代码:

column_expr=''
for row in PRIORITIZATION.rdd.collect():
    x=row.__fields__
    colName=row.Element
    i=0
    lst=['']
    for col in row:
        if (col is not None and i != 0 and i != 1) :
            dfColName="checkNullUdf("+x[i]+"[\""+colName+"\"])"
            lst.insert(int(col)-1,dfColName)
        i+=1
    lst.remove('')
    print(lst)
    lst=",".join(lst)
    column_expr=column_expr+"coalesce("+lst+"),"
column_expr=column_expr[:-1]
print("Final String is: " +column_expr) 

如预期的那样给出以下输出

Final String is: coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))

但是在将这个字符串传递给下面的选择表达式时,我收到了一个错误:

RESULT_REC = GOLDEN_RECORD.join(BITPULSE, GOLDEN_RECORD.BitPulse_rec_Id==BITPULSE.UUID, "left_outer").join(SAP, GOLDEN_RECORD.SAP_rec_Id==SAP.UUID, "left_outer").join(WEBBIT, GOLDEN_RECORD.MDM_Well_Id==WEBBIT.UUID, "left_outer").join(WELLDB, GOLDEN_RECORD.WellDB_rec_Id==WELLDB.UUID, "left_outer").select(column_expr)

错误如下:

Py4JJavaError                             Traceback (most recent call last)
C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o75.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))`' given input columns: [State, Country, UUID, County, City, WellDB_rec_Id, Spud_Date, UUID, Well_Name, Water_Depth, State, Spud_Date, UUID, SAP_rec_Id, Country, Plug_Date, County, State, UUID, Spud_Date, Field_Name, BitPulse_rec_Id, Operator, Well_Name, Water_Depth, Country, MDM_Well_Id, UUID, Water_Depth, Field_Name, Well_Name, Long];;
'Project ['coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))]
+- Join LeftOuter, (WellDB_rec_Id#14 = UUID#124)
   :- Join LeftOuter, (MDM_Well_Id#11 = UUID#108)
   :  :- Join LeftOuter, (SAP_rec_Id#12 = UUID#88)
   :  :  :- Join LeftOuter, (BitPulse_rec_Id#13 = UUID#40)
   :  :  :  :- Relation[UUID#10,MDM_Well_Id#11,SAP_rec_Id#12,BitPulse_rec_Id#13,WellDB_rec_Id#14,Well_Name#15,Country#16,Operator#17,Field_Name#18,Long#19] csv
   :  :  :  +- Relation[UUID#40,Spud_Date#41,Plug_Date#42,Water_Depth#43,Field_Name#44,State#45,County#46] csv
   :  :  +- Relation[UUID#88,Well_Name#89,Country#90,City#91,State#92] csv
   :  +- Relation[UUID#108,Spud_Date#109,Water_Depth#110] csv
   +- Relation[UUID#124,Well_Name#125,State#126,County#127,Country#128,Spud_Date#129,Water_Depth#130] csv
--------------------------------------------------------------------
AnalysisException: 'cannot resolve \'`coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))`\' given input columns: [State, Country, UUID, County, City, WellDB_rec_Id, Spud_Date, UUID, Well_Name, Water_Depth, State, Spud_Date, UUID, SAP_rec_Id, Country, Plug_Date, County, State, UUID, Spud_Date, Field_Name, BitPulse_rec_Id, Operator, Well_Name, Water_Depth, Country, MDM_Well_Id, UUID, Water_Depth, Field_Name, Well_Name, Long];;\n\'Project [\'coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))]\n+- Join LeftOuter, (WellDB_rec_Id#14 = UUID#124)\n   :- Join LeftOuter, (MDM_Well_Id#11 = UUID#108)\n   :  :- Join LeftOuter, (SAP_rec_Id#12 = UUID#88)\n   :  :  :- Join LeftOuter, (BitPulse_rec_Id#13 = UUID#40)\n   :  :  :  :- Relation[UUID#10,MDM_Well_Id#11,SAP_rec_Id#12,BitPulse_rec_Id#13,WellDB_rec_Id#14,Well_Name#15,Country#16,Operator#17,Field_Name#18,Long#19] csv\n   :  :  :  +- Relation[UUID#40,Spud_Date#41,Plug_Date#42,Water_Depth#43,Field_Name#44,State#45,County#46] csv\n   :  :  +- Relation[UUID#88,Well_Name#89,Country#90,City#91,State#92] csv\n   :  +- Relation[UUID#108,Spud_Date#109,Water_Depth#110] csv\n   +- Relation[UUID#124,Well_Name#125,State#126,County#127,Country#128,Spud_Date#129,Water_Depth#130] csv\n'
20/04/07 00:53:33 INFO SparkContext: Invoking stop() from shutdown hook

但是如果我在选择表达式中复制粘贴最终字符串输出,它工作正常并且我得到了预期的输出。

不确定为什么在传递 column_expr 时出现错误

标签: pythondataframeapache-sparkpyspark

解决方案


您可以使用运行 String SQL 表达式spar.sql(" SQL expression as a String ")

val employees = spark.createDataFrame(Seq(("E1",100.0,"a,b"), ("E2",200.0,"e,f"),(null,300.0,"c,d"))).toDF("employee","salary","clubs")

employees.createTempView("employees")
spark.sql("select coalesce(employee,salary) as emp_or_sal from employees").show()

结果 -

+----------+
|emp_or_sal|
+----------+
|        E1|
|        E2|
|     300.0|
+----------+

推荐阅读