python - Python Peewee SqliteQueueDatabase 表未创建
问题描述
我正在使用 SQLite 数据库开发多线程应用程序。我做了一些研究,似乎 SqliteQueueDatabase 可以提供所需的并发处理。我已经浏览了文档,但似乎我还没有设法了解如何启动和启动数据库的全貌。
from peewee import *
from playhouse.sqliteq import SqliteQueueDatabase
db = SqliteQueueDatabase(':memory:')
class Prime(Model):
num = IntegerField()
class Meta:
database = db
db.start()
db.connect()
db.create_tables([Prime])
print db.get_tables() # prints []
db.stop()
在上面的示例中启动数据库后,我尝试为我的模型创建表,但它没有被创建。我想念什么?我试图找到一个涵盖整个生命周期但无法找到的 peewee & SqliteQueueDatabase 示例。
解决方案
我最终做的是我没有使用 peewee、SqliteQueueDatabase 或任何 ORM,但 sqlite3 和线程。
使用一种单例技巧,我基本上有一个具有连接属性的对象实例,因此一个连接实例与所有线程共享。我必须check_same_thread=False
在连接到 sqlite 时进行设置,否则不同的线程不能共享相同的连接。
下面是一个简化版:
import sqlite3
import threading
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Dao(object, metaclass=Singleton):
def __init__(self, conf=None):
self.lock = threading.Lock()
self.conn = sqlite3.connect(
conf.db,
check_same_thread=False
)
我第一次Dao
在主线程中实例化了该类并将配置传递给__init__
. 稍后,无论哪个线程需要使用数据库,它都只是创建了一个Dao
对象。然而,由于 Singleton 技巧,调用者只是获得了对已经存在的实例的引用,该实例还包括已经建立的连接。
我将所有数据库操作作为方法添加到此类Dao
。为了避免重复锁定,我使用wraps
.
from functools import wraps
def transaction_read_write(fn):
@wraps(fn)
def wrapper(self, *args, **kwargs):
self.lock.acquire()
# execute wrapped method and perform commit
try:
ret = fn(self, *args, **kwargs)
self.conn.commit()
except Exception as e:
# perform rollback in case of an error
# also in real world application do some logging here
self.conn.rollback()
raise e
finally:
# release acquired lock
self.lock.release()
return ret
return wrapper
我有一个类似的只读操作包装器,但没有提交/回滚。我使它可配置为是否在只读操作期间执行锁定——只是为了能够在没有新版本的情况下调整锁定行为,如果在产品中发生数据库并发问题。
现在我所要做的就是将我的自定义事务装饰器添加到Dao
方法中。(在现实生活中,一个事务当然可以由几个 SQL 命令组成。因此,我有一些没有事务注释的原子方法,它们从未从外部直接调用。它们仅由Dao
在单个事务中执行多个调用的其他一些方法调用所以这些复杂的方法有事务注释。我非常小心这些事务的大小和速度,因为在我的情况下,锁定机制基本上阻止了其他线程同时使用数据库。)
所以一个Dao
方法可能看起来像:
@transaction_read_write
def set_processed_files(self, id, num_files):
cur = self.conn.cursor()
cur.execute("UPDATE jobs SET num_files = ? WHERE job_id = ?", (num_files, id))
最后我决定不使用 peewee,但我希望在我的示例中有一些有用的东西。
推荐阅读
- python - 基于 3 个列表创建矩阵
- elixir - 在 Elixir 中更好地使用 Enum.reduce
- android - Android PresetReverb 和 EnvironmentalReverb 不起作用
- python - 使用熊猫适用于 if 条件
- elasticsearch - 我可以在弹性搜索中创建多少个索引?
- questdb - 我可以让 QuestDB 将列数据添加到现有行吗?
- javascript - 将对象插入到基于属性匹配的数组中的特定位置
- amazon-web-services - ssh AWS ec2 堡垒权限被拒绝
- php - Laravel 工匠使用 PHP_CLI_SERVER_WORKERS 服务
- php - PHP domdocument 最终具有意想不到的价值