首页 > 解决方案 > 只要有可用资源,就并行运行 Spark 代码,而不是按顺序运行

问题描述

我用 PySpark 创建了一个管道,它基本上循环一个查询列表,每个查询都使用 JDBC 连接器在 MySQL 数据库上运行,将结果存储在 Spark DataFrame 中,过滤其只有一个值的列,然后将其保存为Parquet 文件。

由于我使用 a 遍历查询列表for,因此每个查询和列过滤过程都是按顺序完成的,因此我没有使用所有可用的 CPU。

我想要完成的是在我有可用 CPU 时启动一个新进程(查询 + 过滤器 + Parquet 持久性)。

注意:我每次都在处理不同的输入(查询),这与这里所要求的不同,不同的处理是在同一个输入中完成的。另外,我不想指定同时运行多少个进程,相反,我想使用第一个进程中可用的所有 CPU,如果仍有可用资源,则启动一个新的。如果仍有可用资源,请启动另一个资源,依此类推...

这是我正在运行的脚本:

# Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, when, count, countDistinct
from time import time


# Spark session initialization
spark = SparkSession \
    .builder \
    .appName('Filtering Columns') \
    .config('spark.driver.memory', '16g') \
    .config('spark.executor.memory', '16g') \
    .config('spark.driver.extraClassPath',
            '/path/to/mysql-connector-java-5.1.38.jar') \
    .getOrCreate()


# JDBC config
jdbc_config = {
    'url': 'jdbc:mysql://my_db_ip_address',
    'properties': {
        'user': 'my_db_user',
        'password': 'my_db_password'
    }
}


# My queries... Didn't put the real queries here, but
# they have nothing in special
queries_to_run = [
    {
        'table_name': 'table1',
        'query': '''
             (some query) as tmp
        '''
    },
    {
        'table_name': 'table2',
        'query': '''
             (some query) as tmp
        '''
    },
    {
        'table_name': 'table3',
        'query': '''
             (some query) as tmp
        '''
    },
    ...
]


# The function I'm using to filter the columns
def drop_constant_columns(df):
    cols_to_drop_map = df.select([
        when(countDistinct(column_name) == 1, True).alias(column_name)
        for column_name in df.columns
    ]).first().asDict()

    cols_to_drop = [
        col for col, should_drop in cols_to_drop_map.iteritems()
        if should_drop
    ]

    return df.drop(*cols_to_drop)


# Here's the code that loops through the queries and, for each 
# one of them:
# 1) Query a MySQL db
# 2) Store the result in a Spark DF
# 3) Filter the constant columns
# 4) Save the filtered DF in a Parquet format
for query in queries_to_run:
    print('Querying {}'.format(query['table_name']))
    df = spark.read.jdbc(table=query['query'], **jdbc_config)

    print('Filtering {}'.format(query['table_name']))
    n_cols = len(df.columns)

    start = time()
    df = drop_constant_columns(df)
    elapsed = time() - start

    n_cols_filtered = n_cols - len(df.columns)
    print('Filtered {} of {} columns in {:.2f} secs'.format(n_cols_filtered, n_cols, elapsed))

    print('Persisting {}'.format(query['table_name']))
    df.write.mode('overwrite').parquet('./{}_test.parquet'.format(query['table_name']))

我在Ubuntu 上使用 PySpark 2.2.1, Python 。2.7.1216.04

标签: pythonpyspark

解决方案


基本上您需要为 Spark 上下文设置 FAIR 调度模式,创建多个线程并在每个线程中执行一个 spark 操作以实现接近 100% 的集群饱和度(假设您的作业是 CPU 密集型的)。

尽管您提到您不想对线程数设置限制,但我还是建议您这样做。您可以在操作系统中创建的线程数量有限,它们都从驱动程序中占用宝贵的内存和 CPU 资源。例如,您不能创建一百万个线程,并且无论如何都必须使用某种排队(例如信号量和锁的组合)。

另一方面,当所有 executor 都 100% 忙碌并且调度程序没有接受新任务并且许多 Spark 作业只是闲置,等待 executor 可用时,就会出现收益递减点。Spark 调度是在任务级别完成的,即如果一个作业的任务正在某个执行器上运行,它不会被抢占。

您可以通过实验找出足够多的同时请求,从而为所有请求提供最佳的整体处理时间。


推荐阅读