python - 无法让 sqlalchemy.orm 在多线程期间使用 Session.execute() 将数据插入到我的 postgres 数据库表中
问题描述
首先,我对使用 sqlite 以外的数据库是完全陌生的,我只使用了一点,python 也是我只使用了大约 6 个月的东西,所以如果我错过了一些明显的东西或完全没有,请耐心等待误解了什么。
我有很多我正在抓取的历史市场数据(三个区域约 15000 个项目)并且为了有效地做到这一点,我试图通过为每个区域使用一个进程然后对每个进程进行多线程来获取所有项目来做到这一点。我从每个项目的抓取中得到的响应是一个字典列表,然后我想使用Session.execute()
. 我还没有让它工作(如果你知道一种方法,请指导我正确的方向),所以现在我只使用多线程,因为我已经成功地使用它来将数据插入到 regionid 和 typeid 表中。当我运行我的脚本时,我仍然没有将数据插入到我的 history_data 表中并且没有错误。我尝试启用 sqlalchemy 日志记录
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
这向我展示了我对在 main 期间调用从 regionid 和 typeid 表获取数据的期望,但在那之后什么都没有,这是否意味着我在多线程后没有与数据库的连接,或者记录器只是不擅长处理多线程?
使用 regionid 和 typeid 表,我使用 Session.merge() 并使用 for 循环处理每个项目的数据,所以我猜这是我的使用Session.execute()
关闭了吗?
我尝试使用 sqlalchemy.orm 将我所有的历史数据插入到 postgres 数据库中我用来尝试插入数据的实际脚本如下:
if __name__ == '__main__':
print("Start database session")
Base.metadata.create_all(engine)
Session = scoped_session(session_factory)
ini_params()
print("Get typeids and regionids from database")
typeids = get_typeids() #get all typeids from typeid table
regionids = get_regionids() #get all regionids from regionid table
typeids = typeid_test_list #uncomment for debug
print(typeids)
for idx, regionid in enumerate(regionids):
no_data = data_to_db(regionid, typeids, idx)
#no_data = multi_processing(regionid, typeids, idx)
print(no_data)
def data_to_db(regionid, typeids, idx):
ini_params()
position = int(idx)
no_data_typeids = []
prefix = 'Getting data for region ' + str(regionid)
typeid_length = len(typeids)
with tqdm(typeids, total=typeid_length, desc=prefix, position=position) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = {executor.submit(multithread_func, typeid, regionid): typeid for typeid in typeids}
for future in concurrent.futures.as_completed(futures):
pbar.update(1)
return no_data_typeids
def multithread_func(typeid, regionid):
today = datetime.date.today()
history = get_market_history(regionid, typeid) #URL-scraper
if history != "error":
import_full_history(history)
else:
return typeid
return 0
def import_full_history(history):
get_data_session = Session()
print(type(history))
get_data_session.execute(historical_data.insert(), item_dict)
get_data_session.commit()
Session.remove()
return 0
我的数据库是这样构建的
根据
engine = create_engine('postgresql://user@localhost:5432/historic_market_data', pool_size=12, max_overflow=0)
session_factory = sessionmaker(bind = engine)
Base = declarative_base()
区域类
Session = scoped_session(session_factory)
class Regionid(Base):
__tablename__ = 'regionids'
regionid = Column(Integer, primary_key=True)
query = Session.query_property()
def __init__(self, regionid):
self.regionid = regionid
类型标识
Session = scoped_session(session_factory)
class Typeid(Base):
__tablename__ = 'typeids'
typeid = Column(Integer, primary_key=True)
query = Session.query_property()
def __init__(self, typeid):
self.typeid = typeid
历史数据
class Historical_data(Base):
__tablename__ = 'historical_data'
id = Column(Integer, primary_key=True)
typeid = Column('typeid', Integer, ForeignKey('typeids.typeid'))
regionid = Column('regionid', Integer, ForeignKey('regionids.regionid'))
date = Column(Date)
average = Column(Float)
highest = Column(Float)
lowest = Column(Float)
order_count = Column(Integer)
volume = Column(Integer)
buy_weighted_avg = Column(Float)
buy_maxval = Column(Float)
buy_minval = Column(Float)
buy_stddev = Column(Float)
buy_median = Column(Float)
buy_volume = Column(Float)
buy_numorders = Column(Integer)
buy_fivepercent = Column(Float)
sell_weighted_avg = Column(Float)
sell_maxval = Column(Float)
sell_minval = Column(Float)
sell_stddev = Column(Float)
sell_median = Column(Float)
sell_volume = Column(Float)
sell_numorders = Column(Integer)
sell_fivepercent = Column(Float)
def __init__(self, title, release_date):
self.typeid = typeid
self.regionid = regionid
self.date = date
self.average = average
self.highest = highest
self.lowest = lowest
self.order_count = order_count
self.volume = volume
self.buy_weighted_avg = buy_weighted_avg
self.buy_maxval = buy_maxval
self.buy_minval = buy_minval
self.buy_stddev = buy_stddev
self.buy_median = buy_median
self.buy_volume = buy_volume
self.buy_numorders = buy_numorders
self.buy_fivepercent = buy_fivepercent
self.sell_weighted_avg = sell_weighted_avg
self.sell_maxval = sell_maxval
self.sell_minval = sell_minval
self.sell_stddev = sell_stddev
self.sell_median = sell_median
self.sell_volume = sell_volume
self.sell_numorders = sell_numorders
self.sell_fivepercent = sell_fivepercent
解决方案
我已经设法通过使用 so 来让它工作,bulk_insert_mappings()
只需将我的更改import_full_history()
为
def import_full_esi_history(history):
get_data_session = Session()
get_data_session.bulk_insert_mappings(Historical_data, history)
get_data_session.commit()
Session.remove()
return 0
我得到它来插入数据。IT 还可以像我最初打算的那样结合多线程和多处理。似乎Session.insert()
一次只能处理一列,而我在列表中的字典是针对整行的