首页 > 解决方案 > 有没有办法使用 FORALL 从数组中插入数据?

问题描述

我正在运行 oracle 19c,我想获得最好的插入性能。目前,我插入使用INSERT /*+APPEND */ ...which 很好,但不是我想要的速度。
我读到使用FORALL速度要快得多,但我真的找不到任何例子。这是代码片段(python 3):

connection = pool.acquire()
cursor = connection.cursor()
cursor.executemany("INSERT /*+APPEND*/ INTO RANDOM VALUES (:1, :2, :3)", list(random))
connection.commit()
cursor.close()
connection.close()

标签: pythonoracleplsql

解决方案


我真的对更快的东西很感兴趣,所以我测试了一些可能的方法来比较它们:

  • 简单executemany,没有技巧。
  • APPEND_VALUES与语句中的提示相同。
  • union all您在另一个问题中尝试过的方法。这应该比上面的要慢,因为它会生成一个非常大的语句(这可能需要比数据本身更多的网络)。然后应该在数据库端解析它,这也会消耗大量时间并忽略所有好处(不是谈论潜在的大小限制)。然后我executemany用块来测试它,而不是为 100k 记录构建单个语句。我没有在语句中使用值连接,因为想保证它的安全。
  • insert all. 同样的缺点,但没有工会。将其与union版本进行比较。
  • 序列化 JSON 中的数据,并在 DB 端使用json_table. 单个简短语句和单个数据传输具有潜在的良好性能,JSON 开销很小。
  • FORALL在 PL/SQL 包装程序中提出的建议。应该和executemanysince一样,但是在数据库端。将数据转换为集合的开销。
  • 相同FORALL,但使用列式方法来传递数据:传递简单的列值列表而不是复杂类型。应该比FORALL集合快得多,因为不需要将数据序列化为集合的类型。

我在 Oracle Cloud 中使用免费帐户的 Oracle 自治数据库。每个方法循环执行 10 次,使用相同的 100k 记录输入数据集,在每次测试之前重新创建表。这是我得到的结果。这里的准备和执行时间分别是客户端数据库调用本身的数据转换。

>>> t = PerfTest(100000)
>>> t.run("exec_many", 10)
Method:  exec_many.
    Duration, avg: 2.3083874 s
    Preparation time, avg: 0.0 s
    Execution time, avg: 2.3083874 s
>>> t.run("exec_many_append", 10)
Method: exec_many_append.
    Duration, avg: 2.6031369 s
    Preparation time, avg: 0.0 s
    Execution time, avg: 2.6031369 s
>>> t.run("union_all", 10, 10000)
Method:  union_all.
    Duration, avg: 27.9444233 s
    Preparation time, avg: 0.0408773 s
    Execution time, avg: 27.8457551 s
>>> t.run("insert_all", 10, 10000)
Method: insert_all.
    Duration, avg: 70.6442494 s
    Preparation time, avg: 0.0289269 s
    Execution time, avg: 70.5541995 s
>>> t.run("json_table", 10)
Method: json_table.
    Duration, avg: 10.4648237 s
    Preparation time, avg: 9.7907693 s
    Execution time, avg: 0.621006 s
>>> t.run("forall", 10)
Method:     forall.
    Duration, avg: 5.5622837 s
    Preparation time, avg: 1.8972456000000002 s
    Execution time, avg: 3.6650380999999994 s
>>> t.run("forall_columnar", 10)
Method: forall_columnar.
    Duration, avg: 2.6702698000000002 s
    Preparation time, avg: 0.055710800000000005 s
    Execution time, avg: 2.6105702 s
>>> 

最快的方法就是executemany,没有那么多惊喜。有趣的是,这APPEND_VALUES并没有改善查询并且平均获得更多时间,因此需要更多调查。

关于FORALL:正如预期的那样,每列的单个数组花费的时间更少,因为没有数据准备。它或多或少与 可比executemany,但我认为 PL/SQL 开销在这里起了一些作用。

对我来说另一个有趣的部分是 JSON:大部分时间都花在将 LOB 写入数据库和序列化上,但查询本身非常快。也许可以使用 chuncsize 或其他方式将 LOB 数据传递到 select 语句中以某种方式改进写入操作,但就我的代码而言,它远非非常简单直接的executemany.

也有可能没有 Python 的方法作为外部数据的本机工具应该更快,但我没有测试它们:

下面是我用于测试的代码。

import cx_Oracle as db
import os, random, json
import datetime as dt


class PerfTest:
  
  def __init__(self, size):
    self._con = db.connect(
      os.environ["ora_cloud_usr"],
      os.environ["ora_cloud_pwd"],
      "test_low",
      encoding="UTF-8"
    )
    self._cur = self._con.cursor()
    self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
  
  def __del__(self):
    if self._con:
      self._con.rollback()
      self._con.close()
 
#Create objets
  def setup(self):
    try:
      self._cur.execute("drop table rand")
      #print("table dropped")
    except:
      pass
  
    self._cur.execute("""create table rand(
      id int,
      str varchar2(100),
      val number
    )""")
    
    self._cur.execute("""create or replace package pkg_test as
  type ts_test is record (
    id rand.id%type,
    str rand.str%type,
    val rand.val%type
  );
  type tt_test is table of ts_test index by pls_integer;
  
  type tt_ids is table of rand.id%type index by pls_integer;
  type tt_strs is table of rand.str%type index by pls_integer;
  type tt_vals is table of rand.val%type index by pls_integer;
  
  procedure write_data(p_data in tt_test);
  procedure write_data_columnar(
    p_ids in tt_ids,
    p_strs in tt_strs,
    p_vals in tt_vals
  );

end;""")
    self._cur.execute("""create or replace package body pkg_test as
  procedure write_data(p_data in tt_test)
  as
  begin
    forall i in indices of p_data
      insert into rand(id, str, val)
      values (p_data(i).id, p_data(i).str, p_data(i).val)
    ;
    
    commit;

  end;
  
  procedure write_data_columnar(
    p_ids in tt_ids,
    p_strs in tt_strs,
    p_vals in tt_vals
  ) as
  begin
    forall i in indices of p_ids
      insert into rand(id, str, val)
      values (p_ids(i), p_strs(i), p_vals(i))
    ;
    
    commit;
    
  end;

end;
""")

 
  def build_union(self, size):
      return """insert into rand(id, str, val)
    select id, str, val from rand where 1 = 0 union all
    """ + """ union all """.join(
      ["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
        for i in range(size)]
    )
 
 
  def build_insert_all(self, size):
      return """
      """.join(
      ["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
        for i in range(size)]
    )


#Test case with executemany
  def exec_many(self):
    start = dt.datetime.now()
    self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
    self._con.commit()
    
    return (dt.timedelta(0), dt.datetime.now() - start)
 
 
#The same as above but with prepared statement (no parsing)
  def exec_many_append(self):
    start = dt.datetime.now()
    self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
    self._con.commit()
    
    return (dt.timedelta(0), dt.datetime.now() - start)


#Union All approach (chunked). Should have large parse time
  def union_all(self, size):
##Chunked list of big tuples
    start_prepare = dt.datetime.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*size))
    ]
    new_stmt = self.build_union(size)
    
    dur_prepare = dt.datetime.now() - start_prepare
    
    #Execute unions
    start_exec = dt.datetime.now()
    self._cur.executemany(new_stmt, new_inp)
    dur_exec = dt.datetime.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetime.now()
      new_stmt = self.build_union(remainder)
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetime.now() - start_prepare
      
      start_exec = dt.datetime.now()
      self._cur.execute(new_stmt, new_inp)
      dur_exec += dt.datetime.now() - start_exec

    self._con.commit()
    
    return (dur_prepare, dur_exec)


#The same as union all, but with no need to union something
  def insert_all(self, size):
##Chunked list of big tuples
    start_prepare = dt.datetime.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*size))
    ]
    new_stmt = """insert all
    {}
    select * from dual"""
    dur_prepare = dt.datetime.now() - start_prepare
    
    #Execute
    start_exec = dt.datetime.now()
    self._cur.executemany(
      new_stmt.format(self.build_insert_all(size)),
      new_inp
    )
    dur_exec = dt.datetime.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetime.now()
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetime.now() - start_prepare
      
      start_exec = dt.datetime.now()
      self._cur.execute(
        new_stmt.format(self.build_insert_all(remainder)),
        new_inp
      )
      dur_exec += dt.datetime.now() - start_exec

    self._con.commit()
    
    return (dur_prepare, dur_exec)

    
#Serialize at server side and do deserialization at DB side
  def json_table(self):
    start_prepare = dt.datetime.now()
    new_inp = json.dumps([
      { "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
    ])
    
    lob_var = self._con.createlob(db.DB_TYPE_CLOB)
    lob_var.write(new_inp)
    
    start_exec = dt.datetime.now()
    self._cur.execute("""
    insert into rand(id, str, val)
    select id, str, val
    from json_table(
      to_clob(:json), '$[*]'
      columns
        id int,
        str varchar2(100),
        val number
    )
    """, json=lob_var)
    dur_exec = dt.datetime.now() - start_exec
    
    self._con.commit()
    
    return (start_exec - start_prepare, dur_exec)


#PL/SQL with FORALL
  def forall(self):
    start_prepare = dt.datetime.now()
    collection_type = self._con.gettype("PKG_TEST.TT_TEST")
    record_type = self._con.gettype("PKG_TEST.TS_TEST")
    
    def recBuilder(x):
      rec = record_type.newobject()
      rec.ID = x[0]
      rec.STR = x[1]
      rec.VAL = x[2]
      
      return rec

    inp_collection = collection_type.newobject([
      recBuilder(i) for i in self.inp
    ])
    
    start_exec = dt.datetime.now()
    self._cur.callproc("pkg_test.write_data", [inp_collection])
    dur_exec = dt.datetime.now() - start_exec
    
    return (start_exec - start_prepare, dur_exec)


#PL/SQL with FORALL and plain collections
  def forall_columnar(self):
    start_prepare = dt.datetime.now()
    ids, strs, vals = map(list, zip(*self.inp))
    start_exec = dt.datetime.now()
    self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
    dur_exec = dt.datetime.now() - start_exec
    
    return (start_exec - start_prepare, dur_exec)

  
#Run test
  def run(self, method, iterations, *args):
    #Cleanup schema
    self.setup()

    start = dt.datetime.now()
    runtime = []
    for i in range(iterations):
      single_run = getattr(self, method)(*args)
      runtime.append(single_run)
    
    dur = dt.datetime.now() - start
    dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
    dur_exec_total = sum([i.total_seconds() for _, i in runtime])
    
    print("""Method: {meth}.
    Duration, avg: {run_dur} s
    Preparation time, avg: {prep} s
    Execution time, avg: {ex} s""".format(
      inp_s=len(self.inp),
      meth=method,
      run_dur=dur.total_seconds() / iterations,
      prep=dur_prep_total / iterations,
      ex=dur_exec_total / iterations
    ))


推荐阅读