首页 > 解决方案 > Kedro 条件管道(或替代方法)

问题描述

我目前正在为我们的管道研究不同的设计模式选项。Kedro 框架似乎是一个不错的选择(允许模块化设计模式、可视化方法等)。

管道应该由许多模块创建,这些模块要么将输出写入文件,要么将其传送到下一个模块(取决于条件)。在第二种情况下(到下一个模块的管道),kedro 失败了,因为它将整个输出读入内存然后转发到下一步(或者是否有可能是 unix 类型的管道)?我正在使用大数据,所以这个适合我。为什么这个工作流程与通常的 unix 管道不同?- unix 管道正在读取特定的缓冲区大小并立即转发它(我猜这会被交换到磁盘而不是保存在内存中?)。如果您能指出另一个允许此类功能的框架,我将不胜感激(我也不介意从头开始实现 DP)。

编辑:我的节点主要依赖于外部二进制文件,因此,我想实现类 Unix 管道。

标签: pythondesign-patternspipepipelinekedro

解决方案


Kedro-Accelerator是一个 Kedro 插件,它为 Kedro 带来了一些 Unix 管道语义。具体来说,TeePlugin允许在内存中的节点之间传递数据(作为MemoryDataSets),同时在后台将输出写入磁盘/文件。

一旦你使用了MemoryDataSets,缓冲就会被委托给底层框架。例如,对于DataFrame对象,默认的复制模式是赋值,因此行为类似于按顺序运行语句而不进行任何加载/保存:

from kedro.extras.datasets.pandas import CSVDataSet

node1_in = CSVDataSet(filepath="data.csv").load()  # Read data from a CSVDataSet as input to the first node.
node1_out = node1_in.dropna()  # The first node performs some operations on the input before returning.
node2_in = node1_out  # If the output of the first node/input to the second node is a MemoryDataSet, no data is passed, just references.
...

有关实现细节(从 Kedro 0.17.0 开始),请参阅https://github.com/quantumblacklabs/kedro/blob/0.17.0/kedro/io/memory_data_set.py#L105-L130


推荐阅读