首页 > 解决方案 > 如何设置火花消费者缓存?修复“KafkaConsumer 缓存达到 64 的最大容量”错误?

问题描述

我正在使用 spark-sql 2.4.1、spark-cassandra-connector_2.11-2.4.1.jar 和 java8。在将数据从 kafka 主题插入 C*/Cassandra 表数据时。

我收到错误:

 org.apache.spark.sql.kafka010.KafkaDataConsumer - KafkaConsumer cache hitting max capacity of 64, removing consumer for CacheKey(spark-kafka-source-33321dde-bfad-49f3-bdf7-09f95883b6e9--1249540122-executor)

如何解决这个问题?

第 2 节:

我使用以下选项

Dataset<Row> df = sparkSession
                      .readStream()
                      .format("kafka")
                      ///other options
                      .option("startingOffsets", "latest")
                      .option("retries", 1)
                      .option("linger.ms", 10)
                      .option("enable.auto.commit", false)
                      .option("failOnDataLoss", false)
                      .option("maxOffsetsPerTrigger", 500)
                   .option("spark.streaming.kafka.consumer.cache.enabled",false)
                      .load(); 

我仍然收到错误:

 org.apache.spark.sql.kafka010.KafkaDataConsumer - KafkaConsumer cache hitting max capacity of 64, removing consumer for CacheKey(spark-kafka-source-33321dde-bfad-49f3-bdf7-09f95883b6e9--1249540122-executor)

标签: apache-sparkapache-kafkaspark-structured-streaming

解决方案


我不确定你在这里期望什么,但我正在分享我的想法。

  1. "spark.streaming.kafka.consumer.cache.enabled"是一个 DStreams 标志,但提到的 API + 警告消息是结构化流。请不要混淆,因为这是 2 种完全不同的产品。

  2. 由于您使用的是结构化流 API,因此假设这就是您最初想要的产品。请注意,在结构化流式处理 Kafka 消费者缓存中不能关闭,但可以使用"spark.sql.kafkaConsumerCache.capacity"(软限制)调整缓存大小。仅供参考,在 Spark 3.0 中,我们重写了整个机制。

  3. 如果作业达到默认的 64 缓存大小,则意味着至少有 64 个线程正在尝试读取单个 JVM 中完全相同的主题分区。我几乎无法想象它在任何方面都是有效的。性能测试可以找出正确的数字。

我建议的是水平缩放(可以添加更多执行器)。由于我没有看到代码本身,我建议理解为什么 Spark 会尝试从这么多线程中读取完全相同的主题分区并对其进行限制。


推荐阅读