首页 > 解决方案 > 气流:OracleHook:bulk_insert_rows:任务退出并返回代码 Negsignal.SIGKILL

问题描述

我通过循环 10000 行的块大小将数据从 S3 加载到 Oracle 中,但在循环某些行后会导致内存问题

错误:Task exited with return code Negsignal.SIGKILL

    def gen_chunks(self,reader, chunksize=10000):
        chunk = []
        for i, line in enumerate(reader):
            if (i % chunksize == 0 and i > 0):
                yield list(map(tuple, chunk))
                chunk = []
            chunk.append(line)
        yield list(map(tuple, chunk))

    def execute(self, context):
        self.s3_key = self.get_s3_key(self.s3_key)
        s3_hook = S3Hook()
        s3_client = s3_hook.get_conn()
        snflk_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
        orcl = OracleHook(oracle_conn_id=self.oracle_conn_id)
        try:
            with tempfile.NamedTemporaryFile() as f_source:
                s3_client.download_file(self.s3_bucket, self.s3_key, f_source.name)
                logger.info('Source file downloaded succesfully')
                with open(f_source.name, 'r') as f:
                    csv_reader = csv.reader(f, delimiter='|')
                    for chunk in self.gen_chunks(csv_reader):
                        orcl.bulk_insert_rows(table=self.oracle_table,rows=chunk, target_fields=self.target_fields,  commit_every=10000)
        except Exception as ex:
            raise AirflowException(f'Error: {ex}')

编辑解决方案:通过减少commit_very = 3000

标签: python-3.xairflow

解决方案


推荐阅读