Skip to content

Java 线程池原理与实践

· 11 min

为什么使用线程池#

使用线程池的好处有三点:

  1. 降低资源的消耗,可以重复利用已经创建的线程。
  2. 提高响应速度,任务来的时候不用等待先创建线程在执行了。
  3. 提高可管理性,线程的调度、创建都统一交给了线程池进行管理

Executors 的四种创建线程池的方法#

这四种方法是不推荐使用的,因为有的是最大线程数没有限制,有的是无界阻塞队列,都可能会造成OOM。

newSingleThreadExecutor “单线程化线程池”#

适用场景:任务按照顺序一个个执行

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newFixedThreadPool “固定数量的线程池”#

弊端:无界队列:如果大量的任务进来,会导致队列中的空间无限增大,浪费系统资源

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newCachedThreadPool “可缓存线程池”#

弊端:最大线程数没有限制,大量异步任务过来的时候会导致大量创建线程,资源耗尽

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newScheduledThreadPool “可调度线程池”#

支持定时和周期性任务执行的线程池的方法。使用于延迟执行任务、定期重复执行任务

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}

ThreadPoolExecutor 详解#

参数说明#

public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)

执行流程#

当一个任务到达线程池之后会经历以下流程:

  1. 判断核心线程数是否满了,如果没有满直接创建新线程执行任务
  2. 如果核心线程满了则判断阻塞队列是不是满了,如果队列没有满,放到队列里面。
  3. 如果阻塞队列也满了,就再判断一下最大线程数是不是已经满了。如果满了就触发拒绝策略
  4. 如果最大线程数还有空余,就创建新线程执行任务

如何确定线程池线程数#

IO密集型#

核心线程数 = CPU * 2

最大线程数 = CPU * 4 ~ CPU * 10

CPU 密集型#

核心线程数 = CPU

最大线程数 = CPU

混合型#

核心线程数 = CPU * ((线程等待时间 / CPU 时间) + 1)

阻塞队列#

  1. ArrayBlockingQueue:数组实现的有界阻塞队列,创建时需要指定大小
  2. LinkedBlockingQueue:基于链表实现的无界阻塞队列,也可以通过有参构造使其有界
  3. PriorityBlockingQueue:具有优先级的无界阻塞队列
  4. DelayBlockingQueue:无界阻塞延迟队列,只有过期的元素才会出队。适合于定时任务或者延迟执行的任务
  5. SynchronousQueue:同步阻塞队列,不存储任何任务,也就是说任务到阻塞队列后,直接根据最大线程数去判断是创建还是拒绝

ThreadFactory 线程工厂#

class CustomeThreadFactory implements ThreadFactory{
static AtomicInteger threadNo = new AtomicInteger(1);
//实现其唯一的创建线程方法
@Override
public Thread newThread(Runnable target) {
String threadName = "simpleThread-" + threadNo.get();
threadNo.incrementAndGet();
//设置线程名称
Thread thread = new Thread(target, threadName);
//设置为守护线程
thread.setDaemon(true);
return thread;
}
}

线程池的拒绝策略#

当线程池被关闭或者是工作队列满了并且最大线程数也满了,那么新来的任务就会被拒绝

  1. AbortPolicy 拒绝,并且抛出异常
  2. DiscardPolicy 安静的拒绝
  3. DiscardOldestPolicy :把队列中最老的元素移除,腾出空间
  4. CallerRunsPolicy: 调用者执行策略,交给上层线程调用。简单来说就是与当前线程池线程无关了,哪个线程提交的任务,哪个线程来执行。这样做的坏处就是会导致新任务提交的速度降低,影响整体性能
  5. 自定义策略,我们可以实现RejectExecutionHandler 然后 自定义一个策略

调度器钩子方法#

ThreadPoolExecutor 方法里面定义了三个钩子方法,我们可以在线程终止时,执行前、执行后调用。

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
@Override
protected void terminated() {
super.terminated();
}

线程池的状态#

优雅关闭线程池#

实践#

//配置类
@Configuration
public class ThreadPoolConfig {
@Autowired
private ThreadPoolRegistry registry;
@Bean
public ExecutorService orderPool() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
4, 8,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
r -> new Thread(r, "OrderWorker"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
return registry.register(pool);
}
}
//管理类
@Component
public class ThreadPoolRegistry implements DisposableBean {
private final List<ExecutorService> executors = new CopyOnWriteArrayList<>();
public ExecutorService register(ExecutorService executor) {
executors.add(executor);
return executor;
}
@Override
public void destroy() {
executors.forEach(e -> {
e.shutdown();
try {
if (!e.awaitTermination(10, TimeUnit.SECONDS)) {
e.shutdownNow();
}
} catch (InterruptedException ex) {
e.shutdownNow();
Thread.currentThread().interrupt();
}
});
}
}

ThreadLocal原理#

ThreadLocal的作用就是存储不同线程的独立的值,我们可以想象成是个Map集合,里面存放的是Thread类和对应的Value

ThreadLocal 的结构#

ThreadLocal 为什么会导致内存泄漏#