luigi - 带有单独任务的 Luigi 依赖项规范问题
问题描述
我有3
Luigi
任务:首先生成一个hadoop
写入Elasticsearch
到Elasticsearch
. 第三个任务与前两个任务相当断开,但我希望它在前两个任务完成后运行。第一个任务中可以有多个文件(相同类型),所以在第二个任务中我指定了这样的依赖项:
def requires(self):
return [SeqrVCFToMTTask()]
SeqrVCFToMTTask
作为第一个任务。它工作正常,所以前两个任务运行良好。现在,当我尝试在第三个任务中以相同的方式指定依赖项时:
def requires(self):
return [SeqrMTToESTask()]
(`SeqrMTToESTask` - name of the second task).
它失败并出现错误:
引发 HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info) elasticsearch.exceptions.RequestError: TransportError(400, '')
最终luigi
任务输出如下所示:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 1 complete ones were encountered:
- 1 SeqrVCFToMTTask(...)
* 1 failed:
- 1 SeqrMTToESTask(...)
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 SeqrGenesQCToESTask(...)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
我知道这与它无关,Elasticsearch
因为当省略依赖项(只是取消注释)时,3rd
任务 - - 单独运行良好。我应该如何在此处正确指定依赖项?我唯一需要的是在前两个完成后任务开始运行。SeqrGenesQCToESTask
requires
3rd
更新
更详细的代码:
class SeqrVCFToMTTask(HailMatrixTableTask):
reference_ht_path = luigi.Parameter(...
...
...
def run(self):
self.read_vcf_write_mt()
def read_vcf_write_mt(self, schema_cls=SeqrVariantsAndGenotypesSchema):
...
...
class SeqrMTToESTask(HailElasticSearchTask):
dest_file = luigi.Parameter()
def __init__(self, *args, **kwargs):
# TODO: instead of hardcoded index, generate from project_guid, etc.
super().__init__(*args, **kwargs)
def requires(self):
return [SeqrVCFToMTTask()]
def output(self):
filename = self.dest_file
return getTarget(filename)
def run(self):
mt = self.import_mt()
row_table = SeqrVariantsAndGenotypesSchema.elasticsearch_row(mt)
self.export_table_to_elasticsearch(row_table, self._mt_num_shards(mt))
class SeqrGenesQCToESTask(luigi.Task):
source_path = luigi.Parameter(...
...
...
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._es = ElasticsearchClient(host=self.es_host, port=self.es_port)
def requires(self):
return [SeqrMTToESTask()]
def output(self):
# TODO: Use https://luigi.readthedocs.io/en/stable/api/luigi.contrib.esindex.html.
filename = self.dest_file
return getTarget(filename)
def run(self):
// Doing some data transformations, then
// Exporting data to Elasticsearch
class HailMatrixTableTask(luigi.Task):
source_paths = luigi.Parameter(...
...
...
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
try:
# Locally it should be '_FrozenOrderedDict' but on AWS for an unknown reason only 'FrozenOrderedDict' works
self.source_paths = list(json.loads(self.source_paths, object_pairs_hook=luigi.parameter._FrozenOrderedDict))
except json.JSONDecodeError:
self.source_paths = [self.source_paths]
def requires(self):
# We only exclude globs in source path here so luigi does not check if the file exists
return [VcfFile(filename=s) for s in self.source_paths if '*' not in s]
def output(self):
filename = self.dest_path
return getTarget(filename)
def complete(self):
# Complete is called by Luigi to check if the task is done and will skip if it is.
# By default it checks to see that the output exists, but we want to check for the
# _SUCCESS file to make sure it was not terminated halfway.
filename = self.dest_path
full_path = os.path.join(filename, '_SUCCESS')
return getTarget(full_path).exists()
def run(self):
// Import file, then output in different format
class HailElasticSearchTask(luigi.Task):
project_guid = luigi.Parameter(...
...
...
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.es_index:
self.es_index = compute_index_name(args=public_class_props(self))
self._es = ElasticsearchClient(host=self.es_host, port=self.es_port)
def requires(self):
return [VcfFile(filename=self.source_path)]
def run(self):
mt = self.import_mt()
# TODO: Load into ES
def import_mt(self):
return hl.read_matrix_table(self.input()[0].path)
def export_table_to_elasticsearch(self, table, num_shards):
// Exports to ES index
更新
我怀疑它的发生是因为3rd
任务 - SeqrGenesQCToESTask
- 打开了连接Elasticsearch
,然后2nd
任务 - -由于错误发生在那里,SeqrMTToESTask
因此无法输出,在线:Elasticsearch
SeqrMTToESTask
self.export_table_to_elasticsearch(row_table, self._mt_num_shards(mt))
解决方案
我最终能够解决它,并且luigi
管道设置存在多个问题,而不是Elasticsearch
:
发生报告的错误是因为我正在调用
3rd
任务 -SeqrGenesQCToESTask
- 使用第二个任务也应该获得的参数:LUIGI_CONFIG_PATH=luigi_pipeline/configs/seqr-loading-local-GRCh38.cfg python -u gcloud_dataproc/submit.py --cpu-limit 4 --num-executors 2 --driver-memory 2g --executor-memory=4g -- hail-version 0.2 --run-locally luigi_pipeline/seqr_loading.py SeqrGenesQCToESTask --local-scheduler --spark-home $SPARK_HOME --project-guid 示例
以前我只是使用SeqrMTToESTask
而不是,SeqrGenesQCToESTask
但命令保持不变,所以通过更改任务,我省略了project-guid
参数,因为它没有在luigi_pipeline/configs/seqr-loading-local-GRCh38.cfg
run
修复了缺少参数的问题后,管道仍然失败,因为我需要在方法的末尾显式输出一个空文件SeqrMTToESTask
,但我认为这luigi
是自动执行的:with self.output().open('w') as out_file: out_file.write('{} task complete!'.format(type(self).__name__))
self.output()
返回hdfs.target
:
https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/hdfs/target.html
在修复这两个之后,管道成功运行,因此依赖项的规范实际上在一开始就完成了SeqrGenesQCToESTask
:
def requires(self):
return [SeqrMTToESTask()]
推荐阅读
- html - 我在 html 中的 iframe 没有显示内容。你能告诉我我的代码有什么问题吗?
- python - 分配容器中的物品 - 3D 装箱问题 | Python
- c# - 如何有效地检索具有与属性名称相似的命名字符串的特定类属性值
- python - 无法使用线性回归预测值。'float() 参数必须是字符串或数字,而不是 'datetime.datetime''
- javascript - React Native中设备连接互联网时如何将本地存储数据自动推送到服务器?
- azure - 运行 Azure Powershell 任务时出错:请提供有效的租户或有效的订阅
- javascript - 如何避免量角器中的 StaleElementReferenceError
- c# - 如何从服务总线触发的消息中获取用户属性?
- hibernate - @Transactional(propagation = Propagation.NOT_SUPPORTED) 与根本没有 @Transactional 注释
- r - 从自定义 R 包构建的函数调用返回旧版本函数的结果