scala - 如何使用套接字数据源优化流式聚合?
问题描述
我在 4 个 CPU 内核和 8 个线程上使用 Spark 2.4.0 和 Scala 2.11。
我写了以下应用程序:
package demos.spark
object WordCounter {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.master("local[4]")
.getOrCreate
import spark.implicits._
spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load
.as[String]
.flatMap(_.split("\\W+"))
.groupBy("value")
.count
.writeStream
.outputMode("complete")
.format("console")
.start
.awaitTermination
}
}
应用程序的处理时间local[1]
约为 60 秒。因为local[8]
它下降到约 15 秒,这是我得到的最小值。
我总是通过套接字发送一两个句子作为输入。
这是预期的行为吗?如何优化应用程序以拥有 1 秒的处理时间?
编辑:在这个问题上花了很长时间后,我终于找到了解决方案。问题在于 Spark 默认使用的分区太多(几百个)。添加 spark.sql.shuffle.partitions 选项设置为 8(我机器上的核心数)后,数据处理的持续时间已下降到 300-400 ms
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.shuffle.partitions", 8)
.getOrCreate
我还不知道,这个数字是否应该保持不变,如果 Spark 应用程序将在可能发生变化的基础设施(Spark、Kubernetes、AWS、自动缩放)上运行会怎样?
解决方案
4个CPU核心和8个线程。
使用local[*]
和 Spark 将使用与内核一样多的处理线程,即 4。如果这 8 个线程是虚拟内核,Spark 将看到 8 个“CPU 内核”,因此 8 是用于处理的最大线程数。
这正是您的测试所证明的,即
因为
local[8]
它下降到约 15 秒,这是我得到的最小值。这是预期的行为吗?
是的,除非您更改处理逻辑,即结构化查询本身,否则几乎不可能赶上时间。这就是我通常说要考虑算法的地方(根据要处理的数据可能会有所不同)。您受到可用 CPU 内核数量的限制。
如何优化应用程序以拥有 1 秒的处理时间?
更改您的结构化查询(“算法”)或它在幕后工作的方式。
以下操作为处理逻辑:
.flatMap(_.split("\\W+"))
.groupBy("value")
.count
flatMap
价格便宜,并且可以与 CPU 内核一样快。你对此无能为力。
您还使用流式聚合groupBy
,然后count
更改执行所需的任务数(在您的情况下,它将从 8 变为默认的随机分区数,即 200)。
您可以计算在 8 个内核上运行 200 个任务所需的 CPU 滴答数,您将需要那么多时间来计算结果。
问题在于 Spark 默认使用的分区太多(几百个)。添加 spark.sql.shuffle.partitions 选项设置为 8(我机器上的核心数)后,数据处理的持续时间已下降到 300-400 ms
当然,这在这种特殊情况下有所帮助,如果这是您可能拥有的唯一硬件,那也没关系。你完成了。
那么其他内核数量可能会更高的环境呢?
如果这个数字应该保持不变,如果 Spark 应用程序将在可能发生变化的基础设施(Spark、Kubernetes、AWS、自动缩放)上运行怎么办?
这是最难回答的问题。欢迎来到非常动态/高度可配置的 Apache Spark 世界。影响最终结果的因素太多了,通常你所拥有的就是你最终应该得到的,或者你开始调整许多配置选项,你将不得不花费数小时或数周来确定最佳配置应该是什么。想想您的流式查询将处理的不同数据(数据形状、数量和速度)。它增加了混乱。
戴上咨询的帽子,在某些时候,您将不得不决定应用程序性能是否足够好,或者您将花费数周时间希望您可以做得比您已经取得的更好(并且有人必须为此付费)。
这个数字是否应该是常数
如果您知道将要处理的所有数据,那么您就可以做出如此艰难的假设。
推荐阅读
- java - 求Java内置Object类的解释
- webassembly - 如何在 Wasm 模块中使用 Javascript 对象属性
- c# - 使用正则表达式从站点抓取链接时出错
- asp.net - 使用 ON DELETE NO ACTION 引入 FOREIGN KEY 约束不起作用
- python - 在 Python 中组合多个正则表达式
- angular - Angular:输入时提交的表单不适用于 updateOn:'submit'
- javascript - 类型“any[]”上不存在属性“push”
- java - 获取类的静态变量值
- regex - 以下代码在 AIX 服务器中显示错误,但在 Red Hat 服务器中工作正常
- c++ - 有没有办法告诉 g++ 编译器,不要在某个 -I 路径中查找包含头文件?