编者按:笔者在使用priorityblockingqueue实现按照优先级处理任务时遇到一类npe问题,经过分析发现根本原因是在任务出队列时调用比较器异常,进而导致后续任务出队列抛出nullpointerexception。本文通过完整的案例复现来演示在什么情况会触发该问题,同时给出了处理建议。希望读者在编程时加以借鉴,避免再次遇到此类问题。
背景知识 priorityblockingqueue是一个无界的基于数组的优先级阻塞队列,使用一个全局reentrantlock来控制某一时刻只有一个线程可以进行元素出队和入队操作,并且每次出队都返回优先级别最高的或者最低的元素。priorityblockingqueue通过以下两种方式实现元素优先级排序:
入队元素实现comparable接口来比较元素优先级; priorityblockingqueue构造函数指定comparator来比较元素优先级; 关于priorityblockingqueue中队列操作的部分,基本和priorityqueue逻辑一致,只不过在操作时加锁了。在本文中我们主要关注priorityblockingqueue出队的take方法,该方法通过调用dequeue方法将元素出队列。当没有元素可以出队的时候,线程就会阻塞等待。
public e take() throws interruptedexception { final reentrantlock lock = this.lock; lock.lockinterruptibly(); e result; try { // 尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列暂无元素,进行阻塞等待。 while ( (result = dequeue()) == null) notempty.await(); } finally { lock.unlock(); } return result;} 现象 在某个业务服务中使用priorityblockingqueue实现按照优先级处理任务,某一天环境中的服务突然间不处理任务了,查看后台日志,发现一直抛出nullpointerexception。将进程堆dump出来,使用mat发现某个priorityblockingqueue中的size值比实际元素个数多1个(入队时已经对任务进行非空校验)。
异常堆栈如下:
java.lang.nullpointerexception at java.util.concurrent.priorityblockingqueue.siftdowncomparable(priorityblockingqueue.java:404) at java.util.concurrent.priorityblockingqueue.dequeue(priorityblockingqueue.java:333) at java.util.concurrent.priorityblockingqueue.take(priorityblockingqueue.java:548) ... mat结果:
原因分析 在此我们分析下priorityblockingqueue是如何出队列的,priorityblockingqueue最终通过调用dequeue方法出队列,dequeue方法处理逻辑如下:
将根节点(array[0])赋值给result; array[n] 赋值给 arrary[0]; 将 array[n] 设置为 null; 调用siftdowncomparable或siftdownusingcomparator对队列元素重新排序; size大小减1; 返回result; 如果在第4步中出现异常,就会出现队列中的元素个数比实际的元素个数多1个的现象。此时size未发生改变,arry[n]已经被置为null,再进行siftdown操作时就会抛出nullpointerexception。继续分析第4步中在什么情况下会出现异常,通过代码走读我们可以发现只有在调用comparable#compareto或者comparator#compare方法进行元素比较的时候才可能出现异常。这块代码的处理逻辑和业务相关,如果业务代码处理不当抛出异常,就会导致上述现象。
/** * mechanics for poll(). call only while holding lock. */ private e dequeue() { int n = size - 1; if (n < 0) return null; else { object[] array = queue; e result = (e) array[0]; //step1 e x = (e) array[n]; //step2 array[n] = null; //step3 comparator cmp = comparator; if (cmp == null) //step4 如果指定了comparator,就按照指定的comparator来比较。否则就按照默认的 siftdowncomparable(0, x, array, n); else siftdownusingcomparator(0, x, array, n, cmp); size = n; //step5 return result; //step6 } }private static void siftdowncomparable(int k, t x, object[] array, int n) { if (n > 0) { comparable key = (comparable)x; int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; object c = array[child]; int right = child + 1; if (right < n && ((comparable) c).compareto((t) array[right]) > 0) c = array[child = right]; if (key.compareto((t) c) <= 0) break; array[k] = c; k = child; } array[k] = key; }}private static void siftdownusingcomparator(int k, t x, object[] array, int n, comparator cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; object c = array[child]; int right = child + 1; if (right 0) c = array[child = right]; if (cmp.compare(x, (t) c) <= 0) break; array[k] = c; k = child; } array[k] = x; }} 复现代码 import java.util.arraylist;import java.util.list;import java.util.concurrent.priorityblockingqueue;public class priorityblockingqueuetest { static class entity implements comparable { private int id; private string name; private boolean flag; public void setflag(boolean flag) { this.flag = flag; } public entity(int id, string name) { this.id = id; this.name = name; } @override public int compareto(entity entity) { if(flag) { throw new runtimeexception(test exception); } if (entity == null || this.id > entity.id) { return 1; } return this.id == entity.id ? 0 : -1; } } public static void main(string[] args) { int num = 5; priorityblockingqueue priorityblockingqueue = new priorityblockingqueue(); list entities = new arraylist(); for (int i = 0; i < num; i++) { entity entity = new entity(i, entity + i); entities.add(entity); priorityblockingqueue.offer(entity); } entities.get(num - 1).setflag(true); int size = entities.size(); for (int i = 0; i = (cap = (array = queue).length)) trygrow(array, cap); try { comparator cmp = comparator; if (cmp == null) siftupcomparable(n, e, array); else siftupusingcomparator(n, e, array, cmp); size = n + 1; notempty.signal(); } finally { lock.unlock(); } return true;}private static void siftupcomparable(int k, t x, object[] array) { comparable key = (comparable) x; while (k > 0) { int parent = (k - 1) >>> 1; object e = array[parent]; if (key.compareto((t) e) >= 0) break; array[k] = e; k = parent; } array[k] = key;} 这也是常见的比较器调用异常案例,本文不再赘述,可自行参考其他文章。
总结 在使用priorityblockingqueue时,注意在比较器中做好异常处理,避免出现类似问题。
小米11镜头排布设计细节曝光
一文接下基带芯片与手机信号的关系
小米CEO雷军的2021两会建议
特斯拉将在2022年达到100万辆交付量
武汉新芯易主 前中芯国际COO杨士宁或将操盘
关于PriorityBlockingQueue中队列操作
致动器研究出现突破 小于零点一毫米的机器人诞生
陆芯IGBT应用在电机驱动产品上
贴片芯片点焊的目的是什么?
综合能源服务到底是什么,它能为我们带来什么
Digi公司XBee Pro SX模块的通讯方式
m12连接器接口优势及应用
部分化工巨头2020上半年销售额
室外消火栓监测方案
LUNALUXX悬浮台灯:会发光的小飞碟
亨通光电拟购买华为子公司51%股权!
什么是电池
如何实现纯电动汽车电机驱动系统三相线滤波磁环的设计?
中国移动宋可为:推出三大产品确保互联网金融安全
新型存储器是怎样创造AI算力的