java - 在springboot中通过API调用方法更新记录时实现多线程
问题描述
我正在 Springboot 中编写一个应用程序以从数据库中获取记录,然后调用外部 rest api 将记录更新到其他表中。此代码已完成并按预期工作。因为我也需要提高性能。我正在尝试在调用 API 时实现多线程,这样我就可以一次发送多条记录。
结构 :
Fetch records from a table and Store it in a list ---> Loop over list ---> multi threaded call to API
ProvRecordProcessing.java:此调用将从数据库中获取记录并创建一个列表并调用 ProvRecordService.java ProvRecordService.java:此调用将处理所有 API 逻辑。
经过一番研究,我尝试在下面实现使其成为多线程:
- 使 ProvRecordService 类实现 Runnable 并覆盖 void run 方法
- 而不是调用方法,而是调用 executorService.execute(new ProvRecordService(record));
ProvRecordProcessing.java :
我已经从代码中删除了其他业务逻辑,只保留调用 API 方法的部分。
@Component
public class ProvRecordProcessing {
.....Code to fetch records from database....
List<UpdateProvider> provRecords = jdbcTemplate.query(sqlApiSelectQuery, new ProvRecordMapper());
//added for multithreading
ExecutorService executorService = Executors.newFixedThreadPool(2);
//looping over list records and calling API to process records
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}
executorService.shutdown();
}
}
ProvRecordService.java
只是为了使其成为多线程,我在下面的代码中添加了几个部分并带有注释://为多线程添加
package com.emerald.paymentengineapi.service;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class ProvRecordService implements IFxiProviderService, Runnable {
@Autowired
RestSslException restSslTemplate;
@Autowired
DbConfig dbConfig;
@Autowired
UpdateProvider updateProvider; // added for multithreading
@Autowired
JdbcTemplate jdbcTemplate;
@Autowired
TokenService tokenService;
@Value("${SHIELD_API_URL}")
private String SHIELD_API_URL;
@Value("${token_expire_time}")
private String token_expire;
RestTemplate restTemplate;
DataSource dataSource;
UpdateProvider record; // added for multithreading
Logger logger = LoggerFactory.getLogger(ProvRecordService.class);
private static String FETCH_OPTIONS_SQL = "select OPTION_NAME, OPTION_VALUE from FSG.FSG_PRCB_PE_API_REQ_CONFIG";
public ProvRecordService(UpdateProvider record) { // added for multithreading
// TODO Auto-generated constructor stub
this.record = record;
}
@Override
public void run() { // added for multithreading
updateProvider(record);
}
@Scheduled(fixedRateString = "token_expire")
public ResponseEntity<String> runTokenScheduler() throws KeyManagementException, KeyStoreException, NoSuchAlgorithmException {
logger.info("Fetching Token..." + token_expire);
ResponseEntity<String> response = tokenService.getOauth2Token();
return response;
}
@Override
public ResponseEntity<String> updateProvider(UpdateProvider updateProviderRequest) {
dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
try {
restTemplate = restSslTemplate.restTemplate();
} catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ResponseEntity<String> response = null;
try {
if (null == TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN))
runTokenScheduler();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
System.out.println("value :" + TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE));
System.out.println("access_token :" + TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
headers.add(ConfigConstants.AUTHORIZATION, TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE) + " "
+ TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
headers.add(ConfigConstants.CLIENT_CODE, ConfigConstants.CSP_PROVIDER_BATCH);
List<RequestOptions> customers = jdbcTemplate.query(FETCH_OPTIONS_SQL,new BeanPropertyRowMapper(RequestOptions.class));
updateProviderRequest.getXpfRequestData().setRequestOptions(customers);
HttpEntity<UpdateProvider> entity = new HttpEntity<UpdateProvider>(updateProviderRequest, headers);
response = restTemplate.exchange(SHIELD_API_URL, HttpMethod.PUT, entity, String.class);
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId());
logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NO_CONTENT",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.info("Provider has been updated successfully");
} else if (response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "INTERNAL_SERVER_ERROR",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.error("Internal Server error occures");
} else if (response.getStatusCode() == HttpStatus.NOT_FOUND) {
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NOT_FOUND",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.error("Provider not found");
}
} catch (TokenServiceException ex) {
logger.error("Exception occures in calling Token API");
updateStatusInDB(ex.getMessage(), ex.getLocalizedMessage(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new RuntimeException("Exception occures in API " + ex);
} catch (HttpClientErrorException ex) {
logger.error("HttpClientErrorException occures in calling API");
updateStatusInDB(ex.getStatusText(), ex.getStatusText(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new HttpClientErrorException(ex.getStatusCode(), ex.getStatusText());
} catch (Exception ex) {
logger.error("Exception occures in calling API");
updateStatusInDB(ex.getMessage(), ex.getMessage(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new RuntimeException("Exception occures in API " + ex);
}
return response;
}
private int updateStatusInDB(String errorCode, String errorMessage, String taxId, String providerId) {
return jdbcTemplate.update(
"update FSG_WRK.FSG_PRCB_PE_API_REQUEST set ERRORCODE = ?, ERRORMESSAGE = ? where TAXID = ? and PROVIDERID= ?",
errorCode, errorMessage, taxId, providerId);
}
}
我调试了这段代码,它正在使用 void run 方法,并且记录也被填充,但在那之后,它没有进入 updateProvider 方法进行处理,我得到以下错误:
Exception in thread "pool-2-thread-1" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-2" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-3" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-5" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
更新 :
经过更多调试后,我知道,问题发生在以下行:
dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
我正在尝试在此处设置 dataSource ,当我没有添加多线程代码时,这工作正常。我无法得到原因。请建议。
解决方案
这里的代码是错误的:
- 没有必要为每个请求执行创建新的 ExecutorService。
更好的方法是只创建一次并将其保留为ProvRecordProcessing
组件的数据字段。创建线程很昂贵+在您的方法中,您不知道可以同时创建多少个线程(如果此方法被许多用户并行调用 - 如果每个用户都创建线程池,那么它可能真的很昂贵)。
除了上述之外,如果您使用线程池执行器,您最好在应用程序关闭时关闭它,所以不要忘记在 predestroy 上调用 close 或其他东西。
- 不要使用
new
关键字创建服务,Spring 将无法管理它并且不会“处理”它的任何注释(Autowired、Value 等),所以这段代码是错误的:
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}
相反,将服务作为单例注入到ProvRecordProcessing
组件中,并调用其负责从可运行/可调用发送 http 请求的方法。这是我的意思的示意图示例:
@Component
class ProvRecordProcessing {
@Autowired
private ProvRecordService provRecordService;
....
for(UpdateProvider record : provRecords) {
executorService.execute(() -> provRecordService.updateHttpOrWhatever(record));
}
}
使用这种方法,ProvRecordService
成为一个常规的 spring 托管 bean。
对此有更高级的解决方案,即使用@Async
可以消除“手动”维护线程池的需要的方法。例如,请参阅本教程...由于您没有在问题中显示这些内容,因此我认为它超出了您所要求的范围,因此请记住它也存在。当然,如果你正确地实现你的代码,它会做得很好。
推荐阅读
- javascript - 如何使用 supertest 附加文件并将 JSON 数据发送到请求正文?
- codeigniter - 如何将数据传递给引导模式?
- database - 在 MS Access 中从一个表减去另一个表
- java - 从表 JavaFX 打开文件
- java - Java @requestBody 不起作用,dto 为空
- reporting-services - SSRS - 报表生成器:使用工具提示在柱形图中显示组中的项目
- java - 如何将 docx base64 编码流转换为 PDFA base64 编码流
- ag-grid - Ag Grid 将自定义标题组件向右对齐
- php - 如何在wordpress中隐藏子类别
- kendo-treelist - 在 Html.Kendo().TreeList 中过滤