spark-structured-streaming - 我们可以在 spark 结构化流批处理模式下从特定偏移量从 Kafka 获取数据吗
问题描述
在 kafka 中,我动态地获取新主题,我必须使用来自特定偏移量的火花流来处理它。是否有可能从变量传递 json 值。例如考虑下面的代码
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.load()
在这个我想动态更新startingOffsets的值......我试图传递字符串中的值并调用它但它没有工作......如果我在startingOffsets中给出相同的值它正在工作。在这种情况下如何使用变量?
val start_offset= """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}"""
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", start_offset)
.load()
java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got """{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}"""
解决方案
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("ReadSpecificOffsetFromKafka");
val spark = SparkSession.builder().config(conf).getOrCreate();
spark.sparkContext.setLogLevel("error");
import spark.implicits._;
val start_offset = """{"first_topic" : {"0" : 15, "1": -2, "2": 6}}"""
val fromKafka = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092, localhost:9093")
.option("subscribe", "first_topic")
// .option("startingOffsets", "earliest")
.option("startingOffsets", start_offset)
.load();
val selectedValues = fromKafka.selectExpr("cast(value as string)", "cast(partition as integer)");
selectedValues.writeStream
.format("console")
.outputMode("append")
// .trigger(Trigger.Continuous("3 seconds"))
.start()
.awaitTermination();
}
这是使用 spark 结构化流和 scala 从 kafka 获取特定偏移量的确切代码
推荐阅读
- java - 使用动态 TCP 端口与 SQL 服务器的 JDBC 连接
- mvvm - MVVM - 自定义模型的 MutableLiveData 没有通过数据绑定更新到 ViewModel 并且始终为空
- elasticsearch - ElasticSearch搜索,获取返回产品的唯一类别
- javascript - 将日期作为字符串写入 Odata 模型 0CALDAY?
- angular - SORTED Angular 6 两个组件之间的通信服务
- powershell - 用户配置文件名称和大小到 CSV 文件
- sql-server - 这个 DB2 游标是一个循环吗?
- html - 如何创建平滑渐变背景动画?
- angular - 如何在 agm 中的特定位置自动放大我的地图。请注意所有标记彼此非常接近
- r - R中的ifelse函数