首页 > 解决方案 > 气流 - 如何使用来自另一个传感器任务的传感器

问题描述

我正在尝试检查远程服务器上是否存在文件,如果存在,请检查行数是否为 0。如果行数大于 0,则管道应继续,如果不存在,我希望传感器继续检查(文件名称中有日期,所以第二天可能新文件不为空)

任何人都可以帮助阐明如何实现这一点吗?我在想我可以在检查行的python函数中使用SFTP传感器吗?如果是这样,我怎么能使用另一个传感器?非常感谢

标签: pythonairflow

解决方案


您可以制作一个实现这两项任务的常规传感器,这里是如何实现此任务的概述,您必须将此文件放在气流内的插件文件夹中,然后您可以将其作为 DAG 的一部分导入并使用。

from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators  import apply_defaults
from airflow.plugins_manager   import AirflowPlugin

import requests
import logging
import json

DEFAULT_CONNECTION_DETAILS = { "host": "127.0.0.1", "password": "wololo" }

log = logging.getLogger( __name__ )

class Remote_File_Row_Sensor( BaseSensorOperator ):

    @apply_defaults
    def __init__( self, file_name, connection_details= DEFAULT_CONNECTION_DETAILS, *args, **kwargs ):
        super( Remote_File_Row_Sensor, self ).__init__( *args, **kwargs )
        self.connection_details = connection_details
        self.file_name          = file_name

    def poke( self, context ):
        connection_details   = self.connection_details
        file_name            = self.file_name

        ROW_COUNT = 0

        # Your code here to connect using SFTP and read the file for the row count

        if ROW_COUNT == 0:
            return False
        else:
            return True

class Remote_File_Row_Plugin( AirflowPlugin ):
    name      = "remote_file_row_sensor"
    operators = [ Remote_File_Row_Sensor ]

推荐阅读