首页 > 图灵资讯 > 技术篇>正文

Java延时队列DelayQueue的使用

2023-05-29 13:55:54

摘要: DelayQueue的使用场景及介绍

问题背景

在最近的一个业务中,遇到了一个问题,一个用户动作,会产生A和B两种行为,分别通过相应的esb消息总线发送。

我们的业务线监控AB的两个esb消息队列(两个过程),同步更新数据。

在正常过程中,A行为先于B行为,A行为优先于B行为,数据最终会根据A行为更新。

但在实际应用中,存在并发问题,数据最终根据B行为更新,覆盖A行为。

首先,通过redis缓存上锁。收到a消息时,在redis中添加key,处理后删除key 。在处理过程中收到B消息,直接返回。

但在测试过程中发现不可用,可以先收到B消息,再收到A消息, 但首先更新A数据,然后更新B数据,或者覆盖。

另一种方法是通过自定义sql修改底层代码,然后比较update 。

问题分析

此外,我们还在考虑是否还有其他方法。问题的原因是A和B的消息队列基本上同时获取数据,导致程序并发操作。

如果能将B的消息队列全部延迟一个时间点,保证两个消息队列不在同一时间点获取数据,这个问题基本可以解决。

于是开始在网上搜索,找到了延迟队列DelayQueue。

虽然我们不能让公司的消息队列延迟发送,但我们可以延迟处理。收到消息时,不要先处理,放入延迟消息队列中,然后从延迟队列中获取另一个线程的数据。

类介绍

public classDelayQueue<EextendsDelayed> extendsAbstractQueue<E>    implements BlockingQueue<E>

DelayQueue 是Delayed 只有在延迟期满时,才能提取元素的无界阻塞队列。队列的头部 延迟期满后保存时间最长 Delayed 元素。如果延迟还没到期,队列就没有头了,而且 poll 将返回 null。作为一个元素 getDelay(TimeUnit.NANOSECONDS) 返回一个小于等于等于的方法 0 值时,将发生到期。即使不能使用 take 或 poll 将未到期元素移除,也不会将这些元素视为正常元素。例如,size 该方法还返回到期元素和未到期元素的计数。不允许使用这个队列 null 元素。

将DelayQueue放入Delay的对象需要实现Delayed接口。

public interfaceDelayedextendsComparable<Delayed> {    /**     * Returns the remaining delay associated with this object, in the     * given time unit.     *     * @param unit the time unit     * @return the remaining delay; zero or negative values indicate     * that the delay has already elapsed     */    longgetDelay(TimeUnit unit);}public interfaceDelayedextendsComparable<Delayed> {    /**     * Returns the remaining delay associated with this object, in the     * given time unit.     *     * @param unit the time unit     * @return the remaining delay; zero or negative values indicate     * that the delay has already elapsed     */    longgetDelay(TimeUnit unit);}

Java延时队列DelayQueue的使用_数据库

测试demo

import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** * @author lujianing01@58.com * @Description: * @date 2016/6/21 */public classDelayQueueTest{    publicstaticvoidmain(String[] args){        DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();        //生产者        producer(delayQueue);        //消费者        consumer(delayQueue);        while (true){            try {                TimeUnit.HOURS.sleep(1);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    /**     * 每100毫秒创建一个对象,放入延迟队列,延迟时间为1毫秒     * @param delayQueue     */    privatestaticvoidproducer(final DelayQueue<DelayedElement> delayQueue){        new Thread(new Runnable() {            @Override            publicvoidrun(){                while (true){                    try {                        TimeUnit.MILLISECONDS.sleep(100);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    DelayedElement element = new DelayedElement(1000,"test");                    delayQueue.offer(element);                }            }        }).start();        /**         * 每秒打印延迟队列中对象的数量         */        new Thread(new Runnable() {            @Override            publicvoidrun(){                while (true){                    try {                        TimeUnit.MILLISECONDS.sleep(1000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println("delayQueue size:"+delayQueue.size());                }            }        }).start();    }    /**     * 消费者,从延迟队列中获取数据,进行处理     * @param delayQueue     */    privatestaticvoidconsumer(final DelayQueue<DelayedElement> delayQueue){        new Thread(new Runnable() {            @Override            publicvoidrun(){                while (true){                    DelayedElement element = null;                    try {                        element =  delayQueue.take();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println(System.currentTimeMillis()+"---"+element);                }            }        }).start();    }}classDelayedElementimplementsDelayed{    private final long delay; ///延迟时间    private final long expire;  //到期时间    private final String msg;   //数据    private final long now; //创建时间    publicDelayedElement(long delay, String msg){        this.delay = delay;        this.msg = msg;        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间        now = System.currentTimeMillis();    }    /**     * 接口需要实现,延迟时间   过期时间-当前时间     * @param unit     * @return     */    @Override    publiclonggetDelay(TimeUnit unit){        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);    }    /**     * 用于延迟队列内部的比较排序   延迟当前时间 - 比较对象的延迟时间     * @param o     * @return     */    @Override    publicintcompareTo(Delayed o){        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));    }    @Override    public String toString(){        final StringBuilder sb = new StringBuilder("DelayedElement{");        sb.append("delay=").append(delay);        sb.append(", expire=").append(expire);        sb.append(", msg='").append(msg).append(\'');        sb.append(", now=").append(now);        sb.append('}');        return sb.toString();    }}import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** * @author lujianing01@58.com * @Description: * @date 2016/6/21 */public classDelayQueueTest{    publicstaticvoidmain(String[] args){        DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();        //生产者        producer(delayQueue);        //消费者        consumer(delayQueue);        while (true){            try {                TimeUnit.HOURS.sleep(1);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    /**     * 每100毫秒创建一个对象,放入延迟队列,延迟时间为1毫秒     * @param delayQueue     */    privatestaticvoidproducer(final DelayQueue<DelayedElement> delayQueue){        new Thread(new Runnable() {            @Override            publicvoidrun(){                while (true){                    try {                        TimeUnit.MILLISECONDS.sleep(100);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    DelayedElement element = new DelayedElement(1000,"test");                    delayQueue.offer(element);                }            }        }).start();        /**         * 每秒打印延迟队列中对象的数量         */        new Thread(new Runnable() {            @Override            publicvoidrun(){                while (true){                    try {                        TimeUnit.MILLISECONDS.sleep(1000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println("delayQueue size:"+delayQueue.size());                }            }        }).start();    }    /**     * 消费者,从延迟队列中获取数据,进行处理     * @param delayQueue     */    privatestaticvoidconsumer(final DelayQueue<DelayedElement> delayQueue){        new Thread(new Runnable() {            @Override            publicvoidrun(){                while (true){                    DelayedElement element = null;                    try {                        element =  delayQueue.take();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    System.out.println(System.currentTimeMillis()+"---"+element);                }            }        }).start();    }}classDelayedElementimplementsDelayed{    private final long delay; ///延迟时间    private final long expire;  //到期时间    private final String msg;   //数据    private final long now; //创建时间    publicDelayedElement(long delay, String msg){        this.delay = delay;        this.msg = msg;        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间        now = System.currentTimeMillis();    }    /**     * 接口需要实现,延迟时间   过期时间-当前时间     * @param unit     * @return     */    @Override    publiclonggetDelay(TimeUnit unit){        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);    }    /**     * 用于延迟队列内部的比较排序   延迟当前时间 - 比较对象的延迟时间     * @param o     * @return     */    @Override    publicintcompareTo(Delayed o){        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));    }    @Override    public String toString(){        final StringBuilder sb = new StringBuilder("DelayedElement{");        sb.append("delay=").append(delay);        sb.append(", expire=").append(expire);        sb.append(", msg='").append(msg).append(\'');        sb.append(", now=").append(now);        sb.append('}');        return sb.toString();    }}

补充说明

1.参考一些网上的例子,一些 compareto方法是错误的, 要么在队列中造成数据积压,要么不能延迟。因此,我们必须通过自己的用例测试来确保没有问题。

2.需要考虑楼主的使用场景。如果过程关闭,则在完成过程之前,应等待当地延迟队列中的数据处理。

上一篇 探讨生产环境下缓存雪崩的几种场景及解决方案
下一篇 最长回文子串 ( manacher算法 ) HDU3068

文章素材均来源于网络,如有侵权,请联系管理员删除。