首页 > 解决方案 > Python Peewee SqliteQueueDatabase 表未创建

问题描述

我正在使用 SQLite 数据库开发多线程应用程序。我做了一些研究,似乎 SqliteQueueDatabase 可以提供所需的并发处理。我已经浏览了文档,但似乎我还没有设法了解如何启动和启动数据库的全貌。

from peewee import *
from playhouse.sqliteq import SqliteQueueDatabase

db = SqliteQueueDatabase(':memory:')


class Prime(Model):
    num = IntegerField()

    class Meta:
        database = db


db.start()
db.connect()
db.create_tables([Prime])
print db.get_tables()  # prints []
db.stop()

在上面的示例中启动数据库后,我尝试为我的模型创建表,但它没有被创建。我想念什么?我试图找到一个涵盖整个生命周期但无法找到的 peewee & SqliteQueueDatabase 示例。

标签: pythonsqlitepeewee

解决方案


我最终做的是我没有使用 peewee、SqliteQueueDatabase 或任何 ORM,但 sqlite3 和线程。

使用一种单例技巧,我基本上有一个具有连接属性的对象实例,因此一个连接实例与所有线程共享。我必须check_same_thread=False在连接到 sqlite 时进行设置,否则不同的线程不能共享相同的连接。

下面是一个简化版:

import sqlite3
import threading

class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]

class Dao(object, metaclass=Singleton):

    def __init__(self, conf=None):
        self.lock = threading.Lock()
        self.conn = sqlite3.connect(
                conf.db,
                check_same_thread=False
            )

我第一次Dao在主线程中实例化了该类并将配置传递给__init__. 稍后,无论哪个线程需要使用数据库,它都只是创建了一个Dao对象。然而,由于 Singleton 技巧,调用者只是获得了对已经存在的实例的引用,该实例还包括已经建立的连接。

我将所有数据库操作作为方法添加到此类Dao。为了避免重复锁定,我使用wraps.

from functools import wraps

def transaction_read_write(fn):
    @wraps(fn)
    def wrapper(self, *args, **kwargs):
        self.lock.acquire()
        # execute wrapped method and perform commit
        try:
            ret = fn(self, *args, **kwargs)
            self.conn.commit()
        except Exception as e:
            # perform rollback in case of an error
            # also in real world application do some logging here
            self.conn.rollback()
            raise e
        finally:
            # release acquired lock
            self.lock.release()
        return ret
    return wrapper

我有一个类似的只读操作包装器,但没有提交/回滚。我使它可配置为是否在只读操作期间执行锁定——只是为了能够在没有新版本的情况下调整锁定行为,如果在产品中发生数据库并发问题。

现在我所要做的就是将我的自定义事务装饰器添加到Dao方法中。(在现实生活中,一个事务当然可以由几个 SQL 命令组成。因此,我有一些没有事务注释的原子方法,它们从未从外部直接调用。它们仅由Dao在单个事务中执行多个调用的其他一些方法调用所以这些复杂的方法有事务注释。我非常小心这些事务的大小和速度,因为在我的情况下,锁定机制基本上阻止了其他线程同时使用数据库。)

所以一个Dao方法可能看起来像:

@transaction_read_write
def set_processed_files(self, id, num_files):
    cur = self.conn.cursor()
    cur.execute("UPDATE jobs SET num_files = ? WHERE job_id = ?", (num_files, id))

最后我决定不使用 peewee,但我希望在我的示例中有一些有用的东西。


推荐阅读