首页 > 技术文章 > [peewee]笔记

watalo 2021-03-14 22:49 原文

Peewee学习笔记

peewee是一个ORM数据库处理的库,我翻译了从快速入门开始的核心部分,后面还有API文档和扩展库的文档没有翻译。

2021年3月22日完成了翻译,中间也间断的学习了一部分。给我的感受是peewee正是我要找的那种可以像操作类的属性一样调用数据的库。在此之前,我也自学了一个叫TinyDB的库,它不用数据库文件,而是使用JSON文件保存数据。感觉还是peewee更专业一点。

后面的内容,预计花1个月的时间,将这个复杂的缩减到一个文档中。这就是所谓的书从厚读薄吧。

摘要:peewee 核心

  • Database

  • Model --> 创建表

  • Field -->表的字段

  • Query --> 查询

使用的顺序:

  • 初始化数据库
  • 创建表
  • 增、删、改、查

一、初始化数据库

在执行下面所有命令之前,我们需要初始化一个数据库(Database)、表(Model)和它的字段(field)。以下是官方文档的标准范式,抄就完了,别问为什么?

from peewee import *

# db = SqliteDatabase(':memory:')  <-- 在内存里面搞一个数据库
db = SqliteDatabase('my_database.db') # <--在当前目录下生成一个my_database.db文件

class BaseModel(Model): # 标准写法
    class Meta:         # 下面的Model全部继承BaseModel
        database = db   # 连接数据库,Meta子类还可以设置其他的东西,这里别写

class User(BaseModel):
    name = CharField()
    message = TextField()

db.connect() # 生成/连接数据库
db.close()   # 关闭 数据库连接

二、创建表

在使用数据库之前,需要用已经声明的Model来在数据库里面创建表:

db.connect() # 没有必要,但是还是显示的建立连接会比较方便后面判断错误。
db.create_tables([User])

print(User.__dict__.keys())

测试下,没有问题。

PS:数据迁移问题

使用playhouse.migrate模块可以进行迁移(在已经存在的表里增加一个字段)

from playhouse.migrate import *
migrator = SqliteMigrator(db)

price_field = IntegerField(default=0)
migrate(migrator.add_column('product', 'price', price_field))

这里需要注意了,你增加了字段,那表的Model就要跟着改。如果你非要在同一个py文件中这么操作,peewee作者的答案是:

It's unusual to do this online:

1.declare a model/schema

2.migrate it

3.use the new schema

Because if you know you need a price field just put it on the model class from the start.

If you really need to do this, then you can call this after you've run the migration:

Product._meta.add_field('price', price_field)

数据库my_database.db文件里面实际上已经有了price这个字段,但是在同一个py文件中是无法调用的,为什么,因为我们定义的Model里面没有,所以,要修改前面的Model。

  • 直接在Model里面写个字段

  • 使用 pwiz 生成新文件

    python -m pwiz -e sqlite path/to/sqlite_database.db
    

三、增删改查

1.创建或插入新记录

  • 类方法
    • Model.create()
    • Model.insert().execute()
    • 通过类方法不需要使用 .save()保存
  • 实例属性
    • user = User(name = 'xxx')
    • user.name = 'xxx'
    • 通过实例属性都要使用 .save()保存

1.1 create()

创建一条记录,并返回一个实例

user = User.create(name = 'watalo', message= '11111')
print(user)

1.2 insert()

插入一条记录,返回主键

User.insert(name = 'donggua', message = 'wangwangwang').execute()

1.3 传参的实例

user = User(name = 'jianhetao', message = 'houhouhou')
user.save()
# query = User.select().where(User.name == 'jianhetao')
# for q in query:
#     print(q.message)

1.4 不传参的实例

user = User()
user.name = 'jiajia' # 提示 NOT NULL constraint failed: user.message
user.message = '555555555555555' # 如果不写这条
user.save() 

1.5 insert_many()

不管什么情况都使用db.atomic()会极大的提高运行速度。

数据源可以用两种类型:

  • 字典列表:[{‘name’:‘ xxx’, 'message':'jjj'},]

    data_source = [
        {'name':'aaa', 'message':'jjj'},
        {'name':'bbb', 'message':'jjj'},
        {'name':'ccc', 'message':'jjj'},
        {'name':'ddd', 'message':'jjj'},
        {'name':'eee', 'message':'jjj'},
        {'name':'fff', 'message':'jjj'},
        {'name':'ggg', 'message':'jjj'},
        ] # 字典列表形式的数据源
    with db.atomic():
        User.insert_many(data_source).execute()
    
  • 元组列表:[('xxx','jjj')]

    data_source = [
        ('hhh', 'jjj'),
        ('hhh', 'jjj'),
        ('hhh', 'jjj'),
        ] # 元组列表形式的数据源
    with db.atomic():
        # 这里需要制定fields
        User.insert_many(data_source, fields=[User.name, User.message]).execute()
    

数据源的量特别大的时候,SQLite会有批处理的数据量的上限,通常是999或这32776,这种情况使用分块会比较好:

  • chunked(): 把大量数据分成固定数量的块,一个一个的循环

    from datetime import datetime
    data_source = []
    for i in range(40000):
        data_cell = {'name':'aaa'+str(i), 'message':'message'+str(i*2)}
        data_source.append(data_cell)
    
    print(datetime.now())
    from peewee import chunked
    with db.atomic():
        for batch in chunked(data_source, 100):
            User.insert_many(batch).execute()
    print(datetime.now())
    

    用时:0.7089990000000004

    还可以直接写个循环:

    with db.atomic():
        for idx in range(0, len(data_source), 100):
            User.insert_many(data_source[idx:idx+100]).execute()
    

    用时:0.7069879999999955

    从用时上看,这两种方法都差不多。

1.6 insert_from()

从其他的表里面直接导入数据。没有试。

User_back.insert_from(User.select(User.name),fields=[User_back.name,]).execute()

1.7 其他选项

  • bulk_create() 没有试。

    with db.atomic():
        User.bulk_create(users, batch_size=100)
    

    参数batch_size

  • bulk_update() 没有试。

    with database.atomic():
        User.bulk_update(list_of_users, fields=[`username`], batch_size=50)
    

2.删除记录

2.1 delete_instance()

删除一个实例,在实例后调用。

watalo = User.get(User.name == 'watalo')
watalo.delete_instance()

2.2 delete()

在类后面使用

hhh_to_be_delete = User.delete().where(User.name == 'hhh') 
hhh_to_be_delete.execute()

3.更新记录

3.1 修改单条记录

一旦实例有了主键,再进行save()操作就是 update,而不是 insert

aaa = User.select().where(User.name == 'aaa').get()
aaa.message = '我改了'
aaa.save()

3.2 更新多条记录

使用update查询,直接进行批量更新

query = User.update(message = '').where(User.name.startswith('aaa'))
query.execute()

3.3 原子更新

atomic update 这个单词不知道具体是什么意思,感觉就是对某个很细微的数据进行更新。有三个案例可以学习下:

  • 简单计数
query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
query.execute()
  • 复杂查询
query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
query.execute()  # Give everyone a bonus!
  • 使用子查询
subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
update = User.update(num_tweets=subquery)
update.execute()

3.4 插入

  • replace()

    3.24之前的SQLite和MySQL可以使用。作用是直接替换某条记录,而且可以忽略限制条件。这里就不讨论了

    class User(Model):
        username = TextField(unique=True)
        last_login = DateTimeField(null=True)
    
    # Insert or update the user. The "last_login" value will be updated
    # regardless of whether the user existed previously.
    user_id = (User
               .replace(username=`the-user`, last_login=datetime.now())
               .execute())
    
  • on_conflict_replace()

    用这个命令可以忽略限制条件。

    user_id = (User
               .insert(username=`the-user`, last_login=datetime.now())
               .on_conflict_replace()
               .execute())
    

4.查询记录

4.1 查询单条记录

7种调用方式:

  • get() 如果没查到会跳出 DoesNotExist异常

    User.get(User.id == 6)
    
  • get_by_id()

    User.get_by_id(7)
    
  • User[8]

  • User.get(User.id == 9).name

  • User.get(User.name == 'aaa')

  • User.select().where(User.name == 'bbb').get()

  • get_or_none() 如果没有查到会直接返回None

    User.select().where(User.name == 'bbb').get()
    
  • get_or_create()如果没有就创建一条记录,并且返回一个元组(实例,布尔值)

    User.get_or_create(name = 'wocalie',defaults = {'message':'successed'})
    # (<User: 120017>, True)
    

    这个函数的逻辑如下:

    try:
        with db.atomic():
            return User.create(username=username)
    except peewee.IntegrityError:
        # `username` is a unique column, so this username already exists,
        # making it safe to call .get().
        return User.get(User.username == username)
    

4.2 查询多条记录

4.2.1 select(): 查询所有
query = User.select()
print(query)

select()结果可以进行迭代:

  • User.select().iterator()

    query = User.select()
    
    for row in query.iterator():
        print(row)
    

    在调用大型数据的时候,会节省内存。

还可以字典化、元组化,进一步加快速度。而且这些都可以进行iterator()

  • User.select().dicts()

    query = User.select().dicts().iterator()
    
  • User.select().tuple()

    query = User.select().tuples().iterator()
    
  • User.select().nametuple()

    query = User.select().namedtuples().iterator()
    

在做联表查询的大型迭代时,peewee实际上重构了model,把数据写入model,再读取。这样很慢,所以peewee提供了一个方法:

  • User.select().objects()

    query = User.select()
    
    for q in query.objects():
        print(q.name)
    

4.3 筛选查询记录

4.3.1 where()方法
  • User.select().where()

对单一Model的条件查询

User.select().where(User.name == 'bbb')
  • User.select().join(Tweet).where()

对联表Mode的条件查询

User.select().join(Tweet).where(Tweet.is_pulished = True)
4.3.2 条件设置

复杂查询时,可采用以下方式:

  • 圆括号和python的按位操作符
(User.username == `Charlie`) | (User.username == `Peewee Herman`)
  • 函数表达式
fn.Lower(fn.Substr(User.username, 1, 1)) == a
  • 比较表达式
Employee.salary < (Employee.tenure * 1000) + 40000
  • 嵌套查询:in_()
a_users = User.select().where(fn.Lower(fn.Substr(User.username, 1, 1)) == `a`)
a_user_tweets = Tweet.select().where(Tweet.user.in_(a_users))

4.4 排序order_by()

  • 基本用法

    • 正序(+):不用调整

    • 倒序(-):Tweet.created_date.desc() =- Tweet.created_date

query = Tweet.select().join(Member).where(Tweet.is_published).order_by(Tweet.created_date.desc())
query = Tweet.select().join(Member).where(Tweet.is_published).order_by(- Tweet.created_date)
  • 高级用法
ntweets = fn.COUNT(Tweet.id)
query = (User
         .select(User.username, ntweets.alias(`num_tweets`))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(ntweets.desc())

4.5 分页paginate()

for tweet in Tweet.select().paginate(5, 10):
    print(tweet.id)
41
42
43
44
45
46
47
48
49
50

4.6 随机取值Random()

for tweet in Tweet.select().order_by(fn.Random()).limit(5):
    print(tweet.id)

4.7 计数count()

  • 带条件
Tweet.select().where(Tweet.id > 40).count()
  • 不带条件
Tweet.select().count()

4.8 聚合group_by()

我理解的所谓聚合,是各种查询后打包成一个新的数据,然后再来利用,不知道对不对。

举例:查一个计数的结果,然后打包到Member里面:

query = (Member
         .select(Member, fn.Count(Tweet.id).alias('count'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(Member))
for q in query:
    print(q.id, q.count)

推荐阅读