python - Python 多处理并行插入到 Oracle SQL
问题描述
我目前正在尝试在 Oracle SQL 数据库中创建一个表并插入值。我设法让它工作,df.to_sql(name=table_name, con=conn, if_exists='append', index=False)
但上传一个只有 10000 行 * 5 列的 DataFrame 花了 1 小时 30 分钟。
这让我研究了多处理,所以我尝试按照Siddhi Kiran Bajracharya 在这个线程中给出的答案
结果是这样的:
import pandas as pd
from sqlalchemy import create_engine
import config
LOCATION = r"C:\Oracle\instantclient_19_6"
os.environ["PATH"] = LOCATION + ";" + os.environ["PATH"]
conn = create_engine('oracle+cx_oracle://' + config.user + ':' + config.pw +
'@' + config.host + ':' + config.port + '/?service_name=' + config.db +'?charset=latin-1')
import math
from multiprocessing.dummy import Pool as ThreadPool
def insert_df(df, *args, **kwargs):
nworkers = 4 # number of workers that executes insert in parallel fashion
chunk = math.floor(df.shape[0] / nworkers) # number of chunks
chunks = [(chunk * i, (chunk * i) + chunk) for i in range(nworkers)]
chunks.append((chunk * nworkers, df.shape[0]))
pool = ThreadPool(nworkers)
def worker(chunk):
i, j = chunk
df.iloc[i:j, :].to_sql(*args, **kwargs)
pool.map(worker, chunks)
pool.close()
pool.join()
insert_df(df, f'{table_name}', conn, if_exists='append', index=False)
问题是最后这段代码运行了 20 分钟,只在表中插入了 9 行,然后引发了以下错误DatabaseError: (cx_Oracle.DatabaseError) ORA-00955: name is already used by an existing object
完整追溯:
---------------------------------------------------------------------------
DatabaseError Traceback (most recent call last)
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1248 self.dialect.do_execute(
-> 1249 cursor, statement, parameters, context
1250 )
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
579 def do_execute(self, cursor, statement, parameters, context=None):
--> 580 cursor.execute(statement, parameters)
581
DatabaseError: ORA-00955: name is already used by an existing object
The above exception was the direct cause of the following exception:
DatabaseError Traceback (most recent call last)
<ipython-input-73-b50275447767> in <module>
20
21
---> 22 insert_df(df, f'{table_name}', conn, if_exists='append', index=False)
<ipython-input-73-b50275447767> in insert_df(df, *args, **kwargs)
14 df.iloc[i:j, :].to_sql(*args, **kwargs)
15
---> 16 pool.map(worker, chunks)
17 pool.close()
18 pool.join()
C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in map(self, func, iterable, chunksize)
266 in a list that is returned.
267 '''
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
269
270 def starmap(self, func, iterable, chunksize=None):
C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in get(self, timeout)
655 return self._value
656 else:
--> 657 raise self._value
658
659 def _set(self, i, obj):
C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
119 job, i, func, args, kwds = task
120 try:
--> 121 result = (True, func(*args, **kwds))
122 except Exception as e:
123 if wrap_exception and func is not _helper_reraises_exception:
C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in mapstar(args)
42
43 def mapstar(args):
---> 44 return list(map(*args))
45
46 def starmapstar(args):
<ipython-input-73-b50275447767> in worker(chunk)
12 def worker(chunk):
13 i, j = chunk
---> 14 df.iloc[i:j, :].to_sql(*args, **kwargs)
15
16 pool.map(worker, chunks)
C:\ProgramData\Anaconda3\lib\site-packages\pandas\core\generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
2710 chunksize=chunksize,
2711 dtype=dtype,
-> 2712 method=method,
2713 )
2714
C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
516 chunksize=chunksize,
517 dtype=dtype,
--> 518 method=method,
519 )
520
C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
1317 dtype=dtype,
1318 )
-> 1319 table.create()
1320 table.insert(chunksize, method=method)
1321 if not name.isdigit() and not name.islower():
C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in create(self)
654 )
655 else:
--> 656 self._execute_create()
657
658 def _execute_insert(self, conn, keys, data_iter):
C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in _execute_create(self)
636 # Inserting table into database, add to MetaData object
637 self.table = self.table.tometadata(self.pd_sql.meta)
--> 638 self.table.create()
639
640 def create(self):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\schema.py in create(self, bind, checkfirst)
868 if bind is None:
869 bind = _bind_or_error(self)
--> 870 bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
871
872 def drop(self, bind=None, checkfirst=False):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _run_visitor(self, visitorcallable, element, connection, **kwargs)
2044 ):
2045 with self._optional_conn_ctx_manager(connection) as conn:
-> 2046 conn._run_visitor(visitorcallable, element, **kwargs)
2047
2048 class _trans_ctx(object):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _run_visitor(self, visitorcallable, element, **kwargs)
1613
1614 def _run_visitor(self, visitorcallable, element, **kwargs):
-> 1615 visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
1616
1617
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\visitors.py in traverse_single(self, obj, **kw)
136 meth = getattr(v, "visit_%s" % obj.__visit_name__, None)
137 if meth:
--> 138 return meth(obj, **kw)
139
140 def iterate(self, obj):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\ddl.py in visit_table(self, table, create_ok, include_foreign_key_constraints, _is_metadata_operation)
824 table,
825 include_foreign_key_constraints= # noqa
--> 826 include_foreign_key_constraints,
827 )
828 # fmt: on
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object_, *multiparams, **params)
986 raise exc.ObjectNotExecutableError(object_)
987 else:
--> 988 return meth(self, multiparams, params)
989
990 def _execute_function(self, func, multiparams, params):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\ddl.py in _execute_on_connection(self, connection, multiparams, params)
70
71 def _execute_on_connection(self, connection, multiparams, params):
---> 72 return connection._execute_ddl(self, multiparams, params)
73
74 def execute(self, bind=None, target=None):
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_ddl(self, ddl, multiparams, params)
1048 compiled,
1049 None,
-> 1050 compiled,
1051 )
1052 if self._has_events or self.engine._has_events:
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1251 except BaseException as e:
1252 self._handle_dbapi_exception(
-> 1253 e, statement, parameters, cursor, context
1254 )
1255
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
1471 util.raise_from_cause(newraise, exc_info)
1472 elif should_wrap:
-> 1473 util.raise_from_cause(sqlalchemy_exception, exc_info)
1474 else:
1475 util.reraise(*exc_info)
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\util\compat.py in raise_from_cause(exception, exc_info)
396 exc_type, exc_value, exc_tb = exc_info
397 cause = exc_value if exc_value is not exception else None
--> 398 reraise(type(exception), exception, tb=exc_tb, cause=cause)
399
400
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
150 value.__cause__ = cause
151 if value.__traceback__ is not tb:
--> 152 raise value.with_traceback(tb)
153 raise value
154
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1247 if not evt_handled:
1248 self.dialect.do_execute(
-> 1249 cursor, statement, parameters, context
1250 )
1251 except BaseException as e:
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
578
579 def do_execute(self, cursor, statement, parameters, context=None):
--> 580 cursor.execute(statement, parameters)
581
582 def do_execute_no_params(self, cursor, statement, context=None):
DatabaseError: (cx_Oracle.DatabaseError) ORA-00955: name is already used by an existing object
[SQL:
CREATE TABLE "TEST_TABLE_DELETE" (
"id" CLOB,
"name" CLOB,
"var1" CLOB,
"var2" CLOB,
"var3" CLOB,
"var4" CLOB,
"var5" CLOB,
"var6" CLOB,
"var7" CLOB,
"var8" CLOB,
"var9" CLOB,
"var10" CLOB,
"var11" CLOB,
"var12" FLOAT,
"var13" CLOB,
"var14" CLOB
)
]
(Background on this error at: http://sqlalche.me/e/4xp6)
任何帮助我解决此问题的指针将不胜感激。
谢谢!
卢蒂
解决方案
如果您使用 to_sql,在数据框中使用字符串列,则最好执行以下操作:
dtyp = {c:types.VARCHAR(data[c].str.len().max()) for c in data.columns[data.dtypes == 'object'].tolist()}
data.to_sql('table_name.....',con=...,if_exists='append'
, index=False
, dtype = dtyp)
对于 10k 行,它应该非常快。
推荐阅读
- linux - openembedded-core 和 poky 的区别
- php - 如何使用 ajax 在 codeigniter 中创建实时计数?
- java - 如何在android中的firebase实时聊天中实现交付状态
- ios - 在“开始开发 iOS 应用程序 (Swift)”教程中,从 Xcode 11.0 开始,Hello World 屏幕出现在画布顶部,将其隐藏
- dynamics-crm - 用于 Dynamics 365 学习的沙盒环境
- android - 在另一个线程中访问 ArrayList 数据
- c++ - 如何在 C++ 中修复运行时错误“分段错误”
- python - 无法在 Python 中的类中获取详细信息
- primefaces - 如何过滤primefaces中的动态数据表?它不工作
- react-native - 如何在 react-native-maps 上的每个折线段上绘制箭头