首页 > 解决方案 > 如何将代理配置传递到 Airflow S3 连接

问题描述

我有一个运行气流的 ec2 服务器,我必须为所有外部 https 请求使用代理。以下函数在 dag 中使用。

import boto3
from botocore.config import Config

def get_files(**context):

    s3 = boto3.client('s3',config=Config(proxies={'https': 'mycorpsproxy.com:3128'}))
    s3_bucket = "some_bucket"
    paginator = s3.get_paginator("list_objects")
    page_iterator = paginator.paginate(
        Bucket=s3_bucket, Prefix="folder_like_prefix/"
    current_files = []
    for page in page_iterator:
        if 'Contents' not in page:
            continue
        for object in page['Contents']:
            current_files.append(object['Key'])
    return current_files

但我不想在每个 dag 中都对代理进行硬编码。我想使用 s3 挂钩,但找不到在气流连接的额外字段中放置什么以使其工作。

我尝试了一些变化,例如

预填充 S3 的气流连接 Web 界面

但是当我运行 dag 时出现 json 错误

[2019-11-04 14:08:01,266] {logging_mixin.py:112} INFO - [2019-11-04 14:08:01,266] {connection.py:296} ERROR - Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/connection.py", line 294, in extra_dejson
    obj = json.loads(self.extra)
  File "/usr/lib/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
[2019-11-04 14:08:01,267] {logging_mixin.py:112} INFO - [2019-11-04 14:08:01,266] {connection.py:297} ERROR - Failed parsing the json for conn_id s3_airflow

标签: airflow

解决方案


在文档中写道:config_kwargs:用于构造传递给 boto3.client 和 boto3.resource 的 botocore.config.Config 的附加 kwargs。

请参阅:https ://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html

因此,您可以像这样在 UI 中添加额外的参数: {"host":"https://target:port", "aws_access_key_id":"my_key_id", "aws_secret_access_key": "my_access_key", "config_kwargs": {"proxies" :{"http": 'my.insane.proxy:8080', "https": 'my.insane.proxy:8080'}}}


推荐阅读