python - 结合 SQLAlchemy yield_per 和 group_by
问题描述
我有一个。SQLAlchemy 数据库表跨越 24 小时,每小时最多 1,000,000 行。下面的示例表。
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declatative_base
from sqlalchemy.orm import sessionmaker
from random import choice
import pandas as pd
Base = declarative_base()
class WebsiteData(Base):
__tablename__ = 'hourly_website_table'
id = Column(Integer, primary_key=True)
user = Column(String(600), index=True)
website = Column(String(600))
time_secs = Column(Integer, index=True)
class DataBaseManager:
def __init__(self, db_loc='sqlite:////home/test/database.db'):
self.engine = create_engine(db_loc, echo=False)
self.table = WebsiteData
def get_session(self):
Session = sessionmaker(bind=self.engine)
session = Session()
Base.metadata.create_all(self.engine)
return session
def get_db_info(self):
session = self.get_session()
rows = session.query(self.table).count()
session.close()
return rows
def df_to_hourly_db(self, table_name, df, time_secs):
conn = self.engine.raw_connection()
df['hour'] = time_secs
query = "INSERT OR REPLACE INTO %s (user,website,time_secs) VALUES (?,?,?)" %\
table_name
conn.executemany(query, df[['user', 'website', 'hour']].to_records(index=False))
conn.commit()
conn.close()
def create_df(time_secs=0, users=10000, rows_per_user=100):
user_arr = [("u%d" % i) for i in range(users)] * rows_per_user
web_arr = [("www.website_%d" % (time_secs + i)) for i in xrange(rows_per_user * users)]
return pd.DataFrame({'user': user_arr, 'website': web_arr})
DBM = DataBaseManager()
for hour in range(24):
time_secs = (60 * 24 * 3600) + (hour * 3600)
df = create_df(time_secs=time_secs, rows_per_user=choice(range(100)))
DBM.df_to_hourly_db(df, time_secs)
每小时的行数是可变的。为了避免一次将整个表加载到内存中,我想group_by(table.time_secs)
对数据执行 a ,然后按顺序流式传输每个组。是否有可能以某种方式结合 SQLAlchemygroup_by
和yield_per
方法来实现这一目标?我知道yield_per
允许您一次产生一定数量的行,但是每次迭代是否可以产生不同数量的行?如果没有,有没有其他方法可以做类似的事情?
解决方案
推荐阅读
- c# - 使用 DependencyProperties 链接两个用户控件(图形编辑器)
- reactjs - 如何在我的 React-Redux 客户端使用保存在我的数据库中的枚举?
- java - 在 java 日志格式化程序中使用简单的类名
- spring - 如何使用不明确的 pom 启用 log4j2?
- javascript - javascript Uncaught TypeError for map
- dart - 如何修复 Flutter 本地身份验证的平台异常错误“auth_in_progress”?
- python - 需要帮助使用 bs4 和 python 从幻灯片中抓取图像
- php - Symfony 检查输入数据是否唯一
- ubuntu - 如何查看 cron 修改时间?
- django - 在模板的表单操作中使用子流程