首页 > 解决方案 > Python 多处理并行插入到 Oracle SQL

问题描述

我目前正在尝试在 Oracle SQL 数据库中创建一个表并插入值。我设法让它工作,df.to_sql(name=table_name, con=conn, if_exists='append', index=False)但上传一个只有 10000 行 * 5 列的 DataFrame 花了 1 小时 30 分钟。

这让我研究了多处理,所以我尝试按照Siddhi Kiran Bajracharya 在这个线程中给出的答案

结果是这样的:

import pandas as pd
from sqlalchemy import create_engine
import config

LOCATION = r"C:\Oracle\instantclient_19_6"
os.environ["PATH"] = LOCATION + ";" + os.environ["PATH"]
conn = create_engine('oracle+cx_oracle://' + config.user + ':' + config.pw +
                     '@' + config.host + ':' + config.port + '/?service_name=' + config.db +'?charset=latin-1')

import math
from multiprocessing.dummy import Pool as ThreadPool

def insert_df(df, *args, **kwargs):
    nworkers = 4 # number of workers that executes insert in parallel fashion

    chunk = math.floor(df.shape[0] / nworkers) # number of chunks
    chunks = [(chunk * i, (chunk * i) + chunk) for i in range(nworkers)]
    chunks.append((chunk * nworkers, df.shape[0]))
    pool = ThreadPool(nworkers)

    def worker(chunk):
        i, j = chunk
        df.iloc[i:j, :].to_sql(*args, **kwargs)

    pool.map(worker, chunks)
    pool.close()
    pool.join()

insert_df(df, f'{table_name}', conn, if_exists='append', index=False)

问题是最后这段代码运行了 20 分钟,只在表中插入了 9 行,然后引发了以下错误DatabaseError: (cx_Oracle.DatabaseError) ORA-00955: name is already used by an existing object

完整追溯:

---------------------------------------------------------------------------
DatabaseError                             Traceback (most recent call last)
C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1248                     self.dialect.do_execute(
-> 1249                         cursor, statement, parameters, context
   1250                     )

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
    579     def do_execute(self, cursor, statement, parameters, context=None):
--> 580         cursor.execute(statement, parameters)
    581 

DatabaseError: ORA-00955: name is already used by an existing object

The above exception was the direct cause of the following exception:

DatabaseError                             Traceback (most recent call last)
<ipython-input-73-b50275447767> in <module>
     20 
     21 
---> 22 insert_df(df, f'{table_name}', conn, if_exists='append', index=False)

<ipython-input-73-b50275447767> in insert_df(df, *args, **kwargs)
     14         df.iloc[i:j, :].to_sql(*args, **kwargs)
     15 
---> 16     pool.map(worker, chunks)
     17     pool.close()
     18     pool.join()

C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
    119         job, i, func, args, kwds = task
    120         try:
--> 121             result = (True, func(*args, **kwds))
    122         except Exception as e:
    123             if wrap_exception and func is not _helper_reraises_exception:

C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py in mapstar(args)
     42 
     43 def mapstar(args):
---> 44     return list(map(*args))
     45 
     46 def starmapstar(args):

<ipython-input-73-b50275447767> in worker(chunk)
     12     def worker(chunk):
     13         i, j = chunk
---> 14         df.iloc[i:j, :].to_sql(*args, **kwargs)
     15 
     16     pool.map(worker, chunks)

C:\ProgramData\Anaconda3\lib\site-packages\pandas\core\generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
   2710             chunksize=chunksize,
   2711             dtype=dtype,
-> 2712             method=method,
   2713         )
   2714 

C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
    516         chunksize=chunksize,
    517         dtype=dtype,
--> 518         method=method,
    519     )
    520 

C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
   1317             dtype=dtype,
   1318         )
-> 1319         table.create()
   1320         table.insert(chunksize, method=method)
   1321         if not name.isdigit() and not name.islower():

C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in create(self)
    654                 )
    655         else:
--> 656             self._execute_create()
    657 
    658     def _execute_insert(self, conn, keys, data_iter):

C:\ProgramData\Anaconda3\lib\site-packages\pandas\io\sql.py in _execute_create(self)
    636         # Inserting table into database, add to MetaData object
    637         self.table = self.table.tometadata(self.pd_sql.meta)
--> 638         self.table.create()
    639 
    640     def create(self):

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\schema.py in create(self, bind, checkfirst)
    868         if bind is None:
    869             bind = _bind_or_error(self)
--> 870         bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
    871 
    872     def drop(self, bind=None, checkfirst=False):

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _run_visitor(self, visitorcallable, element, connection, **kwargs)
   2044     ):
   2045         with self._optional_conn_ctx_manager(connection) as conn:
-> 2046             conn._run_visitor(visitorcallable, element, **kwargs)
   2047 
   2048     class _trans_ctx(object):

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _run_visitor(self, visitorcallable, element, **kwargs)
   1613 
   1614     def _run_visitor(self, visitorcallable, element, **kwargs):
-> 1615         visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
   1616 
   1617 

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\visitors.py in traverse_single(self, obj, **kw)
    136             meth = getattr(v, "visit_%s" % obj.__visit_name__, None)
    137             if meth:
--> 138                 return meth(obj, **kw)
    139 
    140     def iterate(self, obj):

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\ddl.py in visit_table(self, table, create_ok, include_foreign_key_constraints, _is_metadata_operation)
    824                 table,
    825                 include_foreign_key_constraints=  # noqa
--> 826                     include_foreign_key_constraints,
    827             )
    828             # fmt: on

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object_, *multiparams, **params)
    986             raise exc.ObjectNotExecutableError(object_)
    987         else:
--> 988             return meth(self, multiparams, params)
    989 
    990     def _execute_function(self, func, multiparams, params):

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\sql\ddl.py in _execute_on_connection(self, connection, multiparams, params)
     70 
     71     def _execute_on_connection(self, connection, multiparams, params):
---> 72         return connection._execute_ddl(self, multiparams, params)
     73 
     74     def execute(self, bind=None, target=None):

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_ddl(self, ddl, multiparams, params)
   1048             compiled,
   1049             None,
-> 1050             compiled,
   1051         )
   1052         if self._has_events or self.engine._has_events:

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1251         except BaseException as e:
   1252             self._handle_dbapi_exception(
-> 1253                 e, statement, parameters, cursor, context
   1254             )
   1255 

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1471                 util.raise_from_cause(newraise, exc_info)
   1472             elif should_wrap:
-> 1473                 util.raise_from_cause(sqlalchemy_exception, exc_info)
   1474             else:
   1475                 util.reraise(*exc_info)

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\util\compat.py in raise_from_cause(exception, exc_info)
    396     exc_type, exc_value, exc_tb = exc_info
    397     cause = exc_value if exc_value is not exception else None
--> 398     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    399 
    400 

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
    150             value.__cause__ = cause
    151         if value.__traceback__ is not tb:
--> 152             raise value.with_traceback(tb)
    153         raise value
    154 

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1247                 if not evt_handled:
   1248                     self.dialect.do_execute(
-> 1249                         cursor, statement, parameters, context
   1250                     )
   1251         except BaseException as e:

C:\ProgramData\Anaconda3\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
    578 
    579     def do_execute(self, cursor, statement, parameters, context=None):
--> 580         cursor.execute(statement, parameters)
    581 
    582     def do_execute_no_params(self, cursor, statement, context=None):

DatabaseError: (cx_Oracle.DatabaseError) ORA-00955: name is already used by an existing object
[SQL: 
CREATE TABLE "TEST_TABLE_DELETE" (
    "id" CLOB, 
    "name" CLOB, 
    "var1" CLOB, 
    "var2" CLOB, 
    "var3" CLOB, 
    "var4" CLOB, 
    "var5" CLOB, 
    "var6" CLOB, 
    "var7" CLOB, 
    "var8" CLOB, 
    "var9" CLOB, 
    "var10" CLOB, 
    "var11" CLOB, 
    "var12" FLOAT, 
    "var13" CLOB, 
    "var14" CLOB
)

]
(Background on this error at: http://sqlalche.me/e/4xp6)

任何帮助我解决此问题的指针将不胜感激。

谢谢!

卢蒂

标签: pythonpython-3.xpandasoraclesqlalchemy

解决方案


如果您使用 to_sql,在数据框中使用字符串列,则最好执行以下操作:

dtyp = {c:types.VARCHAR(data[c].str.len().max()) for c in data.columns[data.dtypes == 'object'].tolist()}
data.to_sql('table_name.....',con=...,if_exists='append'
              , index=False
              , dtype = dtyp)

对于 10k 行,它应该非常快。


推荐阅读