首页 > 解决方案 > 使用 IntelliJ 从 Azure EventHub 获取 Scala 中时间窗口的数据

问题描述

我想从 Azure eventthub 获取 1 天的数据并应用一些逻辑并将其复制到 cosmos DB。我能够从 eventthub 获取数据,但数据是流式传输的。我只需要在一个时间窗口内获取数据(假设只需要一天/或 5 小时)。

下面是我尝试从 Azure EventHub 获取数据的代码。

import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession


object FromEventHub{



 val spark = SparkSession
    .builder
    .appName("FromEventHubToCosmos")
    .getOrCreate()

  import spark.implicits._

  val connectionString = ConnectionStringBuilder()
    .setNamespaceName("NAMESPACE_NAME")
    .setEventHubName("EVENTHUB_NAME")
    .setSasKeyName("KEY_NAME")
    .setSasKey("KEY")
    .build

  val currTime = Instant.now
  val ehConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEnqueuedTime(currTime.minus(Duration.ofHours(5))))
  .setEndingPosition(EventPosition.fromEnqueuedTime(currTime))

  val reader =  spark
    .read
    .format("eventhubs")
    .options(ehConf.toMap)
    .load()


val newDF = reader.withColumn("Offset", $"offset".cast(LongType)).withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType)).withColumn("Timestamp", $"enqueuedTime".cast(LongType)).withColumn("Body", $"body".cast(StringType)).select("Offset", "Time (readable)", "Timestamp", "Body")

newDF.show()


}

我之前使用过 setStartingPosition 5 小时,但在 scala 中,数据不断从 eventthub 流式传输。在执行代码之前,我只需要来自事件中心的数据。

  1. 有没有办法使用时间窗口或任何其他方式限制来自事件中心的数据?

  2. 如何管理数据框中可用的数据以应用一些逻辑。?

标签: scalaapache-sparkapache-spark-sqlspark-structured-streamingazure-eventhub

解决方案


我还遇到了 Spark 作业保持流式传输但未完成的问题。如果这有帮助,则可以通过在Azure Databricks笔记本中运行代码来解决问题。奇怪,但工作在那里完成。免费的Databricks Community Edition也可以。


推荐阅读