python - 在谷歌云作曲家气流 DAG 中导入错误
问题描述
我在谷歌作曲家(气流)中有一个 DAG 可以导入:
从气流.contrib.sensors.gcs_sensor 导入 GoogleCloudStorageObjectSensor
当我运行 DAG 时,出现此错误:
“ImportError:没有名为 sensors.base_sensor_operator 的模块”
基本上我想在做其他事情之前检查一个文件是否存在于存储桶中。
这是完整的python代码:
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import
GoogleCloudStorageToBigQueryOperator
from airflow.contrib.sensors.gcs_sensor import
GoogleCloudStorageObjectSensor
CONNECTION_ID = 'something'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 8, 21, 0, 0),
}
def print_hello():
return 'youtube folder exists!!'
with DAG('DATA_TRANSFER_GCP_BUCKET_TO_BQ2',
schedule_interval=timedelta(days=1),
default_args=default_args ) as dag:
gcp_sensorBucket=GoogleCloudStorageObjectSensor(
task_id='gcp_sensorbucket',
bucket='/aa_youtube_new/2018/06/04/',
#bucket='{{var.value.gcp_youtube_video_bucket}}/2018/06/04/',
object='*.csv',
google_cloud_conn_id=CONNECTION_ID
)
hello_operator = PythonOperator(task_id='hello_task',
python_callable=print_hello)
hello_operator.set_upstream(gcp_sensorBucket)
解决方案
I see that the import from base_sensor_operator was added in the version 1.10 of Airflow. This was not present in versions 1.8 and 1.9. Instead, the import was done as follows:
from airflow.operators.sensors import BaseSensorOperator
So, checking the sensors, the related sensor for Airflow 1.10 is under sensors but for Airflow 1.9 and 1.8, the sensor is under operators.
So, this issue seems to be related to the versioning of Composer and Airflow, but Airflow 1.10 wasn't available for Composer on August. In fact, this issue was reported on Google's Issue Tracker and the response is to solve this by using Apache Airflow version 1.10, which can be included on the version 1.3 of Composer.
推荐阅读
- angular6 - 如何在 Angular 6 中生成子组件
- c - 在C中将ASCII字符存储在数组中
- mysql - 通过命令行进行的 Flyway 迁移不起作用
- c - 如何使用 esp_bt_gap_read_rssi_delta 函数从 ESP32 获取蓝牙经典 RSSI?
- angular - Ionic 3项目和base64图像中http帖子中的数据参数
- python - Cefpython cx_freeze 错误
- java - 使用 Java 创建条形图
- spring - spring restservice 异步日志记录功能
- javascript - 如何通过类名获取“this.parent”元素值?
- linux - 如何从文件夹中的脚本文件中运行 100 个 csv 文件(一次一个文件)?