Java线程池、Executor原理分析

本文出自:【InTheWorld的博客】 (欢迎留言、交流)thread_pool1. 线程池作用与基本知识

在开始之前,我们先来讨论下“线程池”这个概念。“线程池”,顾名思义就是一个线程缓存。它是一个或者多个线程的集合,用户可以把需要执行的任务简单地扔给线程池,而不用过多的纠结与执行的细节。那么线程池有哪些作用?或者说与直接用Thread相比,有什么优势?我简单总结了以下几点:

  • 1. 减小线程创建和销毁带来的消耗

对于Java Thread的实现,我在前面的一篇blog中进行了分析。Java Thread与内核线程是1:1(Linux)的,再加上Thread在Java层与C++层都有不少成员数据,所以Java Thread其实是比较重的。创建和销毁一个Java Thread需要OS和JVM都做不少工作,因此如果将Java Thread缓存起来,可以实现一定的效率提升。

  • 2. 更加方便和透明的实现计算资源控制

讨论这一条,可能需要举一些例子。以非常闻名的web服务器Nginx为例,Nginx以强大的并发能力和低资源消耗而著称。Nginx为了实现这些严格的要求,它严格地限定了工作线程的数目(worker线程一般等于CPU数目)。这种设计的着眼点就是降低线程切换带来的性能损失,这条优化方式对Java同样适用。倘若,每来一个任务就新建一个Thread来运算,那最终的结果就是程序资源难以控制(某个功能把CPU跑满了),而且整体的执行速度也比较慢。 而Java线程池提供了FixedThreadPool,你可以使用它实现线程最大数目的控制。

上面说了这么多的“废话”,还是来结合Java线程池的实现来分析一下吧!Java的线程池有一下几种实现:

  • cached ThreadPool

缓存线程池的特点是它会缓存之前的线程,新提交的任务可以运行在缓存的线程中,即实现了前文所述的第一个优势。

  • fixed ThreadPool

cachedThreadPool的一个特点是——新提交的任务没有空闲线程可以执行了,就会创建一个新的线程。而fixedThreadPool不会这样,它会将任务保存起来,等到有空闲线程再执行。即实现了前文所述的第二个优势。

  • scheduled ThreadPool

scheduled ThreadPool的特点是可以实现任务的调度,比如任务的延迟执行和周期执行。

出了上面三种,Java还实现了newWorkStealingPool,这个是基于Fork/Join框架的。目前我还没研究这个,所以就先不管它了。Java的并发支持中,使用了Executor来包装各种线程池,“执行器”这个名称其实挺贴切的,线程池可不就是个执行器嘛!

 

2. cached ThreadPool、fixed ThreadPool的实现

从前文的描述就可以看出,这两种线程池非常类似。的确是这样,事实上它们是同时实现的,不行我们来看实际例子:

 ThreadPoolExecutor executor1 = (ThreadPoolExecutor)Executors.newCachedThreadPool();

    ThreadPoolExecutor executor2 = (ThreadPoolExecutor)Executors.newFixedThreadPool(4);

这是两种线程池的新建方法,看起来很像吧!如果你不这么认为,我只能让你看看真相了。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

是的,它们调用了同一个构造函数,只是参数略有不同。那么我们来看看这些参数的含义,以及两组参数的区别。首先还是需要贴一下ThreadPoolExecutor的构造函数了。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

为了看起来清爽,再一层的构造函数我就不贴了,而且那个构造函数也只是简单的赋值而已。这里的函数原型已经能给我们很多很多信息了,不得不说JDK的代码命名确实好,简直就像注释一样。

  • maximumPoolSize就是线程池的最大线程数;对于cached ThreadPool来说,这个值是Integer.MAX_VALUE,基本相当于无穷大了,什么样的机器能跑几十亿线程!!对于fixed ThreadPool来讲,这个值就是用户设定的线程池的数目。
  • keepAliveTime和unit决定了线程的缓存过期时间;对于cached ThreadPool来讲,线程的缓存过期时间是一分钟,换言之,一个工作线程如果一分钟都无事可干,就把它撤销掉以节省资源。fixed ThreadPool传入的时间是0,这里的含义是fixed ThreadPool中的工作线程是永远不过期的。
  • corePoolSize是线程池的最小线程数;对于cached ThreadPool,这个值为0,因为在完全没有任务的情况下,cached ThreadPool的确会成为“光杆司令”。至于fixed ThreadPool,这个fixed已经表明corePoolSize是等于线程总数的。

接下来,我们根据一个简单的使用例子,来看看一下cached ThreadPool的流程。

public class Task implements Callable {
    
    private String name;
    public Task(String name) {
        this.name = name;
    }
    @Override
    public String call() throws Exception {
        System.out.printf("%s: Starting at : %s\n", this.name, new Date());
        return "hello, world";
    }
    public static void main(String[] args) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
        Task task = new Task("test");
        Future result = executor.submit(task);
        try {
            System.out.printf("%s\n", result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        System.out.printf("Main ends at : %s\n", new Date());
    }
}

首先,来看看executor.submit(task),这其实调用了ThreadPoolExecutor.execute(Runnable command)方法,这个方法的代码如下,整段代码的逻辑是这样的。首先检查线程池的线程数是否不够corePoolSize,如果不够就直接新建线程并把command添加进去;如果线程数已经够了或者添加失败(多个线程增加添加的情况),就尝试把command添加到队列中(workQueue.offer(command)),如果添加失败了,就reject掉cmd。大体的逻辑是这样的,这段代码有很多基于线程安全的设计,这里为了不跑题,就先忽略细节了。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

到这里,看起来线程池实现的整体思路其实也没多么复杂。但是还有一个问题——一个普通的Thread在执行完自己的run方法后会自动退出。那么线程池是如何实现Worker线程不断的干活,甚至在没有任务的时候。其实答案很简单,就是Worker其实在跑大循环,Worker实际运行方法如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
        /***/
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    /***/
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

关键就在这个while的判断条件,对于需要cached线程的情况下,getTask()会阻塞起来,如果缓存的时间过期,就会返回一个null,然后Worker就退出了,也就结束了它的服役周期。而在有任务的情况下,Woker会把task拿出来,然后调用task.run()执行任务,并通过Future通知客户线程(即future.get()返回)。这样一个简单的线程池使用过程就完了。。。

当然,线程池的很多精髓知识——基于线程安全的设计,我都没有分析。有兴趣可以自己分析一下,也可以和我讨论。此外Scheduled ThreadPool这里也没有分析,它的要点其实是调度,主要是根据时间最小堆来驱动的。

 

 

 

好吧!这篇博客就到这里!

(写博客的时候在单曲循环——“黑夜无情孤独仰望,亲爱的你是那最远的星吗?~~~”)

已有4条评论 发表评论

  1. 123 /

    6666先顶后看

    1. lshw4320814 / 本文作者

      小🐎,好!

  2. 陈照俊 /

    看完再顶

    1. lshw4320814 / 本文作者

      原来是Rank,尼嚎👋

回复给lshw4320814 取消回复