databricks - Koalas 数据框随着 Deltalake 的更新而实时更新
问题描述
我正在研究一种解决方案,该解决方案使用以下代码通过给定索引更新 delta 湖:
dataframe = ks.read_table('data')
subdataframe = dataframe .loc[dataframe ['status']== 1,:]
for index,column in subdataframe.iterrows():
#get values for a given row
record= subdataframe.loc[index].to_dict()
if(record_needs_updating):
#update deltalake
dataframe.loc[dataframe['file']==record['file'],'status'] = 0
dataframe.to_delta('fileloc', partition_cols='pull',mode='overwrite')
#update databricks table
spark.sql("DROP TABLE IF EXISTS data")
spark.sql("CREATE TABLE data USING DELTA LOCATION fileloc)
spark.sql("OPTIMIZE data")
我遇到的问题是尝试在 for 循环中索引子数据帧时出现的关键错误。
这似乎是因为数据框本身在更新 delta 湖后被更新为不包含任何 status = 0 的记录,这意味着索引发生了变化,从而给出了一个关键错误。
有什么方法可以将子数据帧变成一个非实时数据帧,随着 deltalake 的更新,该数据帧不会被更新?
还要注意我需要在代码运行时更新,而不是在所有代码运行后只更新一次。
谢谢!
解决方案
推荐阅读
- reactjs - 传递给子组件的道具未定义,但 console.log 显示道具的值
- gradle - 使用 Gradle 和 JavaFX 创建一个使用自定义 JVM 参数的可运行 jar
- oop - ByRef 或 ByVal 用于传递参数,当参数是对象时。伪代码
- javascript - 使用Javascript计算文本宽度而不进行四舍五入
- excel - 格式功能已更改且无法工作
- node.js - 带有 express 和 nodejs 的空正文
- scala - 在 Spark 应用程序中使用 log4j 时如何忽略导入的库日志
- python - 如何根据 Python Dataframe 中不同行中 2 个变量的变化添加二进制状态变量?
- floating-point - 浮点运算:导出摆动
- java - Docker 映像用完覆盖空间并导致 pod 驱逐 ok GKE