博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
这两天学的线程池归纳
阅读量:5971 次
发布时间:2019-06-19

本文共 8533 字,大约阅读时间需要 28 分钟。

1 线程池 ---》接口 Executor

2 线程池 ---》接口 ExecutorService

final ExecutorService poolExecutor = Executors.newFixedThreadPool(threadNum);

poolExecutor.submit(new ConvertOne(latch,seperateMap,dbname));

poolExecutor.shutdown();

shutdown()方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务的启动并试图停止当前正在执行的任务。在终止后,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService以允许回收其资源。
例子:网络服务的简单结构

3 线程池 ---》类 ThreadPoolExecutor
由于内容较多没有一一研究,只看了较常用的 ThreadPoolExecutor ,所以在这里做个介绍。ThreadPoolExecutor 的继承关系如下。
Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor
已实现的接口: Executor, ExecutorService

每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。但是,强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程)。

关于ThreadPoolExecutor的 execute方法:

public void execute(Runnable command)
这里给出源码:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));ctl是一个重要的变量,主要包装两个重要的概念:一是workerCount:effective number of threads,二是runState:  indicating whether running, shutting down RUNNING可以接受新的task,并且可以处理queue中的task,SHUTDOWN不可以接受新的task,但是可以处理queue中的task,其他的全都不可以。int c = ctl.get();if (workerCountOf(c) < corePoolSize) { 判断线程数是否小于corePoolSize。否则进入②if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) { //② 判断线程是否是running状态,并且阻塞队列的容量没有达到上限int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);//如果此处状态不是RUNNING,也不是SHUTDOWN,或者removeFirstOccurrence(队列中移除第一个阻塞的任务,返回ture)那么,则拒绝任务else if (workerCountOf(recheck) == 0)addWorker(null, false);//由于任务放到了BlockingQueue中,此处,在Worker中,不添加task,而是运行任务时,从queue取出task}else if (!addWorker(command, false))//除了以上情况以外,比如BlockingQueue饱和了,线程池容量也饱和了,执行饱和策略,默认为AbortPolicy,拒绝任务reject(command);}

  

常用两个阶段来关闭ExecutorServise

( 1 shutdown 拒绝传入任务 ,当所有已提交任务执行完后,就关闭。如果已经关闭,则调用没有其他作用。 抛出:
        SecurityException - 如果安全管理器存在并且关闭,此 ExecutorService 可能操作某些不允许调用者修改的线程。
(2 在!pool.awaitTermination(60, TimeUnit.SECONDS) 任务还没有执行完,取消所有遗留的任务 shutdownNow

关于构造的参数 corePoolSize ,maximumPoolSize的说明

注意1:在新任务被提交时,如果运行的core线程少于corePoolSize,才创建新core线程。并不是一开始就创建corePoolSize个core线程。
注意2:"如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程"
注意3:如果两个相同则固定大小的线程池(这时候可以用blockingQueue 进行排队,用来传输和保持提交的任务,使用队列与线程池的大小进行交互)
注意4: maximumPoolSize设置成 Integer.MAX_VALUE,则允许使用任何数量的并发任务。无界
注意5:核心线程即core线程,只有当前线程数小于等于corePoolSize时,这时的线程才叫核心线程。

构造函数:

(1)public ThreadPoolExecutor(int corePoolSize, //池中所保存的线程数,包括空闲线程。                          int maximumPoolSize,//池中允许的最大线程数。                          long keepAliveTime,//当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。                          TimeUnit unit,//参数的时间单位。                          BlockingQueue
workQueue, ThreadFactory threadFactory)使用ThreadFactory创建新线程,如果没有另外说明,则使用 Executors.defaultThreadFactory() 创建线程,他们在同一个ThreadGroup中, 并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。(2)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue)使用workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 线程池所使用的缓冲队列,改缓冲队列的长度决定了能够缓冲的最大数量。拒绝任务:拒绝任务是指当线程池里面的线程数量达到 maximumPoolSize 且 workQueue 队列已满的情况下被尝试添加进来的任务。(3)public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, RejectedExecutionHandler handler)使用handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

关于线程处于非活动状态如何减少资源消耗:【空闲线程的回收】

注意1:setKeepAliveTime(long, java.util.concurrent.TimeUnit)用于设置空闲线程最长的活动时间,
    即如果空闲时间超过设定值,就停掉该线程,对该线程进行回收。
    该策略默认只对非内核线程有用(即当前线程数大于corePoolSize),
    可以调用allowCoreThreadTimeOut(boolean)方法将此超时策略扩大到核心线程。 设置适当保持活动时间,使用0核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)。
注意2:如果把值设为Long.MAX_VALUE TimeUnit.NANOSECONDS的话,空闲线程不会被回收直到ThreadPoolExecutor为Terminate。

关于blockingQueue 进行排队与线程池大小进行交互:

为什么要与线程池大小进行交互,用于传输和保持提交的任务,

(1、 corePoolSize, maximumPoolSize的设置
    * 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
         * 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
         * 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
(2、排队的三种通用策略
  直接提交【 默认选项 synchronousQueue 】此策略允许线程无界的增长。
任务直接提交给线程,而不保持它们。
如果不存在立即运行任务的线程,会构造新的线程,通常需要设置maximumPoolSize无界,否则无法创建新线程,导致拒绝新任务的提交。
  无界队列【例如,没有预定容量的LinkedBlockingQueue】此策略适合web服务器类型,每个任务独立于其他任务。任务互不影响。允许队列无线的增长,适合处理瞬态突发请求。
创建的线程永远不会超过corePoolSize 但是maximumPoolSize就失效了。
  有界队列【例如,ArrayBlocking】
 队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。
 使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。Queue】此策略有助于防止资源耗尽,但是可能较难调整和控制。

Java 线程池 ThreadPoolExecutor.的拒绝策略

在 ThreadPoolExecutor 里面定义了 4 种 handler 策略,分别是

(1)CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。只要线程池未关闭,该策略直接在调用者线程中运行当前被丢弃的任务。显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降。
(2)AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。(默认的)RejectedExecutionException
(3)DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。
(4)DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
关于这个给出源码:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}

扩展RejectedExecutioHandler接口,自定义拒绝策略

先看下RejectedExecutionHandler接口吧:

public interfaceRejectedExecutionHandler{
    voidrejectedExecution(Runnable r,ThreadPoolExecutor executor);
}
再举个例子吧:

1 public classTestRejectHandler { 2     class MyTask implements Runnable{ 3        @Override 4        public void run() { 5            System.out.println("Thread ID:"+Thread.currentThread().getId()); 6            try{ 7               Thread.sleep(1000); 8            }catch(InterruptedException e){ 9               e.printStackTrace();10            }11        }12     }13     public void test(){14        ThreadPoolExecutor executor= newThreadPoolExecutor(5, 5,15               0L, TimeUnit.MILLISECONDS,16               newLinkedBlockingQueue(10),17               Executors.defaultThreadFactory(),18               newRejectedExecutionHandler() {  19                   @Override20                   public voidrejectedExecution(Runnable r, ThreadPoolExecutor executor) {21                      System.out.println(r.toString()+" 被抛弃了");22                   }23               });24        MyTask task= newMyTask();25        for(int i=0;i<20;i++){26            executor.submit(task);27        }28        executor.shutdown();29     }

最后给出一个简单的利用线程池的例子

public class ThreadPoolTermination {	public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS,			new LinkedBlockingQueue
(10), new SolrjNamedThreadFactory("Termination"), new ThreadPoolExecutor.CallerRunsPolicy()); //SolrjNamedThreadFactory监控线程 //CallerRunsPolicy 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 static class TestThread implements Runnable { int i; public TestThread(int i) { this.i = i; } @Override public void run() { try { String thrdName = Thread.currentThread().getName(); //dosomethind-------------这里写线程需要做的事 System.out.println("当前线程:"+thrdName+" "+i); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { for (int i = 0; i < 200; i++) { threadPool.execute(new TestThread(i)); } /*下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后等10秒后, 任务还没执行完成,就调用 shutdownNow(如有必要)取消所有遗留的任务*/ threadPool.shutdown(); try { while (!threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) { System.out.println("waiting for thread pool to terminate..."); } } catch (InterruptedException e) { e.printStackTrace(); threadPool.shutdownNow(); } System.out.println("done"); }}

  总结一下:

一个任务通过 execute(Runnable) 方法被添加到线程池,任务就是一个 Runnable 类型的对象,任务的执行方法就是 Runnable 类型对象的 run() 方法。

当一个任务通过 execute(Runnable) 方法欲添加到线程池时,线程池采用的策略如下:

1. 如果此时线程池中的数量小于 corePoolSize ,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

2. 如果此时线程池中的数量等于 corePoolSize ,但是缓冲队列 workQueue 未满,那么任务被放入缓冲队列。

3. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量小于maximumPoolSize ,建新的线程来处理被添加的任务。

4. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量等于maximumPoolSize ,那么通过 handler 所指定的策略来处理此任务。

处理任务的优先级为:

核心线程 corePoolSize 、任务队列 workQueue 、最大线程 maximumPoolSize ,如果三者都满了,使用 handler处理被拒绝的任务。当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过 keepAliveTime ,线程将被终止。这样,线程池可以动态的调整池中的线程数。

转载于:https://www.cnblogs.com/zyxs/p/8820351.html

你可能感兴趣的文章
即将上线的Hive服务器面临的一系列填坑笔记
查看>>
转:Mosquitto用户认证配置
查看>>
『计算机视觉』Mask-RCNN_锚框生成
查看>>
SpringBoot上传文件到本服务器 目录与jar包同级
查看>>
知物由学 | 内容平台、社交媒体如何应对虚假新闻?
查看>>
tensorflow模型量化
查看>>
ThreadLocal的使用及原理分析
查看>>
centos6.8下l2tp客户端xl2tpd的安装配置
查看>>
关于c++中如何调整输出格式的讲解!!!
查看>>
(原創) 如何在Windows XP使用微軟正黑體? (OS) (Windows)
查看>>
拓扑排序 soj1075
查看>>
wav文件格式分析
查看>>
[New Portal]Windows Azure Storage (13) 本地冗余存储 vs 地理冗余存储 (下)
查看>>
利用OCR识别扫描的jpg、tif文件的文字
查看>>
(转)超过 130 个你需要了解的 vim 命令
查看>>
用反射解决水果篮问题 [Design, C#]
查看>>
网络协议栈4:bind()函数
查看>>
POJ 3468 A Simple Problem with Integers (伸展树区间更新求和操作 , 模板)
查看>>
pycharm 注册码/License server 2017年最新
查看>>
json序列化对象
查看>>