首页 > 解决方案 > Pandas 无法使用 SQLAlchemy 将所有表和行写入 MySQL

问题描述

我正在尝试在 MySQL 的本地磁盘上创建我的大型 MSSQL Server 数据库的一小部分,以便使用 Python 进行分析/ML。我成功地将数据读入 Pandas 数据帧,将所有对象数据类型的列转换为字符串,这样我只处理字符串、float64、int64 和 bool 数据类型。然后我使用 dataframe.to_sql. 我SQLAlchemy用来创建引擎,我必须Mysql-python-connector用 Python 做所有事情。

问题:写入 MySQL 本地数据库时,它将创建所有表,但有些表最终为空,有些表缺少行。如果我将相同的数据帧写入 SQLite,我就不会遇到任何这些问题。增加chunchsize有助于减少丢失的行,增加pool_recycle有助于减少空表,但并没有完全解决这两个问题。特别是有一些从未填充过的特定表。在 MySQL 中分析表显示utf8mb4_0900_ai_ci为表排序规则,这基本上意味着它正在使用utf8mb4字符集。表并不大,我拥有的最大的表不到 200K 行和 50 列。整个22张表的读写时间不长,不到10分钟。

代码:

import pandas as pd
import pyodbc 
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
import mysql.connector
import gc

def update_mysql_db(local_db_name:"database filename",
                    tables_to_update:"lists names of all the tables and their data"):

    '''Receives a list of pairs in the form of 
    (table name, table data in a datframe).
    Creates new tables in the local database with the data provided'''




    print(f"{len(tables_to_update)} tables will be updated/created")
    for table in tables_to_update:
        print('\n')
        print(f"reading {table[0]} dataframe")        

        try:
            table[1].to_sql(table[0].lower(),
                           create_engine('mysql+mysqlconnector://demo:Demo@localhost:3306/sqlalchemy',
                                        pool_recycle=36000, echo = False),
                            if_exists='replace',
                            index='True',
                            chunksize=5000,
                           index_label ='Index')

            print('Table {}.{} Created'.format(local_db_name,table[0].lower()))
        except ValueError as vx:
             print('Value Error : \n\n  ', vx)
        except Exception as ex:
              print('Exception : \n\n  ', ex)        


    gc.collect() 
    print('\n')
    print("Finished updating the local database")
    return None

def main():

    Local_db = 'sqlalchemy'
    update_mysql_db(Local_db,list_of_tablesnames_and_data)
    gc.collect()



if __name__ == "__main__":
    main()

我得到的错误:

reading SeminarEvaluationResponses dataframe
Exception during reset or similar
Traceback (most recent call last):
  File "C:\Sam\Anaconda\lib\site-packages\sqlalchemy\pool\base.py", line 693, in _finalize_fairy
    fairy._reset(pool)
  File "C:\Sam\Anaconda\lib\site-packages\sqlalchemy\pool\base.py", line 880, in _reset
    pool._dialect.do_rollback(self)
  File "C:\Sam\Anaconda\lib\site-packages\sqlalchemy\dialects\mysql\base.py", line 2302, in do_rollback
    dbapi_connection.rollback()
  File "C:\Sam\Anaconda\lib\site-packages\mysql\connector\connection_cext.py", line 386, in rollback
    self._cmysql.rollback()
_mysql_connector.MySQLInterfaceError: MySQL server has gone away
Exception : 

   MySQL server has gone away

还有什么: 不幸的是,我无法在此处上传任何实际的数据/数据库/表格以使代码可行。我希望你们中的一些大师可以给我一些提示或指出正确的方向。

标签: pythonmysqldatabasepandassqlalchemy

解决方案


看起来这个过程可能需要太长时间并且连接超时。根据你给我写这些数据需要多长时间的时间框架,情况似乎就是这样。有两个选项不是互斥的,因此如果需要,您可以同时执行这两个选项。第一个是为你的 MySQL 服务器增加 wait_timeout。您可以在 Python 或 MySQL Workbench 中执行此操作,此答案会告诉您如何以两种方式执行此操作。

第二个选项,涉及更多一点,是您可以从使用 SQLAlchemy 切换到直接使用 pyodbc 执行插入操作。这本身并不快,但它允许您将尝试推送的 DataFrame 转换为元组列表,这比推送带有.to_sql(). 对于 SQLAlchemy 自动处理的事情,可能需要做一些额外的编码,但它是提高性能的一个选项。也就是说,我强烈建议您先尝试第一个,以确保在您尝试采用此策略之前甚至有必要这样做。

对于丢失的行,我真的不知道为什么一组记录只会部分插入到表中。自动提交是否在任何地方设置为 True?

最后,这个答案涵盖了很多基础,我没有准确评估这个问题。


推荐阅读