当前位置: 首页 > 图灵资讯 > 技术篇> 【JUC】DelayQueue了解一下

【JUC】DelayQueue了解一下

来源:图灵教育
时间:2023-06-06 09:34:08

单线程消费

package com.itplh.queue;import lombok.Data;import lombok.extern.slf4j.Slf4j;import java.time.LocalDateTime;import java.time.ZoneOffset;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** * 使用延时队列DelayQueue,按时间顺序消费任务 * * @author: tanpeng * @since: 2020-06-08 15:55 */ @Slf4jpublic class TestDelayQueue {    public static void main(String[] args) {        // 现在的时间        long now = LocalDateTime.now().toEpochSecond(ZoneOffset.of(+8);        // 过去的时间        long ago = now - 10L;        // 未来的时间        long future = now + 10L;        DelayQueue<MyTask> delayQueue = new DelayQueue();        delayQueue.put(new MyTask(task1-now", now));        delayQueue.put(new MyTask(task2-ago", ago));        delayQueue.put(new MyTask(task3-future", future));        while (true) {            // 检查队伍的首要元素            MyTask checkTask = delayQueue.peek();            if (checkTask == null) {                break;            }            // 到了执行任务的时间,然后消耗这个任务            LocalDateTime taskExecutorTime = LocalDateTime.ofEpochSecond(checkTask.getTime(), 0, ZoneOffset.of(+8);            if (LocalDateTime.now().isAfter(taskExecutorTime)) {                log.info(消费:{} delayQueue.poll());                continue;            }            // 这里的执行说明了未到任务的执行时间,sleep2秒后继续消费任务            try {                log.info("...");                TimeUnit.SECONDS.sleep(2);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    }@Dataclass MyTask implements Delayed {    private String taskId;    private long time;    public MyTask(String taskId, long time) {        this.taskId = taskId;        this.time = time;    }    @Override    public long getDelay(TimeUnit unit) {        return time - System.currentTimeMillis();    }    @Override    public int compareTo(Delayed o) {        MyTask task = (MyTask) o;        long diff = this.time - task.getTime();        if (diff <= 0) {            return -1;        }        return 1;    }}

控制台输出

12:44:31.700 [main] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task2-ago, time=1592282661)12:44:31.704 [main] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task1-now, time=1592282671)12:44:31.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:33.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:35.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:37.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:39.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:41.704 [main] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task3-future, time=1592282681)

从上面的demo可得出结论:

  • 消费任务按时间顺序进行。
  • 时间不到的任务不会提前消费,只有到了任务的时间点才会消费。
多线程消费
package com.itplh.queue;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.RandomStringUtils;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneOffset;import java.util.Date;import java.util.concurrent.*;import java.util.concurrent.atomic.LongAdder;/** * 使用延时队列DelayQueue,按时间顺序消费任务 * 多线程版本 * @author: tanpeng * @since: 2020-06-16 11:05 */@Slf4jpublic class TestDelayQueue {    private static int total = 1000;    private static DelayQueue<MyTask> delayQueue = new DelayQueue();    private static CountDownLatch countDownLatch = new CountDownLatch(total);    private static LongAdder longAdder = new LongAdder();    public static void main(String[] args) throws InterruptedException {// 将元素放入队列中        long now = System.currentTimeMillis();        for (int i = 1; i <= total; i++) {            long time = now + Long.valueOf(RandomStringUtils.randomNumeric(4));            delayQueue.put(new MyTask("task" + i, time));        }        ExecutorService executorService = Executors.newFixedThreadPool(5);        for (int i = 0; i < 5; i++) {            executorService.submit(() -> consumer(delayQueue));        }        countDownLatch.await();        executorService.shutdown();        log.info({}, longAdder.sum());    }    private static void consumer(DelayQueue<MyTask> delayQueue) {        while (true) {            // 保证原子性,检查元素与消费元素相同            synchronized ("") {                // 检查队伍的首要元素                MyTask checkTask = delayQueue.peek();                if (checkTask == null) {                    break;                }                // 到了执行任务的时间,然后消耗这个任务                Instant instant = new Date(checkTask.getTime()).toInstant();                LocalDateTime taskExecutorTime = LocalDateTime.ofInstant(instant, ZoneOffset.of(+8);                if (LocalDateTime.now().isAfter(taskExecutorTime)) {                    MyTask task = delayQueue.poll();                    log.info(" 消费了:" + task);                    countDownLatch.countDown();                    longAdder.add(1);                    continue;                }            }            // 这里的执行说明了未到任务的执行时间,sleep2秒后继续消费任务            try {                log.info("...");                TimeUnit.SECONDS.sleep(2);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}

控制台输出

12:20:09.145 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task207, time=1592281209106)12:20:09.148 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task929, time=1592281209147)12:20:09.148 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  ...12:20:09.148 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  ...12:20:09.148 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  ...12:20:09.148 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  ...12:20:09.148 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue -  ...12:20:11.148 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task294, time=1592281209148)12:20:11.148 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task944, time=1592281209187)12:20:11.150 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task350, time=1592281209189)12:20:11.150 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task919, time=1592281209190)12:20:11.160 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  ...12:20:11.160 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue -  ...12:20:11.160 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  ...12:20:11.160 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  ...12:20:11.160 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  ...12:20:13.160 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task809, time=1592281211162)12:20:13.160 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task7, time=1592281211162)12:20:13.167 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task357, time=1592281213163)12:20:13.167 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  ...12:20:13.167 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  ...12:20:13.167 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue -  ...12:20:13.167 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  ...12:20:13.167 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  ...12:20:15.167 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task247, time=1592281213169)12:20:15.167 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task505, time=1592281213176)12:20:15.167 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task548, time=1592281213185)12:20:15.172 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task519, time=1592281215151)12:20:15.172 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  ...12:20:15.172 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue -  ...12:20:15.172 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  ...12:20:15.172 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  ...12:20:15.173 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  ...12:20:17.172 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task596, time=1592281215173)12:20:17.172 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task949, time=1592281215191)12:20:17.176 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task577, time=1592281217171)12:20:17.176 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  ...12:20:17.176 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue -  ...12:20:17.176 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue -  ...12:20:17.176 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue -  ...12:20:17.176 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue -  ...12:20:19.178 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task690, time=1592281219072)12:20:19.178 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue -  消费了:MyTask(taskId=task703, time=1592281219083)12:20:19.179 [main] INFO com.itplh.queue.TestDelayQueue - 1000