首页 > 解决方案 > Composer 没有看到数据流作业成功

问题描述

我正在使用 Gcloud Composer 启动 Dataflow 作业。

我的 DAG 包含两个应一个接一个运行的 Dataflow 作业。

import datetime

from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models


default_dag_args = {

    'start_date': datetime.datetime(2019, 10, 23),
    'dataflow_default_options': {
               'project': 'myproject',
               'region': 'europe-west1',
               'zone': 'europe-west1-c',
               'tempLocation': 'gs://somebucket/',
               }
}

with models.DAG(
        'some_name',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    parameters = {'params': "param1"}

    t1 = DataflowTemplateOperator(
        task_id='dataflow_example_01',
        template='gs://path/to/template/template_001',
        parameters=parameters,
        dag=dag)

    parameters2 = {'params':"param2"}

    t2 = DataflowTemplateOperator(
        task_id='dataflow_example_02',
        template='gs://path/to/templates/template_002',
        parameters=parameters2,
        dag=dag
    )

    t1 >> t2

当我签入数据流时,作业已成功,它应该创建的所有文件都已创建,但它似乎在美国地区运行,云作曲家环境在欧洲西部。

在气流中,我可以看到第一个作业仍在运行,因此第二个作业未启动

在此处输入图像描述

我应该向 DAG 添加什么以使其成功?我如何在欧洲跑步?

任何有关如何进行的建议或解决方案将不胜感激。谢谢!

标签: pythongoogle-cloud-dataflowgoogle-cloud-composer

解决方案


我过去必须解决这个问题。在 Airflow 1.10.2(或更低版本)中,代码调用service.projects().templates().launch()端点。这已在1.10.3中修复,其中使用区域性的:service.projects().locations().templates().launch().

截至 2019 年 10 月,可用于 Composer 环境的最新 Airflow 版本是 1.10.2。如果您立即需要解决方案,可以将修复程序反向移植到 Composer。

为此,我们可以覆盖DataflowTemplateOperator我们自己的版本,称为RegionalDataflowTemplateOperator

class RegionalDataflowTemplateOperator(DataflowTemplateOperator):
  def execute(self, context):
    hook = RegionalDataFlowHook(gcp_conn_id=self.gcp_conn_id,
                        delegate_to=self.delegate_to,
                        poll_sleep=self.poll_sleep)

    hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
                                 self.parameters, self.template)

现在,这将使用修改后的操作符RegionalDataFlowHookstart_template_dataflow方法DataFlowHook来调用正确的端点:

class RegionalDataFlowHook(DataFlowHook):
  def _start_template_dataflow(self, name, variables, parameters,
                               dataflow_template):
      ...
      request = service.projects().locations().templates().launch(
          projectId=variables['project'],
          location=variables['region'],
          gcsPath=dataflow_template,
          body=body
      )
      ...
      return response

然后,我们可以使用我们的新运算符和 Google 提供的模板(用于测试目的)创建一个任务:

task = RegionalDataflowTemplateOperator(
    task_id=JOB_NAME,
    template=TEMPLATE_PATH,
    parameters={
        'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
        'output': 'gs://{}/europe/output'.format(BUCKET)
    },
    dag=dag,
)

完整的工作 DAG在这里。对于更清洁的版本,操作员可以移动到单独的模块中。


推荐阅读