首页 > 解决方案 > 按列有效地对事件日志数据库进行分类

问题描述

情况

我正在使用带有内置 sqlite3 模块的 Python 3.7.2。(sqlite3.version == 2.6.0)

我有一个如下所示的 sqlite 数据库:

| user_id | action | timestamp  |
| ------- | ------ | ---------- |
| Alice   |      0 | 1551683796 |
| Alice   |     23 | 1551683797 |
| James   |      1 | 1551683798 |
| ....... | ...... | .......... |

其中,user_id是一个任意的,是一个代表 UNIX 时间。TEXTactionINTEGERtimestampINTEGER

数据库有 200M 行,有 70K distinct user_ids。

目标

我需要制作一个看起来像这样的 Python 字典:

{
    "Alice":[(0, 1551683796), (23, 1551683797)],
    "James":[(1, 1551683798)],
    ...
}

user_ids 作为键,将相应的事件日志作为值,它们是 tuples 的列表(action, timestamp)。希望每个列表都按升序排序timestamp,但即使不是,我认为可以通过在制作字典后对每个列表进行排序来轻松实现。

努力

我有以下代码来查询数据库。它首先查询用户列表(带有user_list_cursor),然后查询属于该用户的所有行。

import sqlite3
connection = sqlite3.connect("database.db")
user_list_cursor = connection.cursor()
user_list_cursor.execute("SELECT DISTINCT user_id FROM EVENT_LOG")
user_id = user_list_cursor.fetchone()

classified_log = {}
log_cursor = connection.cursor()
while user_id:
    user_id = user_id[0] # cursor.fetchone() returns a tuple
    query = (
        "SELECT action, timestamp"
        " FROM TABLE"
        " WHERE user_id = ?"
        " ORDER BY timestamp ASC"
    )
    parameters = (user_id,)
    local_cursor.execute(query, parameters) # Here is the bottleneck
    classified_log[user_id] = list()
    for row in local_cursor.fetchall():
        classified_log[user_id].append(row)
        user_id = user_list_cursor.fetchone()

问题

每个用户的查询执行速度太慢。每行代码(被注释为瓶颈)大约需要 10 秒user_id。我认为我对查询采取了错误的方法。实现目标的正确方法是什么?

我尝试使用关键字“按列分类 db”、“按列分类 sql”、“sql 日志到字典 python”进行搜索,但似乎没有什么与我的情况相匹配。我认为这不会是一个罕见的需求,所以也许我错过了正确的搜索关键字。

再现性

如果有人愿意用 200M 行的 sqlite 数据库重现这种情况,下面的代码将创建一个 5GB 的数据库文件。

但我希望有人熟悉这种情况并知道如何编写正确的查询。

import sqlite3
import random

connection = sqlite3.connect("tmp.db")
cursor = connection.cursor()
cursor.execute(
    "CREATE TABLE IF NOT EXISTS EVENT_LOG (user_id TEXT, action INTEGER, timestamp INTEGER)"
)
query = "INSERT INTO EVENT_LOG VALUES (?, ?, ?)"
parameters = []
for timestamp in range(200_000_000):
    user_id = f"user{random.randint(0, 70000)}"
    action = random.randint(0, 1_000_000)
    parameters.append((user_id, action, timestamp))
cursor.executemany(query, parameters)
connection.commit()
cursor.close()
connection.close()

标签: pythonmysqlsqlite

解决方案


非常感谢@Strawberry 和@Solarflare 在评论中提供的帮助。

以下解决方案实现了 70 倍以上的性能提升,因此为了完整起见,我将我所做的作为答案。

正如他们建议的那样,我使用了索引并查询了整个表。

import sqlite3
from operators import attrgetter

connection = sqlite3.connect("database.db")

# Creating index, thanks to @Solarflare
cursor = connection.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_id ON EVENT_LOG (user_id)")
cursor.commit()

# Reading the whole table, then make lists by user_id. Thanks to @Strawberry
cursor.execute("SELECT user_id, action, timestamp FROM EVENT_LOG ORDER BY user_id ASC")
previous_user_id = None
log_per_user = list()
classified_log = dict()
for row in cursor:
    user_id, action, timestamp = row
    if user_id != previous_user_id:
        if previous_user_id:
            log_per_user.sort(key=itemgetter(1))
            classified_log[previous_user_id] = log_per_user[:]
        log_per_user = list()
    log_per_user.append((action, timestamp))
    previous_user_id = user_id

所以要点是

  • 索引user_idORDER BY user_id ASC在可接受的时间内执行。
  • 读取整个表,然后按 分类user_id,而不是对每个 进行单独查询user_id
  • 迭代cursor以逐行读取,而不是cursor.fetchall().

推荐阅读