scala - 如何在 Databricks 流中的 Scala 中运行 if else 语句
问题描述
我是 Scala 和 Databricks 流媒体的新手。我正在将流式事件读入数据帧,并且我想使用 if-else 语句根据数据帧是否为空来触发不同的笔记本。下面的简单代码(及其变体)
if(finalDF.isEmpty){
print("0")
}
else{
print("1")
}
持续导致以下错误
AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs
如何将 writeStream.start() 合并到上述代码中?或者,鉴于数据帧是通过流式事件填充到其中的,我如何评估数据帧内容并在此基础上采取一项或另一项行动?
解决方案
流 DF 在设计上不能为空或不能为空 - 流是无限的,如果你现在没有数据,那么你可以在下一秒得到新的东西。所以你的代码不起作用。
您可以使用foreachBatch处理数据的“当前”快照,您可以像使用“正常”、非流数据帧一样使用这些快照,但是您可能无法从其中触发笔记本,因此这两个条件的代码应该是在同一个功能里面,不在不同的笔记本里面。
推荐阅读
- c - 有一个卡在无限循环的测试你能找到它吗?
- sql - where 子句与 connect by And 创建查询的交互以获取层次结构中的下一个级别
- java - 在 JAVA 中登录到 sql server 报告服务
- apache-kafka - 我们也可以为生产者使用 APPLICATION_ID_CONFIG
- css - Prestashop - 如何在页脚和页面内容之间添加或删除空间
- haskell - 如何阅读haskell中的语法`Typ{..}`?
- google-api - 根据google api上的邮政编码和国家代码查找位置纬度和经度
- reactjs - 期望(jest.fn())[.not].toHaveBeenCalled()错误
- sql - 创建一个跨 1 年的公共时间段的修复表
- javascript - 反应:返回声明