python - 使用检查点数据框覆盖表失败并出现 FileNotFoundException
问题描述
我df
在 pySpark 中有一些数据框,它来自于调用:
df = spark.sql("select A, B from org_table")
df = df.stuffIdo
我想org_table
在我的脚本末尾覆盖。由于禁止覆盖输入表,我检查了我的数据:
sparkContext.setCheckpointDir("hdfs:/directoryXYZ/PrePro_temp")
checkpointed = df.checkpoint(eager=True)
血统现在应该被打破,我也可以用checkpointed.show()
(作品)查看我的检查点数据。什么不起作用是写表:
checkpointed.write.format('parquet')\
.option("checkpointLocation", "hdfs:/directoryXYZ/PrePro_temp")\
.mode('overwrite').saveAsTable('org_table')
这会导致错误:
引起:java.io.FileNotFoundException:文件不存在:hdfs://org_table_path/org_table/part-00081-4e9d12ea-be6a-4a01-8bcf-1e73658a54dd-c000.snappy.parquet
我已经尝试了几件事,比如在写作之前刷新 org_table 等,但我在这里感到困惑。我该如何解决这个错误?
解决方案
我会小心转换输入是新输出的此类操作。这样做的原因是,如果出现任何错误,您可能会丢失数据。假设您的转换逻辑有问题,并且您生成了无效数据。但你只在一天后看到了这一点。此外,要修复错误,您不能使用刚刚转换的数据。您需要转换前的数据。您如何使数据再次保持一致?
另一种方法是:
- 暴露视图
- 在每批你都在写一个新表,最后你只用这个新表替换视图
- 几天后,您还可以计划一项清理工作,该工作将删除过去 X 天的表格
如果您想继续使用您的解决方案,为什么不简单地这样做而不是处理检查点呢?
df.write.parquet("hdfs:/directoryXYZ/PrePro_temp")\
.mode('overwrite')
df.load("hdfs:/directoryXYZ/PrePro_temp").write.format('parquet').mode('overwrite').saveAsTable('org_table')
当然,您将读取数据两次,但它看起来不像带有检查点的那次那样笨拙。此外,您每次都可以将“中间”数据存储在不同的目录中,因此您可以解决我在开始时暴露的问题。即使您有错误,您仍然可以通过简单地选择一个好的目录并执行.write.format(...)
org_table 来带来有效版本的数据。
推荐阅读
- regex - 如何在第二个和第三个正斜杠之间取任何值
- deep-learning - ReLU 什么时候会杀死神经元?
- web-services - 如何使用groovy在soapui中获取标签的值
- git - 在单一存储库中构建微服务的最佳策略是什么?
- angular - “Promise”类型上不存在属性“令牌”
' - python - Python make list of dicts from list of tuples
- android - 片段无故失去对活动的引用
- python - Pandas 中的 Excel 查询自动化
- javascript - 使用 phantomJS 抓取 javascript 网页
- python-3.x - AttributeError:“str”对象没有属性“ndim”,无法使用 model.predict()