sqlite - 无法在循环内构建 rusqlite 事务:使用移动值并且不能一次多次借用可变值
问题描述
为了加快使用rusqlite插入 SQLite DB 的速度,我想在 for 循环中构建一个事务,并且只提交每 N 次迭代。
以下代码可以编译,但它会构建一个事务并一次性提交所有事务:
use rusqlite::{Connection, Result, NO_PARAMS};
fn main() -> Result<()> {
let mut conn = Connection::open_in_memory()?;
conn.execute(
"CREATE TABLE entry (
id INTEGER PRIMARY KEY,
data INTEGER
)",
NO_PARAMS,
)?;
let tx = conn.transaction()?;
for i in 0..20 {
tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
}
tx.commit()?;
Ok(())
}
我的用例需要构建一个包含数百万个插入的事务,所以我想做的是累积事务,并在它到达transaction_size
提交时重新开始一个新事务。非编译版本如下所示:
let transaction_size = 5;
let tx = conn.transaction()?;
for i in 0..20 {
if (i % transaction_size) == (transaction_size - 1) {
tx.commit()?;
let tx = conn.transaction()?;
}
tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
}
借阅检查器不允许这样做有两个原因。
error[E0382]: use of moved value: `tx`
--> src/main.rs:18:13
|
15 | let tx = conn.transaction()?;
| -- move occurs because `tx` has type `rusqlite::transaction::Transaction<'_>`, which does not implement the `Copy` trait
...
18 | tx.commit()?;
| ^^ value moved here, in previous iteration of loop
error[E0499]: cannot borrow `conn` as mutable more than once at a time
--> src/main.rs:19:22
|
15 | let tx = conn.transaction()?;
| ---- first mutable borrow occurs here
...
19 | let tx = conn.transaction()?;
| ^^^^ second mutable borrow occurs here
20 | }
21 | tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
| -- first borrow later used here
第一个抱怨对我来说很有意义。第二个不是那么多,因为以下将编译(但我每个事务只插入一行):
for i in 0..20 {
let tx = conn.transaction()?;
tx.execute("INSERT INTO entry (data) VALUES (?1)", &[i])?;
tx.commit()?;
}
我试过let tx = if cond { tx.commit()?; conn.transaction()? }
在循环内使用 a ,但你需要一个 else 子句来进行类型检查。
我不知道如何在让编译器满意的同时实现我的目标。也许有一些方法可以使用不安全的功能,但我对 Rust 很陌生。
编辑
我忘了提到我想将我的迭代器视为一次性使用。
使用将构建事务的逻辑do_batch
与@Sébastien Renauld 分离的想法,我制作了这个版本,它将累积必须使用可变向量添加到事务中的数据。然后它以 size 的块构建并提交事务transaction_size
。
use rusqlite::{Connection, Result, Transaction, NO_PARAMS};
use std::vec::Vec;
fn do_batch<'a>(tx: &Transaction<'a>, transaction_accum: &Vec<i32>) -> Result<()> {
for i in transaction_accum.iter() {
tx.execute("INSERT INTO entry (data) values (?1)", &[i])?;
}
Ok(())
}
fn main() -> Result<()> {
let mut conn = Connection::open_in_memory()?;
conn.execute(
"CREATE TABLE entry (
id INTEGER PRIMARY KEY,
data INTEGER
)",
NO_PARAMS,
)?;
let transaction_size = 5;
let mut transaction_accum: Vec<i32> = Vec::new();
for i in 1..20 {
transaction_accum.push(i);
if (i % transaction_size) == (transaction_size - 1) {
let tx = conn.transaction()?;
do_batch(&tx, &transaction_accum)?;
transaction_accum.clear();
tx.commit()?;
}
}
Ok(())
}
编辑 2
在@Sébastien Renauld 的另一个建议之后,我偶然发现了 itertools 板条箱,它可以让您将来自迭代器的输出分块,从而提供以下漂亮而干净的解决方案。我唯一担心的是,为了制作块,整个迭代器在调用chunks
. 是这样吗?
use rusqlite::{Connection, Result, Transaction, NO_PARAMS};
use std::vec::Vec;
use itertools::Itertools;
fn do_batch<'a>(tx: &Transaction<'a>, transaction_accum: &Vec<i32>) -> Result<()> {
for i in transaction_accum.iter() {
tx.execute("INSERT INTO entry (data) values (?1)", &[i])?;
}
Ok(())
}
fn main() -> Result<()> {
let mut conn = Connection::open_in_memory()?;
conn.execute(
"CREATE TABLE entry (
id INTEGER PRIMARY KEY,
data INTEGER
)",
NO_PARAMS,
)?;
let transaction_size = 5;
let my_iter = 1..20; // this is really a WalkDir from the walkdir crate
for chunk in &my_iter.into_iter().chunks(transaction_size) {
let tx = conn.transaction()?;
do_batch(&tx, &chunk.collect())?;
tx.commit()?;
}
Ok(())
}
解决方案
这是一个 SQL 问题,而不是 Rust 问题,但我会解释你为什么会遇到这个问题,以及它是如何在 Rust 中出现的。
这一切都源于对事务数据库的基本误解,它适用于支持事务的每一个 RDBMS。事务的重点是在服务器上打开可视为单独的内容;然后,您对其进行状态更改,例如添加或删除行,然后将单独的平板转换为服务器的“真实”状态。根据您使用的数据库引擎,这将以不同的方式实现,但对于我们今天的目的,您的问题,这个类比就可以了。
而不是这样做,你打开你的交易,做一个插入,然后立即用commit()
. 注意它的签名:
fn commit(self) -> Result<()>
正如我们所期望的那样,commit()
需要self
,而不是&mut self
。通过提交(或回滚),您告诉服务器您已完成此事务。
要解决此问题,您需要根据数据库决定如何处理它。批处理是一个好主意,您已经发现了这一点,但您需要确保您有能力承受一批失败并重复。因此,我们将把事情分开一点。
首先,我们将构建我们的批处理生成器。我们需要这个,特别是如果我们打算重放一批:
fn do_batch<'a>(tx: &mut Transaction<'a>) -> Result<(), rusqlite::Error> {
for i in 0..20 {
tx.execute("INSERT INTO entry (data) values (?1", &[i])?;
}
Ok(())
}
然后,我们围绕它构建结构:
fn do_tx(mut conn: Connection) -> Result<(), rusqlite::Error> {
for i in 0..20 {
// Open the TX
let mut tx = conn.transaction()?;
do_batch(&mut tx)?;
// Do your error handling here. If the batch fails, you want to decide whether to retry or abort.
tx.commit()?;
}
Ok(())
}
如果可能,分离关注点总是值得的,如果需要,总是值得传递事务;这就是他们的目的。让您的函数构建批处理,然后在某种总体结构中处理提交/回滚行为。
正如您在评论中提到的,您正在走一棵树。为此,我只是假设您已经展平了迭代器(即,您的 N 维树由一维迭代器表示),并且该迭代器位于tree_walker
.
当前没有chunks()
在迭代器上定义方法,这是您需要的。为简洁起见,我们将collect()
使用Vec::chunks()
. 对于大多数工作负载,这应该不是问题,但是如果您发现此分配的大小太大,您可以相对轻松地自己重新实现它。
use rusqlite::Error;
use rusqlite::{Connection, Transaction};
fn do_batch<'a>(tx: &Transaction<'a>, transaction_accum: &[i32]) -> Result<(), rusqlite::Error> {
for i in transaction_accum.iter() {
tx.execute("INSERT INTO entry (data) values (?1)", &[i])?;
}
Ok(())
}
fn commit(
mut conn: Connection,
tree_walker: impl Iterator<Item = i32>,
batch_size: usize,
) -> Result<(), rusqlite::Error> {
let collected: Vec<i32> = tree_walker.collect();
collected
.chunks(batch_size)
.fold(Ok(()), |current, elements| {
current.and_then(|_| {
let tx = conn.transaction()?;
do_batch(&tx, &elements)?;
tx.commit()
})
})
}
推荐阅读
- css - 如何部分填充 SVG 形状中的正空间和负空间?
- c# - 如何在 PostgreSQL 中对一组 ID 进行分层子计数(不是单个 ID)
- vba - 在 MS Access 365 上循环访问 MS 现代图表的 ChartSeriesCollection
- css - 为什么 chrome devtools 显示这个?正文颜色不起作用
- python - 包含字符串的多个 DataFrame 的逐项加权平均值
- python - 从 JSON 写入 CSV,只导入给定的键
- firebase - 如何设置存储规则以便我可以运行 Firebase 存储模拟器?
- postgresql - postgreSQL - unixtime 每小时值的每日平均值
- javascript - Reactjs-我不断收到无法 POST 错误
- python - 如何在 fastapi 中进行部分更新?