python - Composer 没有看到数据流作业成功
问题描述
我正在使用 Gcloud Composer 启动 Dataflow 作业。
我的 DAG 包含两个应一个接一个运行的 Dataflow 作业。
import datetime
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models
default_dag_args = {
'start_date': datetime.datetime(2019, 10, 23),
'dataflow_default_options': {
'project': 'myproject',
'region': 'europe-west1',
'zone': 'europe-west1-c',
'tempLocation': 'gs://somebucket/',
}
}
with models.DAG(
'some_name',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
parameters = {'params': "param1"}
t1 = DataflowTemplateOperator(
task_id='dataflow_example_01',
template='gs://path/to/template/template_001',
parameters=parameters,
dag=dag)
parameters2 = {'params':"param2"}
t2 = DataflowTemplateOperator(
task_id='dataflow_example_02',
template='gs://path/to/templates/template_002',
parameters=parameters2,
dag=dag
)
t1 >> t2
当我签入数据流时,作业已成功,它应该创建的所有文件都已创建,但它似乎在美国地区运行,云作曲家环境在欧洲西部。
在气流中,我可以看到第一个作业仍在运行,因此第二个作业未启动
我应该向 DAG 添加什么以使其成功?我如何在欧洲跑步?
任何有关如何进行的建议或解决方案将不胜感激。谢谢!
解决方案
我过去必须解决这个问题。在 Airflow 1.10.2(或更低版本)中,代码调用service.projects().templates().launch()
端点。这已在1.10.3中修复,其中使用区域性的:service.projects().locations().templates().launch()
.
截至 2019 年 10 月,可用于 Composer 环境的最新 Airflow 版本是 1.10.2。如果您立即需要解决方案,可以将修复程序反向移植到 Composer。
为此,我们可以覆盖DataflowTemplateOperator
我们自己的版本,称为RegionalDataflowTemplateOperator
:
class RegionalDataflowTemplateOperator(DataflowTemplateOperator):
def execute(self, context):
hook = RegionalDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
self.parameters, self.template)
现在,这将使用修改后的操作符RegionalDataFlowHook
的start_template_dataflow
方法DataFlowHook
来调用正确的端点:
class RegionalDataFlowHook(DataFlowHook):
def _start_template_dataflow(self, name, variables, parameters,
dataflow_template):
...
request = service.projects().locations().templates().launch(
projectId=variables['project'],
location=variables['region'],
gcsPath=dataflow_template,
body=body
)
...
return response
然后,我们可以使用我们的新运算符和 Google 提供的模板(用于测试目的)创建一个任务:
task = RegionalDataflowTemplateOperator(
task_id=JOB_NAME,
template=TEMPLATE_PATH,
parameters={
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
'output': 'gs://{}/europe/output'.format(BUCKET)
},
dag=dag,
)
完整的工作 DAG在这里。对于更清洁的版本,操作员可以移动到单独的模块中。
推荐阅读
- java - Java 11 到 14 迁移 - 使用 Jackson 对 Optionals 进行反序列化
- angular - Angular 8中对同一行为主题可观察/数据源的多次订阅间歇性失败
- laravel - 在 Laravel 的 Backpack 中删除过滤器的标签
- c# - 我怎样才能避免文件创建 2 次,只有一个文件的内容应该编码 base64 字符串
- libreoffice-calc - 会计下划线怎么做
- java - @Transactional 在休眠拦截器中不起作用
- postgresql - 启用 Istio 注入时无法连接到 Azure Database for PostgreSQL 服务器
- android-studio - 为 Wear OS 和普通应用程序构建的 Android Studio 项目,但共享源文件
- javascript - 如何编写正则表达式来验证 [1,65535] 之间的字符串
- keycloak - 需要如何设置客户端才能让 account_api 工作?