前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

WHAT

DelayQueue 是一个无界阻塞队列( BlockingQueue ),用于放置实现了 Delayed 接口的对象,其中的对象只能在其到期时才能从队列中取走。

关于 BlockingQueue 请参考我的博客——BlockingQueue 源码解析(JDK8)

这种队列是有序的,因此队首对象的延迟到期的时间最长。

如果没有任何延迟到期,那么就不会有队首元素,并且 poll() 将返回 null (正因为这样,你不能将 null 放置到这种队列中)。

实践

下面是一个示例,其中的 Delayed 对象自身就是任务,而 DelayedTaskConsumer 将最“紧急”的任务(到期时间最长的任务)从队列中取出,然后运行它。

注意的是这样的 DelayQueue 就成为了优先级队列的一种变体。

package com.shockang.study.java.concurrent.blocking_queue;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

class DelayedTask implements Runnable, Delayed {
	private static int counter = 0;
	private final int id = counter++;
	private final int delta;
	private final long trigger;
	protected static List<DelayedTask> sequence = new ArrayList<>();

	DelayedTask(int delayInMilliseconds) {
		delta = delayInMilliseconds;
		trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
		sequence.add(this);
	}

	@Override
	public long getDelay(TimeUnit unit) {
		return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
	}

	@Override
	public int compareTo(Delayed arg) {
		DelayedTask that = (DelayedTask) arg;
		if (trigger < that.trigger) return -1;
		if (trigger > that.trigger) return 1;
		return 0;
	}

	@Override
	public void run() {
		System.out.print(this + " ");
	}

	@Override
	public String toString() {
		return String.format("[%d] Task %d", delta, id);
	}

	public String summary() {
		return String.format("(%d:%d)", id, delta);
	}

	public static class EndTask extends DelayedTask {
		EndTask(int delay) {
			super(delay);
		}

		@Override
		public void run() {
			sequence.forEach(dt -> System.out.println(dt.summary()));
		}
	}
}

public class DelayQueueDemo {
	public static void
	main(String[] args) throws Exception {
		DelayQueue<DelayedTask> tasks = Stream.concat(
						new Random(47).ints(20, 0, 4000).mapToObj(DelayedTask::new),
						Stream.of(new DelayedTask.EndTask(4000)))
				.collect(Collectors.toCollection(DelayQueue::new));
		while (tasks.size() > 0)
			tasks.take().run();
	}
}

[128] Task 12 [429] Task 6 [551] Task 13 [555] Task 2 [693] Task 3 [809] Task 15 [961] Task 5 [1258] Task 1 [1258] Task 20 [1520] Task 19 [1861] Task 4 [1998] Task 17 [2200] Task 8 [2207] Task 10 [2288] Task 11 [2522] Task 9 [2589] Task 14 [2861] Task 18 [2868] Task 7 [3278] Task 16 (0:4000)
(1:1258)
(2:555)
(3:693)
(4:1861)
(5:961)
(6:429)
(7:2868)
(8:2200)
(9:2522)
(10:2207)
(11:2288)
(12:128)
(13:551)
(14:2589)
(15:809)
(16:3278)
(17:1998)
(18:2861)
(19:1520)
(20:1258)

DelayedTask 包含一个称为 sequence 的 List< DelayedTask>,它保存了任务被创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的

Delay 接口有一个方法, getDelay(),该方法用来告知延迟到期有多长时间,或者延迟在多长时间之前已经到期了。

这个方法强制我们去使用 Timeunit 类,因为这就是参数类型。

这会产生一个非常方便的类,因为你可以很容易地转换单位而无需作任何声明。

例如, delta 的值是以毫秒为单位存储的,但是 System.nanotime ()产生的时间则是以纳秒为单位的。

你可以转换 delta 的值,方法是声明它的单位以及你希望以什么单位来表示,就像下面这样。

NANOSECONDS.convert(delta, MILLISECONDS);

在 getDelay()中,所希望的单位是作为 unit 参数传递进来的,你使用它将当前时间与触发时间之间的差转换为调用者要求的单位,而无需知道这些单位是什么(这是策略设计模式的一个简单示例,在这种模式中,算法的一部分是作为参数传递进来的)。

为了排序, Delayed 接口还继承了 Comparable 接口,因此必须实现 compareTo(),使其可以产生合理的比较。

从输出中可以看到,任务创建的顺序对执行顺序没有任何影响-相反,任务是按照所期望的延迟顺序所执行的。

上一篇 下一篇