python - 有没有办法使用 FORALL 从数组中插入数据?
问题描述
我正在运行 oracle 19c,我想获得最好的插入性能。目前,我插入使用INSERT /*+APPEND */ ...
which 很好,但不是我想要的速度。
我读到使用FORALL
速度要快得多,但我真的找不到任何例子。这是代码片段(python 3):
connection = pool.acquire()
cursor = connection.cursor()
cursor.executemany("INSERT /*+APPEND*/ INTO RANDOM VALUES (:1, :2, :3)", list(random))
connection.commit()
cursor.close()
connection.close()
解决方案
我真的对更快的东西很感兴趣,所以我测试了一些可能的方法来比较它们:
- 简单
executemany
,没有技巧。 APPEND_VALUES
与语句中的提示相同。union all
您在另一个问题中尝试过的方法。这应该比上面的要慢,因为它会生成一个非常大的语句(这可能需要比数据本身更多的网络)。然后应该在数据库端解析它,这也会消耗大量时间并忽略所有好处(不是谈论潜在的大小限制)。然后我executemany
用块来测试它,而不是为 100k 记录构建单个语句。我没有在语句中使用值连接,因为想保证它的安全。insert all
. 同样的缺点,但没有工会。将其与union
版本进行比较。- 序列化 JSON 中的数据,并在 DB 端使用
json_table
. 单个简短语句和单个数据传输具有潜在的良好性能,JSON 开销很小。 - 您
FORALL
在 PL/SQL 包装程序中提出的建议。应该和executemany
since一样,但是在数据库端。将数据转换为集合的开销。 - 相同
FORALL
,但使用列式方法来传递数据:传递简单的列值列表而不是复杂类型。应该比FORALL
集合快得多,因为不需要将数据序列化为集合的类型。
我在 Oracle Cloud 中使用免费帐户的 Oracle 自治数据库。每个方法循环执行 10 次,使用相同的 100k 记录输入数据集,在每次测试之前重新创建表。这是我得到的结果。这里的准备和执行时间分别是客户端数据库调用本身的数据转换。
>>> t = PerfTest(100000)
>>> t.run("exec_many", 10)
Method: exec_many.
Duration, avg: 2.3083874 s
Preparation time, avg: 0.0 s
Execution time, avg: 2.3083874 s
>>> t.run("exec_many_append", 10)
Method: exec_many_append.
Duration, avg: 2.6031369 s
Preparation time, avg: 0.0 s
Execution time, avg: 2.6031369 s
>>> t.run("union_all", 10, 10000)
Method: union_all.
Duration, avg: 27.9444233 s
Preparation time, avg: 0.0408773 s
Execution time, avg: 27.8457551 s
>>> t.run("insert_all", 10, 10000)
Method: insert_all.
Duration, avg: 70.6442494 s
Preparation time, avg: 0.0289269 s
Execution time, avg: 70.5541995 s
>>> t.run("json_table", 10)
Method: json_table.
Duration, avg: 10.4648237 s
Preparation time, avg: 9.7907693 s
Execution time, avg: 0.621006 s
>>> t.run("forall", 10)
Method: forall.
Duration, avg: 5.5622837 s
Preparation time, avg: 1.8972456000000002 s
Execution time, avg: 3.6650380999999994 s
>>> t.run("forall_columnar", 10)
Method: forall_columnar.
Duration, avg: 2.6702698000000002 s
Preparation time, avg: 0.055710800000000005 s
Execution time, avg: 2.6105702 s
>>>
最快的方法就是executemany
,没有那么多惊喜。有趣的是,这APPEND_VALUES
并没有改善查询并且平均获得更多时间,因此需要更多调查。
关于FORALL
:正如预期的那样,每列的单个数组花费的时间更少,因为没有数据准备。它或多或少与 可比executemany
,但我认为 PL/SQL 开销在这里起了一些作用。
对我来说另一个有趣的部分是 JSON:大部分时间都花在将 LOB 写入数据库和序列化上,但查询本身非常快。也许可以使用 chuncsize 或其他方式将 LOB 数据传递到 select 语句中以某种方式改进写入操作,但就我的代码而言,它远非非常简单直接的executemany
.
也有可能没有 Python 的方法作为外部数据的本机工具应该更快,但我没有测试它们:
下面是我用于测试的代码。
import cx_Oracle as db
import os, random, json
import datetime as dt
class PerfTest:
def __init__(self, size):
self._con = db.connect(
os.environ["ora_cloud_usr"],
os.environ["ora_cloud_pwd"],
"test_low",
encoding="UTF-8"
)
self._cur = self._con.cursor()
self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
def __del__(self):
if self._con:
self._con.rollback()
self._con.close()
#Create objets
def setup(self):
try:
self._cur.execute("drop table rand")
#print("table dropped")
except:
pass
self._cur.execute("""create table rand(
id int,
str varchar2(100),
val number
)""")
self._cur.execute("""create or replace package pkg_test as
type ts_test is record (
id rand.id%type,
str rand.str%type,
val rand.val%type
);
type tt_test is table of ts_test index by pls_integer;
type tt_ids is table of rand.id%type index by pls_integer;
type tt_strs is table of rand.str%type index by pls_integer;
type tt_vals is table of rand.val%type index by pls_integer;
procedure write_data(p_data in tt_test);
procedure write_data_columnar(
p_ids in tt_ids,
p_strs in tt_strs,
p_vals in tt_vals
);
end;""")
self._cur.execute("""create or replace package body pkg_test as
procedure write_data(p_data in tt_test)
as
begin
forall i in indices of p_data
insert into rand(id, str, val)
values (p_data(i).id, p_data(i).str, p_data(i).val)
;
commit;
end;
procedure write_data_columnar(
p_ids in tt_ids,
p_strs in tt_strs,
p_vals in tt_vals
) as
begin
forall i in indices of p_ids
insert into rand(id, str, val)
values (p_ids(i), p_strs(i), p_vals(i))
;
commit;
end;
end;
""")
def build_union(self, size):
return """insert into rand(id, str, val)
select id, str, val from rand where 1 = 0 union all
""" + """ union all """.join(
["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
for i in range(size)]
)
def build_insert_all(self, size):
return """
""".join(
["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
for i in range(size)]
)
#Test case with executemany
def exec_many(self):
start = dt.datetime.now()
self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
self._con.commit()
return (dt.timedelta(0), dt.datetime.now() - start)
#The same as above but with prepared statement (no parsing)
def exec_many_append(self):
start = dt.datetime.now()
self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
self._con.commit()
return (dt.timedelta(0), dt.datetime.now() - start)
#Union All approach (chunked). Should have large parse time
def union_all(self, size):
##Chunked list of big tuples
start_prepare = dt.datetime.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*size))
]
new_stmt = self.build_union(size)
dur_prepare = dt.datetime.now() - start_prepare
#Execute unions
start_exec = dt.datetime.now()
self._cur.executemany(new_stmt, new_inp)
dur_exec = dt.datetime.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetime.now()
new_stmt = self.build_union(remainder)
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetime.now() - start_prepare
start_exec = dt.datetime.now()
self._cur.execute(new_stmt, new_inp)
dur_exec += dt.datetime.now() - start_exec
self._con.commit()
return (dur_prepare, dur_exec)
#The same as union all, but with no need to union something
def insert_all(self, size):
##Chunked list of big tuples
start_prepare = dt.datetime.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*size))
]
new_stmt = """insert all
{}
select * from dual"""
dur_prepare = dt.datetime.now() - start_prepare
#Execute
start_exec = dt.datetime.now()
self._cur.executemany(
new_stmt.format(self.build_insert_all(size)),
new_inp
)
dur_exec = dt.datetime.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetime.now()
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetime.now() - start_prepare
start_exec = dt.datetime.now()
self._cur.execute(
new_stmt.format(self.build_insert_all(remainder)),
new_inp
)
dur_exec += dt.datetime.now() - start_exec
self._con.commit()
return (dur_prepare, dur_exec)
#Serialize at server side and do deserialization at DB side
def json_table(self):
start_prepare = dt.datetime.now()
new_inp = json.dumps([
{ "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
])
lob_var = self._con.createlob(db.DB_TYPE_CLOB)
lob_var.write(new_inp)
start_exec = dt.datetime.now()
self._cur.execute("""
insert into rand(id, str, val)
select id, str, val
from json_table(
to_clob(:json), '$[*]'
columns
id int,
str varchar2(100),
val number
)
""", json=lob_var)
dur_exec = dt.datetime.now() - start_exec
self._con.commit()
return (start_exec - start_prepare, dur_exec)
#PL/SQL with FORALL
def forall(self):
start_prepare = dt.datetime.now()
collection_type = self._con.gettype("PKG_TEST.TT_TEST")
record_type = self._con.gettype("PKG_TEST.TS_TEST")
def recBuilder(x):
rec = record_type.newobject()
rec.ID = x[0]
rec.STR = x[1]
rec.VAL = x[2]
return rec
inp_collection = collection_type.newobject([
recBuilder(i) for i in self.inp
])
start_exec = dt.datetime.now()
self._cur.callproc("pkg_test.write_data", [inp_collection])
dur_exec = dt.datetime.now() - start_exec
return (start_exec - start_prepare, dur_exec)
#PL/SQL with FORALL and plain collections
def forall_columnar(self):
start_prepare = dt.datetime.now()
ids, strs, vals = map(list, zip(*self.inp))
start_exec = dt.datetime.now()
self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
dur_exec = dt.datetime.now() - start_exec
return (start_exec - start_prepare, dur_exec)
#Run test
def run(self, method, iterations, *args):
#Cleanup schema
self.setup()
start = dt.datetime.now()
runtime = []
for i in range(iterations):
single_run = getattr(self, method)(*args)
runtime.append(single_run)
dur = dt.datetime.now() - start
dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
dur_exec_total = sum([i.total_seconds() for _, i in runtime])
print("""Method: {meth}.
Duration, avg: {run_dur} s
Preparation time, avg: {prep} s
Execution time, avg: {ex} s""".format(
inp_s=len(self.inp),
meth=method,
run_dur=dur.total_seconds() / iterations,
prep=dur_prep_total / iterations,
ex=dur_exec_total / iterations
))
推荐阅读
- sql-server - 如何将数据库从 SQL Server 导入 Azure db?
- visual-studio - iPhone 模拟器未在 Visual Studio for Mac 中以调试模式显示。Xamarin.IOS
- android - 如何更改导航抽屉项目的颜色
- python - 来自多个列表(两个以上)的 Python 总和值
- php - laravel - 更新数据库
- git - 在 git 中推送提交时,src refspec master 不匹配任何内容(带有提交)
- python - -Python- 根据匹配名称,使用 Glob 或 Shutil 将文件夹中的所有 PDF 文件移动到 NewDirectory
- java - 如何使用 javaParser 在 java 文件中搜索?
- testing - Testcafe 不保存 cookie,导致 JWT 格式错误
- android - 在活动和服务中注册接收者的缺点?