首页 > 解决方案 > 带有单独任务的 Luigi 依赖项规范问题

问题描述

我有3 Luigi任务:首先生成一个hadoop写入ElasticsearchElasticsearch. 第三个任务与前两个任务相当断开,但我希望它在前两个任务完成后运行。第一个任务中可以有多个文件(相同类型),所以在第二个任务中我指定了这样的依赖项:

def requires(self):
    return [SeqrVCFToMTTask()]

SeqrVCFToMTTask作为第一个任务。它工作正常,所以前两个任务运行良好。现在,当我尝试在第三个任务中以相同的方式指定依赖项时:

def requires(self):
    return [SeqrMTToESTask()]

(`SeqrMTToESTask` - name of the second task).

它失败并出现错误:

引发 HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info) elasticsearch.exceptions.RequestError: TransportError(400, '')

最终luigi任务输出如下所示:

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 1 complete ones were encountered:
    - 1 SeqrVCFToMTTask(...)
* 1 failed:
    - 1 SeqrMTToESTask(...)
* 1 were left pending, among these:
    * 1 had failed dependencies:
        - 1 SeqrGenesQCToESTask(...)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

我知道这与它无关,Elasticsearch因为当省略依赖项(只是取消注释)时,3rd任务 - - 单独运行良好。我应该如何在此处正确指定依赖项?我唯一需要的是在前两个完成后任务开始运行。SeqrGenesQCToESTaskrequires3rd

更新

更详细的代码:

class SeqrVCFToMTTask(HailMatrixTableTask):
    reference_ht_path = luigi.Parameter(...
    ...
    ...

    def run(self):
        self.read_vcf_write_mt()

    def read_vcf_write_mt(self, schema_cls=SeqrVariantsAndGenotypesSchema):
    ...
    ...

class SeqrMTToESTask(HailElasticSearchTask):
    dest_file = luigi.Parameter()
    def __init__(self, *args, **kwargs):
        # TODO: instead of hardcoded index, generate from project_guid, etc.
        super().__init__(*args, **kwargs)

    def requires(self):
        return [SeqrVCFToMTTask()]

    def output(self):
        filename = self.dest_file
        return getTarget(filename)

    def run(self):
        mt = self.import_mt()
        row_table = SeqrVariantsAndGenotypesSchema.elasticsearch_row(mt)
        self.export_table_to_elasticsearch(row_table, self._mt_num_shards(mt))

class SeqrGenesQCToESTask(luigi.Task):
    source_path = luigi.Parameter(...
    ...
    ...

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._es = ElasticsearchClient(host=self.es_host, port=self.es_port)

    def requires(self):
        return [SeqrMTToESTask()]

    def output(self):
        # TODO: Use https://luigi.readthedocs.io/en/stable/api/luigi.contrib.esindex.html.
        filename = self.dest_file
        return getTarget(filename)

    def run(self):
        // Doing some data transformations, then 
        // Exporting data to Elasticsearch


class HailMatrixTableTask(luigi.Task):
     source_paths = luigi.Parameter(...
     ...
     ...

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        try:
            # Locally it should be '_FrozenOrderedDict' but on AWS for an unknown reason only 'FrozenOrderedDict' works
            self.source_paths = list(json.loads(self.source_paths, object_pairs_hook=luigi.parameter._FrozenOrderedDict))
        except json.JSONDecodeError:
            self.source_paths = [self.source_paths]

    def requires(self):
        # We only exclude globs in source path here so luigi does not check if the file exists
        return [VcfFile(filename=s) for s in self.source_paths if '*' not in s]

    def output(self):
        filename = self.dest_path
        return getTarget(filename)

    def complete(self):
        # Complete is called by Luigi to check if the task is done and will skip if it is.
        # By default it checks to see that the output exists, but we want to check for the
        # _SUCCESS file to make sure it was not terminated halfway.
        filename = self.dest_path
        full_path = os.path.join(filename, '_SUCCESS')
        return getTarget(full_path).exists()

    def run(self):
        // Import file, then output in different format

class HailElasticSearchTask(luigi.Task):
    project_guid = luigi.Parameter(...
    ...
    ...

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if not self.es_index:
            self.es_index = compute_index_name(args=public_class_props(self))
        self._es = ElasticsearchClient(host=self.es_host, port=self.es_port)

    def requires(self):
        return [VcfFile(filename=self.source_path)]

    def run(self):
        mt = self.import_mt()
        # TODO: Load into ES

    def import_mt(self):
        return hl.read_matrix_table(self.input()[0].path)

    def export_table_to_elasticsearch(self, table, num_shards):
        // Exports to ES index

更新

我怀疑它的发生是因为3rd任务 - SeqrGenesQCToESTask- 打开了连接Elasticsearch,然后2nd任务 - -由于错误发生在那里,SeqrMTToESTask因此无法输出,在线:ElasticsearchSeqrMTToESTask

self.export_table_to_elasticsearch(row_table, self._mt_num_shards(mt))

标签: luigi

解决方案


我最终能够解决它,并且luigi管道设置存在多个问题,而不是Elasticsearch

  1. 发生报告的错误是因为我正在调用3rd任务 - SeqrGenesQCToESTask- 使用第二个任务也应该获得的参数:

    LUIGI_CONFIG_PATH=luigi_pipeline/configs/seqr-loading-local-GRCh38.cfg python -u gcloud_dataproc/submit.py --cpu-limit 4 --num-executors 2 --driver-memory 2g --executor-memory=4g -- hail-version 0.2 --run-locally luigi_pipeline/seqr_loading.py SeqrGenesQCToESTask --local-scheduler --spark-home $SPARK_HOME --project-guid 示例

以前我只是使用SeqrMTToESTask而不是,SeqrGenesQCToESTask但命令保持不变,所以通过更改任务,我省略了project-guid参数,因为它没有在luigi_pipeline/configs/seqr-loading-local-GRCh38.cfg

  1. run修复了缺少参数的问题后,管道仍然失败,因为我需要在方法的末尾显式输出一个空文件SeqrMTToESTask,但我认为这luigi是自动执行的:

    with self.output().open('w') as out_file:
        out_file.write('{} task complete!'.format(type(self).__name__))
    

self.output()返回hdfs.target

https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/hdfs/target.html

在修复这两个之后,管道成功运行,因此依赖项的规范实际上在一开始就完成了SeqrGenesQCToESTask

def requires(self):
    return [SeqrMTToESTask()]

推荐阅读