首页 > 解决方案 > 在 Google Cloud Composer 上添加自定义运算符

问题描述

我正在尝试向 Google Cloud Composer (Airflow) 添加自定义运算符,但似乎找不到该运算符。我在这方面花了很多时间并尝试过:

我已经修改了示例中的代码以尝试获取操作员。

dags/my_dag.py

import datetime
from airflow import DAG
# from airflow.models import Variable
# from airflow.operators import StopInstanceOperator
from airflow.operators.my_operator import StopInstanceOperator
# [END dag_imports]

# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]

# [START dag]
dag1 = DAG('backup_vm_instance',
           description='Backup a Compute Engine instance using an Airflow DAG',
           schedule_interval=INTERVAL,
           start_date=START_DATE,
           catchup=False)
# [END dag]

## Compute Engine tasks
stop_instance = StopInstanceOperator(
    project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]

# Airflow DAG definition
begin >> stop_instance

插件/my_operator.py

import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]


class StopInstanceOperator(BaseOperator):
  """Stops the virtual machine instance."""

  @apply_defaults
  def __init__(self, project, zone, instance, *args, **kwargs):
    self.project = project
    self.zone = zone
    self.instance = instance
    super(StopInstanceOperator, self).__init__(*args, **kwargs)

  def execute(self, context):
    logging.info("Hello world")
    # [END stop_oper_no_xcom]

class GoogleComputeEnginePlugin(AirflowPlugin):
  """Expose Airflow operators."""

  name = 'gce_commands_plugin'
  operators = [StopInstanceOperator]

这是代码/结构,我在 Airflow 上得到的错误是

Broken DAG: [/home/airflow/gcs/dags/my_dag.py] No module named 'airflow.operators.my_operator'

标签: pythonairflowgoogle-cloud-composer

解决方案


我已经测试了您的 Composer 文件,并且在清理代码后它似乎工作正常。

首先,请确保您被授予以下权限,这是添加和更新插件所必需的:

  • storage.objectAdmin上传文件
  • composer.environments.get查找 DAG 目标存储桶。使用 Cloud Storage API 或 gsutil 时不需要此权限

plugins/my_operator.py

# [START imports]
import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]

class StopInstanceOperator(BaseOperator):
  """Stops the virtual machine instance."""

  @apply_defaults
  def __init__(self, project, zone, instance, *args, **kwargs):
    self.project = project
    self.zone = zone
    self.instance = instance
    super(StopInstanceOperator, self).__init__(*args, **kwargs)

  def execute(self, context):
    logging.info('Hello world!')

class GoogleComputeEnginePlugin(AirflowPlugin):
  """Expose Airflow operators."""

  name = 'gce_commands_plugin'
  operators = [StopInstanceOperator]

代码是正确的,您需要注意name = 'gce_commands_plugin'变量,它为插件提供了内部名称gce_commands_plugin(因此您可以在 DAG 文件中引用它)并为其添加一个运算符(StopInstanceOperator)。

比,我们有dags/my_dag.py

# [START dag_imports]
import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.gce_commands_plugin import StopInstanceOperator
# [END dag_imports]

# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]

# [START dag]
dag1 = DAG('backup_vm_instance',
           description='Backup a Compute Engine instance using an Airflow DAG',
           schedule_interval=INTERVAL,
           start_date=START_DATE,
           catchup=False)
# [END dag]

# [START operators]
## Compute Engine tasks
stop_instance = StopInstanceOperator(
    project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]

我在这里注意到:

from airflow.operators.my_operator import StopInstanceOperator

您正在尝试引用插件的文件名,出了什么问题。您必须从文件中引用name = 'gce_commands_plugin'变量my_operator.py

from airflow.operators.gce_commands_plugin import StopInstanceOperator

您不能使用以下代码片段:

# Airflow DAG definition
begin >> stop_instance

那是因为您决定不使用dummy_operator,它定义了begin变量。

当您的文件准备好后,您可以将它们复制到 Composer 的存储桶中,并在 Airflow UI 中看到积极的结果:

gsutil cp $HOME/my_operator.py gs://<COMPOSER_BUCKET_NAME>/plugins

gsutil cp $HOME/my_dag.py gs://<COMPOSER_BUCKET_NAME>/dags

我希望它有所帮助。


推荐阅读