首页 > 解决方案 > 使用 Pandas 和 SQLAlchemy 将数据加载到 SQL Server 时出现问题

问题描述

我正在尝试使用 sqlalchemy 将数据加载到 SQL Server 数据库中,但出现如下错误

要执行的第一个参数必须是字符串或 unicode 查询。

虽然我已经完成了 index=False。我有正确格式的数据,并明确提到了它们各自的数据类型。任何帮助将不胜感激。

下面是代码:

df.to_sql("df",
               engine,
               if_exists='append',
               schema='dbo',
               index=False,
               chunksize=500,
               dtype={"Address": Text,
                      "City": Text,
                      "Brand": Text,
                      "Lat":  Float,
                      "Long": Float,
                      "Name": Text,
                      "Phone": Text,
                      "Services": Text,
                      "Zip": Text,
                      "Account_ID": Text,
                      "Upload_Dt": DateTime,
                      "Country": Text})

以下是我得到的错误:

Empty                                     Traceback (most recent call last)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in _do_get(self)
    725                         self._overflow >= self._max_overflow
--> 726             return self._pool.get(wait, self._timeout)
    727         except sqla_queue.SAAbort as aborted:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\util\queue.py in get(self, block, timeout)
    153                 if self._empty():
--> 154                     raise Empty
    155             elif timeout is None:

Empty: 

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-51-f1810c7ff198> in <module>
     21                       "Account_ID": String,
     22                       "Upload_Dt": DateTime,
---> 23                       "Country": String})
     24 
     25 '''cursor=conn.cursor()

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
   2710             chunksize=chunksize,
   2711             dtype=dtype,
-> 2712             method=method,
   2713         )
   2714 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
    516         chunksize=chunksize,
    517         dtype=dtype,
--> 518         method=method,
    519     )
    520 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
   1317             dtype=dtype,
   1318         )
-> 1319         table.create()
   1320         table.insert(chunksize, method=method)
   1321         if not name.isdigit() and not name.islower():

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in create(self)
    639 
    640     def create(self):
--> 641         if self.exists():
    642             if self.if_exists == "fail":
    643                 raise ValueError(

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in exists(self)
    626 
    627     def exists(self):
--> 628         return self.pd_sql.has_table(self.name, self.schema)
    629 
    630     def sql_schema(self):

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in has_table(self, name, schema)
   1342     def has_table(self, name, schema=None):
   1343         return self.connectable.run_callable(
-> 1344             self.connectable.dialect.has_table, name, schema or self.meta.schema
   1345         )
   1346 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in run_callable(self, callable_, *args, **kwargs)
   2423 
   2424         """
-> 2425         conn = self.contextual_connect()
   2426         try:
   2427             return conn.run_callable(callable_, *args, **kwargs)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in contextual_connect(self, close_with_result, **kwargs)
   2489 
   2490         return self._connection_cls(self,
-> 2491                                     self.pool.connect(),
   2492                                     close_with_result=close_with_result,
   2493                                     **kwargs)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in connect(self)
    234         """
    235         if not self._use_threadlocal:
--> 236             return _ConnectionFairy(self).checkout()
    237 
    238         try:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in __init__(self, pool)
    399         self._echo = _echo = pool._should_log_debug()
    400         try:
--> 401             rec = self._connection_record = pool._do_get()
    402             conn = self.connection = self._connection_record.get_connection()
    403             rec.fairy = weakref.ref(

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in _do_get(self)
    744                     return self._do_get()
    745                 else:
--> 746                     con = self._create_connection()
    747                     self._overflow += 1
    748                     return con

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in _create_connection(self)
    187         """Called by subclasses to create a new ConnectionRecord."""
    188 
--> 189         return _ConnectionRecord(self)
    190 
    191     def recreate(self):

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in __init__(self, pool)
    285         pool.dispatch.first_connect.\
    286                     for_modify(pool.dispatch).\
--> 287                     exec_once(self.connection, self)
    288         pool.dispatch.connect(self.connection, self)
    289 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\event.py in exec_once(self, *args, **kw)
    375 
    376         if not self._exec_once:
--> 377             self(*args, **kw)
    378             self._exec_once = True
    379 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\event.py in __call__(self, *args, **kw)
    384             fn(*args, **kw)
    385         for fn in self.listeners:
--> 386             fn(*args, **kw)
    387 
    388     # I'm not entirely thrilled about the overhead here,

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\strategies.py in first_connect(dbapi_connection, connection_record)
    166                 c._has_events = False
    167 
--> 168                 dialect.initialize(c)
    169             event.listen(pool, 'first_connect', first_connect)
    170 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\connectors\pyodbc.py in initialize(self, connection)
    133 
    134         # run other initialization which asks for user name, etc.
--> 135         super(PyODBCConnector, self).initialize(connection)
    136 
    137     def _dbapi_version(self):

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\dialects\mssql\base.py in initialize(self, connection)
   1162 
   1163     def initialize(self, connection):
-> 1164         super(MSDialect, self).initialize(connection)
   1165         if self.server_version_info[0] not in list(range(8, 17)):
   1166             # FreeTDS with version 4.2 seems to report here

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\default.py in initialize(self, connection)
    175         try:
    176             self.default_schema_name = \
--> 177                             self._get_default_schema_name(connection)
    178         except NotImplementedError:
    179             self.default_schema_name = None

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\dialects\mssql\base.py in _get_default_schema_name(self, connection)
   1178 
   1179     def _get_default_schema_name(self, connection):
-> 1180         user_name = connection.scalar("SELECT user_name() as user_name;")
   1181         if user_name is not None:
   1182             # now, get the default schema

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in scalar(self, object, *multiparams, **params)
   1381         """
   1382 
-> 1383         return self.execute(object, *multiparams, **params).scalar()
   1384 
   1385     def execute(self, object, *multiparams, **params):

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object, *multiparams, **params)
   1447                                                 object,
   1448                                                 multiparams,
-> 1449                                                 params)
   1450         else:
   1451             raise exc.InvalidRequestError(

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_text(self, statement, multiparams, params)
   1626             statement,
   1627             parameters,
-> 1628             statement, parameters
   1629         )
   1630         if self._has_events:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1689                                     statement,
   1690                                     parameters,
-> 1691                                     context)
   1692         except Exception as e:
   1693             self._handle_dbapi_exception(

~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
    331 
    332     def do_execute(self, cursor, statement, parameters, context=None):
--> 333         cursor.execute(statement, parameters)
    334 
    335     def do_execute_no_params(self, cursor, statement, context=None):

TypeError: The first argument to execute must be a string or unicode query.

标签: pythonsql-serversqlalchemy

解决方案


在提交后创建一个引擎插入并确保您的 df 数据类型和表数据类型匹配,否则将其设为 MAX

engine = create_engine('mssql+pyodbc://username:password@localhost/database_to_commit?driver=SQL+Server+Native+Client+11.0') 

conn = pyodbc.connect('Driver={SQL Server}; Server=localhost;uid=username; pwd=password; Database = database_name; Trusted_Connection=No;')


df.to_sql(name='table_name',con=engine, index= False, schema='dbo', if_exists= 'append')
    conn.commit()


sql = "Your SQL insert query"
 with engine.begin() as conne:
      conne.execute(sql)
      conn.commit()

推荐阅读