首页 > 解决方案 > 控制微批量结构化 Spark Streaming

问题描述

我正在从 Kafka 主题中读取数据,并以分区模式将其放入 Azure ADLS(类似 HDFS)。

我的代码如下:

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("failOnDataLoss", false)
      .load()
      .selectExpr(/*"CAST(key AS STRING)",*/ "CAST(value AS STRING)").as(Encoders.STRING)
df.writeStream
      .partitionBy("year", "month", "day", "hour", "minute")
      .format("parquet")
      .option("path", outputDirectory)
      .option("checkpointLocation", checkpointDirectory)
      .outputMode("append")
      .start()
      .awaitTermination()

我有大约 2000 条记录/秒,我的问题是 Spark 每 45 秒插入一次数据,我希望立即插入数据。

有人知道如何控制微批次的大小吗?

标签: scalaazureapache-sparkapache-kafkaspark-structured-streaming

解决方案


从 Spark 2.3 版本开始,可以使用连续处理模式。在官方文档中。你可以读到这个模式只支持三个sink,并且只有Kafka sink准备好生产,并且“以Kafka作为source和sink,可以最好地观察端到端的低延迟处理

df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/0")
.option("topic", "output0")
.trigger(Trigger.Continuous("0 seconds"))
.start()

因此,目前看来,您不能使用连续模式将 HDFS 用作接收器。在您的情况下,也许您可​​以测试 Akka Streams 和 Alpakka连接器


推荐阅读