多线程系列之线程池

多线程系列之线程池

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

核心参数:

  • corePoolSize: 线程池核心线程数最大值
  • maximumPoolSize: 线程池最大线程数大小
  • keepAliveTime: 线程池中非核心线程空闲的存活时间大小
  • unit: 线程空闲存活时间单位
  • workQueue: 存放任务的阻塞队列
  • threadFactory: 用于设置创建线程的工厂,例如可以给创建的线程设置有意义的名字
  • handler: 任务拒绝策略

说明:

  1. 运行线程数小于核心数,创建新线程处理请求
  2. 运行线程数大于核心数,小于最大线程数,当队列满的的时候创建新的线程
  3. 只有当线程池中的线程数大于corePoolSize时, keepAliveTime才会起作用 - 如果线程空闲时间达到keepAliveTime (unit)时,会终止线程,直到线程池中的线程数不超过corePoolSize
  4. 当线程数达到了maximumPoolSize,新任务过来,执行拒绝策略

队列

  • ArrayBlockingQueue

    数组实现的有界队列

  • LinkedBlockingQueue

    可设置容量,默认值为无限大

  • DelayQueue

    延迟队列,ScheduledThreadPool 使用此类队列

  • PriorityBlockingQueue

    具有优先级的无界阻塞队列

  • SynchronousQueue

    不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除,否则处于阻塞

使用无界队列的线程池(如fixedThreadPool)可能导致OOM - 线程获取任务后,执行时间较长,队列任务积累。

拒绝任务策略

  • AbortPolicy

    抛出异常 (默认)

  • CallerRunsPolicy

    交个线程池的调用 的所在线程进行处理

  • DiscardPolicy

    直接丢弃

  • DiscardOldestPolicy

    丢弃队列最老得任务,将当前任务继续提交给线程池处理

Callable VS Runnable

  • Runnable 没有返回值,不能抛出checked exception
  • Callable 有返回值

Submit VS Execute

execute没有返回值,适用于不需要知道线程的执行结果

submit返回Future对象,调用Future get方法获取执行结果(同步阻塞)

异常处理

  1. try-catch
  2. UncaughtExceptionHandler,实现void uncaughtException(Thread t, Throwable e); ,作为handler传入线程池ThreadFactory
  3. Future,future 可以handle 返回结果,也可以handle 异常
  4. 重写ThreadPoolExecutor的afterExecute方法

线程池关闭

  • shutdown

    不接受新任务,之前提交的任务继续执行结束后关闭线程池

  • shutdownNow

    不接受新任务,并尝试停止线程池中的任务,返回未处理的任务

Executors

工厂类

创建ThreadPoolExecutor

  • newFixedThreadPool : corePoolSize 和 maximumPoolSize 相同, workQueue为LinkedBlockingQueue(无界队列)

  • newCachedThreadPool:

    corePoolSize 为0, maximumPoolSize无限大,workQueue为SynchronousQueue

    任务提交时,queue不存储,无限创建非核心线程

  • SingleThreadExecutor

    corePoolSize 为1, maximumPoolSize 为1, workQueue为LinkedBlockingQueue(无界队列)

创建ScheduleThreadPoolExecutor

创建ForkJoinPool

查看 Post not found: 多线程之线程池-forkjoinpool 多线程之线程池-forkjoinpool


线程池参数配置相关

1. 线程池大小

影响因素

  • 硬件,尤其是cpu(core)数
  • task type: CPU intensive VS IO Intensive - better utilization of CPU, 对于IO intensive , cpu could be idle when watiing on IO operation.
Screen Shot 2020-08-12 at 11.24.33 AM

如果线程数数设置过小,任务再队列堆积,资源消耗(甚至oom),cpu利用率低

如果线程数设置过大,CPU 频繁上下文切换(interleaving/context switch),执行效率较低

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断任务是IO bound 还是 CPU bound?

对于大多涉及到网络,IO的操作,cpu计算耗费的时间相较于等待IO操作完成的时间来说很少,大部分时间任务处于wating 状态等待IO 完成唤醒。

2. 队列

建议有界队列,避免资源耗尽

3. 拒绝策略

默认采用的是AbortPolicy,抛出运行时异常RejectedExecutionException, 应在程序中捕获并处理

CallerRunsPolicy, 一般会导致主线程在一定时间内无法继续提交新任务

自定义拒绝策略,实现RejectedExecutionHandler

如果任务不是特别重要,可考虑discard 策略

4. misc

4.1 不推荐使用 Executors提供的线程池

  • FixedThreadPool和SingleThreadExecutor => 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常
  • CachedThreadPool => 允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而引起OOM异常

实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等; 应该显示地给我们的线程池命名,这样有助于定位问题。

Alibaba P3c introspection result about Executors:

https://github.com/alibaba/p3c

A thread pool should be created by ThreadPoolExecutor rather than Executors. These would make the parameters of the thread pool understandable. It would also reduce the risk of running out of system resource. Note: Below are the problems created by usage of Executors for thread pool creation:
1) FixedThreadPool and SingleThreadPool:
 Maximum request queue size Integer.MAX_VALUE. A large number of requests might cause OOM.
2) CachedThreadPool:
 The number of threads which are allowed to be created is Integer.MAX_VALUE. Creating too many threads might lead to OOM.

Positive example 1:
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());



Positive example 2:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();

//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown



Positive example 3:
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />

<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);

4.2 监测线程池

4.3 不同类别的任务使用单独线程池

4.4 线程池命名

默认: pool-1-thread-n, 可通过threadFactory 指定naming pattern