深入解析 Java 线程池

深入解析 Java 线程池

文章目录

  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。

一、什么是线程池

线程池 (Thread Pool) 是一种常见的并发编程技术,也是一种用于管理和复用线程的机制。它可以在应用程序启动时预先创建一定数量的线程,并将这些线程放入一个池中,等待分配任务。当有任务需要执行时,就可以从线程池中获取一个空闲的线程来执行任务,任务执行完后将线程放回池中,以便下次使用。这样就避免了频繁创建和销毁线程的开销,提高了系统的性能和效率。

二、线程池的优点

线程池的优点主要包括以下几点:

  • 节省资源,实现线程复用,减少线程创建销毁开销。
  • 快速响应,利用待命线程,新任务到达即可执行。
  • 强化管理,集中监控线程,灵活调整策略,保障系统稳定。
  • 动态扩容,依据负载调整线程数,提升系统灵活性和扩展性。
  • 代码清晰,分离线程控制与业务逻辑,易于维护。
  • 并发控制,合理限制并发数,防止单位时间内任务过多压垮系统。
  • 灵活调度,支持多种任务调度方式,实现复杂逻辑更简便。

三、相关的一些概念

有一些与线程池相关的一些概念,本人认为了解这些概念,可以加深对线程池的理解。

3.1 同步和异步 (Sync And ASync)

同步 (Sync) 和异步 (Async) 是两种不同的执行方式和编程模型,用于描述任务的执行和结果的处理方式。

同步指的是一种阻塞式执行方式,即在执行某个操作时,必须在前一个任务完成后才开始执行,并且执行时会按照顺序依次执行。同步方式适用于顺序执行和依赖前置任务结果的场景。它可以确保任务按照特定的顺序执行,简化了并发控制和资源管理。然而,同步方式可能导致任务之间的等待和阻塞,影响整体的执行效率。

异步则是一种非阻塞式执行方式,即在执行某个操作时,任务可以并发执行,不需要等待前一个任务完成就可以开始执行,但是不能保证任务的执行顺序。异步方式适用于并行执行和提高系统的响应性能的场景。它可以充分利用 CPU 和 IO 资源,减少任务之间的等待时间,提高系统的并发能力。然而,异步方式可能需要处理并发控制、线程安全和结果处理的复杂性。

无论是同步还是异步,它们各自的优势和局限性决定了在不同场景下的应用选择:

  • 同步任务: 适用于需要严格按照顺序执行、确保数据一致性和流程连贯性的场合,如数据库事务处理、文件系统操作等。
  • 异步任务: 当面对高并发请求、需要最大化系统资源利用率时,异步模型成为首选,常见于 Web 服务、大数据处理和实时通信系统中。
  • 异步回调: 在事件驱动的环境中,异步回调机制 (如事件监听器、Promise/async/await) 提供了一种优雅的方式来处理异步操作的结果,避免了“回调地狱”,简化了错误处理和流程控制。

在实际开发中,同步和异步的选择通常会根据具体的业务场景和需求进行权衡和选择。不过高效的应用往往是在两者之间寻找最佳平衡点,利用异步机制提升性能的同时,通过适当的同步策略确保业务逻辑的正确性和数据的一致性。

3.2 携程 (Coroutine)

携程 (Coroutine) 是一种轻量级的并发编程模型和协作式的并发机制,能够在一个线程内支持多个并发执行流程。通过协作式调度实现非抢占式多任务调度,在不同的协程间进行灵活切换,达到近乎并发的效果,而无需承担高昂的线程切换成本。

协程的核心优势在于其轻量级属性,这使得它能在单个线程中承载大量并发任务,避免了多线程环境下的上下文切换开销。相比于重量级的线程,协程的创建、切换和销毁成本极低,从而显著提升了程序的执行效率和响应速度。此外,协程的非抢占式调度机制确保了执行流程的可控性,开发者可以通过显式调用挂起和恢复点,精确控制协程的执行流程,实现更加灵活的任务调度和资源管理。

携程的特点总的来说可以概况如下:

  • 轻量级: 携程是一种轻量级的并发编程模型,它能够在单线程中实现多个协程的并发执行,避免了线程切换的开销。
  • 高效性: 协程间的切换由应用程序自身控制,无需操作系统干预,极大地加快了切换速度。
  • 灵活性: 协程能够在任何指定点暂停和恢复执行,为复杂控制流设计提供了更多可能性。
  • 可读性: 携程的代码结构更加清晰和易读,可以避免回调地狱和复杂的同步代码。

携程的实现方式有很多种,不同的语言也提供了不同的携程库。在 Java 语言中,携程最常见的实现是通过第三方库或框架,比如 Project Loom、Quasar、CoroutineIO、Kotlin Coroutines 等。这些库提供了携程的语法和运行时支持,使得在 Java 中使用携程变得更加简单和方便。

协程作为一种前沿的并发编程模型,正逐步改变着我们处理并发问题的方式。其轻量级、高效性和灵活性的特性,使其在异步编程、并发控制和高性能计算等领域展现出广阔的应用前景。随着技术的不断演进,协程有望成为构建高性能、高响应性软件系统的关键技术之一。

携程的优点在于能够提高程序的并发性能,简化程序的开发难度,同时避免传统多线程开发中的一些问题,例如死锁和竞态条件。需要注意的是,携程的实现方式是通过协作式调度来实现的,需要程序员在编写代码时显式地控制何时让出 CPU,否则可能导致程序阻塞。

四、线程池创建工具 Executors

4.1 Executors 是什么

Executors 是一个位于 java.util.concurrent 包下的线程池创建工具,在该工具中提供了一系列工厂方法,可以用于创建各种类型的线程池。这些方法简化了线程池的创建过程,使得开发者无需深入理解线程池的复杂构造,就能轻松获取到符合项目需求的线程池实例。

4.2 Executors 方法

Executors 类中提供了多个方法用于创建不同类型的线程池,下面是一些常用的方法:

  • newFixedThreadPool(int nThreads): 创建一个固定大小的线程池,其中包含指定数量的线程。
  • newCachedThreadPool(): 创建一个可缓存的线程池,线程池中的线程数量会根据需要进行动态调整。
  • newSingleThreadExecutor(): 创建一个只有一个线程的线程池,用于顺序执行任务。
  • newWorkStealingPool(): 创建一个工作窃取线程池,适用于处理大量的细粒度任务。
  • newScheduledThreadPool(int corePoolSize): 创建一个可以调度任务的线程池,可以延迟执行任务或定期执行任务。
  • defaultThreadFactory(): 创建并返回一个默认的线程工厂。
  • callable(Runnable task, T result): 将一个 Runnable 对象转换为一个 Callable 对象。

除了以上提到的方法外,在 Executors 类中还提供了其它一些辅助方法,例如 unconfigurableExecutorService(ExecutorService executor) 方法额可以用于创建一个不可配置的线程池,防止外部代码修改线程池的配置;newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 方法允许自定义线程工厂,用于创建具有特定属性的线程,如线程名前缀、优先级等。

4.2 使用 Executors 时的注意事项

使用 Executors 工具类来创建线程池虽然方便快捷,但它也有一些需要注意的事项,以避免潜在的问题,特别是那些可能引起性能下降或内存溢出的情况。以下是使用 Executors 创建线程池时的一些重要考虑点:

⑴ 避免使用默认的线程工厂

Executors 默认的线程工厂可能不会给线程提供有意义的名字,这对于调试和问题定位不利。建议使用自定义的 ThreadFactory,为线程提供更具描述性的名称。

⑵ 明确线程池的运行规则

使用 Executors 的静态工厂方法创建线程池时,你可能无法完全控制线程池的行为。例如,newFixedThreadPool 和 newSingleThreadExecutor 使用了 LinkedBlockingQueue 作为任务队列,其容量是 Integer.MAX_VALUE,这可能导致在高负载下内存溢出。因此,推荐使用 ThreadPoolExecutor 直接控制参数,如队列类型、拒绝策略、核心线程数等。

⑶ 控制线程池的最大大小

newCachedThreadPool 没有固定的线程上限,这可能导致在高并发场景下创建大量线程,从而耗尽系统资源。应当谨慎使用或考虑自定义线程池的大小限制。

⑷ 设置合理的拒绝策略

当线程池满载且任务队列已满时,Executors的默认行为是抛出RejectedExecutionException。你应该根据实际情况设置一个合理的拒绝策略,如丢弃最老的任务、使用 CallerRunsPolicy 让调用者线程执行任务,或是自定义策略。

⑸ 注意线程池的关闭:

在不再需要线程池时,务必调用 shutdown 或 shutdownNow 方法来关闭线程池,释放资源。否则,线程可能持续运行,导致内存泄露或进程无法正常退出。

⑹ 监控和调整:

根据应用的实际负载和性能指标,定期监控线程池的运行情况,并根据需要调整线程池的参数,如核心线程数、最大线程数、队列大小等。

五、Java 线程池类的关联关系

在 Java 线程池的实现中,一般都是从 Executor 接口作为启动,然后 ExecutorService 继承该接口,对其功能进行扩展。

Java 线程池类图

通过线程池的关系结构可以了解到,ExecutorService 是线程池的核心接口,而 ThreadPoolExecutor 是线程池的核心实现类,整体上是通过实现层层接口和继承抽象类,实现了从最基础的任务执行到复杂的线程池管理功能。其中的每个类的作用介绍如下:

⑴ Executor

Executor 是一个顶层接口,代表一个执行任务的对象。它仅定义了一个方法 void execute(Runnable command),用于提交一个 Runnable 任务用于执行,但不关注任务的执行结果和返回值。

⑵ ExecutorService

ExecutorService 继承了 Executor 接口,并对其功能进行了扩展,提供了更丰富的管理任务执行的方法。比如提交 Callable 任务、批量执行任务、关闭线程池等操作。

⑶ AbstractExecutorService

为了减少实现 ExecutorService 接口的冗余代码,抽象类 AbstractExecutorService 应运而生。它实现了 ExecutorService 接口中的部分方法,为子类提供了一系列可复用的模板逻辑,提升了代码的复用性和可维护性。

⑷ ThreadPoolExecutor

ThreadPoolExecutor 是一个真正实现线程池功能的类,在该类中提供了灵活的线程池配置,包括核心线程数、最大线程数、线程空闲存活时间、任务队列、拒绝策略等。此外,ThreadPoolExecutor 还实现了高效的任务调度与线程复用机制,是进行线程管理和任务调度的强力工具。

⑸ ScheduledExecutorService

ScheduledExecutorService 是一个扩展了 ExecutorService 的接口,专门用于支持定时和周期性任务的执行。它提供了如 schedule、scheduleAtFixedRate 和 scheduleWithFixedDelay 等方法,允许任务在特定延迟后执行或按照固定的周期重复执行,极大地拓宽了线程池的应用场景。

⑹ ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 实现了 ScheduledExecutorService 接口,并且也继承了 ThreadPoolExecutor 类,因此该类具有所有关于线程池配置和管理的灵活性,并在此基础上加入了对定时任务的支持。通过维护一个 DelayedWorkQueue (一个基于优先级队列的实现,用于存放待执行的 Delayed 任务),使得其能够准确地控制任务的执行时机,是执行定时任务和周期性任务的理想选择。

六、ThreadPoolExecutor 线程池

通过上面的类关系和描述可以了解到,ThreadPoolExecutor 才是线程池的核心,这里对其进行详细介绍。

6.1 ThreadPoolExecutor 介绍

ThreadPoolExecutor 是 Java 并发编程中一个非常核心的类,位于 java.util.concurrent 包下,它提供了一个强大且灵活的线程池实现。作为 ExecutorService 接口的实现,ThreadPoolExecutor 允许开发者自定义和管理线程池的各个方面,以高效地执行并行或异步任务。

6.2 ThreadPoolExecutor 方法

在 ThreadPoolExecutor 中存在很多方法,大体上可以分为 线程池操作相关方法线程池状态查询相关方法。如下:

(1) 线程池操作相关方法

  • execute(Runnable command): 将任务提交给线程池执行。该方法接收一个实现了 Runnable 接口的任务,并将其放入任务队列中等待执行。如果有空闲线程,将会立即执行任务,否则任务会等待执行。
  • submit(Callable task): 将 Callable 任务提交给线程池执行,并返回一个表示任务执行结果的 Future 对象。Future 对象可以用于获取任务的执行结果或取消任务的执行。
  • shutdown(): 关闭线程池。该方法执行后将停止接收新的任务,并尝试将队列中的任务全部执行完毕。已提交但未开始执行的任务将被取消。
  • shutdownNow(): 立即关闭线程池。该方法将尝试中断当前正在执行的任务,并返回队列中等待执行的任务列表。
  • purge(): 清理线程池中已经被取消或者已完成的任务。
  • isShutdown(): 判断线程池是否已经关闭。
  • isTerminated(): 判断线程池是否已经完全终止,即所有任务都已经执行完成并且线程池已经关闭。
  • prestartCoreThread(): 预启动一个核心线程。
  • prestartAllCoreThreads(): 启动线程池中的所有的核心线程,即预先创建线程池中的全部核心线程。
  • awaitTermination(long timeout, TimeUnit unit): 等待线程池的关闭。该方法阻塞调用线程,直到线程池关闭或超过指定的等待时间。
  • setThreadFactory(ThreadFactory threadFactory): 设置线程池的线程工厂。
  • setCorePoolSize(int corePoolSize): 设置线程池的核心线程数,即线程池中保留的最小线程数。
  • setKeepAliveTime(long time, TimeUnit unit): 设置线程池中空闲线程的存活时间。
  • setMaximumPoolSize(int maximumPoolSize): 设置线程池最大线程数,即当等待队列满了之后可以创建的最大线程数量。
  • allowCoreThreadTimeOut(boolean value): 设置线程池中核心线程是否允许闲置且达到空闲线程的存活时间后被释放掉。
  • setRejectedExecutionHandler(RejectedExecutionHandler handler): 设置线程池的拒绝策略。当线程池和任务队列都满了后,对新提交的任务的处理方式。可选的拒绝策略有 AbortPolicy、CallerRunsPolicy、DiscardPolicy 和 DiscardOldestPolicy。

这些方法提供了线程池的基本管理功能,可以根据具体需求进行选择和配置,使用这些方法可以帮助我们更好地管理线程池。

(2) 线程池监控相关方法

  • getActiveCount(): 获取线程池中正在执行任务的线程数量。
  • getCompletedTaskCount(): 获取线程池中已经完成的任务数量。
  • getCorePoolSize(): 获取线程池中核心线程数量。
  • getLargestPoolSize(): 获取线程池曾经达到的最大线程数量。
  • getMaximumPoolSize(): 获取线程池允许的最大线程数。
  • getPoolSize(): 获取线程池中线程数量。
  • getQueue(): 获取线程池中的任务队列。
  • getQueueSize(): 获取任务队列中等待执行的任务数量。
  • getTaskCount(): 获取线程池中提交过的任务数量,包括已经完成的和正在执行的任务。
  • getRejectedExecutionHandler(): 获取线程池的拒绝策略。

通过这些监控方法,可以监测线程池的工作情况、任务的执行进度以及线程的利用率,有了这些数据可以便于我们开发人员进行性能分析、调优和故障排查。

不过需要注意的是,监控方法返回的数据是瞬时状态,随着线程池的运行和任务的执行会发生变化。因此,在分析和处理这些数据时,需要综合考虑线程池的动态变化和实际需求。

6.3 ThreadPoolExecutor 配置参数

ThreadPoolExecutor 提供了多个构造函数,可以根据需要选择不同的参数来创建线程池。其中可配置的参数如下:

  • corePoolSize (核心线程数): 线程池中一直保持活动状态的核心线程数量。即使这些线程处于空闲状态,也不会被销毁。默认情况下核心线程数为 0。
  • maximumPoolSize (最大线程数): 线程池中允许存在的最大线程数量。如果任务数量超过核心线程数并且工作队列已满,则会创建新的线程来执行任务,但不会超过最大线程数。
  • keepAliveTime (线程空闲时间): 非核心线程在没有任务执行时保持活动的时间。当线程池中的线程数量超过核心线程数 corePoolSize 时,多余的线程在没有任务执行后就变成了空闲线程,空闲线程会在指定的时间后被销毁。
  • unit (时间单位): 即 keepAliveTime 参数的时间单位。
  • workQueue (任务队列): 用于存储待执行任务的队列。常见的有无界队列 (LinkedBlockingQueue)、有界队列 (ArrayBlockingQueue) 和同步移交队列 (SynchronousQueue) 等。
  • threadFactory (线程工厂): 用于创建新线程的工厂。可以自定义线程工厂来定制线程的创建方式,如线程的名称、优先级等。
  • handler (拒绝策略): 当线程池已满无法接收新任务时,如何处理新提交的任务的策略。常见的有 AbortPolicy(中止策略)、CallerRunsPolicy(调用者运行策略)、DiscardPolicy(直接丢弃策略) 和 DiscardOldestPolicy(丢弃最老任务策略) 等。

不同的参数有不同的作用,它们常搭配组合进行配置,下面是一些详细的参数介绍:

⑴ corePoolSize 和 maximumPoolSize

其中,corePoolSize 和 maximumPoolSize 是非常重要的参数,它们决定了线程池的大小和容量。corePoolSize 表示在任务队列为空时一直保持活动的线程数,maximumPoolSize 表示在任务队列已满时可以创建的最大线程数。如果任务数量超过了 maximumPoolSize,则会根据拒绝策略来处理任务。

⑵ keepAliveTime 和 unit

keepAliveTime 和 unit 是用于控制线程空闲时间的参数。keepAliveTime 表示线程在没有任务执行时保持活动的时间,unit 表示 keepAliveTime 参数的时间单位。如果线程在 keepAliveTime 时间内没有任务执行,则会被回收。

⑶ workQueue

workQueue 是用于存储等待执行的任务的队列。ThreadPoolExecutor 支持多种任务队列,包括 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等。其中,ArrayBlockingQueue 和 LinkedBlockingQueue 是常用的任务队列,它们具有不同的特性和优缺点。ArrayBlockingQueue 是一个固定大小的队列,它的容量是固定的,可以根据需要动态调整。LinkedBlockingQueue 是一个动态大小的队列,它的容量是可变的,可以根据需要自动扩容。

⑷ threadFactory

threadFactory 是用于创建新的线程的工厂类。ThreadPoolExecutor 支持多种线程工厂,包括 DefaultThreadFactory、CustomizableThreadFactory 等。其中,DefaultThreadFactory 是默认的线程工厂类,它可以根据线程的名称、优先级、守护线程等参数来创建新的线程。CustomizableThreadFactory 是可定制的线程工厂类,它可以根据需要来定制线程的名称、优先级、守护线程等参数。

⑸ handler

handler 是用于处理线程池中的异常的异常处理器。ThreadPoolExecutor 支持多种异常处理器,包括 AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy 等。其中,AbortPolicy 表示线程池会抛出异常,CallerRunsPolicy 表示线程池会调用任务的 execute() 方法,DiscardOldestPolicy 表示线程池会丢弃最老的任务,DiscardPolicy 表示线程池会丢弃任务。

6.4 ThreadPoolExecutor 运行机制

ThreadPoolExecutor 的计遵循了生产者-消费者模型,这使其能够有效地分离了任务的提交(生产)和任务的执行(消费)。这种设计极大提高了系统对于任务处理的灵活性和效率,同时也便于管理和控制线程资源。总的来说,ThreadPoolExecutor 的运行机制可以大致划分为 任务管理线程管理 俩个部分,如下:

(1) 任务管理

任务管理存在以下几个流程:

  • 任务提交: 应用程序通过调用 execute(Runnable task) 或 submit(Callable task) 方法向线程池提交任务。提交时,线程池会执行一系列策略来决策如何处理这个任务。
  • 直接执行: 如果当前线程池中的线程数量小于核心线程数 (corePoolSize),那么就会创建一个新的线程去执行新提交的任务,即使此时工作队列为空。
  • 缓冲到队列: 当线程池中的线程数量等于或者超过了核心线程数时,新提交的任务会被放置到任务队列中进行缓存。默认情况下,如果队列未满,则任务可以加入到队列并进行排队。
  • 拒绝策略: 如果任务队列已满,并且当前线程数达到了最大线程数 (maximumPoolSize),那么线程池将按照预设的拒绝策略来处理这个任务,比如抛出异常、直接丢弃任务或由调用者线程自己执行任务。

(2) 线程管理

线程管理存在以下几个流程:

  • 线程创建: 线程池会根据需要动态创建线程,直到达到核心线程数。核心线程在无任务执行时不会被销毁,除非显式调用了线程池的关闭方法。
  • 线程复用: 一旦线程被创建,它们会在执行完一个任务后尝试从队列中获取下一个任务继续执行,而不是被销毁,从而实现线程复用。
  • 线程扩展: 当队列已满并且核心线程都在忙碌时,线程池会增加非核心线程 (线程数可以达到maximumPoolSize),以应对临时的高峰负载。
  • 线程回收: 非核心线程在空闲一段时间后会被回收 (空闲等待时间由keepAliveTime参数决定),以减少资源占用。核心线程是否回收也取决于配置。
  • 线程池关闭: 通过调用 shutdown() 或 shutdownNow() 方法可以平滑地关闭线程池。前者不允许提交新任务,但会等待所有已提交任务执行完毕;后者尝试停止所有正在执行的任务,并不再接收新提交的任务。

ThreadPoolExecutor 运行机制

6.5 ThreadPoolExecutor 线程池状态

在 ThreadPoolExecutor 类中定义了五种线程池运行状态,分别为:

  • RUNNING(运行中): 线程池处于活动状态,能够接收新的任务,也可以处理任务队列中的任务。
  • SHUTDOWN(关闭中): 线程池不再接收新的任务,但仍然会处理已添加到队列中的任务。当所有任务都执行完毕后,线程池会进入 TERMINATED 状态。
  • STOP(停止中): 线程池不再接收新的任务,并且会尝试中断正在执行的任务。它会通过调用 Thread.interrupt() 方法来中断线程。当所有任务都执行完毕后,线程池会进入 TERMINATED 状态。
  • TIDYING(整理中): 线程池中的任务都执行完毕后就会进入这个状态。线程池会执行一些清理操作,例如中断处于空闲状态的工作线程。当清理完成后,线程池会进入 TERMINATED 状态。
  • TERMINATED(已终止): 线程池已经完全终止,不再接收新的任务,也不再处理已添加的任务。此时,线程池中的所有线程都已经销毁。

其生命周期转换如下入所示:

而且,ThreadPoolExecutor 类中的运行状态是通过变量 ctl(control) 来记录和管理的。ctl 是一个 AtomicInteger 类型变量,存储的是一个 32 位的整型数值,该值是一个由 运行状态(unState)工作线程数量(workerCount) 组合而成,其中,高 3 位用于表示线程池的运行状态,低 29 位用于表示线程池中的工作线程数量。

具体来说变量 ctl 的位分布如下:

  • 高3位(高29-31位): 表示线程池的运行状态。其中,最高位 (第31位) 表示是否处于 SHUTDOWN 状态,次高位 (第30位) 表示是否处于 STOP 状态,次次高位 (第29位) 表示是否处于 TIDYING 或 TERMINATED 状态。这三个位不能同时为 1。
  • 低29位(低0-28位): 表示线程池中的工作线程数量。
1private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

可能大家都会有疑问,问什么使用 ctl 一个变量去存储运行状态和工作线程数量两个值?这里列举几个原因:

  • 一致性: 通过使用一个变量,可以确保运行状态和工作线程数量的一致性。这意味着在做相关决策时,不会出现两个值不一致的情况。不需要为了维护两个变量的一致性而占用额外的锁资源。
  • 性能优化: 使用 AtomicInteger 类型的 ctl 变量可以支持原子操作,避免了在多线程环境中对状态和数量进行操作时的竞态条件。这提高了线程池的并发性能。
  • 代码简化: 将运行状态和工作线程数量合并为一个变量,简化了线程池状态的管理和判断逻辑。不再需要额外的变量或标志位来记录线程池的状态。

所以,在 ThreadPoolExecutor 类中,是通过使用 AtomicInteger 类型的 ctl 变量来存储运行状态和工作线程数量,这样可以确保数据一致性,也能一定程度上保证性能,并简化代码逻辑。这种设计优化使得 ThreadPoolExecutor 类更高效、可靠和易于维护。

6.6 ThreadPoolExecutor 任务缓冲机制

在 ThreadPoolExecutor 线程池中,使用了一个阻塞队列BlockingQueue 来缓存提交待执行的任务。通过使用阻塞队列,线程和任务能够有效地解耦,同时实现了任务的延迟执行和调度。

当线程池中的核心线程都在忙于执行任务时,没有空闲的线程来执行新提交的任务。这时,新提交的任务会被暂时存储在队列中进行缓冲操作。待线程有空闲时,它们会按照队列的顺序从队列中获取待执行的任务,并开始执行。

使用阻塞队列的好处是它提供了一种线程间通信的机制。当线程池中的线程都在忙碌时,新提交的任务不会立即被执行,而是会进入阻塞队列等待。这样可以避免任务的丢失,并且在任务数量过多时,能够有效地平衡线程的执行速度和任务的产生速度,避免资源耗尽和系统过载。

不同类型的阻塞队列有不同的特性,可以根据具体的需求,选择合适的阻塞队列类型来控制任务的缓冲能力和线程池的行为。通过合理配置阻塞队列的容量和选择合适的阻塞策略,可以优化线程池的性能和资源利用效率。

Java 中提供了很多类型的阻塞队列,常用的有以下几种:

  • ArrayBlockingQueue: 使用数组实现的有界队列。在创建时需要指定队列的容量,当队列满时,新提交的任务会被阻塞,直到有空闲线程来执行任务。
  • LinkedBlockingQueue: 使用链表实现的无界/或有界队列(如果没有配置队列大小就属于无界队列,否则属于有界队列)。它可以缓存任意数量的任务,当线程池中的线程都在执行任务时,新提交的任务会被放入队列尾部等待执行。
  • SynchronousQueue: 一个没有存储能力的阻塞队列。每个插入操作必须等待一个对应的删除操作,反之亦然。当线程池中的线程都在执行任务时,新提交的任务会被直接交给线程处理,而不会缓存在队列中。
  • DelayQueue: 一个支持延迟执行的阻塞队列,可以设置任务的延迟执行时间。
  • PriorityBlockingQueue: 一个支持优先级的阻塞队列,可以按照优先级来执行任务。

通过选择不同类型的任务队列,可以根据需要调整线程池的行为。比如使用无界队列可以接受任意数量的任务,但可能会导致内存资源的耗尽。使用有界队列可以限制任务的数量,但可能会导致任务被拒绝。使用同步队列可以实现零缓冲的任务提交,但可能会导致线程池中的线程不断创建和销毁。所以,需要根据实际情况选择合适类型的队列,设置合适的大小,才能保证任务稳定的执行。

6.7 ThreadPoolExecutor 任务拒绝机制

ThreadPoolExecutor 任务拒绝机制是指当线程池中的线程数达到最大值,且阻塞队列已满时,新提交的任务如何拒绝的一种机制。这种情况下,线程池会根据预先设置的拒绝策略来处理这些被拒绝的任务。常见的拒绝策略包括抛出异常、丢弃任务、丢弃队列中最早的任务等。开发人员可以根据实际业务需求选择合适的拒绝策略。

ThreadPoolExecutor 提供了几种内置的任务拒绝策略,可以通过调用 setRejectedExecutionHandler() 方法进行设置。以下是常用的拒绝策略:

  • AbortPolicy(默认策略):拒绝并抛出拒绝执行异常 (RejectedExecutionException)。这是默认的拒绝策略,当线程池无法处理新任务时,会立即抛出异常。
  • CallerRunsPolicy:将任务交给提交当前任务的线程去执行,这样可以保证任务一定能够被执行,但会影响到任务提交线程的性能。
  • DiscardPolicy:直接丢弃被拒绝的任务,不抛出异常也不执行任务,即直接将任务丢弃,不进行任何处理。
  • DiscardOldestPolicy:丢弃任务队列中最早提交的一个任务,为新任务腾出位置,然后尝试重新提交被拒绝的任务。

除了上述内置的拒绝策略,还可以通过实现 RejectedExecutionHandler 接口自定义拒绝策略,根据业务需求来决定如何处理被拒绝的任务。自定义的拒绝策略需要实现 RejectedExecutionHandler 接口的 rejectedExecution() 方法,该方法接收被拒绝的任务和线程池作为参数,在方法中可以自定义处理逻辑,如记录日志、返回特定结果等。

比如,可以自定义一个拒绝策略来记录被拒绝的任务信息:

1class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
2
3    @Override
4    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
5        System.out.println("Task rejected: " + runnable.toString());
6        // 可以根据需求进行其他处理
7    }
8    
9}

然后在创建线程池时,将自定义拒绝策略 CustomRejectedExecutionHandler 作为线程池的拒绝策略:

1ThreadPoolExecutor executor = new ThreadPoolExecutor(
2    corePoolSize,
3    maximumPoolSize,
4    keepAliveTime,
5    unit,
6    workQueue,
7    new CustomRejectedExecutionHandler()
8);

通过设置适合的任务拒绝策略,可以更好地控制任务的处理方式,避免任务被丢失或导致系统异常。选择合适的拒绝策略要根据具体业务场景和需

七、ThreadPoolExecutor 执行过程

7.1 提交任务与调度

当需要使用 ThreadPoolExecutor 线程池执行任务时,可以通过调用线程池中的 execute() 或者 submit() 方法向线程池中提交任务,然后线程池会根据 线程池的运行状态工作线程的数量 以及 配置的策略,决定接下来的执行的流程,是直接申请线程执行,还是将任务加入到队列中进行缓冲,亦或者是直接拒绝该任务。其执行流程大概如下:

  • ① 首先检查线程池的运行状态是否为 RUNNING,如果不是则直接拒绝执行任务。
  • ② 如果 workerCount < corePoolSize,即当前工作线程数小于核心线程数,则创建并启动一个核心工作线程去执行新提交的任务。
  • ③ 如果 workerCount >= corePoolSize,即工作线程数等于或超过核心线程数,并且线程池内的任务队列未满,则将任务添加到该队列中进行缓冲。
  • ④ 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,即工作线程数已达核心线程数上限,任务队列也已满,且线程总数小于最大线程数,则创建并启动一个非核心工作线程去执行新提交的任务。
  • ⑤ 如果 workerCount >= maximumPoolSize,即工作现成数已经超过所允许的最大线程数,并且线程池内的任务队列已满,无法容纳更多任务时, 线程池会根据设置的拒绝策略对任务进行处理。默认的拒绝策略是 AbortPolicy,该策略的处理方式是直接抛异常。

下图直观地展示了上述任务调度流程:

任务调度流程

7.2 增加工作线程

为了掌握线程的状态并维护线程的生命周期,在 ThreadPoolExecutor 中增加了一个内部私有类 Worker,它继承了 AQS 抽象类,还实现了 Runnable 接口,并持有一个线程 thread 和任务 firstTask。每个 Worker 对象都表示一个 工作线程,负责从任务队列中获取任务并执行。该类利用 AQS 提供的同步机制来管理工作线程的生命周期。Worker 对象的创建和销毁由 ThreadPoolExecutor 内部进行管理,可以根据线程池的配置和任务提交情况动态创建或销毁工作线程。

1private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
2    // 持有的线程
3    final Thread thread;
4    // 初始化任务(可以为null)
5    Runnable firstTask;
6}

工作线程执行流程

工作线程的主要职责是执行任务并处理异常。具体的执行逻辑包含在 run() 方法中。一旦工作线程启动,它将不断循环地从任务队列中获取任务并执行。在执行任务的过程中,如果任务执行出现异常,则会将异常上抛,并通过 processWorkerExit() 方法进行处理。无论任务是否执行成功或异常,工作线程都会继续从任务队列中获取下一个任务,直到线程池关闭或任务队列为空。

而工作线程的创建和销毁,则是由线程池内部进行自动管理的,它们的数量会根据线程池的配置和任务负载情况进行动态调整。当线程池中的线程数量超过核心线程数时,空闲的工作线程会根据设置的 keepAliveTime 时间进行回收,以节省系统资源。当线程池关闭时,所有的工作线程会被停止并销毁。

通过 Worker 类的设计和线程池的管理,ThreadPoolExecutor 实现了灵活的线程调度和任务执行机制,能够高效地管理线程资源,并根据实际需求自动调整线程数量,提供了可靠的任务执行环境。这种设计模式使得线程池能够有效地执行大量任务,并提供了对任务执行的监控和管理功能。

7.3 线程申请任务

在任务提交到线程池 ThreadPoolExecutor 后,ThreadPoolExecutor 会按照 "任务提交" 规则执行任务,要么创建一个工作线程 Worker 去执行,要么将任务加入到任务队列中进行缓存等待执行,或者按照拒绝策略拒绝执行。其中工作线程有两种获取任务方式:

  • ⑴ 新创建工作线程 Worker 时,直接设置当前线程第一次要执行的任务;
  • ⑵ 尝试从任务队列中获取任务,获取任务成功后则执行该任务,然后重复执行这个过程;

第 ⑴ 种情况仅出现在线程初始创建的时候。第 ⑵ 种是线程获取任务的绝大多数情况,线程需要从任务队列中不断地取任务执行,实现线程管理模块和任务管理模块之间的通信。这部分策略由 getTask() 方法实现,其执行流程如下图所示:

getTask() 方法执行流程

getTask() 方法中进行了多次判断,为的是控制工作线程 Worker 的数量,使其保持在一定数目,避免过多或者过少,保证线程池正常运行。比如,当前工作线程数量超过核心线程数时,则会返回 null,表示当前没有可以执行的任务。

7.4 工作线程回收

线程池中的线程销毁依赖于 JVM 自动回收,而线程池的主要工作是根据当前线程池的状态来维护一定数量的线程引用,以防止这些线程被 JVM 回收。当线程池决定回收某些线程时,只需将其引用消除即可。

工作线程 Worker 在创建后会不断循环尝试获取任务去执行,如果在 keepAliveTime 时间内没有获取到任务,则会判断当前工作现成数量是否小于 corePoolSize 核心线程数,或者是否设置允许回收核心线程,如果满足其中一个条件,则标记当前线程超时,返回一个 null,表示当前没有可以执行的任务。

当工作线程成功获取任务并执行时,将继续循环执行下一个任务。如果获取的任务为 null,即没有可以执行的任务时,工作线程就会结束循环并主动消除自身在线程池内的引用,并从工作线程集合中移除,等待被 GC 回收。

下面是工作线程循环执行任务的简要源码:

 1final void runWorker(Worker w) {
 2    // 标记线程是否突然中断,默认设置为 true
 3    boolean completedAbruptly = true;
 4    try {
 5        // 循环获取任务并执行
 6        while (task != null || (task = getTask()) != null) {
 7            // 执行任务
 8        }
 9        // 线程没有获取到任务,结束循环,并标记 completedAbruptly 为 false,
10        // 表示并发非意外情况导致线程中断
11        completedAbruptly = false;
12    } finally {
13        // 调用 processWorkerExit 方法,进行工作线程回收前的处理
14        processWorkerExit(w, completedAbruptly);
15    }
16}

从源码中可以看出,线程的回收工作是在 processWorkerExit 方法中进行的。该方法会将线程从线程池的线程集合中移除,并将线程数量减 1,从而完成线程的销毁工作。然而,由于线程销毁的原因可能有多种,线程池还需要判断是什么原因引发了销毁,是否需要改变线程池的状态,并根据新状态重新分配线程等操作。

7.5 线程池关闭

当线程池完成了其生命周期中的任务处理,或者应用程序需要释放与线程池相关的资源时,可以通过调用线程池提供的关闭方法来优雅地关闭线程池。Java 中的 ExecutorService 接口提供了 shutdown()shutdownNow() 两个用于关闭线程池的方法。这两个方法作用如下:

  • shutdown(): 这个方法会将线程池置于终止状态,不再接受新的任务,但是已经提交的任务将继续执行直到完成。一旦所有已提交的任务都执行完毕,线程池将自动关闭。这是线程池关闭的一种较为平滑的方式,因为它允许线程池完成当前正在处理的任务,从而避免数据丢失或处理不完整的情况。
  • shutdownNow(): shutdownNow() 与 shutdown() 方法不同,该方法执行时会尝试立即停止所有正在执行的任务,并取消所有未开始执行的任务。它会返回一个包含那些尚未执行的任务列表,然后线程池将进入关闭状态。这种方法适用于需要立即释放资源或紧急停止线程池的情况,但是需要注意的是,强制停止正在执行的任务可能会导致数据不一致或其他副作用,因此当前方法应当谨慎使用。

在实际应用中,选择哪种关闭方式取决于具体的需求和上下文。如果可以等待任务的自然完成,并且想要避免数据损失,那可以使用 shutdown() 方法;如果需要立即释放资源,或者在异常情况下需要迅速清理线程池,那么使用 shutdownNow() 方法可能是更好的选择。但是,无论采用哪种关闭线程池的方式,都应该在关闭线程池后检查返回的状态,以确保线程池确实已经关闭,并适当地处理任何未完成的任务或异常情况。


  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。