关于PriorityBlockingQueue中队列操作

编者按:笔者在使用priorityblockingqueue实现按照优先级处理任务时遇到一类npe问题,经过分析发现根本原因是在任务出队列时调用比较器异常,进而导致后续任务出队列抛出nullpointerexception。本文通过完整的案例复现来演示在什么情况会触发该问题,同时给出了处理建议。希望读者在编程时加以借鉴,避免再次遇到此类问题。
背景知识 priorityblockingqueue是一个无界的基于数组的优先级阻塞队列,使用一个全局reentrantlock来控制某一时刻只有一个线程可以进行元素出队和入队操作,并且每次出队都返回优先级别最高的或者最低的元素。priorityblockingqueue通过以下两种方式实现元素优先级排序:
入队元素实现comparable接口来比较元素优先级; priorityblockingqueue构造函数指定comparator来比较元素优先级; 关于priorityblockingqueue中队列操作的部分,基本和priorityqueue逻辑一致,只不过在操作时加锁了。在本文中我们主要关注priorityblockingqueue出队的take方法,该方法通过调用dequeue方法将元素出队列。当没有元素可以出队的时候,线程就会阻塞等待。
public e take() throws interruptedexception {    final reentrantlock lock = this.lock;    lock.lockinterruptibly();    e result;    try {        // 尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列暂无元素,进行阻塞等待。        while ( (result = dequeue()) == null)            notempty.await();    } finally {        lock.unlock();    }    return result;} 现象 在某个业务服务中使用priorityblockingqueue实现按照优先级处理任务,某一天环境中的服务突然间不处理任务了,查看后台日志,发现一直抛出nullpointerexception。将进程堆dump出来,使用mat发现某个priorityblockingqueue中的size值比实际元素个数多1个(入队时已经对任务进行非空校验)。
异常堆栈如下:
java.lang.nullpointerexception at java.util.concurrent.priorityblockingqueue.siftdowncomparable(priorityblockingqueue.java:404) at java.util.concurrent.priorityblockingqueue.dequeue(priorityblockingqueue.java:333) at java.util.concurrent.priorityblockingqueue.take(priorityblockingqueue.java:548)        ... mat结果:
原因分析 在此我们分析下priorityblockingqueue是如何出队列的,priorityblockingqueue最终通过调用dequeue方法出队列,dequeue方法处理逻辑如下:
将根节点(array[0])赋值给result; array[n] 赋值给 arrary[0]; 将 array[n] 设置为 null; 调用siftdowncomparable或siftdownusingcomparator对队列元素重新排序; size大小减1; 返回result; 如果在第4步中出现异常,就会出现队列中的元素个数比实际的元素个数多1个的现象。此时size未发生改变,arry[n]已经被置为null,再进行siftdown操作时就会抛出nullpointerexception。继续分析第4步中在什么情况下会出现异常,通过代码走读我们可以发现只有在调用comparable#compareto或者comparator#compare方法进行元素比较的时候才可能出现异常。这块代码的处理逻辑和业务相关,如果业务代码处理不当抛出异常,就会导致上述现象。
    /**     * mechanics for poll().  call only while holding lock.     */    private e dequeue() {        int n = size - 1;        if (n < 0)            return null;        else {            object[] array = queue;            e result = (e) array[0];     //step1            e x = (e) array[n];     //step2            array[n] = null;        //step3            comparator cmp = comparator;            if (cmp == null)        //step4 如果指定了comparator,就按照指定的comparator来比较。否则就按照默认的                siftdowncomparable(0, x, array, n);            else                siftdownusingcomparator(0, x, array, n, cmp);            size = n;       //step5            return result;      //step6        }    }private static  void siftdowncomparable(int k, t x, object[] array, int n) {    if (n > 0) {        comparable key = (comparable)x;        int half = n >>> 1;        while (k < half) {            int child = (k << 1) + 1;             object c = array[child];            int right = child + 1;            if (right < n && ((comparable) c).compareto((t) array[right]) > 0)                 c = array[child = right];            if (key.compareto((t) c) <= 0)                 break;            array[k] = c;            k = child;        }        array[k] = key;    }}private static  void siftdownusingcomparator(int k, t x, object[] array, int n, comparator cmp) {    if (n > 0) {        int half = n >>> 1;        while (k < half) {            int child = (k << 1) + 1;            object c = array[child];            int right = child + 1;            if (right  0)                c = array[child = right];            if (cmp.compare(x, (t) c) <= 0)                break;            array[k] = c;            k = child;        }        array[k] = x;    }} 复现代码 import java.util.arraylist;import java.util.list;import java.util.concurrent.priorityblockingqueue;public class priorityblockingqueuetest {    static class entity implements comparable {        private int id;        private string name;        private boolean flag;        public void setflag(boolean flag) {            this.flag = flag;        }        public entity(int id, string name) {            this.id = id;            this.name = name;        }        @override        public int compareto(entity entity) {            if(flag) {                throw new runtimeexception(test exception);            }            if (entity == null || this.id > entity.id) {                return 1;            }            return this.id == entity.id ? 0 : -1;        }    }    public static void main(string[] args) {        int num = 5;        priorityblockingqueue priorityblockingqueue = new priorityblockingqueue();        list entities = new arraylist();        for (int i = 0; i < num; i++) {            entity entity = new entity(i, entity + i);            entities.add(entity);            priorityblockingqueue.offer(entity);        }        entities.get(num - 1).setflag(true);        int size = entities.size();        for (int i = 0; i = (cap = (array = queue).length))  trygrow(array, cap); try {  comparator cmp = comparator;  if (cmp == null)   siftupcomparable(n, e, array);  else   siftupusingcomparator(n, e, array, cmp);  size = n + 1;  notempty.signal(); } finally {  lock.unlock(); } return true;}private static  void siftupcomparable(int k, t x, object[] array) { comparable key = (comparable) x; while (k > 0) {  int parent = (k - 1) >>> 1;  object e = array[parent];  if (key.compareto((t) e) >= 0)   break;  array[k] = e;  k = parent; } array[k] = key;} 这也是常见的比较器调用异常案例,本文不再赘述,可自行参考其他文章。
总结 在使用priorityblockingqueue时,注意在比较器中做好异常处理,避免出现类似问题。


小米11镜头排布设计细节曝光
一文接下基带芯片与手机信号的关系
小米CEO雷军的2021两会建议
特斯拉将在2022年达到100万辆交付量
武汉新芯易主 前中芯国际COO杨士宁或将操盘
关于PriorityBlockingQueue中队列操作
致动器研究出现突破 小于零点一毫米的机器人诞生
陆芯IGBT应用在电机驱动产品上
贴片芯片点焊的目的是什么?
综合能源服务到底是什么,它能为我们带来什么
Digi公司XBee Pro SX模块的通讯方式
m12连接器接口优势及应用
部分化工巨头2020上半年销售额
室外消火栓监测方案
LUNALUXX悬浮台灯:会发光的小飞碟
亨通光电拟购买华为子公司51%股权!
什么是电池
如何实现纯电动汽车电机驱动系统三相线滤波磁环的设计?
中国移动宋可为:推出三大产品确保互联网金融安全
新型存储器是怎样创造AI算力的