首页 > 解决方案 > 如何使用多线程持久化 Spring Transactions 和 EntityManager 实体?

问题描述

我在对一些简单的应用程序进行多线程测试时遇到了问题。应用程序是用 spring-boot 编写的。我想征求意见。在你阅读我的代码之前——我很抱歉它的质量,这可能有点像意大利面条代码。我正在测试很多选项...

我有一个服务类,它的唯一工作是在实体存在时更新行数,或者使用递增的行数创建新的。与正常情况的唯一区别是 primaryKey 是一个字符串并且不是自动生成的。此外,主要规则是不要松开 updateCount 的单次执行(因此没有回滚)。

@Service
public class SomeEntityServiceRepository {

    @PersistenceContext
    private EntityManager entityManager;

    @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.REPEATABLE_READ)
    public synchronized SomeEntity updateCount(String primaryKeyString) {

        SomeEntity entity = entityManager.find(SomeEntity.class, primaryKeyString, LockModeType.PESSIMISTIC_WRITE);

        if (entity == null) {
            try {
                entity = createNewEntity(login);
                return entity;
            } catch (Exception e){
                entityManager.refresh(entity);
                entity.incrementCount();
                entityManager.persist(entity);
                entityManager.flush();
                return entity;
            }
        }

        entityManager.refresh(entity);
        entity.incrementCount();
        entityManager.persist(entity);
        entityManager.flush();
        return entity;
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.REPEATABLE_READ)
    synchronized SomeEntity createNewEntity(String primaryKeyString) {
        SomeEntity entity = new SomeEntity(primaryKeyString);
        entity.incrementCount();
        entityManager.persist(entity);
        entityManager.flush();
        return entity;
    }
}

我的实体类是:

@Entity
public class SomeEntity {

    @Id
    private String primaryKeyString;

    @Column(name = "COUNT", nullable = false)
    private long count;

    public void incrementCount(){
        this.count++;
    }

    //rest of the code

我的测试用例是(THREADS_COUNT 设置为 100,服务存储库是自动装配的):

@Test
    void testPessimisticLock() throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(THREADS_COUNT + 10); 
        CountDownLatch latch = new CountDownLatch(THREADS_COUNT);
        SomeEntity entity = null;

        for (int i = 1; i <= THREADS_COUNT; i++) {
            service.submit(() -> {
                try {
                    serviceRepository.updateCount("SOMETHING");
                } catch (Exception e) {
                    e.printStackTrace(); 
                }
                latch.countDown();
            });
        }
        latch.await();

        entity = serviceRepository.updateCount("SOMETHING"); // one additional execute for return
        Assertions.assertEquals(THREADS_COUNT + 1, entity.getCount());

    }

它在 90% 的时间里都能正常工作。同步效果很好,在大多数情况下事务管理也是如此。但是当由两个线程管理的两个事务想要创建新实体时 - 出现错误,因为第一个线程无法足够快地提交到数据库,第二个启动新事务并且 entityManager.find() 方法发现 null。

所以错误日志就像:

  1. “唯一索引或主键违规:”
  2. java.lang.IllegalArgumentException:尝试使用空对象生成刷新事件
  3. org.opentest4j.AssertionFailedError:预期:<101> 但原为:<100>

我的 entityManager 是基于上下文的,事务由容器管理。

我不知道如何防止来自两个不同线程的两个事务在彼此之后完全执行。据我了解,METHOD 是同步的,但事务可能比方法本身长一点,因为它由外部容器方法和进程管理,因此 DBMS 中的实际状态在刷新后不是即时的。

我很高兴得到任何帮助,因为这对我的技能来说有点复杂。我正在使用 Apache Derby Embedded(文件)数据库。

标签: javaspringmultithreadinghibernatetransactions

解决方案


测试在单个事务中完成,最后回滚。所以你需要这样的东西:

@RunWith(SpringRunner.class)
@ContextConfiguration(...)
@DataJpaTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
public class ConcurrentTest {

    ....

    @Autowired
    PlatformTransactionManager transactionManager;

    @Before
    public void setup() {
        final TransactionTemplate template = new TransactionTemplate(transactionManager);
        template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);

        template.executeWithoutResult(transactionStatus -> {
            // init data
        });
    }

    @Test
    public void testConcurrent() throws Exception {
        System.out.println("testConcurrent");

        final int COUNT = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(COUNT);
        List<Callable<Void>> tasks = new ArrayList<>();

        for (int i = 0; i < COUNT; i++) {
            tasks.add(() -> {
                final TransactionTemplate template = new TransactionTemplate(transactionManager);
                template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);

                try {
                    // code 
                } catch (Exception exc) {
                    exc.printStackTrace();
                    throw exc;
                }


                return null;
            });
        }

        executorService.invokeAll(tasks);
    }

}

推荐阅读