python - Apache Airflow - 连接到 AWS S3 错误
问题描述
我正在尝试使用 Connection 对象在 Apache Airflow 中获取 S3 挂钩。
它看起来像这样:
class S3ConnectionHandler:
def __init__():
# values are read from configuration class, which loads from env. variables
self._s3 = Connection(
conn_type="s3",
conn_id=config.AWS_CONN_ID,
login=config.AWS_ACCESS_KEY_ID,
password=config.AWS_SECRET_ACCESS_KEY,
extra=json.dumps({"region_name": config.AWS_DEFAULT_REGION}),
)
@property
def s3(self) -> Connection:
return get_live_connection(self.logger, self._s3)
@property
def s3_hook(self) -> S3Hook:
return self.s3.get_hook()
我收到一个错误:
Broken DAG: [...] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py", line 282, in get_hook
return hook_class(**{conn_id_param: self.conn_id})
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 354, in __init__
raise AirflowException('Either client_type or resource_type must be provided.')
airflow.exceptions.AirflowException: Either client_type or resource_type must be provided.
为什么会这样?据我了解,S3Hook 从父类 AwsHook 调用构造函数,并将 client_type 作为“s3”字符串传递。我怎样才能解决这个问题?
我从这里把这个配置用于钩子。
编辑:直接创建 S3 挂钩时,我什至遇到同样的错误:
@property
def s3_hook(self) -> S3Hook:
#return self.s3.get_hook()
return S3Hook(
aws_conn_id=config.AWS_CONN_ID,
region_name=self.config.AWS_DEFAULT_REGION,
client_type="s3",
config={"aws_access_key_id": self.config.AWS_ACCESS_KEY_ID, "aws_secret_access_key": self.config.AWS_SECRET_ACCESS_KEY}
)
``
解决方案
如果您使用的是 Airflow 2,请参阅新文档- 这可能有点棘手,因为大多数谷歌搜索会将您重定向到旧文档。
在我的情况下,我正在使用AwsHook
并且必须切换到,AwsBaseHook
因为它似乎是版本 2 中唯一且正确的。我也必须切换导入路径,现在 aws 的东西contrib
不再在它下面了providers
正如您在新文档中看到的那样,您可以将 client_type 或 resource_type 作为 AwsBaseHook 参数传递,具体取决于您要使用的参数。一旦你这样做,你的问题应该得到解决
推荐阅读
- mysql - MSQL 从带有子查询和 1 个值的选择语句中插入多行
- reactjs - 这是使用 React-Three-Fiber 每秒渲染精灵位置的最佳方式吗?
- excel - 使用 Kotlin 阅读在线 Excel 电子表格
- kotlin - 使用 swagger codegen 使 Kotlin 模型可序列化
- flutter - 如何在颤动的提升按钮中创建渐变颜色?
- reactjs - echarts-for-react 以编程方式展开和折叠树中的叶子
- sql - Postgres 通过赋予特定列更多优先级来获取值
- ios - 单元测试模块导入引发未定义符号错误
- java - switchMapSingle() 不会取消先前的请求
- html - 响应式 flexbox 填充所有浏览器高度