python - 使用 Pandas 和 SQLAlchemy 将数据加载到 SQL Server 时出现问题
问题描述
我正在尝试使用 sqlalchemy 将数据加载到 SQL Server 数据库中,但出现如下错误
要执行的第一个参数必须是字符串或 unicode 查询。
虽然我已经完成了 index=False。我有正确格式的数据,并明确提到了它们各自的数据类型。任何帮助将不胜感激。
下面是代码:
df.to_sql("df",
engine,
if_exists='append',
schema='dbo',
index=False,
chunksize=500,
dtype={"Address": Text,
"City": Text,
"Brand": Text,
"Lat": Float,
"Long": Float,
"Name": Text,
"Phone": Text,
"Services": Text,
"Zip": Text,
"Account_ID": Text,
"Upload_Dt": DateTime,
"Country": Text})
以下是我得到的错误:
Empty Traceback (most recent call last)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in _do_get(self)
725 self._overflow >= self._max_overflow
--> 726 return self._pool.get(wait, self._timeout)
727 except sqla_queue.SAAbort as aborted:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\util\queue.py in get(self, block, timeout)
153 if self._empty():
--> 154 raise Empty
155 elif timeout is None:
Empty:
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-51-f1810c7ff198> in <module>
21 "Account_ID": String,
22 "Upload_Dt": DateTime,
---> 23 "Country": String})
24
25 '''cursor=conn.cursor()
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
2710 chunksize=chunksize,
2711 dtype=dtype,
-> 2712 method=method,
2713 )
2714
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
516 chunksize=chunksize,
517 dtype=dtype,
--> 518 method=method,
519 )
520
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
1317 dtype=dtype,
1318 )
-> 1319 table.create()
1320 table.insert(chunksize, method=method)
1321 if not name.isdigit() and not name.islower():
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in create(self)
639
640 def create(self):
--> 641 if self.exists():
642 if self.if_exists == "fail":
643 raise ValueError(
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in exists(self)
626
627 def exists(self):
--> 628 return self.pd_sql.has_table(self.name, self.schema)
629
630 def sql_schema(self):
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\sql.py in has_table(self, name, schema)
1342 def has_table(self, name, schema=None):
1343 return self.connectable.run_callable(
-> 1344 self.connectable.dialect.has_table, name, schema or self.meta.schema
1345 )
1346
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in run_callable(self, callable_, *args, **kwargs)
2423
2424 """
-> 2425 conn = self.contextual_connect()
2426 try:
2427 return conn.run_callable(callable_, *args, **kwargs)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in contextual_connect(self, close_with_result, **kwargs)
2489
2490 return self._connection_cls(self,
-> 2491 self.pool.connect(),
2492 close_with_result=close_with_result,
2493 **kwargs)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in connect(self)
234 """
235 if not self._use_threadlocal:
--> 236 return _ConnectionFairy(self).checkout()
237
238 try:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in __init__(self, pool)
399 self._echo = _echo = pool._should_log_debug()
400 try:
--> 401 rec = self._connection_record = pool._do_get()
402 conn = self.connection = self._connection_record.get_connection()
403 rec.fairy = weakref.ref(
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in _do_get(self)
744 return self._do_get()
745 else:
--> 746 con = self._create_connection()
747 self._overflow += 1
748 return con
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in _create_connection(self)
187 """Called by subclasses to create a new ConnectionRecord."""
188
--> 189 return _ConnectionRecord(self)
190
191 def recreate(self):
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\pool.py in __init__(self, pool)
285 pool.dispatch.first_connect.\
286 for_modify(pool.dispatch).\
--> 287 exec_once(self.connection, self)
288 pool.dispatch.connect(self.connection, self)
289
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\event.py in exec_once(self, *args, **kw)
375
376 if not self._exec_once:
--> 377 self(*args, **kw)
378 self._exec_once = True
379
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\event.py in __call__(self, *args, **kw)
384 fn(*args, **kw)
385 for fn in self.listeners:
--> 386 fn(*args, **kw)
387
388 # I'm not entirely thrilled about the overhead here,
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\strategies.py in first_connect(dbapi_connection, connection_record)
166 c._has_events = False
167
--> 168 dialect.initialize(c)
169 event.listen(pool, 'first_connect', first_connect)
170
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\connectors\pyodbc.py in initialize(self, connection)
133
134 # run other initialization which asks for user name, etc.
--> 135 super(PyODBCConnector, self).initialize(connection)
136
137 def _dbapi_version(self):
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\dialects\mssql\base.py in initialize(self, connection)
1162
1163 def initialize(self, connection):
-> 1164 super(MSDialect, self).initialize(connection)
1165 if self.server_version_info[0] not in list(range(8, 17)):
1166 # FreeTDS with version 4.2 seems to report here
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\default.py in initialize(self, connection)
175 try:
176 self.default_schema_name = \
--> 177 self._get_default_schema_name(connection)
178 except NotImplementedError:
179 self.default_schema_name = None
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\dialects\mssql\base.py in _get_default_schema_name(self, connection)
1178
1179 def _get_default_schema_name(self, connection):
-> 1180 user_name = connection.scalar("SELECT user_name() as user_name;")
1181 if user_name is not None:
1182 # now, get the default schema
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in scalar(self, object, *multiparams, **params)
1381 """
1382
-> 1383 return self.execute(object, *multiparams, **params).scalar()
1384
1385 def execute(self, object, *multiparams, **params):
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object, *multiparams, **params)
1447 object,
1448 multiparams,
-> 1449 params)
1450 else:
1451 raise exc.InvalidRequestError(
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_text(self, statement, multiparams, params)
1626 statement,
1627 parameters,
-> 1628 statement, parameters
1629 )
1630 if self._has_events:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1689 statement,
1690 parameters,
-> 1691 context)
1692 except Exception as e:
1693 self._handle_dbapi_exception(
~\AppData\Local\Continuum\anaconda3\lib\site-packages\sqlalchemy\engine\default.py in do_execute(self, cursor, statement, parameters, context)
331
332 def do_execute(self, cursor, statement, parameters, context=None):
--> 333 cursor.execute(statement, parameters)
334
335 def do_execute_no_params(self, cursor, statement, context=None):
TypeError: The first argument to execute must be a string or unicode query.
解决方案
在提交后创建一个引擎插入并确保您的 df 数据类型和表数据类型匹配,否则将其设为 MAX
engine = create_engine('mssql+pyodbc://username:password@localhost/database_to_commit?driver=SQL+Server+Native+Client+11.0')
conn = pyodbc.connect('Driver={SQL Server}; Server=localhost;uid=username; pwd=password; Database = database_name; Trusted_Connection=No;')
df.to_sql(name='table_name',con=engine, index= False, schema='dbo', if_exists= 'append')
conn.commit()
sql = "Your SQL insert query"
with engine.begin() as conne:
conne.execute(sql)
conn.commit()
推荐阅读
- spring-boot - 在 s3-outbound-gateway 中使用 object-acl-expression
- c# - IdentityServer4 管理界面
- gatling - Gatling 脚本编译错误说值“检查”不是成员
- swift - 我在 viewDidLoad 中展示了 firebase AuthViewController 并收到此错误。什么原因?
- r - 在连续行中查找值
- indexing - 如何理解“以顶点为中心的索引不能加速特定标签的无约束遍历”。
- c++ - 从在单独线程中运行的 C++ 插件调用 Node.js 函数
- python - 想要从 txt 文件中读取单词并将该单词插入到包含字符“p”的列表 (p_words) 中
- php - 如何在 Laravel 中循环引导模式?
- javascript - 如何使用 grunt 更改我的 HTML 文件以引用我缩小的 JS 和 CSS 而不是未压缩的?