首页 > 解决方案 > 为什么对 kafka 主题的查询进行流式连接需要这么长时间?

问题描述

我正在使用 Spark Structured Streaming 并加入来自 Kafka 主题的两个流。

工作的 DAG

我注意到每条记录的流式查询大约需要 15 秒。在下面的屏幕截图中,阶段 id 2 需要 15 秒。为什么会这样?

每个阶段花费的时间

代码如下:

  val kafkaTopic1 = "demo2"
  val kafkaTopic2 = "demo3"
  val bootstrapServer = "localhost:9092"

  val spark = SparkSession
    .builder
    .master("local")
    .getOrCreate

  import spark.implicits._

  val df1 = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServer)
    .option("subscribe", kafkaTopic1)
    .option("failOnDataLoss", false)
    .load

  val df2 = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServer)
    .option("subscribe", kafkaTopic2)
    .option("failOnDataLoss", false)
    .load

  val order_details = df1
    .withColumn(...)
    .select(...)

  val invoice_details = df2
    .withColumn(...)
    .where(...)

  order_details
    .join(invoice_details)
    .where(order_details.col("s_order_id") === invoice_details.col("order_id"))
    .select(...)
    .writeStream
    .format("console")
    .option("truncate", false)
    .start
    .awaitTermination()

代码方面一切正常。唯一的问题是加入两个流的时间。如何优化此查询?

标签: scalaapache-sparkspark-structured-streaming

解决方案


给定主 URL,即.master("local"). local[*]至少将其更改为,您应该更快地找到加入。


推荐阅读