python - .foreachPartition 命令在 Python 包内运行时挂起,但从包外的 Python 文件运行时有效
问题描述
我有一个非常简单的 pyspark 应用程序,我在 EMR Pyspark 3.0.0 上运行。参见下面的项目结构,process.py 是控制应用程序的流程并调用 file_processor 包内的代码。
process.py
file_processor
config
spark.py
repository
s3_repo.py
structure
table_creator.py
当我在一个小数据帧上运行 .foreachPartition 时,我看到了奇怪的行为。
当 process.py 调用位于 s3_repo.py 中的 .foreachPartition 代码时,该命令会挂起
当相同的 .foreachPartition 代码从 s3_repo.py 移动并放置在 process.py 中时,它运行得很好。
.foreachPartition 代码的位置有何不同?这是 Pyspark 错误还是我在这里遗漏了一些简单的东西?
process.py 代码
from file_processor.structure import table_creator
from file_processor.repository import s3_repo
def process():
table_creator.create_table()
s3_repo.save_to_s3()
if __name__ == '__main__':
process()
spark.py 代码
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("Test").getOrCreate()
s3_repo.py 代码
from file_processor.config.spark import spark_session
def save_to_s3():
spark_session.sql('SELECT * FROM rawFileData').toJSON().foreachPartition(_save_to_s3)
def _save_to_s3(iterator):
for record in iterator:
print(record)
table_creator.py 代码
from file_processor.config.spark import spark_session
from pyspark.sql import Row
def create_table():
file_contents = [
{'line_num': 1, 'contents': 'line 1'},
{'line_num': 2, 'contents': 'line 2'},
{'line_num': 3, 'contents': 'line 3'}
]
spark_session.createDataFrame(Row(**row) for row in file_contents).cache().createOrReplaceTempView("rawFileData")
解决方案
推荐阅读
- python - 在 pandas 中定义一个函数以更快地执行
- flutter - Flutter 中的“[xcb] Unknown sequence number while processing queue”错误
- javascript - TypeError:无法读取未定义 Sequelize 模型的属性“长度”
- mongodb - Mongoose:使用一个用户的管理员权限在不同的 Mongo 数据库上进行身份验证
- c - 在 C 中具有多个进程的 TCP Echo 服务器
- python - Python 中可变嵌套字典的类型提示
- macos - 在 macOS 上构建支持 X11 的 Firefox
- c# - 如何使用 C# 函数返回列表数组?
- wordpress - 命名空间时扩展 WC_Product 变量
- flutter - 文件选择器和文件选择器交叉构建失败,即使尝试了所有命令也没有接受许可证