与https请求多线程spring的jpa

我正在使用spring JPAHTTP

postrequest,逐行获取数据,然后将数据发布到HTTP请求到API,它对我来说很好用,但是在这里我正在处理大量数据,所以我必须使用多线程但我是java和spring的新手,我如何实现与10个线程一起使用的功能,每个线程每次并行读取1k呢?

我已经阅读了有关10个线程的多线程的知识,每个线程每次读取1k行,我的数据库中大约有1000万条记录

AccessingDataJpaApplication类:

@SpringBootApplication

public class AccessingDataJpaApplication implements CommandLineRunner {

private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);

@Autowired

private Bulk_repositoryRepository bulk_repositoryRepository;

public static void main(String[] args) {

SpringApplication.run(AccessingDataJpaApplication.class);

}

Date currentDate = new Date();

@Override

public void run(String... args) throws Exception {

RestTemplate restTemplate = new RestTemplate();

HttpHeaders headers = new HttpHeaders();

headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));

headers.setBasicAuth("user", "pass");

while(true) {

Date currentDate = new Date();

logger.info("Just Started");

for (Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate)) {

System.out.print(churnss);

logger.info(churnss.toString());

AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());

logger.info(AddOffer.toString());

HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);

ResponseEntity<String> responseEntity = restTemplate.exchange(

"api link", HttpMethod.POST, entity, String.class);

if(responseEntity.getStatusCode() == HttpStatus.OK){

String response = responseEntity.getBody();

churnss.setStatus(1);

churnss.setProcessDate(new Date());

churnss.setFulfilment_status(response);

logger.info(churnss.toString() + ", Response: " + response);

bulk_repositoryRepository.save(churnss);

}else {

logger.warn("Record Id: " + churnss.getId() + ", Http Failed Response: " + responseEntity.getStatusCode());

}

}

Thread.sleep(1000);

}

}

}

Bulk_repository类:

@Entity

@Table(name = "BULK_REPOSITORY")

public class Bulk_repository {

@Id

@GeneratedValue(strategy=GenerationType.AUTO)

@Column(name = "id")

private long id;

@Column(name = "msisdn")

private String msisdn;

@Column(name = "camp_start_date")

private Date campStartDate;

@Column(name = "camp_end_date")

private Date campEndDate;

@Column(name = "camp_type")

private int campType;

@Column(name = "camp_cd")

private String camp_cd;

@Column(name = "status")

private int status;

@Column(name = "process_date")

private Date processDate;

@Column(name = "entry_date")

private Date entryDate;

@Column(name = "entry_user")

private String entry_user;

@Column(name = "param1")

private String param1;

@Column(name = "param2")

private String param2;

@Column(name = "param3")

private String param3;

@Column(name = "param4")

private String param4;

@Column(name = "param5")

private String param5;

@Column(name = "error_desc")

private String error_desc;

@Column(name = "fulfilment_status")

private int fulfilment_status;

##then getter and setters and tostring

Bulk_repositoryRepository类:

public interface Bulk_repositoryRepository extends CrudRepository<Bulk_repository, Long> {

Date today = new Date();

List<Bulk_repository>findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(int status, int campType,Date today0, Date today1);

Bulk_repository findById(long id);

}

AddOfferRequest类:

public class AddOfferRequest {

private String ChannelID="113";

private String MSISDN;

private String ServiceID;

public AddOfferRequest() {

}

public AddOfferRequest(String channelID,String mSISDN,String serviceID ) {

this.MSISDN = mSISDN;

this.ServiceID = serviceID;

}

## then getter and setter and tostring

我创建了AsyncConfiguration类:

package com.example.accessingdatajpa;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration

@EnableAsync

public class AsyncConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);

@Bean (name = "taskExecutor")

public Executor taskExecutor() {

LOGGER.debug("Creating Async Task Executor");

final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(2);

executor.setMaxPoolSize(2);

executor.setQueueCapacity(1000);

executor.setThreadNamePrefix("CarThread-");

executor.initialize();

return executor;

}

}

但是直到现在我还是不明白如何将findby和http post与多线程结合起来

回答:

重写您的代码。而不是List<Bulk_repository>返回Stream<Bulk_repository>。这将懒惰地从数据库中加载记录,而不是尝试立即执行所有操作。

然后使用TaskExecutor来为每个线程执行不同的请求,只给它一个任务,它将在有空闲线程时执行。

@SpringBootApplication

public class AccessingDataJpaApplication implements CommandLineRunner {

private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);

@Autowired

private Bulk_repositoryRepository bulk_repositoryRepository;

@Autowired

private AsyncTaskExecutor executor;

@Autowired

private RestTemplate rest;

public static void main(String[] args) {

SpringApplication.run(AccessingDataJpaApplication.class);

}

@Override

public void run(String... args) throws Exception {

Date currentDate = new Date();

Stream< Bulk_repository> results = Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate);

results.forEach(it -> executor.submit(this.process(it)));

Thread.sleep(1000);

}

private void process(RestTemplate rest, Bulk_repository churnss) {

AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());

HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);

try {

ResponseEntity<String> responseEntity = restTemplate.exchange(

"api link", HttpMethod.POST, entity, String.class);

if(responseEntity.getStatusCode() == HttpStatus.OK){

String response = responseEntity.getBody();

churnss.setStatus(1);

churnss.setProcessDate(new Date());

churnss.setFulfilment_status(response);

bulk_repositoryRepository.save(churnss);

}else {

logger.warn("Record Id: {}, Http Failed Response: {}",churnss.getId(), responseEntity.getStatusCode());

}

} catch (RestClientException rce) {

logger.warn("Record Id: {} Http Failed. ", churnss.getId(), rce);

}

}

}

这是从我的头顶输入的,未经测试。但是应该提供一些指导。

以上是 与https请求多线程spring的jpa 的全部内容, 来源链接: utcz.com/qa/408492.html

回到顶部