apache-spark - PySpark + AWS EMR:df.count() 需要很长时间才能完成
问题描述
我正在使用该操作count()
来触发我的 udf 函数运行。这可行,但是在我的 udf 函数完成运行很久之后, df.count() 需要几天才能完成。数据框本身并不大,大约有 30k 到 100k 行。
AWS 集群设置:
- 1 m5.4xlarge 用于主节点
- 2 m5.4xlarge 用于工作节点。
Spark 变量和设置(这些是用于运行脚本的 spark 变量)
--executor-cores 4
--conf spark.sql.execution.arrow.enabled=true
'spark.sql.inMemoryColumnarStorage.batchSize', 2000000(在 pyspark 脚本中设置)
伪代码
这是我们脚本的实际结构。自定义 pandas udf 函数为每一行调用 PostGres 数据库。
from pyspark.sql.functions import pandas_udf, PandasUDFType
# udf_schema: A function that returns the schema for the dataframe
def main():
# Define pandas udf for calculation
# To perform this calculation, every row in the
# dataframe needs information pulled from our PostGres DB
# which does take some time, ~2-3 hours
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def calculate_values(local_df):
local_df = run_calculation(local_df)
return local_df
# custom function that pulls data from our database and
# creates the dataframe
df = get_df()
df = df\
.groupBy('some_unique_id')\
.apply(calculate_values)
print(f'==> finished running calculation for {df.count()} rows!')
return
解决方案
推荐阅读
- playwright - 剧作家组件测试
- javascript - 在 typescript 中导入 JSON 文件
- scala - Spark-Scala:如何比较不同长度字符串中的日期并返回最小值?
- sql-server - 使用 Microsoft SQL Server 的 SELECT 语句中的组计数子查询
- reactjs - 后退按钮未加载上一个组件
- ubuntu-18.04 - Gazebo 无法启动:Ubuntu 18.04 ROS 旋律
- amazon-cloudfront - 'script-src' 没有明确设置,但它是?
- typescript - 使用graphql代码生成器时如何将自定义标量类型映射到打字稿类型?
- python - 没有console.py的cmd操作
- regex - 从hackerrank bash实践中获得的结果与我的实际bash不同