首页 > 解决方案 > 无法让 sqlalchemy.orm 在多线程期间使用 Session.execute() 将数据插入到我的 postgres 数据库表中

问题描述

首先,我对使用 sqlite 以外的数据库是完全陌生的,我只使用了一点,python 也是我只使用了大约 6 个月的东西,所以如果我错过了一些明显的东西或完全没有,请耐心等待误解了什么。

我有很多我正在抓取的历史市场数据(三个区域约 15000 个项目)并且为了有效地做到这一点,我试图通过为每个区域使用一个进程然后对每个进程进行多线程来获取所有项目来做到这一点。我从每个项目的抓取中得到的响应是一个字典列表,然后我想使用Session.execute(). 我还没有让它工作(如果你知道一种方法,请指导我正确的方向),所以现在我只使用多线程,因为我已经成功地使用它来将数据插入到 regionid 和 typeid 表中。当我运行我的脚本时,我仍然没有将数据插入到我的 history_data 表中并且没有错误。我尝试启用 sqlalchemy 日志记录

import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

这向我展示了我对在 main 期间调用从 regionid 和 typeid 表获取数据的期望,但在那之后什么都没有,这是否意味着我在多线程后没有与数据库的连接,或者记录器只是不擅长处理多线程?

使用 regionid 和 typeid 表,我使用 Session.merge() 并使用 for 循环处理每个项目的数据,所以我猜这是我的使用Session.execute()关闭了吗?

我尝试使用 sqlalchemy.orm 将我所有的历史数据插入到 postgres 数据库中我用来尝试插入数据的实际脚本如下:

if __name__ == '__main__':
    print("Start database session")
    Base.metadata.create_all(engine)
    Session = scoped_session(session_factory)
    ini_params()
    print("Get typeids and regionids from database")
    typeids = get_typeids() #get all typeids from typeid table
    regionids = get_regionids() #get all regionids from regionid table
    typeids = typeid_test_list #uncomment for debug
    print(typeids)
    for idx, regionid in enumerate(regionids):
        no_data = data_to_db(regionid, typeids, idx)
        #no_data = multi_processing(regionid, typeids, idx)
    print(no_data)

def data_to_db(regionid, typeids, idx):
    ini_params()
    position = int(idx)
    no_data_typeids = []
    prefix = 'Getting data for region ' + str(regionid)
    typeid_length = len(typeids)
    with tqdm(typeids, total=typeid_length, desc=prefix, position=position) as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
            futures = {executor.submit(multithread_func, typeid, regionid): typeid for typeid in typeids}
            for future in concurrent.futures.as_completed(futures):
                pbar.update(1)
    return no_data_typeids

def multithread_func(typeid, regionid):
    today = datetime.date.today()
    history = get_market_history(regionid, typeid) #URL-scraper
    if history != "error":
        import_full_history(history)
    else:
        return typeid
    return 0

def import_full_history(history):
    get_data_session = Session()
    print(type(history))
    get_data_session.execute(historical_data.insert(), item_dict)
    get_data_session.commit()
    Session.remove()
    return 0

我的数据库是这样构建的

根据

engine = create_engine('postgresql://user@localhost:5432/historic_market_data', pool_size=12, max_overflow=0)
session_factory = sessionmaker(bind = engine)
Base = declarative_base()

区域类

Session = scoped_session(session_factory)
class Regionid(Base):
    __tablename__ = 'regionids'
    regionid = Column(Integer, primary_key=True)
    query = Session.query_property()
    def __init__(self, regionid):
        self.regionid = regionid

类型标识

Session = scoped_session(session_factory)
class Typeid(Base):
    __tablename__ = 'typeids'
    typeid = Column(Integer, primary_key=True)
    query = Session.query_property()
    def __init__(self, typeid):
        self.typeid = typeid

历史数据

class Historical_data(Base):
    __tablename__ = 'historical_data'
    
    id = Column(Integer, primary_key=True)
    typeid = Column('typeid', Integer, ForeignKey('typeids.typeid'))
    regionid = Column('regionid', Integer, ForeignKey('regionids.regionid'))
    date = Column(Date)
    average = Column(Float)
    highest = Column(Float)
    lowest = Column(Float)
    order_count = Column(Integer)
    volume = Column(Integer)
    buy_weighted_avg = Column(Float)
    buy_maxval = Column(Float)
    buy_minval = Column(Float)
    buy_stddev = Column(Float)
    buy_median = Column(Float)
    buy_volume = Column(Float)
    buy_numorders = Column(Integer)
    buy_fivepercent = Column(Float)
    sell_weighted_avg = Column(Float)
    sell_maxval = Column(Float)
    sell_minval = Column(Float)
    sell_stddev = Column(Float)
    sell_median = Column(Float)
    sell_volume = Column(Float)
    sell_numorders = Column(Integer)
    sell_fivepercent = Column(Float)
    
def __init__(self, title, release_date):
    self.typeid = typeid
    self.regionid = regionid
    self.date = date
    self.average = average
    self.highest = highest
    self.lowest = lowest
    self.order_count = order_count
    self.volume = volume
    self.buy_weighted_avg = buy_weighted_avg
    self.buy_maxval = buy_maxval
    self.buy_minval = buy_minval
    self.buy_stddev = buy_stddev
    self.buy_median = buy_median
    self.buy_volume = buy_volume
    self.buy_numorders = buy_numorders
    self.buy_fivepercent = buy_fivepercent
    self.sell_weighted_avg = sell_weighted_avg
    self.sell_maxval = sell_maxval
    self.sell_minval = sell_minval
    self.sell_stddev = sell_stddev
    self.sell_median = sell_median
    self.sell_volume = sell_volume
    self.sell_numorders = sell_numorders
    self.sell_fivepercent = sell_fivepercent

标签: pythonpostgresqlsqlalchemy

解决方案


我已经设法通过使用 so 来让它工作,bulk_insert_mappings()只需将我的更改import_full_history()

def import_full_esi_history(history):
    get_data_session = Session()
    get_data_session.bulk_insert_mappings(Historical_data, history)
    get_data_session.commit()
    Session.remove()
    return 0

我得到它来插入数据。IT 还可以像我最初打算的那样结合多线程和多处理。似乎Session.insert()一次只能处理一列,而我在列表中的字典是针对整行的


推荐阅读