首页 > 解决方案 > Luigi 全局变量

问题描述

我想在 Luigi 中设置一些目标路径作为全局变量。

原因是我使用的目标路径是基于给定数值天气预报 (NWP) 的最后一次运行,并且需要一些时间才能获得该值。一旦我检查了最后一次运行,我将创建一个路径,我将在其中放置多个目标文件(具有相同的父文件夹)。

我目前正在重复一个类似的调用来获取多个任务的父路径的值,并且将此路径设置为全局变量会非常有效。我试图从一个由 luigi 类调用的函数 (get_target_path) 中定义全局变量,但是当我回到 Luigi 管道时,看起来全局变量不会持续存在。

这也是我的代码的样子:

class GetNWP(luigi.Task):
    """
    Download the NWP data.
    """
    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def requires(self):
        return None
    def output(self):
        path = get_target_path(self.product_id, self.date, self.run_hr,
                               type='getNWP')
        return luigi.LocalTarget(path)
    def run(self):
        download_nwp_data(self.product_id, self.date, self.run_hr)


class GetNWP_GFS(luigi.Task):
    """
    GFS data.
    """
    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def requires(self):
        return None
    def output(self):
        path = get_target_path(self.product_id_PV, self.date, self.run_hr,
                               type='getNWP_GFS')
        return luigi.LocalTarget(path)
    def run(self):
        download_nwp_data(self.product_id, self.date, self.run_hr,
                          type='getNWP_GFS')


class Predict(luigi.Task):
    """
    Create forecast.
    """
    product_id = luigi.Parameter(default=None)
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')
    horizon = luigi.Parameter(default='DA')

    def requires(self):
        return [
                GetNWP_GFS(self.product_id, self.date, self.run_hr),
                GetNWP(self.product_id, self.date, self.run_hr)
                ]
    def output(self):
        path = get_target_path(self.product_id, self.date, self.run_hr,
                               type='predict', horizon=self.horizon)
        return luigi.LocalTarget(path)
    def run(self):
        get_forecast(self.product_id, self.date, self.run_hr)

函数 get_target_path 根据输入参数定义目标路径。我希望这个函数设置可以从 Luigi 访问的全局变量。例如如下(只是 getNWP 任务的代码):

def get_target_path(product_id, date, run_hr, type=None, horizon='DA'):
        """
        Obtain target path.
        """
        if type == 'getNWP_GFS':
            if 'path_nwp_gfs' in globals():
                return path_nwp_gfs
            else:
                ...
        elif type == 'getNWP':
            if 'path_nwp_model' in globals():
                return path_nwp_model
            else:
                filename = f'{nwp_model}_{date}_{run_hr}_{horizon}.{ext}'
                path = Path(db_dflt['app_data']['nwp_folder'])
                create_directory(path)
                global path_nwp_model
                path_nwp_model = Path(path) / filename
        elif type == 'predict':
            if 'path_predict' in globals():
                return path_predict
            else:
                ...

当我回到 Luigi 时,这个函数中定义的全局变量不存在。

任何有关如何解决此问题的想法将不胜感激!

标签: python-3.xluigi

解决方案


由于似乎没有内置方法来存储 Luigi 目标的路径,我最终决定创建一个包含与 Luigi 目标/路径相关的所有信息的类。当调用需要知道哪些是目标路径的外部函数时,此类在 Luigi 的任务中使用。

此类在主 luigy 脚本中导入,并在定义任务之前实例化:

from .utils import Targets
paths = Targets()

class GetNWP(luigi.Task):
    """Download NWP data required to prepare the prediction."""

    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def requires(self):
        return GetProductInfo(self.product_id)
    def output(self):
        path = paths.getpath_nwp(self.product_id, self.date, self.run_hr)
        path_gfs = paths.getpath_nwp_GFS(self.product_id, self.date, self.run_hr)
        return [luigi.LocalTarget(path),
                luigi.LocalTarget(path_gfs)]
    def run(self):
        download_nwp_data(self.product_id, date=self.date, run_hr=self.run_hr,
                          paths=paths, nwp_model=paths.nwp_model)
        download_nwp_data(self.product_id, date=self.date, run_hr=self.run_hr,
                          paths=paths, nwp_model=paths.gfs_model)   

class Predict(luigi.Task):
    """Create forecast based on the product information and NWP data."""

    product_id = luigi.Parameter()
    date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
    run_hr = luigi.Parameter(default='latest')

    def requires(self):
        return GetNWP(self.product_id, self.date, self.run_hr)
    def output(self):
        path = paths.getpath_predict(self.product_id, self.date, self.run_hr)
        path_gfs = paths.getpath_predict_GFS(self.product_id, self.date,
                                             self.run_hr)
        return [luigi.LocalTarget(path),
                luigi.LocalTarget(path_gfs)]
    def run(self):
        get_forecast(product_id=self.product_id, date=self.date,
                     run_hr=self.run_hr, paths=paths, nwp_model=paths.nwp_model)
        get_forecast(product_id=self.product_id, date=self.date,
                     run_hr=self.run_hr, paths=paths, nwp_model=paths.gfs_model)

其中 Targets 类具有以下结构:

class Targets:
    """Store Luigi's target paths."""

    def __init__(self):
        """Initialize paths and variables."""
        self.path1 = None
        self.path2 = None
        self.path3 = None

    def update_object(self, product_id, date=None, run_hr=None):
        """Update object based on inputs."""
        if self.prod_id is None:
            self.prod_id = product_id
        if self.path_1 is None:
            self.get_path_1(product_id)
        if self.path_2 is None:
            self.get_path_2(product_id)
        if self.path_3 is None:
            self.get_path_3(product_id)

    def get_path_1(self, product_id, ...)
        """Generate a path 1 for a luigi Task."""
        ... define self.path_1...

    def get_path_2(self, product_id, ...)
        """Generate a path 2 for a luigi Task."""
        ... define self.path_2...

    def get_path_3(self, product_id, ...)
        """Generate a path 3 for a luigi Task."""
        ... define self.path_3...

主要思想是只设置一次目标路径,并在每个 Luigi 任务中使用它们作为输入参数。这允许:

  • 更快地执行任务,并且
  • 如果目标路径因新的 NWP 可用而发生更改,请避免错误。

推荐阅读