spring - jpa 与 https 请求多线程 spring
问题描述
我正在使用springJPA
和HTTP post
request,逐行获取数据,然后将数据发布到API的HTTP请求中,它对我来说工作得很好,但我在这里处理大量数据,所以我必须使用多线程但是我是 java 和 spring 的新手,我如何实现与 10 个线程一起工作,并且每个线程每次并行读取 1k?
我已经阅读了有关 10 个线程的多线程的内容,每个线程每次读取 1k 行,我的数据库中有大约 1000 万条记录
AccessingDataJpaApplication 类:
@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
@Autowired
private Bulk_repositoryRepository bulk_repositoryRepository;
public static void main(String[] args) {
SpringApplication.run(AccessingDataJpaApplication.class);
}
Date currentDate = new Date();
@Override
public void run(String... args) throws Exception {
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
headers.setBasicAuth("user", "pass");
while(true) {
Date currentDate = new Date();
logger.info("Just Started");
for (Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate)) {
System.out.print(churnss);
logger.info(churnss.toString());
AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());
logger.info(AddOffer.toString());
HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);
ResponseEntity<String> responseEntity = restTemplate.exchange(
"api link", HttpMethod.POST, entity, String.class);
if(responseEntity.getStatusCode() == HttpStatus.OK){
String response = responseEntity.getBody();
churnss.setStatus(1);
churnss.setProcessDate(new Date());
churnss.setFulfilment_status(response);
logger.info(churnss.toString() + ", Response: " + response);
bulk_repositoryRepository.save(churnss);
}else {
logger.warn("Record Id: " + churnss.getId() + ", Http Failed Response: " + responseEntity.getStatusCode());
}
}
Thread.sleep(1000);
}
}
}
Bulk_repository 类:
@Entity
@Table(name = "BULK_REPOSITORY")
public class Bulk_repository {
@Id
@GeneratedValue(strategy=GenerationType.AUTO)
@Column(name = "id")
private long id;
@Column(name = "msisdn")
private String msisdn;
@Column(name = "camp_start_date")
private Date campStartDate;
@Column(name = "camp_end_date")
private Date campEndDate;
@Column(name = "camp_type")
private int campType;
@Column(name = "camp_cd")
private String camp_cd;
@Column(name = "status")
private int status;
@Column(name = "process_date")
private Date processDate;
@Column(name = "entry_date")
private Date entryDate;
@Column(name = "entry_user")
private String entry_user;
@Column(name = "param1")
private String param1;
@Column(name = "param2")
private String param2;
@Column(name = "param3")
private String param3;
@Column(name = "param4")
private String param4;
@Column(name = "param5")
private String param5;
@Column(name = "error_desc")
private String error_desc;
@Column(name = "fulfilment_status")
private int fulfilment_status;
##then getter and setters and tostring
Bulk_repositoryRepository 类:
public interface Bulk_repositoryRepository extends CrudRepository<Bulk_repository, Long> {
Date today = new Date();
List<Bulk_repository>findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(int status, int campType,Date today0, Date today1);
Bulk_repository findById(long id);
}
AddOfferRequest 类:
public class AddOfferRequest {
private String ChannelID="113";
private String MSISDN;
private String ServiceID;
public AddOfferRequest() {
}
public AddOfferRequest(String channelID,String mSISDN,String serviceID ) {
this.MSISDN = mSISDN;
this.ServiceID = serviceID;
}
## then getter and setter and tostring
我创建了 AsyncConfiguration 类:
package com.example.accessingdatajpa;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);
@Bean (name = "taskExecutor")
public Executor taskExecutor() {
LOGGER.debug("Creating Async Task Executor");
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("CarThread-");
executor.initialize();
return executor;
}
}
但直到现在我还无法理解如何将 findby 和 http 帖子与多线程结合起来
解决方案
重写你的代码。而不是List<Bulk_repository>
返回 a Stream<Bulk_repository>
。这将懒惰地从数据库中加载记录,而不是试图一次做所有事情。
然后使用TaskExecutor
来执行每个线程的不同请求,只需给它一个任务,它就会在有空闲线程时执行。
@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
@Autowired
private Bulk_repositoryRepository bulk_repositoryRepository;
@Autowired
private AsyncTaskExecutor executor;
@Autowired
private RestTemplate rest;
public static void main(String[] args) {
SpringApplication.run(AccessingDataJpaApplication.class);
}
@Override
public void run(String... args) throws Exception {
Date currentDate = new Date();
Stream< Bulk_repository> results = Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate);
results.forEach(it -> executor.submit(this.process(it)));
Thread.sleep(1000);
}
private void process(RestTemplate rest, Bulk_repository churnss) {
AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());
HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);
try {
ResponseEntity<String> responseEntity = restTemplate.exchange(
"api link", HttpMethod.POST, entity, String.class);
if(responseEntity.getStatusCode() == HttpStatus.OK){
String response = responseEntity.getBody();
churnss.setStatus(1);
churnss.setProcessDate(new Date());
churnss.setFulfilment_status(response);
bulk_repositoryRepository.save(churnss);
}else {
logger.warn("Record Id: {}, Http Failed Response: {}",churnss.getId(), responseEntity.getStatusCode());
}
} catch (RestClientException rce) {
logger.warn("Record Id: {} Http Failed. ", churnss.getId(), rce);
}
}
}
注意:这是从我的头顶输入的,未经测试。不过应该提供一些指导。
推荐阅读
- python - 如何停止在 Jupyter Notebook 上运行两个 Python3 版本?
- php - 如何修复我的 url 参数?
- html - Bootstrap 4 HTML 导航栏图标文本
- c# - 什么是 opaqueRect 在 XAML 弹出模板中使用
- windows - xperf 调用堆栈函数:“System\Interrupts + DPCs”
- python - 如何在 tkinter 中扩展画布内的框架?
- winapi - MFC:使用 CString 加载字符串
- swift - 以编程方式更改为特定的 ViewController
- php - SSL强制通过.htaccess导致wordpress上的重定向循环
- c++ - sclite (SCTK)、C++ 模板参数、Filter::Filter* 无效。赛格温