apache-spark - Spark Streaming 指定起始偏移量
问题描述
我有一个场景,我想使用 Spark DStreams 重新处理来自 Kafka 的特定批次数据。
假设我想重新处理以下批次的数据。
主题分区1-{1000,2000} 主题分区2-{500-600}
下面是我拥有的代码片段,我可以在其中指定起始偏移量。
val inputDStream = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](
topic-partition-list,
kafkaProps,
starting-offset-ranges))
但是,我想知道他们是否也可以指定结束偏移量,例如结构化流批处理模式。
所以本质上,它应该处理这个小批量并停止工作流程。
注意:我不想在这个用例中使用结构化流。只想使用 DStreams。
解决方案
找到了一种方法来做到这一点。
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
推荐阅读
- scala - 使用 sbt-native-packager 澄清 dockerExposedPorts 的范围委托
- laravel - 更好的解决方案而不是 $wire.set 为 livewire 传递计算的 Alpine 值
- java - ant matcher 如何在 Spring Security 中工作
- node.js - Spotify API 的 CORS 重定向问题?
- c++ - 如何正确编写向量的特征?
- c# - 需要根据批量大小准备查询
- python - 如何从包含文件路径的变量中删除文件扩展名
- go - 如何忽略模块中的 .go 文件?
- layout - android Layout 内容位置变化
- c++ - 如何多次继承同一个类?