airflow - 如何将代理配置传递到 Airflow S3 连接
问题描述
我有一个运行气流的 ec2 服务器,我必须为所有外部 https 请求使用代理。以下函数在 dag 中使用。
import boto3
from botocore.config import Config
def get_files(**context):
s3 = boto3.client('s3',config=Config(proxies={'https': 'mycorpsproxy.com:3128'}))
s3_bucket = "some_bucket"
paginator = s3.get_paginator("list_objects")
page_iterator = paginator.paginate(
Bucket=s3_bucket, Prefix="folder_like_prefix/"
current_files = []
for page in page_iterator:
if 'Contents' not in page:
continue
for object in page['Contents']:
current_files.append(object['Key'])
return current_files
但我不想在每个 dag 中都对代理进行硬编码。我想使用 s3 挂钩,但找不到在气流连接的额外字段中放置什么以使其工作。
我尝试了一些变化,例如
但是当我运行 dag 时出现 json 错误
[2019-11-04 14:08:01,266] {logging_mixin.py:112} INFO - [2019-11-04 14:08:01,266] {connection.py:296} ERROR - Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/airflow/models/connection.py", line 294, in extra_dejson
obj = json.loads(self.extra)
File "/usr/lib/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
[2019-11-04 14:08:01,267] {logging_mixin.py:112} INFO - [2019-11-04 14:08:01,266] {connection.py:297} ERROR - Failed parsing the json for conn_id s3_airflow
解决方案
在文档中写道:config_kwargs:用于构造传递给 boto3.client 和 boto3.resource 的 botocore.config.Config 的附加 kwargs。
请参阅:https ://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html
因此,您可以像这样在 UI 中添加额外的参数: {"host":"https://target:port", "aws_access_key_id":"my_key_id", "aws_secret_access_key": "my_access_key", "config_kwargs": {"proxies" :{"http": 'my.insane.proxy:8080', "https": 'my.insane.proxy:8080'}}}
推荐阅读
- google-chrome - engine: 'chrome' 和 engine: 'chromy' 在 backstop.config 有什么区别?
- javascript - 如何从事务保存搜索/Suitescript 搜索中获取销售订单“shipaddresslist”值?
- javascript - 为什么我的循环只返回 HTML 集合中的第一个值
- opencl - OpenCL:clSetKernelArg 与 clSetKernelArg + clEnqueueWriteBuffer
- r - 在pdf输出中编织时如何在R Markdown中证明(两边)文本
- python - 告诉 LabelEnocder 忽略新标签?
- mysql - 通过 magmi 导入的产品不会自动保存
- r - 如何使用 write_tsv 保存的 tibble 可以被 read_tsv 读取
- c - 使用 gcc 在 64 位中使用数学内核库编译程序?
- javascript - 如何防止在for循环中自动提交表单