线程池ThreadPoolExecutor源码解析
创始人
2024-05-13 02:27:32
0

参考视频

首先回顾一下创建线程等的三种方式

第一个是直接继承Thread类,重写run方法,这个其实内部也是继承了Runnable接口重写run方法。

比如:

public class MyThread extends Thread{@Overridepublic void run() {System.out.println("这是我自己的线程");}public static void main(String[] args) {new MyThread().start();}
}

Thread类也是继承Runable接口,内部也是重写了run方法,

public
class Thread implements Runnable 但是内部的run方法就是直接调用target的run,这个target也就是一个Runable接口的实现类。
@Overridepublic void run() {if (target != null) {target.run();}}

第二种就是直接继承Runnable接口重写Run方法了。
第三种就是重写Callable接口然后用FutureTask进行包装,最后利用Thread进行启动。这种方式创建的线程可以抛出异常并且是有返回值的。

public class MyCallable implements Callable {@Overridepublic Object call() throws Exception {System.out.println("这是call方法,我自己写的");return "call function";}public static void main(String[] args) {MyCallable myCallable = new MyCallable();try {FutureTask futureTask = new FutureTask<>(myCallable);new Thread(futureTask).start();
//futureTask.get()这个方法会阻塞当前线程的执行。System.out.println(futureTask.get());} catch (Exception e) {e.printStackTrace();}}
}

为什么会出现线程池?

试想一下,如果我们一个比价大型的项目,,每次需要用到多线程都new一个线程,那么这就会导致线程的创建数量我们不好管理和控制,另外创建和销毁线程的开销还是比较大的,甚至有的时候都已经超过了我们这个线程本身执行这个任务所需要的资源耗,这就是非常不合理的了。也有可能会存在线程切换之间的开销过大,资源浪费严重,因此线程池就应运而生了。

我们先看一下这个ThreadPoolExecutor
在这里插入图片描述

这个线程的构造方法中可以传递7个参数,具体的作用和意义这里就不再过多赘述了。这里主要讲解具体这些参数的功能是怎么样实现的?比如为什么线程能够复用,为什么能够保活?

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {

我们在介绍这个类的时候首先先来看一下这个类中的一些属性,避免直接看源码啥也看不懂。

这个ctl是整个线程池的状态控制参数,用这个32位的整数来表示两个状态

  • 第一个就是线程池的运行状态,用最高的三位来表示。
  • 第二个是线程池中的工作线程的数量,用低的29位来表示。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//Integer.SIZE - 3 = 29private static final int COUNT_BITS = Integer.SIZE - 3;//线程池定义的最大工作线程的数量2^29-1private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 下面是线程池的运行状态,保存在ctl的高三位。private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;//ctlOf其实就是按位异或private static int ctlOf(int rs, int wc) { return rs | wc; }

这里顺便介绍一些-1的二进制,我们可以用下面的代码查看-1的二进制

//11111111111111111111111111111111
原码取反变反码
反码加1变成补码
-1的原码:1000 0001
反码:1111 1110
补码:1111 1111
而-1使用的时候为1111 1111
System.out.println(Integer.toBinaryString(-1));

在这个线程池的类中还有一个属性,这个属性如果为false(默认值),则核心线程即使在空闲时也保持活动状态。如果为true,则核心线程使用keepAliveTime超时等待工作。也就是核心线程也会在超时时间过后被销毁。

private volatile boolean allowCoreThreadTimeOut;

下面我们再介绍几个方法

这个方法是用来计算核心线程数的,调用的时候传递的是ctl的值,有没有发现这里用按位与运算就可以实现,所以上面定义工作线程的最大数量的时候是不是很巧妙?这样运算好像效率更高。

private static int workerCountOf(int c)  { return c & CAPACITY; }

上面这个方法再下面这个方法中被调用,当然,下面这个方法也是线程池中的方法。
源码中的注释:启动核心线程,使其空闲等待工作。这将覆盖仅在执行新任务时启动核心线程的默认策略。
如果所有核心线程都已启动,则此方法将返回false。
返回值:如果启动了线程,则为true

public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize &&addWorker(null, true);}

动态设置核心线程数。
这里

public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0)throw new IllegalArgumentException();int delta = corePoolSize - this.corePoolSize;this.corePoolSize = corePoolSize;//这里如果发现设置的核心线程数比原来的核心线程数小就会//调用interruptIdleWorkers打断闲置的线程if (workerCountOf(ctl.get()) > corePoolSize)interruptIdleWorkers();//如果这里发现要设置的线程数大于已经存在的核心线程数,那么就会在阻塞队列不为空的情况下//不断低循环创建核心线程,直到队列为空或者核心线程数达到要求。else if (delta > 0) {// We don't really know how many new threads are "needed".// As a heuristic, prestart enough new workers (up to new// core size) to handle the current number of tasks in// queue, but stop if queue becomes empty while doing so.int k = Math.min(delta, workQueue.size());while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())break;}}}

interruptIdleWorkers内补调用了下面的代码,这里面有个可重入锁,这是线程池类中的一个属性,源码中是这样解释的:
锁定对工人集合和相关簿记的访问。虽然我们可以使用某种类型的并发集合,但事实证明,使用锁通常更可取。其中一个原因是,它序列化了interruptIdleWorkers,这避免了不必要的中断风暴,尤其是在关机期间。否则,退出线程将同时中断那些尚未中断的线程。它还简化了largestPoolSize等的一些相关统计记账。我们还保留mainLock on shutdown和shutdown Now,以确保工人设置稳定,同时分别检查中断和实际中断的权限。

这代码里面有个Worker ,其实就是对工作线程的一个包装,感兴趣的话可以看一下源码。

private final ReentrantLock mainLock = new ReentrantLock();private void interruptIdleWorkers() {interruptIdleWorkers(false);}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();// 这里首先获取到锁然后对工人workers进行遍历,将所有的工作线程的打断状态设置位truetry {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}

下面这个是提供了两个钩子函数,可以在线程池执行某一个线程的前后进行相关的操作

在给定线程中执行给定Runnable之前调用的方法。此方法由将执行任务r的线程t调用,可用于重新初始化
ThreadLocals或执行日志记录。
这个实现什么都不做,但可以在子类中定制。注意:为了正确嵌套多个重写,子类通常应该在这个方法的
末尾调用super.beforeExecute。
参数:
t–将运行任务的线程r–将执行的任务protected void beforeExecute(Thread t, Runnable r) { }protected void afterExecute(Runnable r, Throwable t) { }

这两个方法会在Worker 类的run方法中被调用

private final class Workerextends AbstractQueuedSynchronizerimplements Runnablepublic void run() {runWorker(this);}

我们可以看一下这个方法,我们可以发现这个线程里面不断地调用阻塞队列里面的线程,然后调用它的run方法执行任务,所以实际上没有创建新的线程,你可能比较好奇为这个执行run方法的线程是哪里来的。其实就是addWorker方法创建出来的核心线程来执行的,那为什么核心线程不会start结束呢?为什么核心线程能够复用呢?

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

我们看一下addWorker这个方法,这个方法你会发现会调用了Thread 类的start方法,这里你可能还不明白为什么么,我们看一下前一句的构造方法,w = new Worker(firstTask);

  • Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }

可以看到这个构造方法赋值给Worker类的thread的成员变量就是Worker类自己,因为Worker类实现了Runnable接口,所以这里下面调用的start方法就是调用了上run方法里面的runWorker方法。

private boolean addWorker(Runnable firstTask, boolean core) {/******///* 此处省略创建核心线程之前的判断 *//*****/boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

这里又回到了上面的runWorker方法,为什么不会执行完呢,如果while (task != null || (task = getTask()) != null)这个判断为false了核心线程不就没了吗?所以复用的原理就出来了,我们看一下这个getTask方法,从阻塞队列里面获取任务。这个getTask会不断循环获取,除非当前线程大于核心线程数才会返回null不然就会循环判断。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

相关内容

热门资讯

美国不提安卓系统华为,迈向自主... 华为与美国:一场关于技术、市场与政策的较量在当今这个数字化的世界里,智能手机已经成为我们生活中不可或...
安卓系统怎么打开ppt,选择文... 你有没有遇到过这种情况:手里拿着安卓手机,突然需要打开一个PPT文件,却怎么也找不到方法?别急,今天...
谷歌退回到安卓系统,探索创新未... 你知道吗?最近科技圈可是炸开了锅,谷歌竟然宣布要退回到安卓系统!这可不是一个简单的决定,背后肯定有着...
安卓系统待机耗电多少,深度解析... 你有没有发现,手机电量总是不经用?尤其是安卓系统,有时候明明没怎么用,电量就“嗖”的一下子就下去了。...
小米主题安卓原生系统,安卓原生... 亲爱的手机控们,你是否曾为手机界面单调乏味而烦恼?想要给手机换换“衣服”,让它焕然一新?那就得聊聊小...
voyov1安卓系统,探索创新... 你有没有发现,最近你的手机是不是变得越来越流畅了?没错,我要说的就是那个让手机焕发青春的Vivo V...
电脑刷安卓tv系统,轻松打造智... 你有没有想过,家里的安卓电视突然变得卡顿,反应迟钝,是不是时候给它来个“大保健”了?没错,今天就要来...
安卓系统即将要收费,未来手机应... 你知道吗?最近有个大消息在科技圈里炸开了锅,那就是安卓系统可能要开始收费了!这可不是开玩笑的,这可是...
雷凌车载安卓系统,智能出行新体... 你有没有发现,现在的汽车越来越智能了?这不,我最近就体验了一把雷凌车载安卓系统的魅力。它就像一个聪明...
怎样拍照好看安卓系统,轻松拍出... 拍照好看,安卓系统也能轻松搞定!在这个看脸的时代,拍照已经成为每个人生活中不可或缺的一部分。无论是记...
安卓车机系统音频,安卓车机系统... 你有没有发现,现在越来越多的汽车都开始搭载智能车机系统了?这不,咱们就来聊聊安卓车机系统在音频方面的...
老苹果手机安卓系统,兼容与创新... 你手里那台老苹果手机,是不是已经陪你走过了不少风风雨雨?现在,它竟然还能装上安卓系统?这可不是天方夜...
安卓系统7.dns,优化网络连... 你有没有发现,你的安卓手机最近是不是有点儿“慢吞吞”的?别急,别急,让我来给你揭秘这可能与你的安卓系...
安卓手机系统怎么加速,安卓手机... 你有没有发现,你的安卓手机最近变得有点“慢吞吞”的?别急,别急,今天就来给你支几招,让你的安卓手机瞬...
小米note安卓7系统,探索性... 你有没有发现,手机更新换代的速度简直就像坐上了火箭呢?这不,小米Note这款手机,自从升级到了安卓7...
安卓和鸿蒙系统游戏,两大系统游... 你有没有发现,最近手机游戏界可是热闹非凡呢!安卓和鸿蒙系统两大巨头在游戏领域展开了一场激烈的较量。今...
安卓手机没有系统更,揭秘潜在风... 你有没有发现,现在安卓手机的品牌和型号真是五花八门,让人挑花了眼。不过,你知道吗?尽管市面上安卓手机...
充值宝带安卓系统,安卓系统下的... 你有没有发现,最近手机上的一款充值宝APP,在安卓系统上可是火得一塌糊涂呢!这不,今天就来给你好好扒...
安卓系统8.0镜像下载,轻松打... 你有没有想过,想要给你的安卓手机升级到最新的系统,却不知道从哪里下载那个神秘的安卓系统8.0镜像呢?...
安卓系统修改大全,全方位修改大... 你有没有想过,你的安卓手机其实是个大宝藏,里面藏着无数可以让你手机焕然一新的秘密?没错,今天就要来个...