首页 > 解决方案 > 执行 S3Hook list_keys 或 read_key 方法时出错

问题描述

我收到此错误消息: {logging_mixin.py:112} INFO - [2020-03-22 12:34:53,672] {local_task_job.py:103} INFO - Task exited with return code -6 当我使用S3 钩子的list_keysor方法时。read_keyget_credentials方法虽然工作正常。搜索了一圈,找不到为什么会发生这种情况。

我正在使用apache-airflow==1.10.9, boto3==1.12.21,botocore==1.15.21

这是我使用 S3Hook 的自定义运算符的代码:

class SASValueToRedshiftOperator(BaseOperator):
    """Custom Operator for extracting data from SAS source code.
    Attributes:
        ui_color (str): color code for task in Airflow UI.
    """
    ui_color = '#358150'

    @apply_defaults
    def __init__(self,
                 aws_credentials_id="",
                 redshift_conn_id="",
                 table="",
                 s3_bucket="",
                 s3_key="",
                 sas_value="",
                 columns="",
                 *args, **kwargs):
        """Extracts label mappings from SAS source code and store as Redshift table
        Args:
            aws_credentials_id (str): Airflow connection ID for AWS key and secret.
            redshift_conn_id (str): Airflow connection ID for redshift database.
            table (str): Name of table to load data to.
            s3_bucket (str): S3 Bucket Name Where SAS source code is store.
            s3_key (str): S3 Key Name for SAS source code.
            sas_value (str): value to search for in sas file for extraction of data.
            columns (list): resulting data column names.
        Returns:
            None
        """
        super(SASValueToRedshiftOperator, self).__init__(*args, **kwargs)
        self.aws_credentials_id = aws_credentials_id
        self.redshift_conn_id = redshift_conn_id
        self.table = table
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.sas_value = sas_value
        self.columns = columns

    def execute(self, context):
        """Executes task for staging to redshift.
        Args:
            context (:obj:`dict`): Dict with values to apply on content.
        Returns:
            None   
        """
        s3 = S3Hook(self.aws_credentials_id)
        redshift_conn = BaseHook.get_connection(self.redshift_conn_id)

        self.log.info(s3)
        self.log.info(s3.get_credentials())
        self.log.info(s3.list_keys(self.s3_bucket))

标签: pythonboto3airflow

解决方案


s3 = S3Hook(self.aws_credentials_id) s3.list_keys(bucket_name=s3_bucket, prefix= s3_path, delimiter=delimiter)


推荐阅读