首页>>后端>>java->Java多线程基础

Java多线程基础

时间:2023-12-05 本站 点击:0

前言

各位亲爱的读者朋友,我正在创作 Java多线程系列 文章,如果您觉得内容还不错,还请点赞支持一下。

在上一篇文章 中,我们回顾了线程生命周期、线程之间相互协作的知识,本篇我们继续挖掘,增强对线程的理解。

作者按:本篇按照自己有限的知识进行整理,如有谬误,还请读者在评论区不吝指出

了解系列以及总纲:Java多线程系列

重要声明:

出于 方便叙述 或 帮助基础尚且薄弱的读者理解文章内容 的目的,文中举了一些例子,但这些例子并 不能 百分百准确的对应Java中的概念,甚至有些幼稚。

读者朋友们应当注意到这一点,并且清晰的意识到自己的目标是理解Java中的概念与设计,而不必纠结于例子是否有失偏颇。

本篇博客的内容较为散碎,以下是内容大纲,您可以结合它挑选感兴趣的内容片段阅读、重新梳理知识

线程的创建与启动

在上一篇文章中,我们提到,调用 Thread#start() 即可启动该线程,而并未挖掘虚拟机 真正启动 一个线程的 具体过程。

可能会让您失望,这一篇依旧不会挖掘这一细节,因为它对设计、编写优质的多线程应用毫无帮助。

如果您对此感兴趣,以下文章可能会有帮助:

面试官问如何启动Java 线程 未查询到源头作者信息

从Java到C++,以JVM的角度看Java线程的创建与运行 作者Van96

先回归到概念:

操作系统中的Thread:是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位

JVM中的Thread:A thread is a thread of execution in a program. The Java Virtual Machine allows an application to have multiple threads of execution running concurrently. -- 似乎解释了,又似乎没有

"编写应用程序时,不会为了展现自己已经掌握了启动线程的知识而去启动线程",即线程是手段,这一点并不难理解。应用程序使用线程的目的在于 完成既定任务 , 并且基于多线程并发能力提高程序的运行效率、或者基于线程的特性界定职责边界使程序有序运行。

举个例子,工厂接了一批订单,需要在能力一致的一批工人中选择一批完成订单的生产,不难理解:工厂老板在意的是订单的完成,而不是工人的名字、星座、爱好。

那么如何定义线程需要完成的任务呢?

在最初的JDK中,存在两种方式:

继承Thread类、覆写 run() 方法定义任务

class PrimeThread extends Thread {    long minPrime;    PrimeThread(long minPrime) {        this.minPrime = minPrime;    }    public void run() {        // compute primes larger than minPrime    }}

组合优于继承的典型例子:实现Runnable接口,作为Thread的任务

class Foo {    class PrimeRun implements Runnable {        long minPrime;        PrimeRun(long minPrime) {            this.minPrime = minPrime;        }        public void run() {            // compute primes larger than minPrime        }    }    foo() {        PrimeRun p = new PrimeRun(143);        new Thread(p).start();    }}

随着JDK的发展,也有更多的方式定义任务,我们将在后续的系列文章中展开。

至此,您应该已经意会了 JDK doc 中所说的 a Thread is a thread of execution in a program

线程池

回到前文举得例子,工厂经过长时间的运转,积累了足够的经验,老板突然顿悟:只要工人能够胜任工作,自己完全没有必要了解工人,只需要:

评估生产任务量

制定好生产计划

把任务和计划交给产线即可

完全不用在意是张三做还是李四做。

工人形如线程,产线便形如线程池。结合工厂的实际情况与任务的特性,可以凝练出 几种产线管理方式 。

在Java中,直接或者间接的依靠配置 ThreadPoolExecutor 获得线程池。

作者按:通过简单的搜索,可以发现大量的探讨线程池的博客,可能受面经影响,部分博客均围绕几个常见地面试问题展开。 但务必注意,线程池的知识内容远不止面试题题干所表现的那些内容!相比之下,理解线程池的设计更为重要。

而我的文字功力有限,无法像教科书那样,顺着严谨的大纲递进式展开,还让文字显得 深刻且有趣,只能尽可能推测读者的兴趣点,展开以下内容

接下来,让我们结合生活经验,以工厂产线为例子,反思推导线程池的设计,了解 ThreadPoolExecutor 最基本的知识。

ThreadPoolExecutor 核心设计

上文中,我们以 产线 类比 线程池 ,"工厂对工人的管理方式" 来类比 "线程池的管理设计" ,并且您一定注意到两处重点:任务 、 工人

在线程池中, 上岗工作的线程 可以类比为 工人 ,完成产线收到的任务。

注意,该类比并不完全准确

不难推测,线程池存在两个核心内容:

任务队列 BlockingQueue<Runnable> workQueue

工作者集合 HashSet<Worker> workers

任务队列用于存储任务,您应该已经注意到,它使用的是juc下的 BlockingQueue 接口。它的本质还是队列,附加了两种特殊的操作:

取 时满足 (或等待至满足) 队列非空

存 时满足 (或等待至满足) 队列有空余空间

既然是接口,自然可以有不同的实现,您可以使用不同的实现作为线程池的任务队列。

在线程池设计中,通过依赖抽象 即BlockingQueue 进行了解耦,只关心存取的时机。您可以自行决定队列的特性,诸如大小、存储方式、优先级排序等

在先前的系列文章中还未涉及 BlockingQueue ,计划将于后续系列文章中展开,故本文也不会围绕它展开内容

接下来,让我们看一看 一个人上岗成为产线工人的全过程 ,即 Thread 成为线程池 Worker 的过程

成为线程池中的工作者

产线确定了一个岗位,管理者把岗位信息给到人力资源部门,并申请配给人力:getThreadFactory().newThread(this)

人力资源部门派遣了一个 Thread 小T 给到产线,小T已经接受了技能培训,并且知道上岗后从产线的 任务队列 中取任务、出卖体力完成它即可, void runWorker(Worker w)

小T 就成为了一个 Worker

您可能意识到,线程池只关心线程的管理,并不关心线程的创建细节,所以再次依赖抽象,对线程创建细节进行了解耦:

public interface ThreadFactory {    Thread newThread(Runnable r);}

关于Worker的定义、职责,泛读以下源码即可了然于心:

public class ThreadPoolExecutor extends AbstractExecutorService {    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {        /** Thread this worker is running in.  Null if factory fails. */        final Thread thread;        /** Initial task to run.  Possibly null. */        Runnable firstTask;        /** Per-thread task counter */        volatile long completedTasks;        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }        /** Delegates main run loop to outer runWorker. */        public void run() {            runWorker(this);        }        //其他略    }    public ThreadFactory getThreadFactory() {        return threadFactory;    }    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                //锁处理和判断略                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    }                    //异常略                    finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }}

您可能留意到,近年来的一些社会不良事件中总会出现一个词:临时工。当然,我并没有任何在此讨论社会问题的意愿, 一条生产线有固定的岗位,也会有按需产生的临时工岗位,甚至完全是临时工岗位,这并不难理解。而线程池也有这样的设计。

public class ThreadPoolExecutor extends AbstractExecutorService {    /**     * Core pool size is the minimum number of workers to keep alive     * (and not allow to time out etc) unless allowCoreThreadTimeOut     * is set, in which case the minimum is zero.     */    private volatile int corePoolSize;    /**     * Maximum pool size. Note that the actual maximum is internally     * bounded by CAPACITY.     */    private volatile int maximumPoolSize;}

注意,线程池在管理时,对线程是一视同仁的,只关心核心线程数量、最大线程数量,并不会依据线程的特征将其分化为 核心/非核心。

产线如何确定一个岗位

上一节我们提到,产线确定了一个岗位后,向人力资源部门要人并安排成为产线工人。

那么产线是如何确定一个岗位的呢?即 线程池如何确定需要增加一个Worker?

产线不会无缘无故的安排工人上岗工作,必然是收到了生产任务,否则就成了合理摸鱼。

顺理成章的,线程池收到一个任务时,在相应的Size限制没有达到时,优先考虑安排线程进行处理,而不是丢到任务队列中等待。

在先前的系列文章中,我们已经了解到,启动线程是较为昂贵了,虽然线程池规划了 核心线程的数量最大线程数量 ,但也不会一开始就全员上岗,而是在任务抵达时逐步的安排线程上岗。

尝试 安排线程上岗 时,

需先判断线程池工作状态,如果线程池已经关闭,自然不会再增加线程,返回失败。比如产线准备停产了,已经接的任务会安排处理,但肯定不会再招工。

如果线程池正常工作,则检查线程数量是否可以继续增加

如果可以继续增加,则尝试更新线程总数,如果失败,则说明在其他线程中也触发了addWorker逻辑,那么线程池的工作状态也可能发生了改变,如果没有改变,则重复步骤2,否则回到步骤1继续检测

上一节中的内容,得到Thread实例并让其成为Worker,开始干活

整个过程中有CAS操作,鉴于有系列文章的撰写计划,文中不再展开,相应代码可参考gist:

参考Gist

向产线(线程池)下达任务

联想一下,市场部门小王拿到了一笔单子,来到产线找到负责人老张,让老张安排干掉

老张拿起了职工工作排期表,发现 还有固定岗位空着 ,则直接向人力资源要人上岗干活;

注意,招人上岗 可能失败 ,比如老张手上的信息不及时,现在已经满额了,也有可能厂长决定产线要停掉,通知人力资源不要再派人了

假如固定岗位招不来人,老张继续核实: 产线未停产 且 可纳入计划,在未停产且可纳入计划的情况下,把任务排进了计划

老张还是很严谨的,又再次核实产线生产状态,如果产线已经停产并且该任务没有被领取,则把任务拒掉

否则检查工人是否在岗,如没有工人在岗,则向人力资源部门要一个 临时工 处理 任务队列中的任务, 如果要不来人,任务也放着

如果产线停产了或者排不进计划了,老张精通人情世故,表示看看能不能拉个临时工来,能拿到人就直接处理,否则就只能拒绝了,

如果是产线停了或者达到了最大人数,则要不来人 -- 参考上个小节

否则临时工会处理该任务

而线程池中与此过程也非常类似,代码比较简短:

public class ThreadPoolExecutor extends AbstractExecutorService {    public void execute(Runnable command) {        if (command == null) // 小王在忽悠人            throw new NullPointerException();        int c = ctl.get();        //1.        if (workerCountOf(c) < corePoolSize) {            //1.i            if (addWorker(command, true))                return;            c = ctl.get();        }        //2.        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (!isRunning(recheck) && remove(command)) // 2.i                reject(command);            else if (workerCountOf(recheck) == 0) // 2.ii                addWorker(null, false);        } else if (!addWorker(command, false)) // 3            reject(command); // 3.i    }}

当你拒绝任务

虽然打工人和老板都想任务及时可靠的被完成,但总有不如意的时候,某些情况下,线程池将不得不拒绝任务。

上文中已经提到:线程池已经关闭、任务队列已经排满。

当线程池拒绝任务时,事情总得有个说法,JDK设计了接口:

public interface RejectedExecutionHandler {    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}

在线程池实例化时,需要指明拒绝策略。

JDK中提供了4个策略:

CallerRunsPolicy -- 在线程池未关闭情况下,访问者线程直接负责处理

AbortPolicy -- 抛出 RejectedExecutionException 异常,这是运行时异常, 默认策略

DiscardPolicy -- 这个任务就此罢休

DiscardOldestPolicy -- 只要线程池没关闭,这件事情就非得干,把排在最前的任务踢掉,重走任务下达流程

蓦然回首,回看构造函数

ThreadPoolExecutor 提供了一系列重载构造函数用于获取特定实例

public class ThreadPoolExecutor extends AbstractExecutorService {    public ThreadPoolExecutor(            int corePoolSize,            int maximumPoolSize,            long keepAliveTime,            TimeUnit unit,            BlockingQueue<Runnable> workQueue,            ThreadFactory threadFactory,            RejectedExecutionHandler handler    ) {        //ignore 参见gist https://gitee.com/leobert_253/codes/wh495q63tvlipum2snca131    }}

以此为例

int corePoolSize, 核心线程数量

int maximumPoolSize, 最大线程数量

long keepAliveTime, 配合 unit 表示的时间,作为IDLE 线程等待任务的超时时间,核心线程如果不允许采用超时机制将一直等待任务(默认)

TimeUnit unit, 配合 keepAliveTime

BlockingQueue

workQueue 任务队列

ThreadFactory threadFactory 线程创建工厂

RejectedExecutionHandler handler 拒绝任务时的策略

当然,这些参数存在一些限制和校验,可参考 gist 进一步阅读,摘自JDK1.8。

线程池的状态标识-ctl的设计

前面的内容中已经提到了线程池Shutdown的状态,线程池具有5个状态,先看一眼代码: 相应的二进制补码已标识

注意,计算机中以补码表示数,如果是有符号数,最高位表示符号,1为负、0为非负,非负数其原码和补码一致,负数的补码: 取原码,符号位不变(保持1),其他位取反,然后加1 得到补码

public class ThreadPoolExecutor extends AbstractExecutorService {    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    /**     * 即29     * */    private static final int COUNT_BITS = Integer.SIZE - 3;    /**     * 0010 0000 00000000 00000000 00000000 -1 =>      * 0001 1111 11111111 11111111 11111111     *     * wc = ctl & CAPACITY 低29位存储wc     * state = ctl & ~CAPACITY 高三位存储状态     * */    private static final int CAPACITY = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits    //补码 111 29个0    private static final int RUNNING = -1 << COUNT_BITS;    //补码 000 29个0    private static final int SHUTDOWN = 0 << COUNT_BITS;    //补码 001 29个0    private static final int STOP = 1 << COUNT_BITS;    //补码 010 29个0    private static final int TIDYING = 2 << COUNT_BITS;    //补码 011 29个0    private static final int TERMINATED = 3 << COUNT_BITS;    private static int ctlOf(int rs, int wc) {        return rs | wc;    }}

很显然,ThreadPoolExecutor 将32位的int分为不同的区域标识信息,在多线程背景下,使用AtomicInteger,但本质还是int

将 32位中的 高三位用于存储状态信息,低29位存储 worker-count(即wc),信息存储于ctl中,非常传统的位运算设计。

考虑到篇幅和阅读体验,其他相关的位运算API已经剥离到 gist , 通过位运算解出状态、wc,判断状态大小等

Running 运行状态(实例化后的默认状态)

Shutdown 不接收新任务,处理任务队列中的任务

Stop 不接收新任务,不处理任务队列中的任务,并且中断正在处理的任务

Tidying 所有的任务已终止时的一个暂态,随后将执行terminate(),成功后进入Terminated状态

Terminated 彻底终止

生命周期变化如下:

线程池目的

此时,请您想一想,创建线程池机制的目的是什么?

前文提到,线程池是对线程进行管理,显然还不是根源。

降低资源消耗。 重复利用已创建的线程,可降低线程创建和销毁造成的消耗。

提高响应速度。 线程池中有线程值守,当任务到达时,不需要每次都等待线程创建。注意,并不排除任务排队、必要的线程启动情况

提高线程的可管理性,对系统运行状态进行调优。 线程是稀缺资源,不能无限制的创建,使用线程池可以进行统一的分配、监控、调优。

面经常客,JDK中提供的线程池

您一定阅读过一些面经,其中包含线程池的题目。作者可能在引导您向着 "JDK中特定的API所提供的线程池特征" 方面展开作答,或者题目看起来就是这样,也许就是一个面试陷阱

作者按,不要单纯的为了应付面试和放弃了学习的初心。结合问题 讲清楚线程池的设计 要比 单纯的、枯燥的罗列通过调用Executors中的API得到的线程池对应的特征 有意义

在JDK1.5中,Java凝练了4种配置方式,可获得特定管理方式的线程池:

Java依据其特征作为Executors中的方法命名,借用它们作为这4类线程池的别名

FixedThreadPool 数量固定、线程可重用

SingleThreadExecutor 仅单个线程

CachedThreadPool 会根据需要创建新线程的线程池

ScheduledThreadPool 可定期或周期执行任务的线程池

前文已经提到,它们直接或者间接的使用了 ThreadPoolExecutor,而不是4个继承类!按照其API命名给了它们别名,但并不是类名!

FixedThreadPool

JDK中提供的包装方法如下:

public class Executors {    public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<Runnable>());    }}

很显然,FixedThreadPool 是一个定额的池,nThreads 即为核心线程数量,亦为最大线程数量,注意值必须大于0。

这条产线的工人就很惨,活多了也不会加派人手,任务排队等待线程空闲;不来活也要在岗位上待着,不会释放线程。

LinkedBlockingQueue 作为任务队列,先到的任务先被处理,并且它是无界的。

SingleThreadExecutor

顾名思义,我们会得到一个单一线程的线程池。

class Foo {    class PrimeRun implements Runnable {        long minPrime;        PrimeRun(long minPrime) {            this.minPrime = minPrime;        }        public void run() {            // compute primes larger than minPrime        }    }    foo() {        PrimeRun p = new PrimeRun(143);        new Thread(p).start();    }}0

可能您会疑惑,

class Foo {    class PrimeRun implements Runnable {        long minPrime;        PrimeRun(long minPrime) {            this.minPrime = minPrime;        }        public void run() {            // compute primes larger than minPrime        }    }    foo() {        PrimeRun p = new PrimeRun(143);        new Thread(p).start();    }}1

已经得到了一个 核心线程数、最大线程数均为1的线程池,为啥要增加 FinalizableDelegatedExecutorService 的参与?

前文未提及但您可能知道,ThreadPoolExecutor是可以重新配置的!例如重新设置核心线程数量:

class Foo {    class PrimeRun implements Runnable {        long minPrime;        PrimeRun(long minPrime) {            this.minPrime = minPrime;        }        public void run() {            // compute primes larger than minPrime        }    }    foo() {        PrimeRun p = new PrimeRun(143);        new Thread(p).start();    }}2

FinalizableDelegatedExecutorService 继承自 DelegatedExecutorService,扩展了在 finalize() 时关闭线程池。 而后者是一个Wrapper,仅暴露 ExecutorService 接口的功能,通过委托的方式封闭了重新配置线程池的能力。

CachedThreadPool

该池将使用 "线程对象" 缓存方案,核心线程数量为0,全部为临时工,并且基于上文的知识:

如果池中没有Worker,则会新增Worker处理,否则任务放入任务队列等待。

非核心线程可以运用获取任务超时时间,当获取任务超时时,则 processWorkerExit 下岗

class Foo {    class PrimeRun implements Runnable {        long minPrime;        PrimeRun(long minPrime) {            this.minPrime = minPrime;        }        public void run() {            // compute primes larger than minPrime        }    }    foo() {        PrimeRun p = new PrimeRun(143);        new Thread(p).start();    }}3

ScheduledThreadPool

class Foo {    class PrimeRun implements Runnable {        long minPrime;        PrimeRun(long minPrime) {            this.minPrime = minPrime;        }        public void run() {            // compute primes larger than minPrime        }    }    foo() {        PrimeRun p = new PrimeRun(143);        new Thread(p).start();    }}4

不同于前三者,此时得到的线程池可 定时 处理任务。

为了实现这一点,ScheduledThreadPoolExecutor

使用 DelayedWorkQueue 改变了获取任务的具体实现

使用装饰模式包装原始任务,使得任务在满足 周期性 的条件时,能够重新进入任务队列

作者按:JDK中的源码实现非常精彩,值得深读。gist链接

相信您已经对线程池的设计有了一定的理解,JDK1.8之后也在线程池中增加了Future相关的内容,本文不再继续展开。通过Executors中API的源码,应当已经掌握得到的线程池的特征。

意犹未尽之处

行文至此,内容已经非常冗长,但也不得不告一段落。 文中的部分内容,例如Future、AtomicInteger、CAS等内容,计划在本系列的其他文章中具体展开,文中亦颇多回避。 而DelayedQueue、线程池生命周期变化时的具体细节、线程池的调优等内容,均需要结合代码、场景具体分析,限于文章主题未能尽兴,读者闲暇之余若能将源码再阅读一二,定能有更大的收获。

原文:https://juejin.cn/post/7098235227490746375


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/12229.html