线程池
线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
为什么要使用线程池:
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复
使用,或利用,可以并发执行多个任务
2.可以根据系统的承受能力,调整线程池中的工作线程数目
防止因为消耗过多的内存,而使服务器宕机(down)
结论:就是java给提供一批api方法,用于更好的管理线程
程序员只需要专注于写run方法(线程的逻辑)
自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package thread.pool; //阻塞式队列 import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 此类演示手动创建线程池 * 核心:线程池执行器 * @author gavino * */ public class TestExecutorService { public static void main(String[] args) { //手动创建一个线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5),new RejectedExecutionHandler() { //回调方法 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("线程数超过了线程池的容量,拒绝执行任务-->"+r); } }); //创建线程对象,并把线程对象放在线程池中,并执行 (这样并不等于执行run ) threadPool.execute(new TestThread(1)); threadPool.execute(new TestThread(2)); threadPool.execute(new TestThread(3)); threadPool.execute(new TestThread(4)); threadPool.execute(new TestThread(5)); threadPool.execute(new TestThread(6)); threadPool.execute(new TestThread(7)); threadPool.execute(new TestThread(8)); threadPool.execute(new TestThread(9)); threadPool.execute(new TestThread(10)); threadPool.execute(new TestThread(11)); threadPool.execute(new TestThread(12)); threadPool.execute(new TestThread(13)); threadPool.execute(new TestThread(14)); threadPool.execute(new TestThread(15)); threadPool.execute(new TestThread(16)); } } /** * 写线程的任务逻辑 * @author gavino * */ class TestThread implements Runnable{ private int num; public TestThread(int num) { this.num=num; } @Override public void run() { try { System.out.println("第"+num+"号线程开始执行"); Thread.sleep(3000); System.out.println("第"+num+"号线程执行结束"); } catch (InterruptedException e) { e.printStackTrace(); } }}
|
对上面代码进行分析
- ThreadPoolExecutor 及其相关参数
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
corePoolSize:核心池大小
maximumPoolSize:最大池大小
keepAliveTime:空闲的多余线程保持时间
unit:时间单位
workQueue:阻塞式队列
handler:拒绝服务助手
- 在上面的代码中,一共创建了16个线程对象
核心线程满了,线程会进入阻塞队列,阻塞式队列满了,才开临时线程
能执行的线程总数为: 核心线程数量 + 阻塞队列中的线程数量 + 临时线程数量
在上面的代码中,corePoolSize为5,maximumPoolSize为10,
ThreadPoolExecutor线程池再分析
- 刚开始时线程池是空的,当有线程任务来临时则先用核心线程来处理
- 如果有新任务,并且之前有空闲线程,但不足核心池大小则创建新的线程
- 如果有新任务,核心线程数已经达到核心池大小,并且有空闲线程,则使用空闲线程来处理
- 如果有新任务,但核心线程都忙着,则把线程放在队列中,等核心线程有空闲了,再从队列中取出线程来处理
- 如果核心线程都忙,并且队列也满了,则需要开临时线程处理新任务,但不能超过最大线程数目
- 如果核心线程有空闲,队列中也没有阻塞的任务,则根据线程存活时间来销毁线程池中的线程(被销毁的线程有可能是核心线程,也有可能是临时线程),直到等于核心线程数为止
- 如果核心线程数、队列中的线程数、临时线程数都是最大值时,则回调handler提示
注意
- ThreadPoolExecutor 将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小
- 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的
- 如果运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才创建新线程
- 如果设置的corePoolSize和maximumPoolSize相同,则创建了固定大小的线程池
- 如果将maximumPoolSize设置为基本的无界值(如Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
- 在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用setCorePoolSize(int)和setMaximumPoolSize(int)进行动态更改。
线程池的使用:
有一个Executors的线程工具类,此类提供了若干静态方法
这些静态方法用于生成线程池的对象
Executors.newSingleThreadExecutor();
创建一个单线程的线程池,这个线程池只有一个线程在工作即单线程执行任务,如果这个唯一的线程因为异常结束,那么就会有一个新的线程来替代它,,因此线程池保证所有的任务是按照任务的提交顺序来执行
Executors.newFixedThreadPool();
创建固定大小的线程池,每次提交一个任务就创建一个线程直到线程达到线程池的最大的大小,线程池的大小一旦达到最大就会保持不变,如果某个线程因为执行异常而结束,那么久会补充一个新的线程
Executors.newCachedThreadPool();
创建了一个可以缓冲的线程池,如果线程大小超过处理任务所需要的线程,那么就回收部分线程,当线程数增加的时候,此线程池由可以智能添加新的线程来处理任务,此线程不会对线程池大小做限制,线程池的大小完全依赖操作系统能够创建的最大大小
Executors.newScheduledThreadPool()
创建一个大小无限制的线程池,此线程池支持定时以及周期性的执行任务的需求
Executors.newWorkStealingPool()
(JDK8新增)会根据所需的并发数来动态创建和关闭线程。能够合理的使用CPU进行对任务进行并发操作,所以适合使用在很耗时的任务。
注意返回的是ForkJoinPool对象。
什么是ForkJoinPool:
使用一个无限队列来保存需要执行的任务,可以传入线程的数量,
不传入,则默认使用当前计算机中可用的cpu数量,
使用分治法来解决问题,使用fork()和join()来进行调用
ThreadPoolExector源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| // public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
//ForkJoinPool: public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
|
源码结构分析
- Executor 是一个运行新任务的简单接口
- ExecutorService 扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法
- AbstractExecutorService 对ExecutorService接口的抽象类实现。
- ThreadPoolExecutor 是Java线程池的核心实现。
在向线程池提交任务时,会通过两个方法:execute和submit。
execute方法:
execute方法最主要的逻辑是在addWorker方法中实现的
addWorker方法
addWorker方法的主要工作是在线程池中创建一个新的线程并执行
因为workers是HashSet类型的,不能保证线程安全。所以需要持有mainLock。
Worker.java
Worker 继承了AQS并发框架还实现了Runnable。证明它还是一个线程任务类。那我们调用t.start()事实上就是调用了该类重写的run方法.
其 run 方法又调用了runWorker方法:
1 2 3
| public void run() { runWorker(this); }
|
runWorker方法:
runWorker方法的执行过程:
- while循环中,不断地通过getTask()方法从workerQueue中获取任务
- 如果线程池正在停止,则中断线程。否则调用3.
- 调用task.run()执行任务;
- 如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);