首页 > 解决方案 > 如何在 Databricks 流中的 Scala 中运行 if else 语句

问题描述

我是 Scala 和 Databricks 流媒体的新手。我正在将流式事件读入数据帧,并且我想使用 if-else 语句根据数据帧是否为空来触发不同的笔记本。下面的简单代码(及其变体)

if(finalDF.isEmpty){ 
  print("0")
}
else{
  print("1")
}

持续导致以下错误

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs

如何将 writeStream.start() 合并到上述代码中?或者,鉴于数据帧是通过流式事件填充到其中的,我如何评估数据帧内容并在此基础上采取一项或另一项行动?

标签: scalastreamingdatabricks

解决方案


流 DF 在设计上不能为空或不能为空 - 流是无限的,如果你现在没有数据,那么你可以在下一秒得到新的东西。所以你的代码不起作用。

您可以使用foreachBatch处理数据的“当前”快照,您可以像使用“正常”、非流数据帧一样使用这些快照,但是您可能无法从其中触发笔记本,因此这两个条件的代码应该是在同一个功能里面,不在不同的笔记本里面。


推荐阅读