首页 > 解决方案 > Python Spark 本地并行性

问题描述

我在本地运行 Python Spark 以运行在 Spark 网站上找到的示例。我生成了一个随机数据帧,以获得更大的样本来进行性能测试。

我已经像这样设置了我的 SparkSession 和 SparkContext:

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("KMeansParallel") \
        .getOrCreate()
sc = spark.sparkContext

但该程序似乎并不像这里建议的那样在并行进程上运行。我在任务管理器上看到只使用了 10-25% 的处理器,这让我认为 Python 卡在一个内核上(通过 GIL?)。

我做错了什么?我尝试更改 SparkSession 上的一些参数:

.config("spark.executor.instances", 7) \
.config("spark.executor.cores", 3) \
.config("spark.default.parallelism", 7) \
.config("spark.driver.memory", "15g") \

我使用 16GB 内存、4 个内核、8 个逻辑处理器运行。我按照这里的建议为操作系统留下了一些资源(即使本地可能与 YARN 配置不同)。

完整代码:

from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np
import math
import time

def gaussianMixture(sc, spark, nPoints, nGaussian, gaussianVariance):
    """
    Returns a dataframe with <nPoints> points generated randomly
    around <nGaussian> centers by a normal distribution
    N(<a center chosen randomly>, <gaussianVariance>)    
    """

    #Generating centers
    meanPointsNumpy = np.random.rand(nGaussian, 2)

    def geneRandomChoice(nGaussian, nPoints):
        for i in range(nPoints):
            yield (i, np.random.choice(nGaussian, 1))

    #Generating points in a numpy ndarray
    dataNumpy = np.array([
        [t[0],
        np.random.normal(loc = meanPointsNumpy[t[1],0], scale = math.sqrt(gaussianVariance)),
        np.random.normal(loc = meanPointsNumpy[t[1],1], scale = math.sqrt(gaussianVariance))]
        for t in geneRandomChoice(nGaussian, nPoints)
    ])

    #Converting ndarray to RDD then to dataFrame
    dataRDD = sc \
        .parallelize(dataNumpy) \
        .map(lambda x: Row(label = int(x[0]), features = Vectors.dense(x[1].astype(float), x[2].astype(float))))

    data = spark.createDataFrame(dataRDD)

    return data

def kMeansParallel(sc, spark, nPoints, nGaussian, gaussianVariance):
    """
    Evaluates the clusters from the dataFrame created
    by the gaussianMixture function
    """

    dataset = gaussianMixture(sc, spark, nPoints, nGaussian, gaussianVariance)

    t1 = time.time()

    # Trains a k-means model.
    kmeans = KMeans().setK(nGaussian)#.setSeed(1)
    model = kmeans.fit(dataset)

    # Make predictions
    predictions = model.transform(dataset)

    # Evaluate clustering by computing Silhouette score
    evaluator = ClusteringEvaluator()

    silhouette = evaluator.evaluate(predictions)
    #print("Silhouette with squared euclidean distance = " + str(silhouette))

    return time.time() - t1

nPoints = 10000
nGaussian = 100
gaussianVariance = 0.1
nTests = 20

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("KMeansParallel") \
    .getOrCreate()

sc = spark.sparkContext

meanTime = 0
for i in range(nTests):
    res = kMeansParallel(sc, spark, nPoints, nGaussian, gaussianVariance)
    meanTime += res

meanTime /= nTests
print("Mean Time : " + str(meanTime))

spark.stop()

标签: pythonapache-spark

解决方案


GIL 没有问题,因为 spark 会根据需要运行多个 python 实例。分布式运行时每个执行程序一个,本地运行时每个内核一个(因为它都在驱动程序中运行)。

很可能是数据大小/分区计数太低


推荐阅读