首页 > 解决方案 > .foreachPartition 命令在 Python 包内运行时挂起,但从包外的 Python 文件运行时有效

问题描述

我有一个非常简单的 pyspark 应用程序,我在 EMR Pyspark 3.0.0 上运行。参见下面的项目结构,process.py 是控制应用程序的流程并调用 file_processor 包内的代码。

process.py
file_processor
  config        
    spark.py
  repository        
    s3_repo.py
  structure        
    table_creator.py 

当我在一个小数据帧上运行 .foreachPartition 时,我看到了奇怪的行为。

当 process.py 调用位于 s3_repo.py 中的 .foreachPartition 代码时,该命令会挂起

当相同的 .foreachPartition 代码从 s3_repo.py 移动并放置在 process.py 中时,它运行得很好。

.foreachPartition 代码的位置有何不同?这是 Pyspark 错误还是我在这里遗漏了一些简单的东西?

process.py 代码

from file_processor.structure import table_creator
from file_processor.repository import s3_repo


def process():
    table_creator.create_table()
    s3_repo.save_to_s3()


if __name__ == '__main__':
    process()

spark.py 代码

from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("Test").getOrCreate()

s3_repo.py 代码

from file_processor.config.spark import spark_session

def save_to_s3():
    spark_session.sql('SELECT * FROM rawFileData').toJSON().foreachPartition(_save_to_s3)

def _save_to_s3(iterator):   
    for record in iterator:
        print(record)

table_creator.py 代码

from file_processor.config.spark import spark_session
from pyspark.sql import Row


def create_table():
    file_contents = [
        {'line_num': 1, 'contents': 'line 1'},
        {'line_num': 2, 'contents': 'line 2'},
        {'line_num': 3, 'contents': 'line 3'}        
    ]
    spark_session.createDataFrame(Row(**row) for row in file_contents).cache().createOrReplaceTempView("rawFileData")

标签: pythonpysparkpackageamazon-emr

解决方案


推荐阅读