首页 > 解决方案 > 透明地批量存储

问题描述

我们正在使用以下框架和版本:

我有一个问题,我们的一些业务逻辑被划分为如下所示的逻辑单元:

在代码中,这看起来大致如下:

TransactionRecord transaction = transactionRepository.create();
transaction.create(creationCommand);`

Transaction#create(以事务方式运行)中,会发生以下情况:

storeTransaction();
storePayments();
storeProducts();
// ... other relevant information

给定的交易可以有许多不同类型的产品和属性,所有这些都被存储。其中许多属性会导致UPDATE陈述,而有些可能会导致INSERT陈述 - 很难提前完全了解。

例如,该storeProducts方法大致如下所示:

products.forEach(product -> {
    ProductRecord record = productRepository.findProductByX(...);
    if (record == null) {
        record = productRepository.create();
        record.setX(...);
        record.store();
    } else {
      // do something else
    }
});

如果产品是新的,它们会被INSERT编辑。否则,可能会发生其他计算。根据事务的大小,这个单用户事务显然可能导致O(n)数据库调用/往返,甚至更多取决于存在的其他属性。在存在大量属性的事务中,这可能会导致针对单个请求(!)的数百个数据库调用。我想尽可能地降低它,O(1)以便在我们的数据库上有更多可预测的负载。

自然,这里会想到批量和批量插入/更新。我想做的是使用 将所有这些语句批处理成一个批处理jOOQ,并在提交之前成功调用方法后执行。我发现了几篇(SO PostjOOQ APIjOOQ GitHub Feature Request)帖子隐含地提到了这个主题,还有一个用户组帖子似乎与我的问题明确相关。

由于我Spring与 一起使用jOOQ,我相信我的理想解决方案(最好是声明性的)将如下所示:

@Batched(100) // batch size as parameter, potentially
@Transactional
public void createTransaction(CreationCommand creationCommand) {
    // all inserts/updates above are added to a batch and executed on successful invocation
}

为此,我想我需要管理一个范围(//ThreadLocal范围)资源,该资源可以跟踪当前批次,以便:TransactionalSession

  1. 在进入方法之前,如果方法是 ,则创建一个空批次@Batched
  2. 通过 DI 提供的自定义DSLContext(可能是扩展)具有一个标志,用于跟踪是否应该对任何当前语句进行批处理,如果是的话DefaultDSLContextThreadLocal
  3. 拦截调用并将它们添加到当前批处理中,而不是立即执行它们。

但是,第 3 步将需要从(IMO)相对可读的代码中重写大部分代码:

records.forEach(record -> {
    record.setX(...);
    // ...
    record.store();
}

至:

userObjects.forEach(userObject -> {
    dslContext.insertInto(...).values(userObject.getX(), ...).execute();
}

这将首先破坏具有这种抽象的目的,因为第二种形式也可以使用DSLContext#batchStoreor重写DSLContext#batchInsert。然而,IMO,批处理和批量插入不应该由单个开发人员来决定,并且应该能够在更高级别(例如由框架)透明地处理。

我发现jOOQAPI 的可读性是使用它的一个惊人的好处,但是它似乎不适合(据我所知)在诸如此类的情况下很好地拦截/扩展。jOOQ 3.11.1使用(甚至当前的)API,是否有可能通过透明的批处理/批量处理获得类似于前者的行为?这会带来什么?


编辑:

为了实现商店的透明批处理,想到的一种可能但非常hacky的解决方案如下:

  1. 创建一个RecordListener并将其作为默认值添加到Configuration启用批处理时。
  2. RecordListener#storeStart中,将查询添加到当前事务的批次中(例如在 a 中ThreadLocal<List>
  3. AbstractRecord一个changed标志,在存储之前检查 ( org.jooq.impl.UpdatableRecordImpl#store0, )。org.jooq.impl.TableRecordImpl#addChangedValues重置它(并将其保存以备后用)使存储操作成为空操作。
  4. 最后,在成功的方法调用但在提交之前:

据我所知,这种方法理论上应该可行。显然,如果代码依赖反射来工作,库内部可能随时发生变化,因此它非常hacky并且容易被破坏。

有谁知道更好的方法,只使用公共jOOQAPI?

标签: springspring-bootspring-jdbcjooq

解决方案


jOOQ 3.14 解决方案

您已经发现了相关的功能请求 #3419,它将从 jOOQ 3.14 开始在 JDBC 级别上解决这个问题。您可以BatchedConnection直接使用包装自己的连接来实现以下内容,也可以使用此 API:

ctx.batched(c -> {

    // Make sure all records are attached to c, not ctx, e.g. by fetching from c.dsl()
    records.forEach(record -> {
        record.setX(...);
        // ...
        record.store();
    }
});

jOOQ 3.13 及之前的解决方案

目前,在实现 #3419 之前(在 jOOQ 3.14 中将是),您可以自己实现它作为一种解决方法。您必须代理 JDBCConnection并且PreparedStatement...

...拦截所有:

  • 调用Connection.prepareStatement(String),如果 SQL 字符串与最后一个准备好的语句相同,则返回一个缓存的代理语句,或者批量执行最后一个准备好的语句并创建一个新的。
  • 调用PreparedStatement.executeUpdate()and execute(),并用调用替换那些PreparedStatement.addBatch()

...代表所有:

  • 调用其他 API,例如Connection.createStatement(),应该刷新上述缓冲批次,然后调用委托 API。

我不建议围绕 jOOQRecordListener和其他 SPI 进行破解,我认为这是缓冲数据库交互的错误抽象级别。此外,您还需要批处理其他语句类型。

请注意,默认情况下,jOOQ 会UpdatableRecord尝试获取生成的标识值(请参阅参考资料Settings.returnIdentityOnUpdatableRecord),这会阻止批处理。此类store()调用必须立即执行,因为您可能希望标识值可用。


推荐阅读