Java 多线程:线程池实现原理

前言

文章主要来自:点这里 。这也是博主的博客,主要分享了自己接触过的一些后端技术,有不对的地方希望可以提出。

线程池的相关类

我们都知道,所谓线程池,那么就是相当于有一个池子,线程就放在这个池子中进行重复利用,能够减去了线程的创建和销毁所带来的代价。但是这样并不能很好的解释线程池的原理,下面从代码的角度分析一下线程池的实现。

对于原理,在 Java 中,有几个接口,类 值得我们关注:

  1. Executor

  2. ExecutorService

  3. AbstractExecutorService

  4. ThreadPoolExecutor

Executor

public interface Executor {    void execute(Runnable command);}

Executor 接口只有一个 方法,execute,并且需要 传入一个 Runnable 类型的参数。那么它的作用自然是 具体的执行参数传入的任务。

ExecutorService

public interface ExecutorService extends Executor {    void shutdown();    List shutdownNow();    boolean isShutdown();     Future submit(Callable task);     Future submit(Runnable task, T result);    Future submit(Runnable task);     List> invokeAll(Collection> tasks)        throws InterruptedException;    ......}

ExecutorService 接口继承了 Executor,并且提供了一些其他的方法,比如说:

  1. shutdownNow : 关闭线程池,返回放入了线程池,但是还没开始执行的线程。

  2. submit : 执行的任务 允许拥有返回值。

  3. invokeAll : 运行把任务放进集合中,进行批量的执行,并且能有返回值

这三个方法也可以说是这个接口重点扩展的方法。

Ps:execute 和 submit 区别:

  1. submit 有返回值,execute 没有返回值。 所以说可以根据任务有无返回值选择对应的方法。

  2. submit 方便异常的处理。 如果任务可能会抛出异常,而且希望外面的调用者能够感知这些异常,那么就需要调用 submit 方法,通过捕获 Future.get 抛出的异常。

AbstractExecutorService

AbstractExecutorService 是一个抽象类,主要完成了 对 submit 方法,invokeAll 方法 的实现。 但是其实它的内部还是调用了 execute 方法,例如:

public Future submit(Runnable task) {    if (task == null) throw new NullPointerException();        RunnableFuture ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }

ThreadPoolExecutor

ThreadPoolExecutor 继承了 AbstractExecutorService,并且实现了最重要的 execute 方法,是我们主要需要研究的类。另外,整个线程池是如何实现的呢?

在该类中,有两个成员变量 非常的重要:

private final HashSet workers = new HashSet();private final BlockingQueue workQueue;

对于 workers 变量,主要存在了线程对象 Worker,Worker 实现了 Runnable 接口。而对于 workQueue 变量,主要存放了需要执行的任务。 这样其实可以猜到, 整个线程池的实现原理应该是 workQueue 中不断的取出需要执行的任务,放在 workers 中进行处理。

另外,当线程池中的线程用完了之后,多余的任务会等待,那么这个等待的过程是 怎么实现的呢? 其实如果熟悉 BlockingQueue,那么就会马上知道,是利用了 BlockingQueue 的take 方法进行处理

下面具体代码分析:

   public void execute(Runnable command) {        ......        if (workerCountOf(c) 1. 对于 maximumPoolSize:指的是 线程池中最多允许有多少个线程。1. 对于 corePoolSize: 指的是线程池中正在运行的线程。在 线程池中,有这样的设定,我们加入一个任务进行执行,1. 如果现在线程池中正在运行的线程数量大于 corePoolSize 指定的值而 小于maximumPoolSize 指定的值,那么就会创建一个线程对该任务进行执行,一旦一个线程被创建运行。1. 如果线程池中的线程数量大于corePoolSize,那么这个任务执行完毕后,该线程会被回收;如果 小于corePoolSize,那么该线程即使空闲,也不会被回收。下个任务过来,那么就使用这个空闲线程。对于上述代码,首先有:if (workerCountOf(c) 也就是说,判断现在的线程数量是否小于corePoolSize,如果小于,那么就创建一个线程执行该任务,也就是执行addWorker(command, true)如果大于,那么就把该任务放进队列当中,即workQueue.offer(command)那么,addWorker 是干什么的呢?

private boolean addWorker(Runnable firstTask, boolean core) {
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());

            if (rs  largestPoolSize)                        largestPoolSize = s;                    workerAdded = true;                }         ......}
在这里可以看到一些关键代码,例如 w = new Worker(firstTask), 以及 workers.add(w); 从这里 我们就可以看到,创建 线程对象 并且加入到 线程 队列中。但是,我们现在还没有看到具体是怎么执行任务的,继续追踪w = new Worker(firstTask),如下代码:

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
......

    final Thread thread;    Runnable firstTask;    Worker(Runnable firstTask) {        setState(-1); // inhibit interrupts until runWorker        this.firstTask = firstTask;        this.thread = getThreadFactory().newThread(this);    }    public void run() {        runWorker(this);    }    ......
对于 runWorker 方法:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
}
......
}

在这段代码中,就有很多关键的信息,比如说,Runnable task = w.firstTask;如果为空,那么就 执行 task = getTask(),如果不为空,那么就 进行 task.run(); 调用其方法,**这里也就是具体的执行的任务**。现在知道了是怎么样执行具体的任务,那么假如任务的数量 大于 线程池的数量,那么是怎么实现等待的呢,这里就需要看到getTask() 的具体实现了,如下:

private Runnable getTask() {
for (;;) {
......
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

这里可以看到, 一个 for 死循环,以及 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();而 workQueue 是 BlockingQueue 类型,也就是带有阻塞的功能。这就是 线程如何等待执行的。# 总结现在就可以知道,大致的线程池实现原理:首先,各自存放线程和任务,其中,任务带有阻塞。

private final HashSet workers = new HashSet();
private final BlockingQueue workQueue;

然后,在 execute 方法中 进行 addWorker(command,true),也就是创建一个线程,把任务放进去执行;或者是直接把任务放入到任务队列中。接着 如果是 addWorker,那么就会 new Worker(task) -》调用其中 run() 方法,在Worker 的run() 方法中,调用 runWorker(this); 方法 -》在该方法中就会具体执行我们的任务 task.run(); 同时这个 runWorker方法相当于是个死循环,正常情况下就会一直取出 任务队列中的任务来执行,这就保证了线程 不会销毁。所以,这也是为什么常说的线程池可以避免线程的频繁创建和 销毁带来的性能消耗。# 写在最后1. 写出来,说出来才知道对不对,知道不对才能改正,改正了才能成长。1. 在技术方面,希望大家眼里都容不得沙子。如果有不对的地方或者需要改进的地方希望可以指出,万分感谢。#java、线程池、多线程、线程#


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部