apache-spark - Spark Local 获得比集群模式更好的性能
问题描述
我目前正在使用 Spark 和 DataFrames 为每一行发出一系列 HTTP GET 请求。我目前正在使用 Apache HttpClient 4.5.x 依赖项来发出 GET 请求。我注意到,当我在本地模式 ( --master local[256]
) 下运行时,我能够获得相当多的 GET/秒(大约 100/秒)。但是,(不出所料),当我尝试扩展超过 256 个本地驱动程序线程时,性能开始下降。由于来自 HttpClient 的长 IO 阻塞,我能够在我的 4 核笔记本电脑上运行 256 个本地线程。
然后,我切换到 AWS EMR 上的 5 个工作节点 Spark 集群(每个工作节点的内核数量是我的笔记本电脑的两倍),希望模拟相同的 256 个线程每个工作节点设置来尝试并获得 5 倍的吞吐量。当我将设置设置为高于 YARN 报告的可用 vCore 数spark-submit
时无法启动。--executor-cores
有没有一种方法可以轻松地在 5 个工作人员上复制本地 [256] 行为以使吞吐量提高大约 5 倍?
我尝试了一些结果低于标准的选项。
- 我尝试设置一个 Dockerized Spark Standalone 集群,并将 SPARK_WORKER_CORES 更新到一个非常高的数字,但成功有限。尽管我现在可以为每个工作线程启动 128 个线程,但性能却很糟糕。自尝试以来已经有几个小时了,所以我不记得是否发生了大量的洗牌。
- 我更新了我的代码以实现异步滑动窗口模式。这是一篇很棒的文章,看起来很有希望,但是即使设置了 5 个工作节点,我仍然无法执行尽可能多的 GET
local[256]
(尽管它很接近)。旁注:该帖子解释了我无法转换的更新的“滑动迭代器”(他后来更改了他的代码以使用谷歌提供的 Futures 库,我试图坚持使用 scala.concurrent 版本)。 - 我也尝试过使用 Apache HttpAsyncClient 4.1.x 库,但似乎没有太大区别。
这是为 5-worker-node 设置编写的当前代码库。local[256]
设置很幼稚df.map(row => getRow(row)
package com.me.bot
import java.util.concurrent.{Executors, TimeUnit}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.client.protocol.HttpClientContext
import org.apache.http.impl.client.HttpClients
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.apache.spark.sql.{Row, SparkSession}
object Bot {
val MaxConcurrency: Int = 128
def main(args: Array[String]) = {
val spark = SparkSession.builder().appName("Bot").getOrCreate()
import spark.implicits._
val df = spark.read.option("header", "true").csv("/path/to/urls.csv")
df.rdd.
map(row => ThreadedConcurrentContext.executeAsync(getRow(row))).
mapPartitions(it => ThreadedConcurrentContext.awaitSliding(it, MaxConcurrency)).
toDF().write.mode("append").parquet("/path/to/output")
spark.stop()
}
def getRow(row: Row) = {
val url = row.getAs[String]("url")
GetResult(Browser.getStatusCode(url), url)
}
}
case class GetResult(statusCode: Int, url: String)
object Browser extends Serializable {
lazy val cm = {
val manager = new PoolingHttpClientConnectionManager()
manager.setMaxTotal(Bot.MaxConcurrency)
manager.setDefaultMaxPerRoute(Bot.MaxConcurrency)
manager
}
lazy val httpClient = HttpClients.custom().setConnectionManager(cm).setConnectionTimeToLive(5, TimeUnit.MINUTES).build()
val context = new ThreadLocal[HttpClientContext] {
override def initialValue = HttpClientContext.create()
}
def getStatusCode(url: String) = {
var response: CloseableHttpResponse = null
try {
val httpget = new HttpGet(url)
response = httpClient.execute(httpget, context.get())
response.getStatusLine.getStatusCode
}
catch {
case ex: Exception => -1
case _: Throwable => -1
}
// soooo not Scala... sorry!
finally {
if (response != null) {
response.close()
}
}
}
}
// From http://www.russellspitzer.com/2017/02/27/Concurrency-In-Spark/
/** A singleton object that controls the parallelism on a Single Executor JVM */
object ThreadedConcurrentContext {
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.concurrent.duration.Duration._
implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(Bot.MaxConcurrency))
/** Wraps a code block in a Future and returns the future */
def executeAsync[T](f: => T): Future[T] = Future(f)
/** Awaits only a set of elements at a time. Instead of waiting for the entire batch
* to finish waits only for the head element before requesting the next future*/
def awaitSliding[T](it: Iterator[Future[T]], batchSize: Int, timeout: Duration = Inf): Iterator[T] = {
val slidingIterator = it.sliding(batchSize - 1) //Our look ahead (hasNext) will auto start the nth future in the batch
val (initIterator, tailIterator) = slidingIterator.span(_ => slidingIterator.hasNext)
initIterator.map( futureBatch => Await.result(futureBatch.head, timeout)) ++
tailIterator.flatMap( lastBatch => Await.result(Future.sequence(lastBatch), timeout))
}
def awaitAll[T](it: Iterator[Future[T]], timeout: Duration = Inf) = {
Await.result(Future.sequence(it), timeout)
}
}
我想使用 5 个工作节点获得 5 倍吞吐量。我没有计划扩大规模,因为我的网络服务器可能无法处理更多。
解决方案
推荐阅读
- javascript - 将字符串数组作为 URLSearchParams 发送
- dynamics-crm - Dynamics 365 V9.0:无法解码错误日志
- php - 在 DOM 中获取以下节点
- javascript - 双击多选选项编辑
- sql - 将数据从 Dev 复制到 QA 的存储过程
- angular - 我无法在 Angular 中动态更改语言
- sql - 插入后从另一个表中查找最近记录的数据库触发器
- docker-compose - docker-compose deploy 上的 Azure App Service 网络名称不明确错误
- swiftui - 多级导航 SwiftUI
- r - 如何合并 2 列并将它们分开?