python - 气流 - 如何使用来自另一个传感器任务的传感器
问题描述
我正在尝试检查远程服务器上是否存在文件,如果存在,请检查行数是否为 0。如果行数大于 0,则管道应继续,如果不存在,我希望传感器继续检查(文件名称中有日期,所以第二天可能新文件不为空)
任何人都可以帮助阐明如何实现这一点吗?我在想我可以在检查行的python函数中使用SFTP传感器吗?如果是这样,我怎么能使用另一个传感器?非常感谢
解决方案
您可以制作一个实现这两项任务的常规传感器,这里是如何实现此任务的概述,您必须将此文件放在气流内的插件文件夹中,然后您可以将其作为 DAG 的一部分导入并使用。
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
import requests
import logging
import json
DEFAULT_CONNECTION_DETAILS = { "host": "127.0.0.1", "password": "wololo" }
log = logging.getLogger( __name__ )
class Remote_File_Row_Sensor( BaseSensorOperator ):
@apply_defaults
def __init__( self, file_name, connection_details= DEFAULT_CONNECTION_DETAILS, *args, **kwargs ):
super( Remote_File_Row_Sensor, self ).__init__( *args, **kwargs )
self.connection_details = connection_details
self.file_name = file_name
def poke( self, context ):
connection_details = self.connection_details
file_name = self.file_name
ROW_COUNT = 0
# Your code here to connect using SFTP and read the file for the row count
if ROW_COUNT == 0:
return False
else:
return True
class Remote_File_Row_Plugin( AirflowPlugin ):
name = "remote_file_row_sensor"
operators = [ Remote_File_Row_Sensor ]
推荐阅读
- java - System.out.print() 不起作用,log4j 仍然可以正常工作
- php - 在 codeigniter 控制器中覆盖第三方/帮助程序/库函数
- postgresql - postgres lo_unlink 不删除对象
- wordpress - 获得货币的价格尊重
- mysql - Docker compose 等待数据库服务初始化
- android - Android ROM(LineageOS 16)上的蓝牙损坏
- html - R - kableExtra - 如何尽可能多地加密表格的 html 输出,然后将其转换为单词
- c# - 从 RestResponse 中提取 Location 标头的最佳方法
- jwplayer - IE11 中 JWPlayer 8 上的 HLS 流式传输
- regex - vim 替换 crontab 调度部分