python - Python Spark 本地并行性
问题描述
我在本地运行 Python Spark 以运行在 Spark 网站上找到的示例。我生成了一个随机数据帧,以获得更大的样本来进行性能测试。
我已经像这样设置了我的 SparkSession 和 SparkContext:
spark = SparkSession.builder \
.master("local[*]") \
.appName("KMeansParallel") \
.getOrCreate()
sc = spark.sparkContext
但该程序似乎并不像这里建议的那样在并行进程上运行。我在任务管理器上看到只使用了 10-25% 的处理器,这让我认为 Python 卡在一个内核上(通过 GIL?)。
我做错了什么?我尝试更改 SparkSession 上的一些参数:
.config("spark.executor.instances", 7) \
.config("spark.executor.cores", 3) \
.config("spark.default.parallelism", 7) \
.config("spark.driver.memory", "15g") \
我使用 16GB 内存、4 个内核、8 个逻辑处理器运行。我按照这里的建议为操作系统留下了一些资源(即使本地可能与 YARN 配置不同)。
完整代码:
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np
import math
import time
def gaussianMixture(sc, spark, nPoints, nGaussian, gaussianVariance):
"""
Returns a dataframe with <nPoints> points generated randomly
around <nGaussian> centers by a normal distribution
N(<a center chosen randomly>, <gaussianVariance>)
"""
#Generating centers
meanPointsNumpy = np.random.rand(nGaussian, 2)
def geneRandomChoice(nGaussian, nPoints):
for i in range(nPoints):
yield (i, np.random.choice(nGaussian, 1))
#Generating points in a numpy ndarray
dataNumpy = np.array([
[t[0],
np.random.normal(loc = meanPointsNumpy[t[1],0], scale = math.sqrt(gaussianVariance)),
np.random.normal(loc = meanPointsNumpy[t[1],1], scale = math.sqrt(gaussianVariance))]
for t in geneRandomChoice(nGaussian, nPoints)
])
#Converting ndarray to RDD then to dataFrame
dataRDD = sc \
.parallelize(dataNumpy) \
.map(lambda x: Row(label = int(x[0]), features = Vectors.dense(x[1].astype(float), x[2].astype(float))))
data = spark.createDataFrame(dataRDD)
return data
def kMeansParallel(sc, spark, nPoints, nGaussian, gaussianVariance):
"""
Evaluates the clusters from the dataFrame created
by the gaussianMixture function
"""
dataset = gaussianMixture(sc, spark, nPoints, nGaussian, gaussianVariance)
t1 = time.time()
# Trains a k-means model.
kmeans = KMeans().setK(nGaussian)#.setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
#print("Silhouette with squared euclidean distance = " + str(silhouette))
return time.time() - t1
nPoints = 10000
nGaussian = 100
gaussianVariance = 0.1
nTests = 20
spark = SparkSession.builder \
.master("local[*]") \
.appName("KMeansParallel") \
.getOrCreate()
sc = spark.sparkContext
meanTime = 0
for i in range(nTests):
res = kMeansParallel(sc, spark, nPoints, nGaussian, gaussianVariance)
meanTime += res
meanTime /= nTests
print("Mean Time : " + str(meanTime))
spark.stop()
解决方案
GIL 没有问题,因为 spark 会根据需要运行多个 python 实例。分布式运行时每个执行程序一个,本地运行时每个内核一个(因为它都在驱动程序中运行)。
很可能是数据大小/分区计数太低
推荐阅读
- laravel - 如何在招摇中插入外部文件作为示例
- keras - Yolo to keras to coreml : 获得信心和坐标作为输出
- python - python中n个整数中的最大整数
- python - 将角度和速度转换为 x,y 变化的问题
- batch-file - 批量 zip 提取并使用 zip 文件名重命名
- excel - 在 VBA excel 中使用 application.worksheetfunction.Vlookup 时出现范围错误
- ssl - 证书密钥使用警告
- javascript - node.js] 如何在 server.js 中捕获 GET?
- python - 在数据框中使用 RegexpTokenizer 拆分句子
- vba - Office 365 中保存事件的 Word VBA 事件处理 - 何时触发?