java - 异步 serviceTasks 的 Activiti Job Executor 问题(activiti >= 5.17)
问题描述
请考虑下图
我的进程.bpmn
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
<process id="myProcess" name="My process" isExecutable="true">
<startEvent id="startevent1" name="Start"></startEvent>
<userTask id="evl" name="Evaluation"></userTask>
<boundaryEvent id="timer_event_autocomplete" name="Timer" attachedToRef="evl" cancelActivity="false">
<timerEventDefinition>
<timeDate>PT2S</timeDate>
</timerEventDefinition>
</boundaryEvent>
<serviceTask id="timer_service" name="Timed Autocomplete" activiti:async="true" activiti:class="com.example.service.TimerService"></serviceTask>
<serviceTask id="store_docs_service" name="Store Documents" activiti:async="true" activiti:class="com.example.service.StoreDocsService"></serviceTask>
<sequenceFlow id="flow1" sourceRef="startevent1" targetRef="evl"></sequenceFlow>
<sequenceFlow id="flow2" sourceRef="timer_event_autocomplete" targetRef="timer_service"></sequenceFlow>
<sequenceFlow id="flow3" sourceRef="evl" targetRef="store_docs_service"></sequenceFlow>
<sequenceFlow id="flow4" sourceRef="store_docs_service" targetRef="endevent1"></sequenceFlow>
<endEvent id="endevent1" name="End"></endEvent>
</process>
</definitions>
用文字来描述它,有一个用户任务(评估)和一个附加的计时器(配置为在 2 秒内触发)。触发计时器后,其 Java 委托中的定时自动完成异步服务任务会TimerService
尝试完成用户任务(评估)。完成用户任务(评估)后,流程移动到另一个异步服务任务(存储文档),它调用其 Java 委托,StoreDocsService
然后流程结束。
TimerService.java
public class TimerService implements JavaDelegate {
Logger LOGGER = LoggerFactory.getLogger(TimerService.class);
@Override
public void execute(DelegateExecution execution) throws Exception {
LOGGER.info("*** Executing Timer autocomplete ***");
Task task = execution.getEngineServices().getTaskService().createTaskQuery().active().singleResult();
execution.getEngineServices().getTaskService().complete(task.getId());
LOGGER.info("*** Task: {} autocompleted by timer ***", task.getId());
}
}
StoreDocsService.java
public class StoreDocsService implements JavaDelegate {
Logger LOGGER = LoggerFactory.getLogger(StoreDocsService.class);
@Override
public void execute(DelegateExecution execution) throws Exception {
LOGGER.info("*** Executing Store Documents ***");
}
}
应用程序.java
public class App
{
public static void main( String[] args ) throws Exception
{
// DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
// demoAsyncJobExecutor.setCorePoolSize(10);
// demoAsyncJobExecutor.setMaxPoolSize(50);
// demoAsyncJobExecutor.setKeepAliveTime(10000);
// demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);
ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
.setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
.setJdbcUsername("sa")
.setJdbcPassword("")
.setJdbcDriver("org.h2.Driver")
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
// .setAsyncExecutorActivate(true)
// .setAsyncExecutorEnabled(true)
// .setAsyncExecutor(demoAsyncJobExecutor)
.setJobExecutorActivate(true)
;
ProcessEngine processEngine = cfg.buildProcessEngine();
String pName = processEngine.getName();
String ver = ProcessEngine.VERSION;
System.out.println("ProcessEngine [" + pName + "] Version: [" + ver + "]");
RepositoryService repositoryService = processEngine.getRepositoryService();
Deployment deployment = repositoryService.createDeployment()
.addClasspathResource("MyProcess.bpmn").deploy();
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery()
.deploymentId(deployment.getId()).singleResult();
System.out.println(
"Found process definition ["
+ processDefinition.getName() + "] with id ["
+ processDefinition.getId() + "]");
final Map<String, Object> variables = new HashMap<String, Object>();
final RuntimeService runtimeService = processEngine.getRuntimeService();
ProcessInstance id = runtimeService.startProcessInstanceByKey("myProcess", variables);
System.out.println("Started Process Id: "+id.getId());
try {
final TaskService taskService = processEngine.getTaskService();
// List<Task> tasks = taskService.createTaskQuery().active().list();
// while (!tasks.isEmpty()) {
// Task task = tasks.get(0);
// taskService.complete(task.getId());
// tasks = taskService.createTaskQuery().active().list();
// }
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
}
while(!runtimeService.createExecutionQuery().list().isEmpty()) {
}
processEngine.close();
}
}
活动 5.15
当定时器触发时,如上图所示执行。我们使用 Activiti 的DefaultJobExecutor
我们可以在日志中看到:
[main] INFO org.activiti.engine.impl.ProcessEngineImpl - ProcessEngine default created
[main] INFO org.activiti.engine.impl.jobexecutor.JobExecutor - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.15]
[main] INFO org.activiti.engine.impl.bpmn.deployer.BpmnDeployer - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-1] INFO com.example.service.TimerService - *** Executing Timer autocomplete ***
[pool-1-thread-1] INFO com.example.service.TimerService - *** Task: 9 autocompleted by timer ***
[pool-1-thread-1] INFO com.example.service.StoreDocsService - *** Executing Store Documents ***
[main] INFO org.activiti.engine.impl.jobexecutor.JobExecutor - Shutting down the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] stopped job acquisition
Activiti >= 5.17
仅将 activiti 的版本更改pom.xml
为 5.17.0 及更高版本(测试至 5.22.0)并执行相同的代码,流程执行计时器的 Java 委托,TimerService
完成用户任务(评估)但存储文档 Java代表,StoreDocsService
永远不会被调用。要添加更多内容,似乎流程永远不会结束,并且执行仍然停留在 Store Documents 异步服务任务中。
日志:
[main] INFO org.activiti.engine.impl.ProcessEngineImpl - ProcessEngine default created
[main] INFO org.activiti.engine.impl.jobexecutor.JobExecutor - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO org.activiti.engine.impl.jobexecutor.AcquireJobsRunnableImpl - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO org.activiti.engine.impl.bpmn.deployer.BpmnDeployer - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO com.example.service.TimerService - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO com.example.service.TimerService - *** Task: 9 autocompleted by timer ***
- 更改为异步作业执行器。5.17 版本的一个特性是新的异步作业执行器(但是默认的非异步执行器仍然配置为默认值)。因此,尝试
App.java
通过以下几行启用异步执行器:
DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
demoAsyncJobExecutor.setCorePoolSize(10);
demoAsyncJobExecutor.setMaxPoolSize(50);
demoAsyncJobExecutor.setKeepAliveTime(10000);
demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);
ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
.setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
.setJdbcUsername("sa")
.setJdbcPassword("")
.setJdbcDriver("org.h2.Driver")
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
.setAsyncExecutorActivate(true)
.setAsyncExecutorEnabled(true)
.setAsyncExecutor(demoAsyncJobExecutor)
;
流程似乎正确执行,StoreDocsService
在 之后调用TimerService
,但它永远不会结束(中的while(!runtimeService.createExecutionQuery().list().isEmpty())
语句App.java
始终为真)!
日志:
[main] INFO org.activiti.engine.impl.ProcessEngineImpl - ProcessEngine default created
[main] INFO org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor - Creating thread pool queue of size 100
[main] INFO org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor - Creating executor service with corePoolSize 10, maxPoolSize 50 and keepAliveTime 10000
[Thread-1] INFO org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable - {} starting to acquire async jobs due
[Thread-2] INFO org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable - {} starting to acquire async jobs due
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO org.activiti.engine.impl.bpmn.deployer.BpmnDeployer - Processing resource MyProcess.bpmn
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO com.example.service.TimerService - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO com.example.service.TimerService - *** Task: 9 autocompleted by timer ***
[pool-1-thread-3] INFO com.example.service.StoreDocsService - *** Executing Store Documents ***
!!!!更新 !!!
活动 6.0.0
尝试了相同的场景,但使用了 Activiti 版本6.0.0
。中需要的更改TimerService
,无法获取EngineServices
from DelegateExecution
:
public class TimerService implements JavaDelegate {
Logger LOGGER = LoggerFactory.getLogger(TimerService.class);
@Override
public void execute(DelegateExecution execution) {
LOGGER.info("*** Executing Timer autocomplete ***");
Task task = Context.getProcessEngineConfiguration().getTaskService().createTaskQuery().active().singleResult();
Context.getProcessEngineConfiguration().getTaskService().complete(task.getId());
// Task task = execution.getEngineServices().getTaskService().createTaskQuery().active().singleResult();
// execution.getEngineServices().getTaskService().complete(task.getId());
LOGGER.info("*** Task: {} autocompleted by timer ***", task.getId());
}
}
并且这个版本只有异步执行器,所以ProcessEngineConfiguration
inApp.java
更改为:
ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
.setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
.setJdbcUsername("sa")
.setJdbcPassword("")
.setJdbcDriver("org.h2.Driver")
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
.setAsyncExecutorActivate(true)
// .setAsyncExecutorEnabled(true)
// .setAsyncExecutor(demoAsyncJobExecutor)
// .setJobExecutorActivate(true)
;
6.0.0
正如我们在日志中看到的那样,使用version 和 async executor,该过程成功完成:
[main] INFO org.activiti.engine.impl.ProcessEngineImpl - ProcessEngine default created
[main] INFO org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor - Creating thread pool queue of size 100
[main] INFO org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor - Creating executor service with corePoolSize 2, maxPoolSize 10 and keepAliveTime 5000
[Thread-1] INFO org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable - {} starting to acquire async jobs due
[Thread-2] INFO org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable - {} starting to acquire async jobs due
[Thread-3] INFO org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable - {} starting to reset expired jobs
ProcessEngine [default] Version: [6.0.0.4]
Found process definition [My process] with id [myProcess:1:3]
Started Process Id: 4
[activiti-async-job-executor-thread-2] INFO com.example.service.TimerService - *** Executing Timer autocomplete ***
[activiti-async-job-executor-thread-2] INFO com.example.service.TimerService - *** Task: 10 autocompleted by timer ***
[activiti-async-job-executor-thread-2] INFO com.example.service.StoreDocsService - *** Executing Store Documents ***
[main] INFO org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor - Shutting down the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[activiti-reset-expired-jobs] INFO org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable - {} stopped resetting expired jobs
[activiti-acquire-timer-jobs] INFO org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable - {} stopped async job due acquisition
[activiti-acquire-async-jobs] INFO org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable - {} stopped async job due acquisition
Process finished with exit code 0
2个问题:
- 我们已经从 Activiti 升级
5.15
到5.22.0
并且我们不使用异步作业执行器。有什么方法可以让这张图表的功能保持其行为方式5.15
? - 如果切换到异步作业执行器是不可避免的,那么为了使这个过程成功完成,我们还缺少什么?
可以在以下位置找到上述示例项目:https ://github.com/pleft/DemoActiviti
解决方案
如果不明确回答您需要设置环境和调试的问题,我会建议您至少迁移到 Activiti 6。Activiti 的 5.x 分支已经超过 5 年没有维护并且实际上已经死了。甚至 6.x 线也几乎被放弃了,因为核心开发人员都转移到了“Flowable”项目。如果您选择继续使用 Activiti 5.x,您的选择是:
- 自己维护代码库(并希望将任何更改/增强贡献回项目)。
- 合同 Activiti 支持服务。有几个供应商提供此类服务。
推荐阅读
- laravel - 如何使用 vscode 查看 $request 及其子方法中的所有可用方法?
- javascript - Discord.js V12 如何从 args 中删除提到的用户?
- javascript - 如何在博客上按月调用 javascript 函数
- ios - 为什么自动分配的经纬度在 SwiftUI 中不起作用?
- python - 你怎么称呼论点?
- python - 不能对来自 PIL lib 的 png 文件使用 Image.putalpha()。OSError:无法将模式 PA 写入 PNG
- java - 在recyclerview中从firebase获取数据时一次又一次地获取重复数据
- swift - NavigationLink 的目标视图在变量更改时不会更新
- django - 对 Django 使用 dotenv 方法后,API KEY 在 https 查询中仍然可见
- android - 在颤振应用程序中将屏幕旋转到横向模式后,标签栏未显示其子项