首页 > 解决方案 > Pyspark 从外部文件导入字典

问题描述

我正在尝试在代码中使用字典来检查我的列数据类型。

conversions = {
    "COL1": lambda c: f.col(c).cast("string"),
    "COL2": lambda c: f.from_unixtime(f.unix_timestamp(c, dateFormat)).cast("date"),
    "COL3": lambda c: f.from_unixtime(f.unix_timestamp(c, dateFormat)).cast("date"),
    "COL4": lambda c: f.col(c).cast("float"),
    "COL5": lambda c: f.col(c).cast("string"),
    "COL6": lambda c: f.col(c).cast("string"),
}

validateDF = inputDF.withColumn(
    "dataTypeValidations",
    f.concat_ws(
        ",",
        *[
            f.when(
                v(k).isNull() & f.col(k).isNotNull(), f.lit(k + " not valid")
            ).otherwise(f.lit("None"))
            for k, v in conversions.items()
        ]
    ),
)

如果字典嵌入在同一个程序中,上面的代码可以正常工作。但是,如果我要从不同的文件中导入它,例如

from dataTypeDictionary import conversions

这失败并出现错误

 f.concat_ws(",",
AttributeError: '_io.TextIOWrapper' object has no attribute 'concat_ws'

我将如何处理外部化字典集?

标签: pythonapache-sparkpyspark

解决方案


错误是AttributeError: '_io.TextIOWrapper' object has no attribute 'concat_ws'
如果你看看你有这条线的地方,它是 : f.concat_ws

这仅仅意味着f定义错误。它不是应该pyspark.sql.functions的,但它是一个对象TextIOWrapper

你在某处做with open() as f吗?或者是其他东西 ?您正在覆盖f代码中的某个位置。

作为测试,您可以在定义from pyspark.sql import functions as f之前添加。validateDF

请使用 IDE。它可以帮助跟踪您重新定义对象的位置。


推荐阅读