使用一个或多个池化线程来执行任务的ExecutorService
, 通常情况下使用Executors
类提供的工厂方法进行配置和创建。
An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.
线程池涉及到了两个不同的问题点: 人们常常使用大量的异步任务,通过最终归并每个任务的执行结果的方式去提升执行任务的性能。这种方式需要使用到一些有限且十分宝贵的资源,并对其进行管理。它们包括线程,被消费任务的集合;有的线程池还包含一些基本的统计项,比如已完成任务的数量等。
Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks. Each ThreadPoolExecutor also maintains some basic statistics, such as the number of completed tasks.
为了更广泛的适用于各个场景,这个类提供了非常多的参数和拓展接口。虽然程序员们通常更习惯于去使用Executors
提供的一些更简单的工厂方法:Executors.newCachedThreadPool
(无界线程池,提供了自动扩容线程机制),Executors.newFixedThreadPool
(限定大小线程池),Executors.newSingleThreadExecutor
(单线程异步任务线程池)。这些方法提供了大多数应用场景下的线程池初始化配置方案。其他场景下,可以使用以下配置方法定制该类:
To be useful across a wide range of contexts, this class provides many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient Executors factory methods Executors.newCachedThreadPool (unbounded thread pool, with automatic thread reclamation), Executors.newFixedThreadPool (fixed size thread pool) and Executors.newSingleThreadExecutor (single background thread), that preconfigure settings for the most common usage scenarios. Otherwise, use the following guide when manually configuring and tuning this class:
1. 核心线程数(Core)和最大线程数(Maxium)
线程池对象会通过核心线程数和最大线程数配置规则,自动扩容线程数量。当提交执行一个新的任务时,如果当前线程总数少于核心线程数,线程池会创建一个新的线程来执行这个任务,哪怕当前有线程正处于空闲(idle)状态。如果当前有超过核心线程数,但少于最大线程数的线程处于运行(Running)状态,当且仅当队列满时才会扩容线程。如果设置最大线程数和核心线程数为同一个值,你能够创建一个固定大小(fixed-size)的线程池。如果设置最大线程数为一个无界值(例如Integer.MAX_VALUE
),你就能够创建一个能同时执行任意并发数任务的线程池。通常情况下,核心线程数和最大线程数仅是在构造时候就进行了设置,但是他们也可以在创建后通过set方法进行动态地修改.
Core and maximum pool sizes
A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) according to the bounds set by corePoolSize (see getCorePoolSize) and maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize and setMaximumPoolSize.
2. 按需构造
通常情况下,核心线程只会在新任务到来之际才进行初始化创建,但是我们可以通过使用方法prestartCoreThread
或prestartAllCoreThreads
来强制核心线程进行初始化。在使用一个非空的任务队列构造线程池的时候,你通常会需要先启动池中的核心线程。
On-demand construction
By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method prestartCoreThread or prestartAllCoreThreads. You probably want to prestart threads if you construct the pool with a non-empty queue.
3. 创建新线程
新线程允许通过ThreadFactory
来创建。如果不指定,则会使用默认的方法Executors.defaultThreadFactory
来进行创建;在这种情况下所有线程均在一个ThreadGroup
下,拥有NORM_PRIORITY
的执行权重,且非守护线程状态。通过提供一个不同的ThreadFactory
,你将可以定制现成的组,执行权重以及守护进程状态等等属性。如果ThreadFactory
在调用过程中创建线程失败并返回了null,任务提交会成功返回,但是可能无法执行任何任务。所有线程必须具有modifyThread
的运行时权限。如果工作线程或调用线程不具备该权限,服务会被降级: 被修改的配置可能不会及时地生效,并且一个关闭状态的线程池,会在任务执行完毕后依然处于无法关闭的状态。
Creating new threads
New threads are created using a ThreadFactory. If not otherwise specified, a Executors.defaultThreadFactory is used, that creates threads to all be in the same ThreadGroup and with the same NORM_PRIORITY priority and non-daemon status. By supplying a different ThreadFactory, you can alter the thread’s name, thread group, priority, daemon status, etc. If a ThreadFactory fails to create a thread when asked by returning null from newThread, the executor will continue, but might not be able to execute any tasks. Threads should possess the “modifyThread” RuntimePermission. If worker threads or other threads using the pool do not possess this permission, service may be degraded: configuration changes may not take effect in a timely manner, and a shutdown pool may remain in a state in which termination is possible but not completed.
4. 保活时间
如果当前池线程数超过了核心线程数大小,超过部分线程如果最长保活时间(keepAliveTime)仍然处于休眠(idle)状态,则线程资源会被回收。这个特性旨在当处于线程将不再被使用状态的场景下,能有效降低消耗的资源。通常情况下,保活时间策略只会在当线程数超过核心线程数时才会生效。但是如果调用了方法allowCoreThreadTimeOut(boolean)
,只要保活时间为一个正数,则可以对核心线程也能激活这个保活策略。
Keep-alive times
If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see getKeepAliveTime(TimeUnit)). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. This parameter can also be changed dynamically using method setKeepAliveTime(long, TimeUnit). Using a value of Long.MAX_VALUE TimeUnit.NANOSECONDS effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSize threads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero.
5. 队列
任何一个BlockingQueue
都可以被用来承载提交和待消费的任务。对队列的使用会对队列大小敏感:如果当前运行线程少于核心线程数,线程池总是会倾向于添加一个线程而不是入队。如果超过了核心线程数,线程池总是会倾向于入队而不是创建线程。如果一个任务无法入队(队列已满并被拒绝入队)这时才会继续创建线程。这里有三类常用的入队策略:
直接阻断提交
略无界队列
使用无界队列(比如不设置容量的LinkedBlockingQueue
)时,如果核心线程全部处于忙碌状态,则新任务将全部入队等待消费。在这种情况下,出核心线程以外不会创建任何额外的线程。(最大线程数在这种场景下不会有任何效果。)这种场景适用于当每个任务都是独立与其他任务执行的时候,所以任务与任务的执行状态不会产生任何相互影响,比如一个应用页面服务器。这种类型的如对策略适用于平滑处理大量短暂而连续的请求,在这种场景下默认在任务的无限制增长过程中,其处理速度最终是快于生产速度的。有界队列
一个有界队列(比如ArrayBlockingQueue
)会帮助线程池避免耗尽最大线程数的线程资源,但是会导致调节控制过程变得更为复杂。队列大小和最大线程池可能会产生相互影响: 使容量大的队列和比较小的线程数可以降低CPU占用,系统资源以及线程上下文切换的损耗,但是理论上会导致性能低下。如果任务总是处于阻塞状态(比如IO等待),系统就能分配更多的CPU时间给其他线程。使用较小的队列大小自然而然地需要更大的线程数,这样会导致CPU进入忙碌状态,但是同样会增加上下文切换的调度开销,这样也是不可取的做法。
Queuing
Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
There are three general strategies for queuing:
- Direct handoffs
A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
- Unbounded queues
Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn’t have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.
- Bounded queues
A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.
6. 拒绝策略
当线程池关闭时,新提交的任务会拒绝执行;同样的当任务消耗尽了用户指定的资源大小,无论是最大线程数还是队列大小,让其处于饱和状态,这种情况下任务也会被拒绝执行。无论是两者中任何一种场景,都会调用RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
方法来作为它拒绝策略。有四种预定义的拒绝策略方案:
默认的
ThreadPoolExecutor.AbortPolicy
, 会抛出一个运行时异常RejectedExecutionException
供用户捕获ThreadPoolExecutor.CallerRunsPolicy
, 会在当前线程直接执行任务。这样可以简单地反作用于控制逻辑来降低任务的提交速度ThreadPoolExecutor.DiscardPolicy
, 无法被执行的策略会被直接丢弃掉ThreadPoolExecutor.DiscardOldestPolicy
, 如果线程池没有被关闭, 队列头的任务会被舍弃掉, 然后再次尝试入队(如果仍然无法入队,则重试)
除此之外我们也可以自定义其他的拒绝策略。在做这件事的时候需要额外注意,拒绝策略需要工作在特定的队列大小或入队策略条件之上。
Rejected tasks
New tasks submitted in method execute(Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided:
In the default
ThreadPoolExecutor.AbortPolicy
, the handler throws a runtime RejectedExecutionException upon rejection.In
ThreadPoolExecutor.CallerRunsPolicy
, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.In
ThreadPoolExecutor.DiscardPolicy
, a task that cannot be executed is simply dropped.In
ThreadPoolExecutor.DiscardOldestPolicy
, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)It is possible to define and use other kinds of RejectedExecutionHandler classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies.
7. 钩子方法
线程池类提供了一个支持重载的beforeExecute(Thread, Runnable)
和afterExecute(Runnable, Throwable)
方法,他们分别会在任务开始和结束时调用。这样在执行每个任务的时候,就能更有效的操作执行环境了。比如说,写入一个线程变量,收集统计信息,或者添加一条日志记录。更进一步,当方法调用结束时我们可以指定一个额外的任务,直到线程池关闭。如果钩子方法抛出了异常,原本的工作线程将中断结束。
Hook methods
This class provides protected overridable beforeExecute(Thread, Runnable) and afterExecute(Runnable, Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated can be overridden to perform any special processing that needs to be done once the Executor has fully terminated.
If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.
8. 队列保护
队列的getQueue()
只允许任务消费队列查看或者debugging。任何其他意图的调用将被抛弃。remove(Runnable)
和purge
方法则只支持当大量队列中任务被取消时才允许被调用,以访问队列存储。
Queue maintenance
Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods, remove(Runnable) and purge are available to assist in storage reclamation when large numbers of queued tasks become cancelled.
9. 结束回收
如果一个线程池会被自动关闭,仅当该线程池不再被程序引用,且不再具有工作线程的时候。如果希望即使用户忘记关闭线程池,也想确保该线程池正确回收,那么必须确保工作线程最终会被销毁。为了实现这个,可以设置合适的保活时间,设置核心线程数为0,或者允许核心线程保活超时。
Finalization
A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically. If you would like to ensure that unreferenced pools are reclaimed even if users forget to call shutdown, then you must arrange that unused threads eventually die, by setting appropriate keep-alive times, using a lower bound of core threads and/or setting allowCoreThreadTimeOut(boolean).
10. 拓展样例
大多数拓展类需要重载一个或多个钩子方法。以如下为例,它拓展了一个简单的pause和resume特性
Extension example
Most extensions of this class override one or more of the protected hook methods. For example, here is a subclass that adds a simple pause/resume feature:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}