首页 > 解决方案 > 在springboot中通过API调用方法更新记录时实现多线程

问题描述

我正在 Springboot 中编写一个应用程序以从数据库中获取记录,然后调用外部 rest api 将记录更新到其他表中。此代码已完成并按预期工作。因为我也需要提高性能。我正在尝试在调用 API 时实现多线程,这样我就可以一次发送多条记录。

结构 :

Fetch records from a table and Store it in a list ---> Loop over list ---> multi threaded call to API

ProvRecordProcessing.java:此调用将从数据库中获取记录并创建一个列表并调用 ProvRecordService.java ProvRecordService.java:此调用将处理所有 API 逻辑。

经过一番研究,我尝试在下面实现使其成为多线程:

  1. 使 ProvRecordService 类实现 Runnable 并覆盖 void run 方法
  2. 而不是调用方法,而是调用 executorService.execute(new ProvRecordService(record));

ProvRecordProcessing.java :

我已经从代码中删除了其他业务逻辑,只保留调用 API 方法的部分。

  @Component 
  public class ProvRecordProcessing {
  
  .....Code to fetch records from database....
      
      List<UpdateProvider> provRecords = jdbcTemplate.query(sqlApiSelectQuery, new ProvRecordMapper());
      
      //added for multithreading
      ExecutorService executorService = Executors.newFixedThreadPool(2);
 
     //looping over list records and calling API to process records
     for(UpdateProvider record : provRecords) {
            
            executorService.execute(new ProvRecordService(record));
     }  
     executorService.shutdown();        
    }
}

ProvRecordService.java

只是为了使其成为多线程,我在下面的代码中添加了几个部分并带有注释://为多线程添加

package com.emerald.paymentengineapi.service;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Service
public class ProvRecordService implements IFxiProviderService, Runnable {

    @Autowired
    RestSslException restSslTemplate;
    
    @Autowired
    DbConfig dbConfig;
    
    @Autowired
    UpdateProvider updateProvider; // added for multithreading

    @Autowired
    JdbcTemplate jdbcTemplate;

    @Autowired
    TokenService tokenService;
    
    @Value("${SHIELD_API_URL}")
    private String SHIELD_API_URL;

    @Value("${token_expire_time}")
    private String token_expire;
    
    RestTemplate restTemplate;
    DataSource dataSource;
    
    UpdateProvider record; // added for multithreading

    Logger logger = LoggerFactory.getLogger(ProvRecordService.class);
    

    private static String FETCH_OPTIONS_SQL = "select OPTION_NAME, OPTION_VALUE from FSG.FSG_PRCB_PE_API_REQ_CONFIG";
    
    public ProvRecordService(UpdateProvider record) { // added for multithreading
        // TODO Auto-generated constructor stub
        this.record = record;
    }

    @Override
    public void run() { // added for multithreading
        updateProvider(record);
    }

    @Scheduled(fixedRateString = "token_expire")
    public ResponseEntity<String> runTokenScheduler() throws KeyManagementException, KeyStoreException, NoSuchAlgorithmException {
        logger.info("Fetching Token..." + token_expire);

        ResponseEntity<String> response = tokenService.getOauth2Token();
        return response;
    }

    @Override
    public ResponseEntity<String> updateProvider(UpdateProvider updateProviderRequest) {
        
        dataSource = dbConfig.dataSource();
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        
        try {
            restTemplate = restSslTemplate.restTemplate();
        } catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        ResponseEntity<String> response = null;
        
        try {

            if (null == TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN))
                runTokenScheduler();

            HttpHeaders headers = new HttpHeaders();
            headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
            System.out.println("value :" + TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE));
            System.out.println("access_token :" + TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));

            headers.add(ConfigConstants.AUTHORIZATION, TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE) + " "
                    + TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
            headers.add(ConfigConstants.CLIENT_CODE, ConfigConstants.CSP_PROVIDER_BATCH);
            

            List<RequestOptions> customers = jdbcTemplate.query(FETCH_OPTIONS_SQL,new BeanPropertyRowMapper(RequestOptions.class));

            updateProviderRequest.getXpfRequestData().setRequestOptions(customers);

            HttpEntity<UpdateProvider> entity = new HttpEntity<UpdateProvider>(updateProviderRequest, headers);

            response = restTemplate.exchange(SHIELD_API_URL, HttpMethod.PUT, entity, String.class);
            
            if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
                logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId());
                logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
                
                updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NO_CONTENT",
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
                logger.info("Provider has been updated successfully");

            } else if (response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
                updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "INTERNAL_SERVER_ERROR",
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
                logger.error("Internal Server error occures");
                
            } else if (response.getStatusCode() == HttpStatus.NOT_FOUND) {
                updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NOT_FOUND",
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                        updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
                logger.error("Provider not found");

            }

        } catch (TokenServiceException ex) {
            logger.error("Exception occures in calling Token API");
            updateStatusInDB(ex.getMessage(), ex.getLocalizedMessage(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
            //throw new RuntimeException("Exception occures in API " + ex);
            
        } catch (HttpClientErrorException ex) {
            logger.error("HttpClientErrorException occures in calling API");
            updateStatusInDB(ex.getStatusText(), ex.getStatusText(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
            //throw new HttpClientErrorException(ex.getStatusCode(), ex.getStatusText());
            
        } catch (Exception ex) {
            logger.error("Exception occures in calling API");
            updateStatusInDB(ex.getMessage(), ex.getMessage(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
                    updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());

            //throw new RuntimeException("Exception occures in API " + ex);
        }
        return response;

    }

    private int updateStatusInDB(String errorCode, String errorMessage, String taxId, String providerId) {
        return jdbcTemplate.update(
                "update FSG_WRK.FSG_PRCB_PE_API_REQUEST set ERRORCODE = ?, ERRORMESSAGE = ? where TAXID = ? and PROVIDERID= ?",
                errorCode, errorMessage, taxId, providerId);

    }

}

我调试了这段代码,它正在使用 void run 方法,并且记录也被填充,但在那之后,它没有进入 updateProvider 方法进行处理,我得到以下错误:

 Exception in thread "pool-2-thread-1" java.lang.NullPointerException
    at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-2" java.lang.NullPointerException
    at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-3" java.lang.NullPointerException
    at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-5" java.lang.NullPointerException
    at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

更新 :

经过更多调试后,我知道,问题发生在以下行:

dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

我正在尝试在此处设置 dataSource ,当我没有添加多线程代码时,这工作正常。我无法得到原因。请建议。

标签: javamultithreadingspring-boot

解决方案


这里的代码是错误的:

  1. 没有必要为每个请求执行创建新的 ExecutorService。

更好的方法是只创建一次并将其保留为ProvRecordProcessing组件的数据字段。创建线程很昂贵+在您的方法中,您不知道可以同时创建多少个线程(如果此方法被许多用户并行调用 - 如果每个用户都创建线程池,那么它可能真的很昂贵)。

除了上述之外,如果您使用线程池执行器,您最好在应用程序关闭时关闭它,所以不要忘记在 predestroy 上调用 close 或其他东西。

  1. 不要使用new关键字创建服务,Spring 将无法管理它并且不会“处理”它的任何注释(Autowired、Value 等),所以这段代码是错误的:
     for(UpdateProvider record : provRecords) {
            
            executorService.execute(new ProvRecordService(record));
     }  

相反,将服务作为单例注入到ProvRecordProcessing组件中,并调用其负责从可运行/可调用发送 http 请求的方法。这是我的意思的示意图示例:

@Component
class ProvRecordProcessing {

   @Autowired
   private ProvRecordService provRecordService;

   ....

    for(UpdateProvider record : provRecords) {
            
            executorService.execute(() -> provRecordService.updateHttpOrWhatever(record));
     }
} 

使用这种方法,ProvRecordService成为一个常规的 spring 托管 bean。

对此有更高级的解决方案,即使用@Async可以消除“手动”维护线程池的需要的方法。例如,请参阅本教程...由于您没有在问题中显示这些内容,因此我认为它超出了您所要求的范围,因此请记住它也存在。当然,如果你正确地实现你的代码,它会做得很好。


推荐阅读