首页 > 解决方案 > 多次调用返回 DeferredResults 的异步服务而不增加执行时间

问题描述

我的应用程序应该有 2 个核心端点:pushpull用于发送和获取数据。

拉取操作应该异步工作并产生 DeferredResult。当用户调用 pull service over rest 时,会创建新的 DeferedResult 并将其存储到Map<Long, DefferedResult> results = new ConcurrentHashMap<>()等待新数据的位置或直到超时到期。

推送操作也会调用用户休息,并且此操作会检查结果映射以查找此操作推送的数据的接收者。当 map 包含接收者的结果时,这些数据被设置为他的结果,DefferedResult 被返回。

这是基本代码:

@Service
public class FooServiceImpl {
    Map<Long, DefferedResult> results = new ConcurrentHashMap<>();

    @Transactional
    @Override
    public DeferredResult<String> pull(Long userId) {
        // here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
        DeferredResult<String> newResult = new DeferredResult<>(5000L);
        results.putIfAbsent(userId, newResult);
        newResult.onCompletion(() -> results.remove(userId));

        // if (data != null)
        //      newResult.setResult(data);

        return newResult;
    }

    @Transactional
    @Override
    public void push(String data, Long recipientId) {
        // fooRepository.save(data, recipientId);
        if (results.containsKey(recipientId)) {
            results.get(recipientId).setResult(data);
        }
    }
}

代码按我预期的那样工作,问题是它也应该适用于多个用户。我猜将调用拉操作的最大活动用户最多为 1000。因此,每次调用 pull 最多需要 5 秒,正如我在 DeferedResult 中设置的那样,但事实并非如此。

正如您在图片中看到的,如果我立即从我的 javascript 客户端多次调用其余的 pull 操作,您可以看到任务将按顺序执行而不是同时执行。我上次触发的任务大约需要 25 秒,但我需要当 1000 个用户同时执行拉操作时,该操作最多需要 5 秒 + 延迟。

在此处输入图像描述

如何配置我的应用程序以同时执行这些任务并确保每个任务大约 5 秒或更短(当另一个用户向等待用户发送内容时)?我尝试将此配置添加到属性文件中:

server.tomcat.max-threads=1000

还有这个配置:

@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {

    @Override
    protected AsyncTaskExecutor getTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(1000);
        taskExecutor.initialize();
        return taskExecutor;
    }
}

但这并没有帮助,仍然是相同的结果。你能帮我配置一下吗?

编辑:

这就是我从角度调用此服务的方式:

this.http.get<any>(this.url, {params})
  .subscribe((data) => {
    console.log('s', data);
  }, (error) => {
    console.log('e', error);
  });

当我尝试使用这样的纯 JS 代码多次调用它时:

function httpGet()
{
    var xmlHttp = new XMLHttpRequest();
    xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
    xmlHttp.send( null );
    return xmlHttp.responseText;
}
setInterval(httpGet, 500);

它将更快地执行每个请求调用(大约 7 秒)。我预计增加是导致数据库调用服务,但它仍然优于 25 秒。以角度调用此服务有什么问题吗?

编辑2:

我尝试了另一种形式的测试,而不是浏览器,我使用了 jMeter。我在 100 个线程中执行 100 个请求,结果如下:

在此处输入图像描述

如您所见,请求将按 10 进行,达到 50 个请求后应用程序抛出异常:

java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
    at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
    at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
    at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
    at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause

我还注释了我使用存储库的代码,以确保数据库没有任何内容,并且结果相同。此外,我使用 AtomicLong 类为每个请求设置了唯一的 userId。

编辑 3:

当我发表评论时,我发现@Transactional一切正常!那么你能告诉我如何在不增加延迟的情况下为大量操作设置spring的事务吗?

我添加spring.datasource.maximumPoolSize=1000了我认为应该增加的池大小,所以唯一的问题是如何使用@Transactional 加速方法。

每个对 pull 方法的调用都用 @Transactional 注释,因为我首先需要从数据库加载数据并检查是否有新数据,因为是的,我不必创建等待延迟结果。push 方法也必须使用 @Transaction 进行注释,因为我首先需要将接收到的数据存储在数据库中,然后将该值设置为等待结果。对于我的数据,我使用的是 Postgres。

标签: javaspringspring-bootspring-transactionsspring-async

解决方案


这里的问题似乎是您的数据库池中的连接用完了。

您已将方法标记为,@Transaction但您的控制器也期望方法的结果,即DeferredResult尽快交付,以便释放线程。

现在,这是运行请求时发生的情况:

  • @Transaction功能在 Spring 代理中实现,该代理必须打开连接,调用您的主题方法,然后提交或回滚事务。
  • 因此,当您的控制器调用该fooService.pull方法时,它实际上是在调用一个代理。
  • 代理必须首先从池中请求连接,然后调用您的服务方法,该方法在该事务中执行一些数据库操作。最后,它必须提交或回滚事务并最终将连接返回到池中。
  • 毕竟,您的方法返回 a DeferredResult,然后将其传递给控制器​​以使其返回。

现在,问题在于它DeferredResult的设计方式应该是异步使用的。换句话说,promise 预计稍后会在其他线程中解决,我们应该尽快释放请求线程。

事实上,关于DeferredResult的 Spring 文档说:

@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
    DeferredResult<String> deferredResult = new DeferredResult<String>();
    // Save the deferredResult somewhere..
    return deferredResult;
}

// From some other thread...
deferredResult.setResult(data);

您的代码中的问题正是DeferredResult在同一个请求线程中解决。

所以,问题是当 Spring 代理请求连接到数据库池时,当您进行重负载测试时,许多请求会发现池已满并且没有可用的连接。所以请求被搁置,但此时你的DeferredResult还没有被创建,所以它的超时功能不存在。

您的请求基本上是在等待来自数据库池的一些连接可用。因此,假设 5 秒过去了,然后请求获得了连接,现在您获得DeferredResult了控制器用来处理响应的连接。最终,5 秒后超时。因此,您必须添加等待池连接的时间和等待DeferredResult解决的时间。

这就是为什么您可能会看到,当您使用 JMeter 进行测试时,请求时间会随着数据库池中的连接耗尽而逐渐增加。

您可以通过添加以下 application.properties 文件来为线程池启用一些日志记录:

logging.level.com.zaxxer.hikari=DEBUG

您还可以配置数据库池的大小,甚至添加一些 JMX 支持,以便您可以从 Java Mission Control 监控它:

spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true

使用 JMX 支持,您将能够看到数据库池是如何耗尽的。

这里的技巧在于将解决承诺的逻辑移动到另一个线程:

@Override
public DeferredResult pull(Long previousId, String username) {


    DeferredResult result = createPollingResult(previousId, username);

    CompletableFuture.runAsync(() -> {
        //this is where you encapsulate your db transaction
        List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
        if (messages.isEmpty()) {
           pollingResults.putIfAbsent(username, result);
        } else {
           result.setResult(messages);
        }
    });

    return result;
}

通过这样做,您DeferredResult会立即返回,并且 Spring 可以在释放宝贵的 Tomcat 线程的同时发挥其异步请求处理的魔力。


推荐阅读