摘要: DelayQueue的使用场景及介绍
问题背景
在最近的一个业务中,遇到了一个问题,一个用户动作,会产生A和B两种行为,分别通过相应的esb消息总线发送。
我们的业务线监控AB的两个esb消息队列(两个过程),同步更新数据。
在正常过程中,A行为先于B行为,A行为优先于B行为,数据最终会根据A行为更新。
但在实际应用中,存在并发问题,数据最终根据B行为更新,覆盖A行为。
首先,通过redis缓存上锁。收到a消息时,在redis中添加key,处理后删除key 。在处理过程中收到B消息,直接返回。
但在测试过程中发现不可用,可以先收到B消息,再收到A消息, 但首先更新A数据,然后更新B数据,或者覆盖。
另一种方法是通过自定义sql修改底层代码,然后比较update 。
问题分析
此外,我们还在考虑是否还有其他方法。问题的原因是A和B的消息队列基本上同时获取数据,导致程序并发操作。
如果能将B的消息队列全部延迟一个时间点,保证两个消息队列不在同一时间点获取数据,这个问题基本可以解决。
于是开始在网上搜索,找到了延迟队列DelayQueue。
虽然我们不能让公司的消息队列延迟发送,但我们可以延迟处理。收到消息时,不要先处理,放入延迟消息队列中,然后从延迟队列中获取另一个线程的数据。
类介绍
public classDelayQueue<EextendsDelayed> extendsAbstractQueue<E> implements BlockingQueue<E>
DelayQueue 是Delayed 只有在延迟期满时,才能提取元素的无界阻塞队列。队列的头部 延迟期满后保存时间最长 Delayed 元素。如果延迟还没到期,队列就没有头了,而且 poll 将返回 null。作为一个元素 getDelay(TimeUnit.NANOSECONDS) 返回一个小于等于等于的方法 0 值时,将发生到期。即使不能使用 take 或 poll 将未到期元素移除,也不会将这些元素视为正常元素。例如,size 该方法还返回到期元素和未到期元素的计数。不允许使用这个队列 null 元素。
将DelayQueue放入Delay的对象需要实现Delayed接口。
public interfaceDelayedextendsComparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ longgetDelay(TimeUnit unit);}public interfaceDelayedextendsComparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ longgetDelay(TimeUnit unit);}
测试demo
import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** * @author lujianing01@58.com * @Description: * @date 2016/6/21 */public classDelayQueueTest{ publicstaticvoidmain(String[] args){ DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>(); //生产者 producer(delayQueue); //消费者 consumer(delayQueue); while (true){ try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 每100毫秒创建一个对象,放入延迟队列,延迟时间为1毫秒 * @param delayQueue */ privatestaticvoidproducer(final DelayQueue<DelayedElement> delayQueue){ new Thread(new Runnable() { @Override publicvoidrun(){ while (true){ try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } DelayedElement element = new DelayedElement(1000,"test"); delayQueue.offer(element); } } }).start(); /** * 每秒打印延迟队列中对象的数量 */ new Thread(new Runnable() { @Override publicvoidrun(){ while (true){ try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("delayQueue size:"+delayQueue.size()); } } }).start(); } /** * 消费者,从延迟队列中获取数据,进行处理 * @param delayQueue */ privatestaticvoidconsumer(final DelayQueue<DelayedElement> delayQueue){ new Thread(new Runnable() { @Override publicvoidrun(){ while (true){ DelayedElement element = null; try { element = delayQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis()+"---"+element); } } }).start(); }}classDelayedElementimplementsDelayed{ private final long delay; ///延迟时间 private final long expire; //到期时间 private final String msg; //数据 private final long now; //创建时间 publicDelayedElement(long delay, String msg){ this.delay = delay; this.msg = msg; expire = System.currentTimeMillis() + delay; //到期时间 = 当前时间+延迟时间 now = System.currentTimeMillis(); } /** * 接口需要实现,延迟时间 过期时间-当前时间 * @param unit * @return */ @Override publiclonggetDelay(TimeUnit unit){ return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } /** * 用于延迟队列内部的比较排序 延迟当前时间 - 比较对象的延迟时间 * @param o * @return */ @Override publicintcompareTo(Delayed o){ return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString(){ final StringBuilder sb = new StringBuilder("DelayedElement{"); sb.append("delay=").append(delay); sb.append(", expire=").append(expire); sb.append(", msg='").append(msg).append(\''); sb.append(", now=").append(now); sb.append('}'); return sb.toString(); }}import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** * @author lujianing01@58.com * @Description: * @date 2016/6/21 */public classDelayQueueTest{ publicstaticvoidmain(String[] args){ DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>(); //生产者 producer(delayQueue); //消费者 consumer(delayQueue); while (true){ try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 每100毫秒创建一个对象,放入延迟队列,延迟时间为1毫秒 * @param delayQueue */ privatestaticvoidproducer(final DelayQueue<DelayedElement> delayQueue){ new Thread(new Runnable() { @Override publicvoidrun(){ while (true){ try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } DelayedElement element = new DelayedElement(1000,"test"); delayQueue.offer(element); } } }).start(); /** * 每秒打印延迟队列中对象的数量 */ new Thread(new Runnable() { @Override publicvoidrun(){ while (true){ try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("delayQueue size:"+delayQueue.size()); } } }).start(); } /** * 消费者,从延迟队列中获取数据,进行处理 * @param delayQueue */ privatestaticvoidconsumer(final DelayQueue<DelayedElement> delayQueue){ new Thread(new Runnable() { @Override publicvoidrun(){ while (true){ DelayedElement element = null; try { element = delayQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis()+"---"+element); } } }).start(); }}classDelayedElementimplementsDelayed{ private final long delay; ///延迟时间 private final long expire; //到期时间 private final String msg; //数据 private final long now; //创建时间 publicDelayedElement(long delay, String msg){ this.delay = delay; this.msg = msg; expire = System.currentTimeMillis() + delay; //到期时间 = 当前时间+延迟时间 now = System.currentTimeMillis(); } /** * 接口需要实现,延迟时间 过期时间-当前时间 * @param unit * @return */ @Override publiclonggetDelay(TimeUnit unit){ return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } /** * 用于延迟队列内部的比较排序 延迟当前时间 - 比较对象的延迟时间 * @param o * @return */ @Override publicintcompareTo(Delayed o){ return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString(){ final StringBuilder sb = new StringBuilder("DelayedElement{"); sb.append("delay=").append(delay); sb.append(", expire=").append(expire); sb.append(", msg='").append(msg).append(\''); sb.append(", now=").append(now); sb.append('}'); return sb.toString(); }}
补充说明
1.参考一些网上的例子,一些 compareto方法是错误的, 要么在队列中造成数据积压,要么不能延迟。因此,我们必须通过自己的用例测试来确保没有问题。
2.需要考虑楼主的使用场景。如果过程关闭,则在完成过程之前,应等待当地延迟队列中的数据处理。