首页 > 解决方案 > 异步 serviceTasks 的 Activiti Job Executor 问题(activiti >= 5.17)

问题描述

请考虑下图

在此处输入图像描述

我的进程.bpmn

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
  <process id="myProcess" name="My process" isExecutable="true">
    <startEvent id="startevent1" name="Start"></startEvent>
    <userTask id="evl" name="Evaluation"></userTask>
    <boundaryEvent id="timer_event_autocomplete" name="Timer" attachedToRef="evl" cancelActivity="false">
      <timerEventDefinition>
        <timeDate>PT2S</timeDate>
      </timerEventDefinition>
    </boundaryEvent>
    <serviceTask id="timer_service" name="Timed Autocomplete" activiti:async="true" activiti:class="com.example.service.TimerService"></serviceTask>
    <serviceTask id="store_docs_service" name="Store Documents" activiti:async="true" activiti:class="com.example.service.StoreDocsService"></serviceTask>
    <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="evl"></sequenceFlow>
    <sequenceFlow id="flow2" sourceRef="timer_event_autocomplete" targetRef="timer_service"></sequenceFlow>
    <sequenceFlow id="flow3" sourceRef="evl" targetRef="store_docs_service"></sequenceFlow>
    <sequenceFlow id="flow4" sourceRef="store_docs_service" targetRef="endevent1"></sequenceFlow>
    <endEvent id="endevent1" name="End"></endEvent>
  </process>

</definitions>

用文字来描述它,有一个用户任务(评估)和一个附加的计时器(配置为在 2 秒内触发)。触发计时器后,其 Java 委托中的定时自动完成异步服务任务会TimerService尝试完成用户任务(评估)。完成用户任务(评估)后,流程移动到另一个异步服务任务(存储文档),它调用其 Java 委托,StoreDocsService然后流程结束。

TimerService.java

public class TimerService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(TimerService.class);

    @Override
    public void execute(DelegateExecution execution) throws Exception {
        LOGGER.info("*** Executing Timer autocomplete ***");
        Task task = execution.getEngineServices().getTaskService().createTaskQuery().active().singleResult();
        execution.getEngineServices().getTaskService().complete(task.getId());
        LOGGER.info("*** Task: {} autocompleted by timer ***", task.getId());
    }
}

StoreDocsService.java

public class StoreDocsService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(StoreDocsService.class);

    @Override
    public void execute(DelegateExecution execution) throws Exception {
        LOGGER.info("*** Executing Store Documents ***");
    }
}

应用程序.java

public class App
{
    public static void main( String[] args ) throws Exception
    {

//        DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
//        demoAsyncJobExecutor.setCorePoolSize(10);
//        demoAsyncJobExecutor.setMaxPoolSize(50);
//        demoAsyncJobExecutor.setKeepAliveTime(10000);
//        demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
//                .setAsyncExecutorActivate(true)
//                .setAsyncExecutorEnabled(true)
//                .setAsyncExecutor(demoAsyncJobExecutor)
                .setJobExecutorActivate(true)
                ;
        ProcessEngine processEngine = cfg.buildProcessEngine();
        String pName = processEngine.getName();
        String ver = ProcessEngine.VERSION;
        System.out.println("ProcessEngine [" + pName + "] Version: [" + ver + "]");

        RepositoryService repositoryService = processEngine.getRepositoryService();
        Deployment deployment = repositoryService.createDeployment()
                .addClasspathResource("MyProcess.bpmn").deploy();
        ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery()
                .deploymentId(deployment.getId()).singleResult();
        System.out.println(
                "Found process definition ["
                        + processDefinition.getName() + "] with id ["
                        + processDefinition.getId() + "]");

        final Map<String, Object> variables = new HashMap<String, Object>();
        final RuntimeService runtimeService = processEngine.getRuntimeService();

        ProcessInstance id = runtimeService.startProcessInstanceByKey("myProcess", variables);
        System.out.println("Started Process Id: "+id.getId());
        try {
            final TaskService taskService = processEngine.getTaskService();
//            List<Task> tasks = taskService.createTaskQuery().active().list();
//            while (!tasks.isEmpty()) {
//                Task task = tasks.get(0);
//                taskService.complete(task.getId());
//                tasks = taskService.createTaskQuery().active().list();
//            }

        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {

        }

        while(!runtimeService.createExecutionQuery().list().isEmpty()) {
        }
        processEngine.close();
    }


}

活动 5.15

当定时器触发时,如上图所示执行。我们使用 Activiti 的DefaultJobExecutor 我们可以在日志中看到:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.15]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-1] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-1] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
[pool-1-thread-1] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Shutting down the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] stopped job acquisition

Activiti >= 5.17 仅将 activiti 的版本更改pom.xml为 5.17.0 及更高版本(测试至 5.22.0)并执行相同的代码,流程执行计时器的 Java 委托,TimerService完成用户任务(评估)但存储文档 Java代表,StoreDocsService永远不会被调用。要添加更多内容,似乎流程永远不会结束,并且执行仍然停留在 Store Documents 异步服务任务中。

日志:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnableImpl  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
        DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
        demoAsyncJobExecutor.setCorePoolSize(10);
        demoAsyncJobExecutor.setMaxPoolSize(50);
        demoAsyncJobExecutor.setKeepAliveTime(10000);
        demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                .setAsyncExecutorActivate(true)
                .setAsyncExecutorEnabled(true)
                .setAsyncExecutor(demoAsyncJobExecutor)
                ;

流程似乎正确执行,StoreDocsService在 之后调用TimerService,但它永远不会结束(中的while(!runtimeService.createExecutionQuery().list().isEmpty())语句App.java始终为真)!

日志:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating thread pool queue of size 100
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating executor service with corePoolSize 10, maxPoolSize 50 and keepAliveTime 10000
[Thread-1] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} starting to acquire async jobs due
[Thread-2] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} starting to acquire async jobs due
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
[pool-1-thread-3] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***

!!!!更新 !!!

活动 6.0.0

尝试了相同的场景,但使用了 Activiti 版本6.0.0。中需要的更改TimerService,无法获取EngineServicesfrom DelegateExecution

public class TimerService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(TimerService.class);

    @Override
    public void execute(DelegateExecution execution) {
        LOGGER.info("*** Executing Timer autocomplete ***");
        Task task = Context.getProcessEngineConfiguration().getTaskService().createTaskQuery().active().singleResult();
        Context.getProcessEngineConfiguration().getTaskService().complete(task.getId());
//        Task task = execution.getEngineServices().getTaskService().createTaskQuery().active().singleResult();
//        execution.getEngineServices().getTaskService().complete(task.getId());
        LOGGER.info("*** Task: {} autocompleted by timer ***", task.getId());
    }
}

并且这个版本只有异步执行器,所以ProcessEngineConfigurationinApp.java更改为:

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                .setAsyncExecutorActivate(true)
//                .setAsyncExecutorEnabled(true)
//                .setAsyncExecutor(demoAsyncJobExecutor)
//                .setJobExecutorActivate(true)
                ;

6.0.0正如我们在日志中看到的那样,使用version 和 async executor,该过程成功完成:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating thread pool queue of size 100
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating executor service with corePoolSize 2, maxPoolSize 10 and keepAliveTime 5000
[Thread-1] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} starting to acquire async jobs due
[Thread-2] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} starting to acquire async jobs due
[Thread-3] INFO  org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable  - {} starting to reset expired jobs
ProcessEngine [default] Version: [6.0.0.4]
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[activiti-async-job-executor-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[activiti-async-job-executor-thread-2] INFO  com.example.service.TimerService  - *** Task: 10 autocompleted by timer ***
[activiti-async-job-executor-thread-2] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Shutting down the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[activiti-reset-expired-jobs] INFO  org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable  - {} stopped resetting expired jobs
[activiti-acquire-timer-jobs] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} stopped async job due acquisition
[activiti-acquire-async-jobs] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} stopped async job due acquisition

Process finished with exit code 0

2个问题:

  1. 我们已经从 Activiti 升级5.155.22.0并且我们不使用异步作业执行器。有什么方法可以让这张图表的功能保持其行为方式5.15
  2. 如果切换到异步作业执行器是不可避免的,那么为了使这个过程成功完成,我们还缺少什么?

可以在以下位置找到上述示例项目:https ://github.com/pleft/DemoActiviti

标签: javaactivitibpmn

解决方案


如果不明确回答您需要设置环境和调试的问题,我会建议您至少迁移到 Activiti 6。Activiti 的 5.x 分支已经超过 5 年没有维护并且实际上已经死了。甚至 6.x 线也几乎被放弃了,因为核心开发人员都转移到了“Flowable”项目。如果您选择继续使用 Activiti 5.x,您的选择是:

  1. 自己维护代码库(并希望将任何更改/增强贡献回项目)。
  2. 合同 Activiti 支持服务。有几个供应商提供此类服务。

推荐阅读