近日,小伙伴发现使用线程池,如果Runnable中存在异常且没有catch后会造成某一个线程的阻塞。
以下代码是根据具体业务改编。(不要喷 (•◡•) /)
public class RunnableTest implements Runnable{
private int i;
@Override
public void run() {
User xxx = new User(i);
System.out.println(Thread.currentThread().getName() + " Task " + i + "t start");
xxx = null;
xxx.getAge();
System.out.println(Thread.currentThread().getName() + " Task " + i + "t end");
}
public RunnableTest(int i) {
this.i = i;
}
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
/*执行10个任务*/
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(new RunnableTest(1));
}
}
}
执行结果:
pool-1-thread-1 Task 1 start
Exception in thread "pool-1-thread-1" pool-1-thread-2 Task 1 start
java.lang.NullPointerException
at com.mashibing.practice.runnable_t1.RunnableTest.run(RunnableTest.java:26)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-2" java.lang.NullPointerException
at com.mashibing.practice.runnable_t1.RunnableTest.run(RunnableTest.java:26)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
看到这个结果,内心有这样几个问题
- 为什么xxx.getAge();之后的语句没有执行呢?
很明显是因为
xxx = null;
之后执行xxx.getAge();
就有NullPointerException问题,所以下边代码没有执行。 - 为什么就只有两个Task执行呢?
这就要从ThreadPoolExecutor中的各个参数说起了,先看看这个参数的具体含义。
- corePoolSize 线程池中最少的工作线程,不允许销毁,除非设置了 allowCoreThreadTimeOut 参数
- maximumPoolSize 线程池中最多工作线程数(最大可以是2^29-1个)
- workQueue 任务队列,用于保存任务和执行任务之间切换
- keepAliveTime: 如果 当前线程数量 > corePoolSize,多出来的线程会在keepAliveTime之后就被释放掉
- unit: keepAliveTime的时间单位,比如分钟,小时等
- threadFactory: 创建线程的工厂
- handler: 就是说当线程,队列都满了,之后采取的策略,比如抛出异常等策略
参数的具体含义明白后,再看看代码,发现队列慢了之后的策略是new ThreadPoolExecutor.DiscardPolicy()
,那明白了,也就是说,我一下在创建了10个任务同时启动,队列满了,触发DiscardPolicy
之后进来的任务都抛弃了。那我们再测试一下。
/*执行10个任务*/
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(new RunnableTest(i));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
我向线程池扔一个任务后,睡1s,然后再扔,验证上边所说问题。
执行结果
结果显而易见,线程没有被阻塞,仍然正常的执行之后的任务,线程没有阻塞。
那接下来就有问题了,线上出现了这个问题,我们改怎么优化呢?
分析一下具体有哪些问题
- 问题1: 线程提交任务后,由于队列满了,再向线程池中扔Runnable,我们应该采取什么策略呢?
- 使用submit提交子任务,一定要获取返回值Future,通过get方法获取可能出现的异常,并且可以进行捕获(推荐),然后进一步处理。
- 扩大线程池中等待队列(请求处理队列)。这样看起来似乎可以,估计也是目前最简单粗暴的方案了。但是当并发较高时,系统内存显然是稀缺资源,可能会耗费非常大的内存,甚至OOM。那在不增加机器,不增加内存的情况下,如何让系统有更高的并发呢?
- 扩大线程池中
maximumPoolSize
,显然也有一些缺点,比如并发非常高时,可能会创建数量非常多的线程,甚至OOM。 - 还有一种方案就是调整拒绝策略。默认的拒绝策略看来够呛了。原因是默认的拒绝策略中
DiscardPolicy()
和DiscardOldestPolicy
直接把任务扔掉,这是不是有点粗鲁,在特定场景下,直接扔掉会造成业务数据异常。那我们看看AbortPolicy()
这个呢,还好点,最起码向主线程抛出异常,但这个姿势还是不好,根本问题没有解决,业务数据还是可能有异常的情况。那还有最后一个默认策略CallerRunsPolicy()
大概意思就是哪个线程向线程池中扔任务,那这个线程执行该Runnable。这个似乎还可以。但是这样如果队列满了,发起执行任务的线程就要执行Runnable,这样会大大降低发起执行任务的线程的并发。既然这样,我们不如自定义拒绝策略吧,当线程池中队列满了,我们将多出的Runnable扔到中间件(例如:KafKa)中,起几个消费者帮忙执行一下任务。根据业务场景如果数据可以有一定的延迟时,可以先把Runnable先存下来,等到队列中空闲时再把保存的Runnable扔到等待队列中。
static class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// System.out.println();("r rejected")
// save r kafka mysql redis
// try 3 times
if (executor.getQueue().size() < 4) {
//try put again();
}
}
}
- 既然这样,我们为什么直接用中间件之间处理呢?这就得根据项目具体业务来确定了,来我们聊聊使用线程池和使用中间件的区别吧!
用线程池ExecutorService异步处理:我理解ExecutorService其实也是内部使用了队列(如LinkedBlockingQueue),所以从设计上,其实和使用中间价的消息队列是差不多一致的。只是这里应用服务器既充当生产者又充当消费者,也是消息队列中间价的实现者。这种应该适合非分布式的架构,比如简单的只有一台服务器
使用消息队列:消息队列(指activeMQ,rabbitMQ,kafaKa,Redis等)因为一般都是中间件,部署在其他机器,需要一定的网络消耗。
本着解耦的目的,使用后者更合理,因为应用服务器一般内存也不会太多,队列长度不易太长。让应用服务器只处理逻辑比较合理。适合分布式架构。
MQ也可以更加有扩展性, 支持的场景更多, 而且支持消息自动的持久化, 建议看看 RabbitMQ 和 AMQP 协议, JMS 可以学但是没 AMQP 更加通用, redis的MQ还是不要用了, 那只是一个附带的功能, kafka 是大数据领域的不适合做核心业务功能, 只适合数据统计类应用的发送数据, 因为他不确保消息100%不丢失, 如此大的数据量丢一条无所谓的, 不会对统计结果造成影响, 但速度和吞吐量高很多.
但是线程池就不一样了, 目前执行状态你无法知道, msg的消费率是多少都不知道, 消息转发啊, 消息拒绝啊, 都的自己实现, 而且是单机版的, 我目前用他来做一级转发, 就是用他来将 event 异步发送出去, 而不是让他异步做一些很繁重的工作, 举例:
注册用户service方法, 当事务结束后, 发送 RegisterUserEvent, 这个发送就是用java线程池(如spring的), 然后 RegisterUserListener 监听到了这个 event 就发送 msg 到 Rabbit MQ, 之后对注册用户这个Topic感兴趣的应用都可以订阅, 比如送积分的服务, 送优惠券的服务, 开辟云盘空间的服务等等。
– 问题2: 如何能捕获到快速捕获到线程池中线程的异常呢
1.异常统一捕获
- 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常
- 使用ExecutorService.submit执行任务,利用返回的Future对象的get方法接收抛出的异常,然后进行处理
3.重写ThreadPoolExecutor.afterExecute方法,处理传递到afterExecute方法中的异常
4.为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常 (不推荐)
2.当业务量比较复杂时,项目中使用的线程池比较多,能够快速定位问题显得格外重要,那就给线程池定义一个业务名称吧。
通过实现
ThreadFactory
接口,可以实现
如果线上机器突然宕机,线程池中阻塞队列中的请求怎么办?
必然会导致线程池里的积压的任务实际上来说都是会丢失的
如果说你要提交一个任务到线程池里去,在提交之前,麻烦你先在数据库里插入这个任务的信息,更新他的状态:未提交、已提交、已完成。提交成功之后,更新他的状态是已提交状态
系统重启,后台线程去扫描数据库里的未提交和已提交状态的任务,可以把任务的信息读取出来,重新提交到线程池里去,继续进行执行
由于本人水平有限,文章中如果有不严谨的地方还请提出来,愿闻其详。
还应学习的资料
- 源码 https://www.cnblogs.com/wang-meng/p/10588637.html
- ThreadPoolExcutor 线程池 异常处理 (下篇)
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.shuli.cc/?p=12725,转载请注明出处。
评论0