apache-kafka - 基于kafka分区的结构化流式读取
问题描述
我正在使用 spark 结构化流来读取来自 Kafka 主题的传入消息并根据传入消息写入多个 parquet 表所以我创建了一个 readStream,因为 Kafka 源很常见,并且为每个 parquet 表在循环中创建了单独的写入流。这工作正常,但 readstream 正在创建一个瓶颈,因为它为每个 writeStream 创建一个 readStream 并且没有办法缓存已经读取的数据帧。
val kafkaDf=spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.servers)
.option("subscribe", conf.topics)
// .option("earliestOffset","true")
.option("failOnDataLoss",false)
.load()
foreach table {
//filter the data from source based on table name
//write to parquet
parquetDf.writeStream.format("parquet")
.option("path", outputFolder + File.separator+ tableName)
.option("checkpointLocation", "checkpoint_"+tableName)
.outputMode("append")
.trigger(Trigger.Once())
.start()
}
现在每个写入流都在创建一个新的消费者组并从 Kafka 读取整个数据,然后进行过滤并写入 Parquet。这会产生巨大的开销。为了避免这种开销,我可以对 Kafka 主题进行分区,使其具有与表数量一样多的分区,然后 readstream 应该只从给定的分区中读取。但我没有看到将分区详细信息指定为 Kafka 读取流的一部分的方法。
解决方案
如果数据量不是很高,编写自己的接收器,收集每个微批次的数据,那么你应该能够缓存该数据帧并写入不同的位置,虽然需要一些调整,但它会起作用
推荐阅读
- node.js - Nodejs Cheerio DOM 解析器:有没有办法选择特定范围的元素?
- python - DDPG(Actor-Critic)跑到最小值/最大值
- c# - 如何查询特定日期的 EventLog?
- ruby-on-rails - Rails:如何在重定向时保留对象错误
- amazon-web-services - aws Kinesis steam 可以用作优先级队列吗?
- php - 为什么 PHP 会自动调用 Index 类中的 index 方法?
- python - 在python中配对元素
- java - 将自定义编解码器添加到 rxjava vertx EventBus
- java - 最终变量的理解
- php - 如何在 Godaddy 虚拟主机上使用 PhpMailer 通过 365 发送电子邮件