单线程消费
package com.itplh.queue;import lombok.Data;import lombok.extern.slf4j.Slf4j;import java.time.LocalDateTime;import java.time.ZoneOffset;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** * 使用延时队列DelayQueue,按时间顺序消费任务 * * @author: tanpeng * @since: 2020-06-08 15:55 */ @Slf4jpublic class TestDelayQueue { public static void main(String[] args) { // 现在的时间 long now = LocalDateTime.now().toEpochSecond(ZoneOffset.of(+8); // 过去的时间 long ago = now - 10L; // 未来的时间 long future = now + 10L; DelayQueue<MyTask> delayQueue = new DelayQueue(); delayQueue.put(new MyTask(task1-now", now)); delayQueue.put(new MyTask(task2-ago", ago)); delayQueue.put(new MyTask(task3-future", future)); while (true) { // 检查队伍的首要元素 MyTask checkTask = delayQueue.peek(); if (checkTask == null) { break; } // 到了执行任务的时间,然后消耗这个任务 LocalDateTime taskExecutorTime = LocalDateTime.ofEpochSecond(checkTask.getTime(), 0, ZoneOffset.of(+8); if (LocalDateTime.now().isAfter(taskExecutorTime)) { log.info(消费:{} delayQueue.poll()); continue; } // 这里的执行说明了未到任务的执行时间,sleep2秒后继续消费任务 try { log.info("..."); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } }@Dataclass MyTask implements Delayed { private String taskId; private long time; public MyTask(String taskId, long time) { this.taskId = taskId; this.time = time; } @Override public long getDelay(TimeUnit unit) { return time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { MyTask task = (MyTask) o; long diff = this.time - task.getTime(); if (diff <= 0) { return -1; } return 1; }}
控制台输出
12:44:31.700 [main] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task2-ago, time=1592282661)12:44:31.704 [main] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task1-now, time=1592282671)12:44:31.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:33.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:35.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:37.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:39.704 [main] INFO com.itplh.queue.TestDelayQueue - ...12:44:41.704 [main] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task3-future, time=1592282681)
从上面的demo
可得出结论:
- 消费任务按时间顺序进行。
- 时间不到的任务不会提前消费,只有到了任务的时间点才会消费。
package com.itplh.queue;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.RandomStringUtils;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneOffset;import java.util.Date;import java.util.concurrent.*;import java.util.concurrent.atomic.LongAdder;/** * 使用延时队列DelayQueue,按时间顺序消费任务 * 多线程版本 * @author: tanpeng * @since: 2020-06-16 11:05 */@Slf4jpublic class TestDelayQueue { private static int total = 1000; private static DelayQueue<MyTask> delayQueue = new DelayQueue(); private static CountDownLatch countDownLatch = new CountDownLatch(total); private static LongAdder longAdder = new LongAdder(); public static void main(String[] args) throws InterruptedException {// 将元素放入队列中 long now = System.currentTimeMillis(); for (int i = 1; i <= total; i++) { long time = now + Long.valueOf(RandomStringUtils.randomNumeric(4)); delayQueue.put(new MyTask("task" + i, time)); } ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executorService.submit(() -> consumer(delayQueue)); } countDownLatch.await(); executorService.shutdown(); log.info({}, longAdder.sum()); } private static void consumer(DelayQueue<MyTask> delayQueue) { while (true) { // 保证原子性,检查元素与消费元素相同 synchronized ("") { // 检查队伍的首要元素 MyTask checkTask = delayQueue.peek(); if (checkTask == null) { break; } // 到了执行任务的时间,然后消耗这个任务 Instant instant = new Date(checkTask.getTime()).toInstant(); LocalDateTime taskExecutorTime = LocalDateTime.ofInstant(instant, ZoneOffset.of(+8); if (LocalDateTime.now().isAfter(taskExecutorTime)) { MyTask task = delayQueue.poll(); log.info(" 消费了:" + task); countDownLatch.countDown(); longAdder.add(1); continue; } } // 这里的执行说明了未到任务的执行时间,sleep2秒后继续消费任务 try { log.info("..."); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }}
控制台输出
12:20:09.145 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task207, time=1592281209106)12:20:09.148 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task929, time=1592281209147)12:20:09.148 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - ...12:20:09.148 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - ...12:20:09.148 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - ...12:20:09.148 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - ...12:20:09.148 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue - ...12:20:11.148 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task294, time=1592281209148)12:20:11.148 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task944, time=1592281209187)12:20:11.150 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task350, time=1592281209189)12:20:11.150 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task919, time=1592281209190)12:20:11.160 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - ...12:20:11.160 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue - ...12:20:11.160 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - ...12:20:11.160 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - ...12:20:11.160 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - ...12:20:13.160 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task809, time=1592281211162)12:20:13.160 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task7, time=1592281211162)12:20:13.167 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task357, time=1592281213163)12:20:13.167 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - ...12:20:13.167 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - ...12:20:13.167 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue - ...12:20:13.167 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - ...12:20:13.167 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - ...12:20:15.167 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task247, time=1592281213169)12:20:15.167 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task505, time=1592281213176)12:20:15.167 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task548, time=1592281213185)12:20:15.172 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task519, time=1592281215151)12:20:15.172 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - ...12:20:15.172 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue - ...12:20:15.172 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - ...12:20:15.172 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - ...12:20:15.173 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - ...12:20:17.172 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task596, time=1592281215173)12:20:17.172 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task949, time=1592281215191)12:20:17.176 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task577, time=1592281217171)12:20:17.176 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - ...12:20:17.176 [pool-1-thread-2] INFO com.itplh.queue.TestDelayQueue - ...12:20:17.176 [pool-1-thread-3] INFO com.itplh.queue.TestDelayQueue - ...12:20:17.176 [pool-1-thread-1] INFO com.itplh.queue.TestDelayQueue - ...12:20:17.176 [pool-1-thread-4] INFO com.itplh.queue.TestDelayQueue - ...12:20:19.178 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task690, time=1592281219072)12:20:19.178 [pool-1-thread-5] INFO com.itplh.queue.TestDelayQueue - 消费了:MyTask(taskId=task703, time=1592281219083)12:20:19.179 [main] INFO com.itplh.queue.TestDelayQueue - 1000