跳到主要内容

线程池源码

一. Java 构建线程的方式

  • 继承 Thread: Thread 类已经实现了 Runnable,继承 Thread 类时本质也是间接实现了 Runnable
  • 实现 Runnable: Java 构建线程本质上只有 Runnable 方式
  • 实现 Callable: 手动实现的话,将 Callable 的实现传入到 FutureTask,将 FutureTask 传入 Thread 的有参构造方法,FutureTask 本身也间接的实现了 Runnable 接口。
  • 线程池: 线程池构建工作线程时,构建的 Worker 对象也实现了 Runnable 接口
  • Lambda

二. 线程池的七个参数

  • 为什么要使用线程池(多线程的目的是为了充分发挥硬件资源的性能)
    • 场景: 接口优化(串行 -> 并行)
    • 管理线程(控制线程数量): 线程需要占用内存资源,线程需要被 CPU 调度,如果不控制线程数量,可能造成业务处理速度变慢
    • 重复利用(重复创建问题): 每次线程的构建都需要分配内存资源,使用完成后需要归还内存资源,成本很高,利用连接池的思想构建好反复用
  • JDK 提供了 Executors 的工具类,方便构建常用的线程池,由于 Executors 封装好之后,很多参数无法主动设置,无法更好管理线程池,使用线程池时强烈推荐手动构建 ThreadPoolExecutor

七个参数

线程池参数

  • corePoolSize: 核心线程数,核心线程默认情况下不会被销毁,一直留在线程池中
  • maximumPoolSize: 最大线程数,核心线程数 + 非核心线程数
  • keepAliveTime: 非核心线程的最大空闲时间
  • TimeUnit: 最大空闲时间单位
  • BlockingQueue: 阻塞队列,有任务时先让核心线程处理,核心线程处理不过来时,堆放在 BlockingQueue 中,实在无法处理时,临时构建非核心线程处理 workQueue 中的任务
  • threadFactory: 构建 thread 对象的工厂
  • RejectedExecutionHandler: 拒绝策略,所有线程没有空闲,并且阻塞队列放满时,对新任务的拒绝策略

三. 线程池属性标识 & 线程池的状态

3.1 核心属性

  • 线程池中的核心属性是 int 类型的 ctl,ctl 的高 3 位维护了线程池状态,低 29 位维护了工作线程个数

    //线程池的核心属性 ctl
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // ctl 属性标识了线程池的两个信息
    // int 是一个 32 个 bit 位的数值,其中高 3 位表示线程池状态,低 29 位表示工作线程个数(工作线程的最大值:2^29-1)

    // Integer.SIZE 是获取 int 类型占用的位数,Integer.SIZE = 32
    private static final int COUNT_BITS = Integer.Size - 3

    // CAPACITY 表示线程池中最大的工作线程个数
    private static final int CAPACITY = (1 << COUNT_BITS) - 1
    // 00000000 00000000 00000000 00000001 (1)
    // 00100000 00000000 00000000 00000000 (1 << 29)
    // 00011111 11111111 11111111 11111111 (1 << 29) - 1

线程池的状态

  • RUNNING: 一切正常,任务正常处理
    SHUTDOWN: 不接收新任务,但是正在处理的任务要完成,阻塞队列的任务要处理完毕
    STOP: 不接收新任务,中断正在处理的任务,阻塞队列的任务全部丢掉
    线程池的状态
    // 线程池的 5 个状态  
    private static final int RUNNING = -1 << COUNT_BITS; // 111
    private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
    private static final int STOP = 1 << COUNT_BITS; // 001
    private static final int TIDYING = 2 << COUNT_BITS; // 010
    private static final int TERMINATED = 3 << COUNT_BITS; // 011

四. 线程池的执行流程

  • 分析执行流程需要查看 execute 方法 执行流程
    // 提交任务到线程池的方法  
    public void execute(Runnable command){
    // 非空判断
    if (command = null)
    throw new NullPointerException();
    // 获取核心属性
    int c = ctl.get();
    // workerCountOf: 获取工作线程个数
    // 当前工作线程个数 < 核心线程数
    if (workerCountOf(c) < corePoolSize ) {
    // 添加 1 个工作线程
    // true: 核心线程 false: 非核心线程
    // 添加工作线程会有并发问题,添加成功返回true,添加失败返回false
    if (addWorker(command, true))
    // 如果添加成功,返回
    return;
    // 添加失败,重新获取 ctl,判断情况
    c = ctl.get();
    }
    // inRunning: 线程池状态是否为 RUNNING
    // 如果是 RUNNING,就把任务扔到阻塞队列中排队
    if (inRunning(c) && workQueue.offer(command)){
    // 再次获取 ctl 属性
    int recheck = ctl.get();
    // 再次确认线程池状态是否 RUNNING,如果不是 RUNNING,把任务从阻塞队列移除
    if (!isRunning(recheck) && remove(command))
    // 执行拒绝策略
    reject(command);
    // 阻塞队列中有任务但是没有工作线程
    else if (workerCountOf(recheck) == 0)
    // 创建一个非核心线程去处理阻塞队列中的任务
    addWorker(null, false);
    }
    // 如果任务添加到阻塞队列失败,添加非核心线程处理任务
    else if (!addWorker(command, false))
    // 如果添加非核心线程失败,执行拒绝策略
    reject(command);
    }

五. 添加工作线程的流程

  • 添加工作线程本质就是 addWorker 的流程

    private boolean addWorker(Runnable firstTask, boolean core) {
    // ==================线程池状态 & 工作线程个数判断==================
    retry: // 标识,用于在内层 for 循环退出外层 for 循环
    for (;;) {
    // 拿到 ctl 并拿到高三位的值
    int c = ctl.get();
    int rs = runStateOf(c); // 线程池状态
    // 状态 >= SHUTDOWN (如果状态不是 RUNNING,不需要添加工作线程)
    if (rs >= SHUTDOWN &&
    // 阻塞队列有任务,没有工作线程,会 addWorker(null, false);
    // 线程池状态时 SHUTDOWN,并且阻塞队列有任务,没有工作线程可以处理
    // 针对 addWorker(null, false) 的放行
    ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
    // 走到这一步就无法处理,线程池状态不对,直接退出
    return false;

    // 判断工作线程个数
    for (;;) {
    int wc = workerCountOf(c); // 获取工作线程个数
    if (wc >= CAPACITY || // 如果大于 低29位,直接退出
    wc >= (core ? corePoolSize : maximumPoolSize)) // 根据core 决定线程数判断依据(核心线程数/最大线程数)
    return false; // 超过直接返回false
    if (compareAndIncrementWorkerCount(c)) // 基于 CAS 对 clt + 1
    break retry; // 如果成功,直接跳出外层循环
    // 没有跳出循环说明 CAS 失败,出现并发情况,重新获取 ctl
    c = ctl.get();
    if (runStateOf(c) != rs) // 如果状态改变了,重新进行外层 for 循环
    continue retry;
    // else CAS failed due to workerCount change; retry inner loop
    }
    }
    // ==================创建工作线程 & 启动工作线程==================
    boolean workerStarted = false; // 任务是否成功启动标识
    boolean workerAdded = false; // 任务是否添加成功标识
    Worker w = null;
    try {
    w = new Worker(firstTask); // new 一个工作线程,将任务扔进去
    final Thread t = w.thread; // 拿到工作线程的 Thread
    // Thread 是 ThreadFactory 构建出来的,如果构建的是 null,表示程序员提供的 ThreadFactory 有问题
    if (t != null) {
    // 加锁,因为后面会对 HashSet 以及 int 进行操作,保证线程安全
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    int c = ctl.get();
    int rs = runStateOf(c);
    // 状态是否 RUNNING
    if (rs < SHUTDOWN ||
    (rs == SHUTDOWN && firstTask == null)) {
    if (t.isAlive()) // 线程是否启动,避免程序员提供的 ThreadFactory 有问题
    throw new IllegalThreadStateException();
    workers.add(w); // 工作线程添加到 Set 集合,用 HashSet 维护工作线程
    int s = workers.size();
    // largestPoolSize 记录线程池中工作线程的最大值
    if (s > largestPoolSize)
    largestPoolSize = s;
    workerAdded = true; // 添加成功标识
    }
    } finally {
    mainLock.unlock();
    }
    if (workerAdded) {
    // 添加工作线程成功,启动工作线程,并且设置 workerStarted 为 True
    t.start();
    workerStarted = true;
    }
    }
    } finally {
    if (! workerStarted)
    addWorkerFailed(w);
    }
    return workerStarted;
    }

六. Worker 的封装 & 后续任务的处理

  • 调用 Worker 的有参构造方法时,会基于 ThreadFactory 构建线程,并且将 Worker 对象本身作为 Runnable 传入到 Thread 对象中,当执行 Thread 的 start 方法时,会执行 Worker 的 run 方法,最终执行的是 run 方法内部的 runWorker 方法

    final void runWorker(Worker w){
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock()
    boolean completedAbruptly = true;
    try {
    // 任务的处理方式有 2 种
    // 1. 直接基于 addWorker 携带过来的任务,优先处理
    // 2. 如果 addWorker 携带的任务处理完毕或者没携带任务,直接从阻塞队列中获取
    while(task != null || (task = getTask()) != null){
    // 处理任务!!!省略
    task.run();
    }
    completedAbruptly = false;
    } finally {
    processWorkerExit(w, completedAbruptly);
    }
    }

    // 省略部分代码
    private Runnable getTask() {
    for(;;) {
    try {
    Runnable r = timed ?
    // 从阻塞队列获取任务
    // 如果执行 poll,代表非核心线程,等待一会,没任务则退出(有最大空闲时间)
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    // 如果执行 take,代表是核心线程,死等
    workQueue.take();
    if (r != null)
    return r;
    } catch (InterruptedException retry) {
    timeOut = false;
    }
    }
    }