apache-spark - Spark结构化流式作业:流静态连接未更新
问题描述
我正在使用 Spark 结构化流从 Azure Eventhub 读取事件,通过执行流静态连接来丰富事件,并将丰富的事件输出到其他源。
问题是,几个小时后,流作业停止检测静态表中的更新。因此请注意,静态表中的更新最初确实会被捕获和更新,但在未知的时间之后,情况似乎不再如此。静态数据帧是存储在 dbfs 中的 Delta 表,每 30 分钟更新一次。
有谁知道为什么它一开始按预期工作(即处理静态增量表中的更改)但一段时间后它停止检测静态表中的更改?
请看我的简化代码
# Read eventhub
metricbeat_df = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Join event stream with facts
joined_df = join_with_facts(metricbeat_df)
trigger = {"processingTime": "30 seconds"}
for_each_stream = joined_df \
.writeStream \
.trigger(**trigger) \
.foreach(CustomParser()) # This just outputs each event to the desired output
for_each_stream.start()
哪里join_with_facts
看起来像这样:
def join_with_facts(dataframe):
facts_df = spark.table("my_db.facts_table")
joined_facts = dataframe \
.join(facts_df, "ID", how='inner')
return joined_facts
解决方案
推荐阅读
- tensorflow - Some folds in k-fold cross-validation containing 0 True/False positives
- ios - Remove Select Photos... option on PHPhotoLibrary.requestAuthorization
- javascript - 尝试添加动态输入和标签
- java - Local Tomcat not running on intellij
- javascript - 如何处理 Net Core 中的 net::ERR_INTERNET_DISCONNECTED 错误
- javascript - 使用本地护照时如何判断客户端是否经过身份验证?
- reactjs - React.FunctionComponent with generics in typescript
- sql - Why do I get a "data type conversion error" with ExecuteNonQuery()?
- javascript - 反应只是有时会抛出无法映射未定义的属性'map'
- python-3.x - RuntimeError:无法启动新线程