首页 > 解决方案 > 我们如何管理 Spark Structured Streaming 中的偏移量?( _spark_metadata 的问题)

问题描述

背景: 我编写了一个简单的 spark 结构化蒸汽应用程序,用于将数据从 Kafka 移动到 S3。发现为了支持一次性保证,spark 创建了 _spark_metadata 文件夹,该文件夹最终变得太大,当流式应用程序运行很长时间时,元数据文件夹变得如此之大,以至于我们开始出现 OOM 错误。我想摆脱 Spark Structured Streaming 的元数据和检查点文件夹并自己管理偏移量。

我们如何在 Spark Streaming 中管理偏移量: 我使用 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 来获取 Spark Structured Streaming 中的偏移量。但是想知道如何使用 Spark Structured Streaming 获取偏移量和其他元数据来管理我们自己的检查点。你有任何实现检查点的示例程序吗?

我们如何管理 Spark Structured Streaming 中的偏移量? 查看这个 JIRA https://issues-test.apache.org/jira/browse/SPARK-18258。看起来没有提供偏移量。我们应该怎么做?

问题是在 6 小时内元数据的大小增加到 45MB,并且一直增长到接近 13GB。分配的驱动程序内存为 5GB。当时系统因OOM而崩溃。想知道如何避免让这些元数据变得如此庞大?如何使元数据不记录这么多信息。

代码:

1. Reading records from Kafka topic
  Dataset<Row> inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

在此处输入图像描述

数据集 dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event")) .select("event.metadata", "event.data", "event .connection", "event.registration_event","event.version_event" ); SQLContext sqlContext = new SQLContext(sparkSession); dataDf.createOrReplaceTempView("事件"); 数据集 flatDf = sqlContext .sql("select " + " date, time, id, " + flattenSchema(EVENT_SCHEMA, "event") + " from event"); StreamingQuery query = flatDf .writeStream() .outputMode("append") .option("compression", "snappy") .format("parquet") .option("checkpointLocation", checkpointLocation) .option("

标签: apache-sparkspark-streamingspark-structured-streaming

解决方案


对于非批量 Spark Structured Streaming KAFKA 集成:

引用:

结构化流式处理忽略 Apache Kafka 中的偏移量提交。

相反,它依赖于驱动程序端自己的偏移量管理,该驱动程序负责将偏移量分配给执行器,并在处理轮次(epoch 或 micro-batch)结束时对它们进行检查点。

如果您遵循 Spark KAFKA 集成指南,您不必担心。

优秀参考:https ://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read

对于批处理,情况不同,您需要自己管理并存储偏移量。

更新根据评论,我建议问题略有不同,并建议您查看Spark Structured Streaming Checkpoint Cleanup。除了您更新的评论和没有错误的事实之外,我建议您在 Spark Structured Streaming https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-流式传输/读取。查看代码,与我的风格不同,但看不到任何明显的错误。


推荐阅读