python-3.x - 气流:OracleHook:bulk_insert_rows:任务退出并返回代码 Negsignal.SIGKILL
问题描述
我通过循环 10000 行的块大小将数据从 S3 加载到 Oracle 中,但在循环某些行后会导致内存问题
错误:Task exited with return code Negsignal.SIGKILL
def gen_chunks(self,reader, chunksize=10000):
chunk = []
for i, line in enumerate(reader):
if (i % chunksize == 0 and i > 0):
yield list(map(tuple, chunk))
chunk = []
chunk.append(line)
yield list(map(tuple, chunk))
def execute(self, context):
self.s3_key = self.get_s3_key(self.s3_key)
s3_hook = S3Hook()
s3_client = s3_hook.get_conn()
snflk_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
orcl = OracleHook(oracle_conn_id=self.oracle_conn_id)
try:
with tempfile.NamedTemporaryFile() as f_source:
s3_client.download_file(self.s3_bucket, self.s3_key, f_source.name)
logger.info('Source file downloaded succesfully')
with open(f_source.name, 'r') as f:
csv_reader = csv.reader(f, delimiter='|')
for chunk in self.gen_chunks(csv_reader):
orcl.bulk_insert_rows(table=self.oracle_table,rows=chunk, target_fields=self.target_fields, commit_every=10000)
except Exception as ex:
raise AirflowException(f'Error: {ex}')
编辑解决方案:通过减少
commit_very = 3000
解决方案
推荐阅读
- ocaml - OCaml 物理和结构平等
- python - Google Colab:本地 PC 上没有这样的文件或目录
- prism - 棱镜使用发现的服务
- python - 查找与特定字符匹配的行
- firebase - Firebase 限制我将我的数据库永久设置为一个区域,我如何以低延迟为来自其他区域的人提供服务?
- python - 我需要一种更好的方法来为这个 while 循环实现条件
- javascript - Firebase 过滤结果
- kubernetes - 将 ACR 集成到 AKS 的预期方式是什么?
- c++ - 为什么这个 char 数组需要是静态的?
- java - 如何为 Bukkit 插件从 10 倒计时?