python - 在 Google Cloud Composer 上添加自定义运算符
问题描述
我正在尝试向 Google Cloud Composer (Airflow) 添加自定义运算符,但似乎找不到该运算符。我在这方面花了很多时间并尝试过:
我已经修改了示例中的代码以尝试获取操作员。
dags/my_dag.py
import datetime
from airflow import DAG
# from airflow.models import Variable
# from airflow.operators import StopInstanceOperator
from airflow.operators.my_operator import StopInstanceOperator
# [END dag_imports]
# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]
# [START dag]
dag1 = DAG('backup_vm_instance',
description='Backup a Compute Engine instance using an Airflow DAG',
schedule_interval=INTERVAL,
start_date=START_DATE,
catchup=False)
# [END dag]
## Compute Engine tasks
stop_instance = StopInstanceOperator(
project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]
# Airflow DAG definition
begin >> stop_instance
插件/my_operator.py
import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]
class StopInstanceOperator(BaseOperator):
"""Stops the virtual machine instance."""
@apply_defaults
def __init__(self, project, zone, instance, *args, **kwargs):
self.project = project
self.zone = zone
self.instance = instance
super(StopInstanceOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info("Hello world")
# [END stop_oper_no_xcom]
class GoogleComputeEnginePlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'gce_commands_plugin'
operators = [StopInstanceOperator]
这是代码/结构,我在 Airflow 上得到的错误是
Broken DAG: [/home/airflow/gcs/dags/my_dag.py] No module named 'airflow.operators.my_operator'
解决方案
我已经测试了您的 Composer 文件,并且在清理代码后它似乎工作正常。
首先,请确保您被授予以下权限,这是添加和更新插件所必需的:
- storage.objectAdmin上传文件
- composer.environments.get查找 DAG 目标存储桶。使用 Cloud Storage API 或 gsutil 时不需要此权限
从plugins/my_operator.py
:
# [START imports]
import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]
class StopInstanceOperator(BaseOperator):
"""Stops the virtual machine instance."""
@apply_defaults
def __init__(self, project, zone, instance, *args, **kwargs):
self.project = project
self.zone = zone
self.instance = instance
super(StopInstanceOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info('Hello world!')
class GoogleComputeEnginePlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'gce_commands_plugin'
operators = [StopInstanceOperator]
代码是正确的,您需要注意name = 'gce_commands_plugin'
变量,它为插件提供了内部名称gce_commands_plugin
(因此您可以在 DAG 文件中引用它)并为其添加一个运算符(StopInstanceOperator
)。
比,我们有dags/my_dag.py
:
# [START dag_imports]
import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.gce_commands_plugin import StopInstanceOperator
# [END dag_imports]
# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]
# [START dag]
dag1 = DAG('backup_vm_instance',
description='Backup a Compute Engine instance using an Airflow DAG',
schedule_interval=INTERVAL,
start_date=START_DATE,
catchup=False)
# [END dag]
# [START operators]
## Compute Engine tasks
stop_instance = StopInstanceOperator(
project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]
我在这里注意到:
from airflow.operators.my_operator import StopInstanceOperator
您正在尝试引用插件的文件名,出了什么问题。您必须从文件中引用name = 'gce_commands_plugin'
变量my_operator.py
:
from airflow.operators.gce_commands_plugin import StopInstanceOperator
您不能使用以下代码片段:
# Airflow DAG definition
begin >> stop_instance
那是因为您决定不使用dummy_operator
,它定义了begin
变量。
当您的文件准备好后,您可以将它们复制到 Composer 的存储桶中,并在 Airflow UI 中看到积极的结果:
gsutil cp $HOME/my_operator.py gs://<COMPOSER_BUCKET_NAME>/plugins
gsutil cp $HOME/my_dag.py gs://<COMPOSER_BUCKET_NAME>/dags
我希望它有所帮助。
推荐阅读
- javascript - 通过对象内部的属性过滤对象数组,该对象包含在迭代对象内部的数组中[Javascript]
- laravel - Laravel 5.7 Guard 和 Middleware 不工作
- python - 纯python数据库驱动
- flask - Flask WTForms - 如何跳过对不可见 div 中的字段的验证?
- javascript - 在 PHP 变量中处理内联 JS 中的单引号和双引号时遇到问题
- android - java.lang.NoSuchMethodError:没有虚拟方法 getMicrophones()
- ms-access - 如何在来自聚合查询的 Access 表中插入一组记录
- python - 如果列表元素小于阈值,则尝试删除它?
- sql - SSIS 'Column' 和 'Parameter' 无法在 unicode 和非 unicode 字符串数据类型之间转换
- java - 如何使用类似于 JSP 的 Thymeleaf 将表单数据从一个 HTML 传递到另一个?