airflow - 气流 HdfsSensor 未检测到
问题描述
我正在使用 Airflow 的 HdfsSensor 来检测 hdfs 目录。我们有kerberized集群。我的代码不断戳目录未检测到如下所示
[2020-08-25 13:57:19,808] {hdfs_sensor.py:100} INFO - Poking for file /tmp/ayush/hive/sensor/event_date=2020-08-25
[2020-08-25 13:58:19,871] {hdfs_sensor.py:100} INFO - Poking for file /tmp/ayush/hive/sensor/event_date=2020-08-25
这是我的代码
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.operators.email_operator import EmailOperator
from airflow.sensors.hdfs_sensor import HdfsSensor
from airflow.operators.bash_operator import BashOperator
DAG_ID = 'Sensor_Test'
args = {
'owner': 'Airflow',
'start_date': datetime(year=2020, month=8, day=20)
}
dag = DAG(dag_id=DAG_ID,
default_args=args,
schedule_interval='30 6 * * *',
catchup=False)
source_data_sensor = HdfsSensor(
task_id='source_data_sensor',
filepath='/tmp/ayush/hive/sensor/event_date={{ ds }}',
dag=dag
)
dag >> source_data_sensor
这是kerberos的问题还是其他问题
在 hdfs_conn_id 我使用默认 hdfs_default 连接
我还可以使用我在连接中提供的主机名来查看目录
解决方案
在 HDFSsensor 任务中提供 HDFS 连接 ID
hdfs_sense_open = HdfsSensorImp(
task_id='hdfs_sense_open',
filepath='/user/xxxx/hosts',
hdfs_conn_id='hdfs_folder', ##add this
dag=dag)