首页 > 解决方案 > 每个 Micro Batch 中的最大偏移量

问题描述

我在默认触发器中执行流式传输。我的目标是限制每次执行中的读取量,以避免大量的微批处理。有时我的 Spark Jobs 整个周末都会停止,所以当我重新启动它们时,它们需要很长时间才能完成第一个。我还保留了 Dataframes,因为这是写在 2 个数据库中的。测试了两种方法。

官方文档说maxOffsetsPerTrigger限制每个触发间隔处理的偏移量,但这对我不起作用。我误解了这个参数的含义吗?

  val read = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", "1")
  .load()

另外,我读了这个答案,但我不知道在哪里以及如何正确设置max.poll.records。我尝试了 readStream 的选项,但没有成功。下面的代码:

  val read = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("max.poll.records", "1")
  .load()

主功能:

override def execute(spark: SparkSession, args: Array[String]): Unit = {
    val basePath: String = args(0)
    val kafkaServers: String = args(1)
    val kafkaTopic: String = args(2)
    val checkpoint: String = args(3)

    val read = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServers)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("maxOffsetsPerTrigger", "1")
      .load()

    val transformed = read
      .transform(applySchema)
      .transform(conversions)
      .transform(dropDuplicates)
      .transform(partitioning)

    val sink = new FileSystemSink(basePath)

    val query = transformed
      .writeStream
      .outputMode(OutputMode.Append)
      .foreachBatch(sink.writeOnS3 _)
      .option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
      .start()

    query.awaitTermination()
  }

除了上面的问题,限制偏移量的写入方式是什么?

火花版本:2.4.5。

标签: apache-sparkapache-kafkaspark-structured-streaming

解决方案


我再次测试并且maxOffsetsPerTrigger工作得很好。我误解了触发器的结果,现在它是有道理的。参数表示读取的总偏移量,而不是每个分区的偏移量。


推荐阅读