首页 > 解决方案 > PySpark-在独立集群模式下运行时访问 udf 中的广播变量时​​出错

问题描述

@f.pandas_udf(returnType= DoubleType())
def square(r : pd.Series) -> pd.Series:
    print('In pandas Udf square')
    offset_value = offset.value
    return (r * r ) + 10

if __name__ == "__main__":
    spark = SparkSession.builder.appName("Spark").getOrCreate()

    sc = spark.sparkContext
    
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    
    offset = sc.broadcast(10)
    
    x = pd.Series(range(0,100))

    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
    df = df.withColumn('sq',square(df.x)).withColumn('sqsq', square(f.col('sq')))
    start_time = datetime.datetime.now()   

   
    df.show()

    offset.unpersist()
    offset.destroy()

    spark.stop()

如果我在本地模式下运行 pyspark submit 命令,上面的代码运行良好 Submit.cmd --master local[*] test.py

相同的代码,如果我尝试在独立集群模式下运行,即 Submit.cmd --master spark://xx.xx.0.24:7077 test.py 访问 udf 中的广播变量时​​出现错误

java.io.IOException:无法删除原始文件 'C:\Users\xxx\AppData\Local\Temp\spark-bf6b4553-f30f-4e4a-a7f7-ef117329985c\executor-3922c28f-ed1e-4348-baa4-4ed08e042b76\spark -b59e518c-a20a-4a11-b96b-b7657b1c79ea\broadcast6537791588721535439' 复制到 'C:\Users\xxx\AppData\Local\Temp\spark-bf6b4553-f30f-4e4a-a7f7-ef117329985c\executor-39285c\executor-343482 baa4-4ed08e042b76\blockmgr-ee27f0f0-ee8b-41ea-86d6-8f923845391e\37\broadcast_0_python' 在 org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2835) 在 org.apache.spark.storage.DiskStore。 moveFileToBlock(DiskStore.scala:133) at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:424) at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager. scala:343) 在 org.apache.spark.storage.BlockManager。org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)

在不访问 Udf 中的广播变量的情况下,此代码可以正常工作。

标签: apache-sparkpyspark

解决方案


推荐阅读