[java] 线程池

java

简单线程池的设计

一个典型的线程池,应该包括如下几个部分:
1、线程池管理器(ThreadPool),用于启动、停用,管理线程池
2、工作线程(WorkThread),线程池中的线程
3、请求接口(WorkRequest),创建请求对象,以供工作线程调度任务的执行
4、请求队列(RequestQueue),用于存放和提取请求
5、结果队列(ResultQueue),用于存储请求执行后返回的结果

线程池管理器,通过添加请求的方法(putRequest)向请求队列(RequestQueue)添加请求,这些请求事先需要实现请求接口,即传递工作函数、参数、结果处理函数、以及异常处理函数。之后初始化一定数量的工作线程,这些线程通过轮询的方式不断查看请求队列(RequestQueue),只要有请求存在,则会提取出请求,进行执行。然后,线程池管理器调用方法(poll)查看结果队列(resultQueue)是否有值,如果有值,则取出,调用结果处理函数执行。通过以上讲述,不难发现,这个系统的核心资源在于请求队列和结果队列,工作线程通过轮询requestQueue获得人物,主线程通过查看结果队列,获得执行结果。因此,对这个队列的设计,要实现线程同步,以及一定阻塞和超时机制的设计,以防止因为不断轮询而导致的过多cpu开销。

java代码:

线程池基本代码:

import com.google.common.collect.Queues;

import com.google.common.util.concurrent.Monitor;

import org.apache.commons.collections.CollectionUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.atomic.AtomicInteger;

public class SimpleThreadPool extends Thread {

private final Logger logger = LoggerFactory.getLogger(SimpleThreadPool.class);

/**

* 工作线程列表

*/

private BlockingQueue<WorkerEntry> workers;

/**

* 工作线程的数量

*/

private AtomicInteger workerCounter = new AtomicInteger(0);

/**

* 初始线程ID

*/

private int initWorkID = 0;

/**

* 初始线程队列长度

*/

private final static int initWorkQueueSize = 3;

private Monitor monitor = new Monitor();

private Monitor.Guard guard = new Monitor.Guard(monitor) {

@Override

public boolean isSatisfied() {

return CollectionUtils.isNotEmpty(workers);

}

};

private Monitor.Guard other = new Monitor.Guard(monitor) {

@Override

public boolean isSatisfied() {

return !CollectionUtils.isFull(workers);

}

};

private int queueSize = initWorkQueueSize;

public SimpleThreadPool() {

this(initWorkQueueSize);

}

public SimpleThreadPool(int initWorkQueueSize) {

this.queueSize = initWorkQueueSize;

workers = Queues.newArrayBlockingQueue(queueSize);

}

public void run() {

try {

for (; ; ) {

try {

monitor.enterWhen(guard);

WorkerEntry workerEntry = workers.remove();

// 可运行的任务

if (WorkerStatus.RUNNABLE == workerEntry.getWorkerStatus()

&& -1 != workerEntry.getWorkerId()) {

logger.info("运行任务 ID:{}", workerEntry.getWorkerId());

workerEntry.setWorkerStatus(WorkerStatus.RUNNING);

workerEntry.setDaemon(true);

workerEntry.start();

}

// 当前任务已经运行结束或者被取消

if (workerEntry.isDone()) {

logger.info("任务ID:{}已运行完成,当前状态是:{}",

workerEntry.getWorkerId(), workerEntry.getWorkerStatus());

}

workerCounter.decrementAndGet();

// 如果遇到workId为-1,则标识清空队列

if (-1 == workerEntry.getWorkerId()) {

destroy();

break;

}

} finally {

monitor.leave();

}

}

} catch (Exception ex) {

logger.error("{}", ex);

destroy();

}

}

public void destroy() {

while (CollectionUtils.isNotEmpty(workers)) {

WorkerEntry workerEntry = workers.remove();

workerCounter.decrementAndGet();

logger.info("任务ID:{}已销毁,当前状态是:{}",

workerEntry.getWorkerId(), workerEntry.getWorkerStatus());

}

}

public void addWorker(Runnable runnable) {

WorkerEntry workerEntry = new WorkerEntry(runnable, WorkerStatus.RUNNABLE, initWorkID++);

try {

monitor.enterWhen(other);

workers.offer(workerEntry);

workerCounter.incrementAndGet();

} catch (InterruptedException e) {

} finally {

monitor.leave();

}

}

public void putDeathWorker() {

workers.add(new WorkerEntry(null, WorkerStatus.RUNNABLE, -1));

}

public int getQueueSize() {

return queueSize;

}

public static void main(String[] args) throws InterruptedException {

SimpleThreadPool simpleThreadPool = new SimpleThreadPool();

simpleThreadPool.addWorker(new Runnable() {

@Override

public void run() {

int index = 0;

try {

while (index++ < 10) {

System.out.println("当前线程---Tom");

sleep(100L);

}

} catch (Exception ex) {

}

}

});

simpleThreadPool.addWorker(new Runnable() {

@Override

public void run() {

int index = 0;

while (index++ < 10) {

try {

sleep(100L);

System.out.println("当前线程---Toms");

} catch (Exception ex) {

}

}

}

});

simpleThreadPool.start();

simpleThreadPool.addWorker(new Runnable() {

@Override

public void run() {

int index = 0;

while (index++ < 10) {

try {

sleep(100L);

System.out.println("当前线程---Death");

} catch (Exception ex) {

}

}

}

});

Thread.currentThread().sleep(1000L);

simpleThreadPool.putDeathWorker();

}

}

import org.apache.commons.lang3.builder.ToStringBuilder;

import org.apache.commons.lang3.builder.ToStringStyle;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class WorkerEntry extends Thread {

private final Logger logger = LoggerFactory.getLogger(WorkerEntry.class);

/**

* 实际任务列表

*/

private Runnable runnable;

/**

* 工作状态

*/

private WorkerStatus workerStatus;

/**

* 线程ID

*/

private int workerId;

public WorkerEntry() {

}

public WorkerEntry(Runnable runnable, WorkerStatus workerStatus, int workerId) {

this.runnable = runnable;

this.workerStatus = workerStatus;

this.workerId = workerId;

}

public Runnable getRunnable() {

return runnable;

}

public void setRunnable(Runnable runnable) {

this.runnable = runnable;

}

public WorkerStatus getWorkerStatus() {

return workerStatus;

}

public void setWorkerStatus(WorkerStatus workerStatus) {

this.workerStatus = workerStatus;

}

public int getWorkerId() {

return workerId;

}

public void setWorkerId(int workerId) {

this.workerId = workerId;

}

public boolean isDone() {

return workerStatus == WorkerStatus.DONE

|| workerStatus == WorkerStatus.SHUTDOWN

|| workerStatus == WorkerStatus.EXCEPTION;

}

@Override

public String toString() {

return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);

}

@Override

public void run() {

try {

runnable.run();

workerStatus = WorkerStatus.DONE;

} catch (Exception ex) {

logger.error("ID:{}运行异常", workerId, ex);

workerStatus = WorkerStatus.EXCEPTION;

}

}

}

  线程状态:

import org.apache.commons.lang3.builder.ToStringBuilder;

import org.apache.commons.lang3.builder.ToStringStyle;

public enum WorkerStatus {

RUNNABLE(-1, "可运行"), RUNNING(0, "运行中"), DONE(1, "运行结束"), SHUTDOWN(2, "取消"),

EXCEPTION(-2, "运行异常");

private final int code;

private final String desc;

WorkerStatus(int code, String desc) {

this.code = code;

this.desc = desc;

}

public int getCode() {

return code;

}

public String getDesc() {

return desc;

}

@Override

public String toString() {

return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);

}

}

  

  

参考文档:

http://www.cnblogs.com/coser/archive/2012/03/10/2389264.html

以上是 [java] 线程池 的全部内容, 来源链接: utcz.com/z/393563.html

回到顶部