java并发发送请求的示例

java

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import org.apache.log4j.Logger;

public class ConcurrentRuleSyncUtil {

private static final int MAX_THREAD = 10; //最多启动的线程数

private static final ExecutorService exec = Executors.newFixedThreadPool(MAX_THREAD);

private static final Logger logger = Logger.getLogger(ConcurrentRuleSyncUtil.class);

/**

* 并发来主动通知所有应用来更新规则 最多启动

*

* @param strUrl

* 需要通知的应用ip列表

* @return

*/

public static List<String> activeAppList(List<String> strUrl, String queryString) {

if (strUrl == null || strUrl.size() == 0) {

return null;

}

List<Callable<List<String>>> runThreads = GeneralThreadInfo(strUrl,queryString);

List<String> resList = new ArrayList<String>();

try {

List<Future<List<String>>> fetures = exec.invokeAll(runThreads);

for(Future<List<String>> feture: fetures){

List<String> tmp = null;

try {

tmp = feture.get();

} catch (ExecutionException e) {

logger.error("ConcurrentRuleSyncUtil thread activeAppList get error:" + e.getMessage(), e);

}

if(tmp != null){

resList.addAll(tmp);

}

}

} catch (InterruptedException e) {

logger.error("ConcurrentRuleSyncUtil activeAppList get error:" + e.getMessage(), e);

}

return resList;

}

public static List<Callable<List<String>>> GeneralThreadInfo(List<String> strUrl, String queryString) {

List<Callable<List<String>>> runThreads = new ArrayList<Callable<List<String>>>();

int size = strUrl.size() / MAX_THREAD + 1;

int i = 0;

int len = strUrl.size();

for(; i + size <= len; i += size){

runThreads.add(new ActiveSyncThread(strUrl.subList(i, i + size), queryString));

}

if(i < len){

runThreads.add(new ActiveSyncThread(strUrl.subList(i,len), queryString));

}

return runThreads;

}

static class ActiveSyncThread implements Callable<List<String>>{

private List<String> hostlistUrl; //分配到当前线程的host列表

private String queryString; //参数串

public ActiveSyncThread(List<String> strUrl, String queryString){

this.hostlistUrl = strUrl;

this.queryString = queryString;

}

@Override

public List<String> call() throws Exception {

return getResponseText(hostlistUrl+queryString);

}

}

/**

* 根据绑定去查询绑定地址的url html,HttpUrlConnection调用方式

*

* @param url

* 查询的url

* @throws IOException

*/

public static String getResponseText(String queryUrl) {

return getResponseText(queryUrl, null, null);

}

/**

* 根据绑定去查询绑定地址的url html,HttpUrlConnection调用方式

*

* @param url

* 查询的url

* @param host

* 需要绑定的host

* @param ip

* 对应host绑定的ip

* @throws IOException

*/

public static String getResponseText(String queryUrl,String host,String ip) {

InputStream is = null;

BufferedReader br = null;

StringBuffer res = new StringBuffer();

try {

HttpURLConnection httpUrlConn = null;

URL url = new URL(queryUrl);

if(ip!=null){

String str[] = ip.split("\\.");

byte[] b =new byte[str.length];

for(int i=0,len=str.length;i<len;i++){

b[i] = (byte)(Integer.parseInt(str[i],10));

}

Proxy proxy = new Proxy(Proxy.Type.HTTP,

new InetSocketAddress(InetAddress.getByAddress(b), 80));

httpUrlConn = (HttpURLConnection) url

.openConnection(proxy);

}else{

httpUrlConn = (HttpURLConnection) url

.openConnection();

}

httpUrlConn.setRequestMethod("GET");

httpUrlConn.setDoOutput(true);

httpUrlConn.setConnectTimeout(DEFAULT_TIMEOUT);

httpUrlConn.setReadTimeout(DEFAULT_TIMEOUT);

httpUrlConn.setDefaultUseCaches(false);

httpUrlConn.setUseCaches(false);

is = httpUrlConn.getInputStream();

int responseCode = httpUrlConn.getResponseCode();

// 如果返回的结果是400以上,那么就说明出问题了

if (responseCode > 400) {

logger.error("getResponseText for queryurl:" + queryUrl + " got responseCode :" + responseCode);

return "getResponseText for queryurl:" + queryUrl + " got responseCode :" + responseCode;

}

// 需要自动识别页面的编码,通过从context-type中解析得到,默认为UTF-8

String encoding = "UTF-8";

String contextType = httpUrlConn.getContentType();

if (StringUtilsExt.isNotBlank(contextType)) {

int pos = contextType.lastIndexOf("=");

if (pos > -1) {

encoding = contextType.substring(pos + 1);

}

}

// System.out.println(encoding);

br = new BufferedReader(new InputStreamReader(is, encoding));

String data = null;

while ((data = br.readLine()) != null) {

res.append(data + "\n");

}

return res.toString();

} catch (IOException e) {

logger.error(e.getMessage(), e);

return "failed: " + e.getMessage();

} catch (Exception e) {

logger.error(e.getMessage(), e);

return "failed: " + e.getMessage();

} finally {

if (br != null) {

try {

br.close();

} catch (IOException e) {

logger.error(e.getMessage(), e);

res.append(e.getMessage());

}

}

}

}

}


以上是 java并发发送请求的示例 的全部内容, 来源链接: utcz.com/z/392697.html

回到顶部