airflow - 将 presto 与气流集成
问题描述
我想将 Airflow 与 presto 集成。在 bash shell 上,命令 - */opt/presto/bin/presto --server 10.0.0.15:8190 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10"* 工作正常,但在python 气流脚本,它抛出错误无效的 sythax 并突出显示命令。请问什么是最好的方法来做到这一点。下面是气流脚本。
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta, date
default_args = {
'owner': 'daasuser',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['a.olabamidele@ligadata.com'],
'email_on_failure': True,
#'email_on_retry': True,
'retries': 5,
'retry_delay': timedelta(minutes = 10),
#'queue': 'bash_queue',
#'pool': 'backfill',
#'priority_weight': 10,
#'end_date': datetime(2016, 1, 1),
#'wait_for_downstream': False,
#'dag': dag,
#'sla': timedelta(hours = 2),
#'execution_timeout': timedelta(seconds = 300),
#'on_failure_callback': some_function,
#'on_success_callback': some_other_function,
#'on_retry_callback': another_function,
}
dag = DAG(
'cvm_weekly_datamart',
default_args=default_args,
description='To insert records into cvm datamart weekly',
schedule_interval='0 5 * * 0')
#date1='/opt/presto/bin/presto --server 10.0.0.15:8190 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10"
t1 = BashOperator(
task_id='print_date',
bash_command=/opt/presto/bin/presto --server 54.242.0.153:8180 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10",
dag=dag)
print1='echo "Completed"'
t2 = BashOperator(
task_id='print',
bash_command=print1,
#retries=1,
dag=dag)
t1>>t2
解决方案
命令应该是:
bash_command='/opt/presto/bin/presto --server 54.242.0.153:8180 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10"'
命令应该是一个字符串。
推荐阅读
- ios - 如何在商店中没有应用的情况下配置 App Clip
- flutter - 孩子和括号的问题
- javascript - 如何单击卡片并在单击下一步按钮时将根据所选卡片重定向?
- c# - 在 c# 控制台应用程序中更新 Excel 文件后无法保存它
- javascript - React Router v6:无法读取未定义的属性“用户名”
- javascript - 当javascript中的变量为“否”时,循环ajax调用以退出
- apache-kafka - Kafka KTable Materialized-State-Store 控制
- calculation - DialogFlow CX 计算值
- powershell - PowerShell 导出
- r - 如何根据变量的首字母对新变量进行分组?