你清楚如何动态的调整动态调整corePoolSize与maximumPoolSize吗?

编程

示例demo

public class ThreadChangeTest {

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor executor = new ThreadPoolExecutor(3,

10,

10l, TimeUnit.SECONDS,

new LinkedBlockingQueue<>(10));

int count = 0;

while (true) {

Thread.sleep(1000l);

for (int i = 0; i < 9; i++) {

executor.execute(() -> {

/*try {

Thread.sleep(1l);

} catch (InterruptedException e) {

e.printStackTrace();

}*/

System.out.println("------------core: " + executor.getCorePoolSize() + " active: " + executor.getActiveCount() + " max: " + executor.getMaximumPoolSize());

});

}

count++;

if (count == 20) {

executor.setCorePoolSize(2);

executor.setMaximumPoolSize(9);

System.out.println("----------------------------------------");

}

if (count == 100) {

executor.shutdown();

System.out.println("=============================================");

break;

}

}

Thread.currentThread().join();

}

}

在程序运行中动态修改线程池corePoolSizemaximumPoolSize的值

源码分析

线程池参数调大

public void setCorePoolSize(int corePoolSize) {

if (corePoolSize < 0)

throw new IllegalArgumentException();

int delta = corePoolSize - this.corePoolSize;

this.corePoolSize = corePoolSize;

//核心线程调小,中断空闲任务,否则线程池的当前任务结束,自动调小

if (workerCountOf(ctl.get()) > corePoolSize)

interruptIdleWorkers();

//核心线程数调大后,从队列取任务

else if (delta > 0) {

// We don"t really know how many new threads are "needed".

// As a heuristic, prestart enough new workers (up to new

// core size) to handle the current number of tasks in

// queue, but stop if queue becomes empty while doing so.

//队列大小是否可以取任务

int k = Math.min(delta, workQueue.size());

//队列有任务就取,否则break

while (k-- > 0 && addWorker(null, true)) {

if (workQueue.isEmpty())

break;

}

}

}

public void setMaximumPoolSize(int maximumPoolSize) {

if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)

throw new IllegalArgumentException();

this.maximumPoolSize = maximumPoolSize;

//中断空闲任务,否则线程池的当前任务结束,自动调小

if (workerCountOf(ctl.get()) > maximumPoolSize)

interruptIdleWorkers();

}

源码看出:线程池的调节时直接设置corePoolSizemaximumPoolSize的值

其中

workerCountOf(ctl.get())

代表工作任务线程数,参考我的博客JDK8线程池-ThreadPoolExecutor源码解析

调大corePoolSizemaximumPoolSize,线程池运行过程中自动生效,线程池处理逻辑增强。

线程池调小

调小corePoolSizemaximumPoolSize均会执行

interruptIdleWorkers();

跟踪interruptIdleWorkers源码

private void interruptIdleWorkers() {

interruptIdleWorkers(false);

}

private void interruptIdleWorkers(boolean onlyOne) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

//workers是所有已存在的线程,包括空闲线程

for (Worker w : workers) {

Thread t = w.thread;

//这里注意,非常关键,加锁w.tryLock()

if (!t.isInterrupted() && w.tryLock()) {

try {

t.interrupt();

} catch (SecurityException ignore) {

} finally {

w.unlock();

}

}

//从上面的参数onlyOne is false

if (onlyOne)

break;

}

} finally {

mainLock.unlock();

}

}

这里的workers注意:是一个HashSet,存放规则:

核心线程优先占满,即使核心线程有空闲,新任务来了会优先开启新的线程而不是复用,核心线程仅在占满才会复用,然后使用队列,最后使用max线程,max线程数对应的workers会动态变化,

参考我的博客JDK8线程池-ThreadPoolExecutor源码解析

线程池任务执行源码

我们看ThreadPoolExecutor执行任务的源码,参考我的博客JDK8线程池-ThreadPoolExecutor源码解析

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

//这里注意,加锁了,非常关键

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

//任务执行

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

可以看出在任务拿出来后,立即加锁

包括任务执行的过程都是加锁的。

加锁分析

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

/** Delegates main run loop to outer runWorker */

public void run() {

runWorker(this);

}

protected boolean tryAcquire(int unused) {

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

protected boolean tryRelease(int unused) {

setExclusiveOwnerThread(null);

setState(0);

return true;

}

public void lock() { acquire(1); }

public boolean tryLock() { return tryAcquire(1); }

public void unlock() { release(1); }

public boolean isLocked() { return isHeldExclusively(); }

使用了AQS,自定义了加锁方式CAS模式

public abstract class AbstractQueuedSynchronizer 

extends AbstractOwnableSynchronizer

implements java.io.Serializable {

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

}

可以看出使用tryAcquiretryRelease,均重写方法

protected boolean tryAcquire(int unused) {

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

compareAndSetState(0, 1)

使用上面的代码加锁,意味着线程执行过程中都是加锁的,不会被销毁,只会销毁空闲线程,或者当前线程执行结束销毁。

线程池调小corePoolSizemaximumPoolSize对当前正在执行的任务没有影响。

调节队列大小

队列是不可以动态调整的。

private final int capacity;

总结

  • 线程池corePoolSizemaximumPoolSize调大注意max线程数不要调过大,计算机资源是有限的。

  • 线程池的队列初始化大小注意,不能动态调节,队列占用的是堆内存,注意JVM的内存大小与GC能力,尽量减小大对象的存在。

  • 线程池corePoolSizemaximumPoolSize和队列调小注意,线程池的处理能力减弱,可能会执行拒绝策略。

参考地址

  • https://blog.csdn.net/fenglllle/article/details/84473345

如果大家喜欢我的文章,可以关注个人订阅号。欢迎随时留言、交流。如果想加入微信群的话一起讨论的话,请加管理员简栈文化-小助手(lastpass4u),他会拉你们进群。

以上是 你清楚如何动态的调整动态调整corePoolSize与maximumPoolSize吗? 的全部内容, 来源链接: utcz.com/z/515094.html

回到顶部