apache-spark - 如何设置火花消费者缓存?修复“KafkaConsumer 缓存达到 64 的最大容量”错误?
问题描述
我正在使用 spark-sql 2.4.1、spark-cassandra-connector_2.11-2.4.1.jar 和 java8。在将数据从 kafka 主题插入 C*/Cassandra 表数据时。
我收到错误:
org.apache.spark.sql.kafka010.KafkaDataConsumer - KafkaConsumer cache hitting max capacity of 64, removing consumer for CacheKey(spark-kafka-source-33321dde-bfad-49f3-bdf7-09f95883b6e9--1249540122-executor)
如何解决这个问题?
第 2 节:
我使用以下选项
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
///other options
.option("startingOffsets", "latest")
.option("retries", 1)
.option("linger.ms", 10)
.option("enable.auto.commit", false)
.option("failOnDataLoss", false)
.option("maxOffsetsPerTrigger", 500)
.option("spark.streaming.kafka.consumer.cache.enabled",false)
.load();
我仍然收到错误:
org.apache.spark.sql.kafka010.KafkaDataConsumer - KafkaConsumer cache hitting max capacity of 64, removing consumer for CacheKey(spark-kafka-source-33321dde-bfad-49f3-bdf7-09f95883b6e9--1249540122-executor)
解决方案
我不确定你在这里期望什么,但我正在分享我的想法。
"spark.streaming.kafka.consumer.cache.enabled"
是一个 DStreams 标志,但提到的 API + 警告消息是结构化流。请不要混淆,因为这是 2 种完全不同的产品。由于您使用的是结构化流 API,因此假设这就是您最初想要的产品。请注意,在结构化流式处理 Kafka 消费者缓存中不能关闭,但可以使用
"spark.sql.kafkaConsumerCache.capacity"
(软限制)调整缓存大小。仅供参考,在 Spark 3.0 中,我们重写了整个机制。如果作业达到默认的 64 缓存大小,则意味着至少有 64 个线程正在尝试读取单个 JVM 中完全相同的主题分区。我几乎无法想象它在任何方面都是有效的。性能测试可以找出正确的数字。
我建议的是水平缩放(可以添加更多执行器)。由于我没有看到代码本身,我建议理解为什么 Spark 会尝试从这么多线程中读取完全相同的主题分区并对其进行限制。
推荐阅读
- jquery - 有没有办法用里面替换部分文本
?
- c++11 - 将 constexpr const 指针指向嵌入式应用程序的易失性内存位置的现代 C++ 方法是什么?
- c# - 从子类中获取元素名称
- c# - 使用 .NET SDK 的 Dropbox API 应用程序身份验证
- c# - Microsoft.Toolkit.Wpf.UI.Controls.WebView 运行时错误
- c - 上下文保存——如何读取 C 中的段寄存器和指令指针?
- python - 查找每周重复发生的事件
- mobile - 是否可以基于 AppSheet 中的键列嵌套值?
- python-3.x - 使用 boto3 将字符串写入 S3:“'dict' 对象没有属性 'put'”
- python - 具有组织增长的 Python While 循环未提供适当的输出