apache-spark - 每个 Micro Batch 中的最大偏移量
问题描述
我在默认触发器中执行流式传输。我的目标是限制每次执行中的读取量,以避免大量的微批处理。有时我的 Spark Jobs 整个周末都会停止,所以当我重新启动它们时,它们需要很长时间才能完成第一个。我还保留了 Dataframes,因为这是写在 2 个数据库中的。测试了两种方法。
官方文档说maxOffsetsPerTrigger限制每个触发间隔处理的偏移量,但这对我不起作用。我误解了这个参数的含义吗?
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
另外,我读了这个答案,但我不知道在哪里以及如何正确设置max.poll.records。我尝试了 readStream 的选项,但没有成功。下面的代码:
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("max.poll.records", "1")
.load()
主功能:
override def execute(spark: SparkSession, args: Array[String]): Unit = {
val basePath: String = args(0)
val kafkaServers: String = args(1)
val kafkaTopic: String = args(2)
val checkpoint: String = args(3)
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
val transformed = read
.transform(applySchema)
.transform(conversions)
.transform(dropDuplicates)
.transform(partitioning)
val sink = new FileSystemSink(basePath)
val query = transformed
.writeStream
.outputMode(OutputMode.Append)
.foreachBatch(sink.writeOnS3 _)
.option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
.start()
query.awaitTermination()
}
除了上面的问题,限制偏移量的写入方式是什么?
火花版本:2.4.5。
解决方案
我再次测试并且maxOffsetsPerTrigger工作得很好。我误解了触发器的结果,现在它是有道理的。参数表示读取的总偏移量,而不是每个分区的偏移量。
推荐阅读
- r - 在 xtable 中设置不同的数字
- amazon-web-services - 监控 ec2 实例的 Linux 日志
- sql - 如何在单行中获取多个列值?
- html - bootstrap 4 导航栏折叠不起作用(使用 JQuery - popper - bootstrap
- flutter - 在flutter_bloc中为(注销)功能添加单独的块
- python - 使用 BeautifulSoup 获取图像 url,其中 src= data:image/gif;base64,
- python - 我想将自定义 Django 视图转换为 Django-Rest_framework Endpoint
- flutter - How to set custom tickMarkShape in flutter slider widget
- javascript - Angular 部署到 azure 失败
- javascript - 如何在服务器上创建日期(有 utc 时区)并在客户端正确显示?