LZ项目中用到JUC的线程池,发现调用shutdown和shutdownNow方法,池中的线程还是会继续执行。只是调用shutdownNow方法时,池中线程的阻塞方法(LZ代码中是sleep)会抛出InterruptedException。因此LZ要分析下这里面的具体实现。
1.shutdown
已经submit的task会继续执行(即使该task还未开始执行),不再接收新的task的submit。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } |
- checkShutdownAccess():java安全管理器检查应用程序,是否有操作线程的权限。关于java安全管理器。
- advanceRunState(SHUTDOWN):修改线程池状态,这里设置为SHUTDOWN(线程池状态机)。
- interruptIdleWorkers():中断空闲的workers。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } |
因为线程池中的每个线程,都保存在Worker中(ThreadPoolExecutor内部类Worker),1对1的关系。所以遍历Worker的集合,判断其中哪个线程可以中断。
PS:这里明确两个概念:
- workQueue:BlockingQueue
。这个阻塞队列存储的是调用submit提交的那个Runnable类,在workers里面没有空闲worker来处理task的时候,会将task放入这里面。 -
workers:HashSet
。这个集合来存储worker,worker这个类其实是对提交的Runnable类的封装,而且它自己也实现了Runnable接口,它的run方法里面调用的就是Runnable类的run方法。workers里worker的数量,取决于定义的ThreadPool的类型和大小。关于Worker类。
- t.isInterrupted():如果线程已经中断过了,就不用中断了。
- w.tryLock():这个方法用来判断worker是否空闲。
1 2 3 4 5 6 7 8 9 10 |
public boolean tryLock() { return tryAcquire(1); } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } |
compareAndSetState(0, 1):比较当前state是否是0(空闲),是0就返回true,然后将state设置为1,否则返回false。
这个方法使用Unsafe的CAS操作保证了原子性。关于Unsafe类。
Worker在开始执行task之后,会将state设置为1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } |
- w.lock():开始执行task,state设置为1。
- w.unlock():task执行结束(完成or异常),state设置为0。
1 2 3 4 5 6 7 8 |
public void lock() { acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } |
lock():调用AQS的acquire,acquire里面调用tryAcquire,就是实现了AQS的子类的tryAcquire,这里就是Worker类。tryAcquire将state设置为1(源码见上面)。关于AQS。
判断当前线程未被中断,且worker是空闲状态,调用t.interrupt()。getTask()里面的阻塞方法(读取阻塞队列)会响应这次中断,抛出InterruptedException,跳出while循环,执行processWorkerExit。关于线程中断:《Java并发编程实战》。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } |
- decrementWorkerCount():减少ThreadPool的容量(worker的最大数量)。
- workers.remove(w):移除worker。
阻塞队列里面的task虽然还未执行,但是没有被删除掉,所以还是会执行。
2.shutdownNow
已经submit并且执行task会继续执行,如果task里面有阻塞方法,会抛出InterruptedException。已经submit但尚未执行的task,就不会再执行了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } |
- advanceRunState(STOP):线程池状态设置为STOP。
- interruptWorkers():中断所有workers。
- drainQueue():清除workQueue里面的task。
1 2 3 4 5 6 7 8 9 10 11 |
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } |
workers里面所有的worker都进行中断,所以提交的Runnable类中的阻塞方法都会抛出InterruptedException。但是具体要不要中断该线程,就要自己进行处理了。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; } |
清除了workQueue里面已经submit但尚未执行的任务,所以这些任务就不会再执行了。
7 Replies to “JUC线程池源码阅读1:ThreadPool的shutdown”