python - Creating savepoints and starting from a specific node in apache beam
问题描述
Creating savepoints
Currently savepoints are created by saving the pipeline to a csv file after each transformation. The setup in general looks like this:
I am doing this by having a wrapper transform object to which I can provide a transformation function and an optional savepoint parameter. The transform object can create savepoints, if they are provided:
class SavePointNode(beam.PTransform):
def __init__(self, transform, savepoint=None, **kwargs):
super(beam.PTransform, self).__init__(**kwargs)
self.transform = transform
self.savepoint = savepoint
def expand(self, p):
# transform data
result = p | beam.ParDo(self.transform)
# load to savepoint if available
if self.savepoint:
return result | beam.io.WriteToText(self.savepoint)
return result
My first question is the following:
- Is there already a default way to create savepoints in Apache Beam?
Loading savepoints
By creating savepoints I want to be able to restart the pipeline at a specific node to save time by not retracing each step up to a savepoint. For example: If an error occured in the third transform step:
I want to be able to restart the pipeline from the savepoint created by the second transform step:
My questions regarding this are the following:
- Is there currently a default way to do this in Apache Beam?
- If not are there any other good ways to achieve this behaviour?
- Have I been going about this the wrong way and could there be any other solutions to reduce the reloading time if an error occurred provided by Apache Beam?
解决方案
推荐阅读
- python - 玩家和庄家各抽 2 张牌,庄家和玩家都拥有相同的 4 张牌
- python - 找到相邻的细胞并做出相应的反应
- android - 使用 ExoPlayer2 有两个视频的黑屏
- xml - 如何使用 XPath 在 XML 节点中提取 html 标记
- java - 尝试运行演示应用程序,但 tomcat 似乎正在运行,但浏览器正在抛出 404
- azure - 需要将文件从 Amazon S3 存储复制/传输到 FTP 服务器
- python-3.x - 包含给定字母的单词
- python - 合并单元格而不丢失 Pandas 中的信息
- android - Android Kotlin Promise 是什么?
- javascript - 在 javascript 中设计 const 变量背后的原理是什么?