首页 > 解决方案 > 如何在从 Kafka 读取消息时为 Spark Structured Streaming 设置最佳配置值 - 触发时间、maxOffsetsPerTrigger?

问题描述

我有一个结构化流应用程序从 Kafka 读取消息。每天的消息总数约为 180 亿条,每分钟的峰值消息数 = 12,500,000。最大消息大小为 2 KB。

如何确保我的结构化流式处理应用程序能够处理如此多的数据量和速度?基本上,我只想知道如何设置最佳触发时间、maxOffsetsPerTrigger 或任何其他使工作顺利进行并能够处理故障和重新启动的配置。

标签: apache-sparkapache-kafkaspark-streamingspark-structured-streaming

解决方案


您可以以固定间隔微批处理或连续运行 spark 结构化流应用程序。以下是一些可用于调整流应用程序的选项。

卡夫卡配置:

Kafka中的分区数:

您可以增加 Kafka 中的分区数量。因此,更多的消费者可以同时读取数据。根据输入速率和引导服务器的数量将此设置为适当的数字。

火花流配置:

驱动程序和执行程序内存配置:

计算每批数据的大小(#records * 每条消息的大小)并相应地设置内存。

执行人数量:

将执行者的数量设置为 kafka 主题中的分区数。这增加了并行性。同时读取数据的任务数。

限制偏移量:

每个触发间隔处理的最大偏移数的速率限制。指定的总偏移量将按比例分配到不同卷的主题分区中。

  val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "topicName")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", "1000000")
    .load()

使用检查点从故障中恢复:

如果发生故障或故意关闭,您可以恢复先前查询的先前进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。

finalDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()

扳机:

流式查询的触发设置定义了流式数据处理的时间,查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。


推荐阅读