apache-spark - 如何在“startingOffsets”中同时包含“最新”和“具有特定偏移量的 JSON”,同时将数据从 Kafka 导入 Spark 结构化流
问题描述
我有一个流式查询将数据保存到文件接收器中。我正在使用 .option("startingOffsets", "latest") 和检查点位置。如果 Spark 上有任何停机时间并且当流式查询再次开始时,我不想在查询停止时开始处理查询中断的地方,而不是这种情况,我还想添加 ("startingOffsets", """ { "topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """) 通过指定需要处理的用户定义的偏移量。
我尝试用不同的程序来做这件事,但我需要在一个程序中实现这一点
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object OSB_offset_kafkaToSpark {
def main(args: Array[String]): Unit = {
val spark = SparkSession.
builder().
appName("OSB_kafkaToSpark").
config("spark.mongodb.output.uri", "spark.mongodb.output.uri=mongodb://somemongodb.com:27018").
getOrCreate()
println("SparkSession -> "+spark)
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "somekafkabroker:9092, somekafkabroker:9092")
.option("subscribe", "someTopic")
.option("startingOffsets", "latest")
.option("startingOffsets",""" {"someTopic":{"0":438521}}, "someTopic":{"1":438705}}, "someTopic":{"2":254180}}""")
.option("endingOffsets",""" {"someTopic":{"0":-1}}, "someTopic":{"1":-1}}, "someTopic":{"2":-1}} """)
.option("failOnDataLoss", "false")
.load()
val dfs = df.selectExpr("CAST(value AS STRING)")
val data = dfs.withColumn("splitted", split($"value", "/"))
.select($"splitted".getItem(4).alias("region"), $"splitted".getItem(5).alias("service"), col("value"))
.withColumn("service_type", regexp_extract($"service", """.*(Inbound|Outbound|Outound).*""", 1))
.withColumn("region_type", concat(
when(col("region").isNotNull, col("region")).otherwise(lit("null")), lit(" "),
when(col("service").isNotNull, col("service_type")).otherwise(lit("null"))))
.withColumn("datetime", regexp_extract($"value", """\d{4}-[01]\d-[0-3]\d [0-2]\d:[0-5]\d:[0-5]\d""", 0))
val extractedDF = data.filter(
col("region").isNotNull &&
col("service").isNotNull &&
col("value").isNotNull &&
col("service_type").isNotNull &&
col("region_type").isNotNull &&
col("datetime").isNotNull)
.filter("region != ''")
.filter("service != ''")
.filter("value != ''")
.filter("service_type != ''")
.filter("region_type != ''")
.filter("datetime != ''")
val pathstring = "/user/spark_streaming".concat(args(0))
val query = extractedDF.writeStream
.format("json")
.option("path", pathstring)
.option("checkpointLocation", "/user/some_checkpoint")
.outputMode("append")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
}
}
我需要使用 .option("startingOffsets", "latest") 和 .option("startingOffsets",""" {"someTopic":{"0":438521}}, "someTopic":{" 运行一个程序1":438705}},"someTopic":{"2":254180}}""")。
我不确定这是否可以实现
解决方案
推荐阅读
- java - 刚刚安装了 Eclipse Helios (eclipse-jee-helios-SR1-win32-x86_64),但我不断收到错误
- node.js - 如何将带有返回的 if else 函数转换为迭代?
- vue.js - 如何在vscode的lerna项目中使用vite项目调试vue3?
- r - 测试集和训练集过于相似的问题
- unit-testing - Go GORM Mocking 预期开始
- python-3.x - Pandas 通过组合分隔符拆分列
- javascript - 数组内部获取数组的问题
- vue.js - 带有单选按钮的 V 绑定布尔值
- html - 使用自定义验证器填充响应式表单
- reactjs - 路由更改时组件被卸载/销毁?