python - 只要有可用资源,就并行运行 Spark 代码,而不是按顺序运行
问题描述
我用 PySpark 创建了一个管道,它基本上循环一个查询列表,每个查询都使用 JDBC 连接器在 MySQL 数据库上运行,将结果存储在 Spark DataFrame 中,过滤其只有一个值的列,然后将其保存为Parquet 文件。
由于我使用 a 遍历查询列表for
,因此每个查询和列过滤过程都是按顺序完成的,因此我没有使用所有可用的 CPU。
我想要完成的是在我有可用 CPU 时启动一个新进程(查询 + 过滤器 + Parquet 持久性)。
注意:我每次都在处理不同的输入(查询),这与这里所要求的不同,不同的处理是在同一个输入中完成的。另外,我不想指定同时运行多少个进程,相反,我想使用第一个进程中可用的所有 CPU,如果仍有可用资源,则启动一个新的。如果仍有可用资源,请启动另一个资源,依此类推...
这是我正在运行的脚本:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, when, count, countDistinct
from time import time
# Spark session initialization
spark = SparkSession \
.builder \
.appName('Filtering Columns') \
.config('spark.driver.memory', '16g') \
.config('spark.executor.memory', '16g') \
.config('spark.driver.extraClassPath',
'/path/to/mysql-connector-java-5.1.38.jar') \
.getOrCreate()
# JDBC config
jdbc_config = {
'url': 'jdbc:mysql://my_db_ip_address',
'properties': {
'user': 'my_db_user',
'password': 'my_db_password'
}
}
# My queries... Didn't put the real queries here, but
# they have nothing in special
queries_to_run = [
{
'table_name': 'table1',
'query': '''
(some query) as tmp
'''
},
{
'table_name': 'table2',
'query': '''
(some query) as tmp
'''
},
{
'table_name': 'table3',
'query': '''
(some query) as tmp
'''
},
...
]
# The function I'm using to filter the columns
def drop_constant_columns(df):
cols_to_drop_map = df.select([
when(countDistinct(column_name) == 1, True).alias(column_name)
for column_name in df.columns
]).first().asDict()
cols_to_drop = [
col for col, should_drop in cols_to_drop_map.iteritems()
if should_drop
]
return df.drop(*cols_to_drop)
# Here's the code that loops through the queries and, for each
# one of them:
# 1) Query a MySQL db
# 2) Store the result in a Spark DF
# 3) Filter the constant columns
# 4) Save the filtered DF in a Parquet format
for query in queries_to_run:
print('Querying {}'.format(query['table_name']))
df = spark.read.jdbc(table=query['query'], **jdbc_config)
print('Filtering {}'.format(query['table_name']))
n_cols = len(df.columns)
start = time()
df = drop_constant_columns(df)
elapsed = time() - start
n_cols_filtered = n_cols - len(df.columns)
print('Filtered {} of {} columns in {:.2f} secs'.format(n_cols_filtered, n_cols, elapsed))
print('Persisting {}'.format(query['table_name']))
df.write.mode('overwrite').parquet('./{}_test.parquet'.format(query['table_name']))
我在Ubuntu 上使用 PySpark 2.2.1
, Python 。2.7.12
16.04
解决方案
基本上您需要为 Spark 上下文设置 FAIR 调度模式,创建多个线程并在每个线程中执行一个 spark 操作以实现接近 100% 的集群饱和度(假设您的作业是 CPU 密集型的)。
尽管您提到您不想对线程数设置限制,但我还是建议您这样做。您可以在操作系统中创建的线程数量有限,它们都从驱动程序中占用宝贵的内存和 CPU 资源。例如,您不能创建一百万个线程,并且无论如何都必须使用某种排队(例如信号量和锁的组合)。
另一方面,当所有 executor 都 100% 忙碌并且调度程序没有接受新任务并且许多 Spark 作业只是闲置,等待 executor 可用时,就会出现收益递减点。Spark 调度是在任务级别完成的,即如果一个作业的任务正在某个执行器上运行,它不会被抢占。
您可以通过实验找出足够多的同时请求,从而为所有请求提供最佳的整体处理时间。
推荐阅读
- ios - UITextView:点击应该指向相同应用程序的通用深层链接,在 Safari 中打开后备 URL
- java - Maven中的重复直接依赖项
- javascript - win.toggleTabBar() 在电子中不起作用
- java - 在 Java Maven 项目中配置 Delta 构建
- bus - 在 LIN Slave 传输的帧通过 CAPL 到达 Master 之前应用审查
- postgresql - PostgreSQL:删除重复列
- excel - 如何将添加到表格中的 Excel 自动格式化日期更改为 MM-DD-YYY(自定义格式)?
- c# - C# datagrid 特定单元格可编辑
- java - 如何从 java.sql.Blob 类型的 zip 文件中读取和提取 zip 条目,而不将 FileInputStream 或文件路径作为字符串 java
- c# - C# GMap:如何在地图上绘制 120 度的光束角