首页 > 解决方案 > 将 presto 与气流集成

问题描述

我想将 Airflow 与 presto 集成。在 bash shell 上,命令 - */opt/presto/bin/presto --server 10.0.0.15:8190 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10"* 工作正常,但在python 气流脚本,它抛出错误无效的 sythax 并突出显示命令。请问什么是最好的方法来做到这一点。下面是气流脚本。

            import airflow
            from airflow.models import DAG
            from airflow.operators.bash_operator import BashOperator
            from airflow.operators.dummy_operator import DummyOperator
            from datetime import datetime, timedelta, date
            default_args = {
              'owner': 'daasuser',
              'depends_on_past': False,
              'start_date': airflow.utils.dates.days_ago(0),
              'email': ['a.olabamidele@ligadata.com'],
              'email_on_failure': True,
              #'email_on_retry': True,
              'retries': 5,
              'retry_delay': timedelta(minutes = 10),
              #'queue': 'bash_queue',
              #'pool': 'backfill',
              #'priority_weight': 10,
              #'end_date': datetime(2016, 1, 1),
              #'wait_for_downstream': False,
              #'dag': dag,
              #'sla': timedelta(hours = 2),
              #'execution_timeout': timedelta(seconds = 300),
              #'on_failure_callback': some_function,
              #'on_success_callback': some_other_function,
              #'on_retry_callback': another_function,
            }
            dag = DAG(
                'cvm_weekly_datamart',
                default_args=default_args,
                description='To insert records into cvm datamart weekly',
                schedule_interval='0 5 * * 0')
            #date1='/opt/presto/bin/presto --server 10.0.0.15:8190 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10"
            t1 = BashOperator(
                task_id='print_date',
                bash_command=/opt/presto/bin/presto --server 54.242.0.153:8180 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10",
                dag=dag)
            print1='echo "Completed"'
            t2 = BashOperator(
                task_id='print',
                bash_command=print1,
                #retries=1,
                dag=dag)
            t1>>t2

标签: airflowprestotrino

解决方案


命令应该是:

bash_command='/opt/presto/bin/presto --server 54.242.0.153:8180 --catalog hive --schema cvm_db --execute "select * from cvm_db.cvm_weekly_rech limit 10"'

命令应该是一个字符串。


推荐阅读