首页 > 解决方案 > 从配置动态构建数据框名称 - PySpark

问题描述

我需要根据配置动态构建最终数据帧名称(加入 final_df 和后缀)。当我运行最后提到的代码时,我收到错误 - “SyntaxError:无法分配给运算符”。但是,如果我将 each["final_df"]+'_'+ each["suffix"] 替换为任何其他名称,它就可以工作。

数据 :

df_source_1 = spark.createDataFrame(
        [
          (123,10),
          (123,15),
          (123,20)
        ],
        ("cust_id", "value")
    )

配置:

config = """
                [ 
                  {
                      "source_df":"df_source_1",
                      "suffix": "new", 
                      "group":["cust_id"],
                      "final_df": "df_taregt_1"
                  }
                ]
                """   

代码:

import json   
for each in json.loads(config):
    print("Before=",each['final_df'] ) # str object
    print(each["final_df"]+'_'+ each["suffix"]) # df_taregt_1_new , print statement works
    each["final_df"]+'_'+ each["suffix"] = eval(each["source_df"]).groupBy(each["group"]).agg(sum("value")) # Errors out. Here I need to assign the dataframe to df_taregt_1_new

任何人都可以帮忙。

标签: pythonpython-3.xpyspark

解决方案


你用 dict 编码:

df_dict = {}
df_dict["df_source_1"] = spark.createDataFrame(
    [(123, 10), (123, 15), (123, 20)], ("cust_id", "value")
)

for each in json.loads(config):
    df_dict[each["final_df"] + "_" + each["suffix"]] = (
        df_dict[each["source_df"]].groupBy(each["group"]).agg(sum("value"))
    )

您不必使用应该是动态创建的对象,而是使用一个字典来存储所有这些对象及其动态名称。您甚至可以测试您的 dict 以了解对象是否存在。


推荐阅读