首页 > 解决方案 > 如何使用 Luigi 动态检查输出

问题描述

我意识到我可能需要使用动态需求来完成以下任务,但是我无法理解这在实践中会是什么样子。

目标是使用 Luigi 生成数据并将其添加到数据库中,而无需提前知道将生成什么数据。

以使用 mongodb 为例:

import luigi
from uuid import uuid4
from luigi.contrib import mongodb
import pymongo

# Make up IDs, though in practice the IDs may be generated from an API
class MakeID(luigi.Task):
    def run(self):
        with self.output().open('w') as f:
            f.write(','.join([str(uuid4()) for e in range(10)]))

    # Write the data to file
    def output(self):
        return luigi.LocalTarget('data.csv')


class ToDataBase(luigi.Task):
    def requires(self):
        return MakeID()

    def run(self):
        with self.input().open('r') as f:
            ids = f.read().split(',')

        # Add some fake data to simulate generating new data 
        count_data = {key: value for value, key in enumerate(ids)}
        # Add data to the database
        self.output().write(count_data)

    def output(self):
        # Attempt to read non-existent file to get the IDs to check if task is complete
        with self.input().open('r') as f:
            valid_ids = f.read().split(',')
        client = pymongo.MongoClient('localhost',
                                     27017,
                                     ssl=False)

        return mongodb.MongoRangeTarget(client,
                                        'myDB',
                                        'myData',
                                        valid_ids,
                                        'myField')


if __name__ == '__main__':
    luigi.run()

目标是获取数据,对其进行修改,然后将其添加到数据库中。

上面的代码在运行时失败,因为方法在output方法ToDataBase之前运行,require所以当函数可以访问输入时,输入还不存在。无论如何,我仍然需要检查以确保数据已添加到数据库中。

这个github 问题与我正在寻找的问题很接近,尽管正如我所提到的,在实践中我无法弄清楚这个用例的动态需求。

标签: pythonluigi

解决方案


解决方案是创建第三个任务(在示例中Dynamic),该任务产生等待动态输入的任务,并使依赖项成为参数而不是requires方法。

class ToDatabase(luigi.Task):
    fp = luigi.Parameter()

    def output(self):
        with open(self.fp, 'r') as f:
            valid_ids = [str(e) for e in f.read().split(',')]
        client = pymongo.MongoClient('localhost', 27017, ssl=False)
        return mongodb.MongoRangeTarget(client, 'myDB', 'myData',
                                        valid_ids, 'myField')

    def run(self):
        with open(self.fp, 'r') as f:
            valid_ids = [str(e) for e in f.read().split(',')]
        self.output().write({k: 5 for k in valid_ids})


class Dynamic(luigi.Task):
    def output(self):
        return self.input()

    def requires(self):
        return MakeIDs()

    def run(self):
        yield(AddToDatabase(fp=self.input().path))

推荐阅读