首页 > 解决方案 > 如何使用套接字数据源优化流式聚合?

问题描述

我在 4 个 CPU 内核和 8 个线程上使用 Spark 2.4.0 和 Scala 2.11。

我写了以下应用程序:

package demos.spark

object WordCounter {

  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder
      .master("local[4]")
      .getOrCreate
    import spark.implicits._
    spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load
      .as[String]
      .flatMap(_.split("\\W+"))
      .groupBy("value")
      .count
      .writeStream
      .outputMode("complete")
      .format("console")
      .start
      .awaitTermination
  }
}

应用程序的处理时间local[1]约为 60 秒。因为local[8]它下降到约 15 秒,这是我得到的最小值。

我总是通过套接字发送一两个句子作为输入。

这是预期的行为吗?如何优化应用程序以拥有 1 秒的处理时间?

编辑:在这个问题上花了很长时间后,我终于找到了解决方案。问题在于 Spark 默认使用的分区太多(几百个)。添加 spark.sql.shuffle.partitions 选项设置为 8(我机器上的核心数)后,数据处理的持续时间已下降到 300-400 ms

val spark = SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.shuffle.partitions", 8)
  .getOrCreate

我还不知道,这个数字是否应该保持不变,如果 Spark 应用程序将在可能发生变化的基础设施(Spark、Kubernetes、AWS、自动缩放)上运行会怎样?

标签: scalaapache-sparkapache-spark-sqlspark-structured-streaming

解决方案


4个CPU核心和8个线程。

使用local[*]和 Spark 将使用与内核一样多的处理线程,即 4。如果这 8 个线程是虚拟内核,Spark 将看到 8 个“CPU 内核”,因此 8 是用于处理的最大线程数。

这正是您的测试所证明的,即

因为local[8]它下降到约 15 秒,这是我得到的最小值。

这是预期的行为吗?

是的,除非您更改处理逻辑,即结构化查询本身,否则几乎不可能赶上时间。这就是我通常说要考虑算法的地方(根据要处理的数据可能会有所不同)。您受到可用 CPU 内核数量的限制。

如何优化应用程序以拥有 1 秒的处理时间?

更改您的结构化查询(“算法”)或它在幕后工作的方式。

以下操作为处理逻辑:

.flatMap(_.split("\\W+"))
.groupBy("value")
.count

flatMap价格便宜,并且可以与 CPU 内核一样快。你对此无能为力。

您还使用流式聚合groupBy,然后count更改执行所需的任务数(在您的情况下,它将从 8 变为默认的随机分区数,即 200)。

您可以计算在 8 个内核上运行 200 个任务所需的 CPU 滴答数,您将需要那么多时间来计算结果。

问题在于 Spark 默认使用的分区太多(几百个)。添加 spark.sql.shuffle.partitions 选项设置为 8(我机器上的核心数)后,数据处理的持续时间已下降到 300-400 ms

当然,这在这种特殊情况下有所帮助,如果这是您可能拥有的唯一硬件,那也没关系。你完成了。

那么其他内核数量可能会更高的环境呢?

如果这个数字应该保持不变,如果 Spark 应用程序将在可能发生变化的基础设施(Spark、Kubernetes、AWS、自动缩放)上运行怎么办?

这是最难回答的问题。欢迎来到非常动态/高度可配置的 Apache Spark 世界。影响最终结果的因素太多了,通常你所拥有的就是你最终应该得到的,或者你开始调整许多配置选项,你将不得不花费数小时或数周来确定最佳配置应该是什么。想想您的流式查询将处理的不同数据(数据形状、数量和速度)。它增加了混乱。

戴上咨询的帽子,在某些时候,您将不得不决定应用程序性能是否足够好,或者您将花费数周时间希望您可以做得比您已经取得的更好(并且有人必须为此付费)。

这个数字是否应该是常数

如果您知道将要处理的所有数据,那么您就可以做出如此艰难的假设。

它通常不应该,这就是为什么 Spark 为您提供自适应查询执行视频)。


推荐阅读