scala - 使用 IntelliJ 从 Azure EventHub 获取 Scala 中时间窗口的数据
问题描述
我想从 Azure eventthub 获取 1 天的数据并应用一些逻辑并将其复制到 cosmos DB。我能够从 eventthub 获取数据,但数据是流式传输的。我只需要在一个时间窗口内获取数据(假设只需要一天/或 5 小时)。
下面是我尝试从 Azure EventHub 获取数据的代码。
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object FromEventHub{
val spark = SparkSession
.builder
.appName("FromEventHubToCosmos")
.getOrCreate()
import spark.implicits._
val connectionString = ConnectionStringBuilder()
.setNamespaceName("NAMESPACE_NAME")
.setEventHubName("EVENTHUB_NAME")
.setSasKeyName("KEY_NAME")
.setSasKey("KEY")
.build
val currTime = Instant.now
val ehConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEnqueuedTime(currTime.minus(Duration.ofHours(5))))
.setEndingPosition(EventPosition.fromEnqueuedTime(currTime))
val reader = spark
.read
.format("eventhubs")
.options(ehConf.toMap)
.load()
val newDF = reader.withColumn("Offset", $"offset".cast(LongType)).withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType)).withColumn("Timestamp", $"enqueuedTime".cast(LongType)).withColumn("Body", $"body".cast(StringType)).select("Offset", "Time (readable)", "Timestamp", "Body")
newDF.show()
}
我之前使用过 setStartingPosition 5 小时,但在 scala 中,数据不断从 eventthub 流式传输。在执行代码之前,我只需要来自事件中心的数据。
有没有办法使用时间窗口或任何其他方式限制来自事件中心的数据?
如何管理数据框中可用的数据以应用一些逻辑。?
解决方案
我还遇到了 Spark 作业保持流式传输但未完成的问题。如果这有帮助,则可以通过在Azure Databricks笔记本中运行代码来解决问题。奇怪,但工作在那里完成。免费的Databricks Community Edition也可以。
推荐阅读
- css - $document->addStyleSheet 已弃用
- rest - 在给出 500 作为响应的同时通过测试用例
- dotnetnuke - DNN 重定向到 503 错误页面而不是 404 错误页面
- angular - 与 redux-thunk 相比,ngrx/effects 方法有哪些优势?
- java - 具有昂贵的自定义键功能的列表的最大值
- xpages - 获取在 Java 中触发提交的组件 ID
- python - 双击时Tkinter程序不起作用
- loops - 使用 2 个 for 循环时无法让我的 f# 工作
- java - 如何通过 REST 保证测试用例在 SONAR CUBE 报告中覆盖/即兴发挥百分比?
- javascript - 将数据库中的数据加载到没有 POST 的下拉选择中