你清楚如何动态的调整动态调整corePoolSize与maximumPoolSize吗?
示例demo
public class ThreadChangeTest { public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
10,
10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10));
int count = 0;
while (true) {
Thread.sleep(1000l);
for (int i = 0; i < 9; i++) {
executor.execute(() -> {
/*try {
Thread.sleep(1l);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println("------------core: " + executor.getCorePoolSize() + " active: " + executor.getActiveCount() + " max: " + executor.getMaximumPoolSize());
});
}
count++;
if (count == 20) {
executor.setCorePoolSize(2);
executor.setMaximumPoolSize(9);
System.out.println("----------------------------------------");
}
if (count == 100) {
executor.shutdown();
System.out.println("=============================================");
break;
}
}
Thread.currentThread().join();
}
}
在程序运行中动态修改线程池corePoolSize
与maximumPoolSize
的值
源码分析
线程池参数调大
public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
//核心线程调小,中断空闲任务,否则线程池的当前任务结束,自动调小
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
//核心线程数调大后,从队列取任务
else if (delta > 0) {
// We don"t really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
//队列大小是否可以取任务
int k = Math.min(delta, workQueue.size());
//队列有任务就取,否则break
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
//中断空闲任务,否则线程池的当前任务结束,自动调小
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
源码看出:线程池的调节时直接设置corePoolSize
与maximumPoolSize
的值
其中
workerCountOf(ctl.get())
代表工作任务线程数,参考我的博客JDK8线程池-ThreadPoolExecutor源码解析
调大corePoolSize
与maximumPoolSize
,线程池运行过程中自动生效,线程池处理逻辑增强。
线程池调小
调小corePoolSize
与maximumPoolSize
均会执行
interruptIdleWorkers();
跟踪interruptIdleWorkers源码
private void interruptIdleWorkers() { interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//workers是所有已存在的线程,包括空闲线程
for (Worker w : workers) {
Thread t = w.thread;
//这里注意,非常关键,加锁w.tryLock()
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//从上面的参数onlyOne is false
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
这里的workers
注意:是一个HashSet
,存放规则:
核心线程优先占满,即使核心线程有空闲,新任务来了会优先开启新的线程而不是复用,核心线程仅在占满才会复用,然后使用队列,最后使用max
线程,max
线程数对应的workers
会动态变化,
参考我的博客JDK8
线程池-ThreadPoolExecutor
源码解析
线程池任务执行源码
我们看ThreadPoolExecutor
执行任务的源码,参考我的博客JDK8
线程池-ThreadPoolExecutor
源码解析
final void runWorker(Worker w) { Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
//这里注意,加锁了,非常关键
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
可以看出在任务拿出来后,立即加锁
包括任务执行的过程都是加锁的。
加锁分析
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
使用了AQS,自定义了加锁方式CAS模式
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}
可以看出使用tryAcquire
和tryRelease
,均重写方法
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
compareAndSetState(0, 1)
使用上面的代码加锁,意味着线程执行过程中都是加锁的,不会被销毁,只会销毁空闲线程,或者当前线程执行结束销毁。
线程池调小corePoolSize
与maximumPoolSize
对当前正在执行的任务没有影响。
调节队列大小
队列是不可以动态调整的。
private final int capacity;
总结
线程池
corePoolSize
与maximumPoolSize
调大注意max
线程数不要调过大,计算机资源是有限的。线程池的队列初始化大小注意,不能动态调节,队列占用的是堆内存,注意JVM的内存大小与GC能力,尽量减小大对象的存在。
线程池
corePoolSize
与maximumPoolSize
和队列调小注意,线程池的处理能力减弱,可能会执行拒绝策略。
参考地址
- https://blog.csdn.net/fenglllle/article/details/84473345
如果大家喜欢我的文章,可以关注个人订阅号。欢迎随时留言、交流。如果想加入微信群的话一起讨论的话,请加管理员简栈文化-小助手(lastpass4u),他会拉你们进群。
以上是 你清楚如何动态的调整动态调整corePoolSize与maximumPoolSize吗? 的全部内容, 来源链接: utcz.com/z/515094.html