自学内容网 自学内容网

多层时间轮实现延迟消息

单层时间轮的弊端

对于单层时间轮来说,如果要拉长延迟的时间,要么增加时间轮中槽的数量,要么增大前进指针的时间间隔。但是这两种方式都有弊端

  1. 增加槽的数量:占用内存较高
  2. 增大前进指针的时间间隔:延迟时间不精确
    为了解决上面的问题,我们采用钟表中的秒针、分针、时针一样,通过多层时间轮来挂载延迟任务,这样就可以用少量的内存实现超长的延迟时间。

多层时间轮的结构

时间槽:每个时间轮包含若干时间槽,每个槽代表一个时间片段。
时间轮:多层时间轮通常包含多个时间轮,每一层的精度和范围不同。例如,第一层(细粒度)可以管理毫秒级任务,第二层(中等粒度)可以管理秒级任务,第三层(粗粒度)可以管理分钟级任务。
指针:每个时间轮都有一个指针,用于指向当前时间槽,随着时间的推移指针向前移动。
任务链表:每个时间槽中包含一个任务链表,用于存储定时任务。

代码实战

任务

public class TimerTask implements Runnable{


    private long delayTime;

    private Runnable task;


    public TimerTask(long delayTime, Runnable task) {
        this.delayTime = delayTime;
        this.task = task;
    }

    public long getDelayTime() {
        return delayTime;
    }

    public Runnable getTask() {
        return task;
    }

    @Override
    public void run() {
        task.run();
    }
}

时间轮

public class TimeWheel {

    private int slots;

    private long interval;
    private int currentSlot = 0;
    private List<List<TimerTask>> wheel = new ArrayList<>();


    public TimeWheel(int slots, long interval) {
        this.slots = slots;
        this.interval = interval;
        for (int i = 0; i < slots; i++) {
            wheel.add(new ArrayList<>());
        }
    }

    public int getCurrentSlot() {
        return currentSlot;
    }

    /**
     *
     * @param task
     */
    public void addTask(TimerTask task) {
        //计算置放槽位
        long delayTime = task.getDelayTime() > interval * slots ? task.getDelayTime() - interval * slots : task.getDelayTime();
        int slot = (int) ((delayTime / interval) + currentSlot) % slots;
        //添加任务
        wheel.get(slot).add(task);
    }

    public List<TimerTask> advance() {
        //前进一个槽位
        currentSlot = (currentSlot + 1) % slots;
        //取任务
        List<TimerTask> tasks = new ArrayList<>(wheel.get(currentSlot));
        wheel.get(currentSlot).clear();
        return tasks;

    }
}

多层时间轮

public class MultiLevelTimeWheel {


    TimeWheel smallWheel = new TimeWheel(100, 100);//100个槽,每个槽间隔100ms
    TimeWheel middleWheel = new TimeWheel(10, 10000);//10个槽,每个槽间隔10s(小时间轮的一个周期)
    TimeWheel largeWheel = new TimeWheel(10, 100000);//10个槽,每个槽间隔100s(中时间轮的一个周期)


    public void addTask(TimerTask task) {
        long delayTime = task.getDelayTime();
        if (delayTime < 10000) {
            smallWheel.addTask(task);
        } else if (delayTime < 100000) {
            middleWheel.addTask(task);
        } else if (delayTime < 1000000) {
            largeWheel.addTask(task);
        } else {
            System.out.println("不支持这么大的延迟");
        }
    }

    public void advance() {
        List<TimerTask> tasks = smallWheel.advance();
        for (TimerTask task : tasks) {
            task.run();
        }
        //小时间轮转一圈,需要让中时间轮前进一格,并将对应的任务下放至小时间轮的零刻度,待小时间轮再转一整圈时处理
        if (smallWheel.getCurrentSlot() == 0) {
            List<TimerTask> middleTasks = middleWheel.advance();
            for (TimerTask middleTask : middleTasks) {
                smallWheel.addTask(middleTask);
            }
        }
        //中时间轮转一圈,需要让大时间轮前进一格,并将对应的任务下放至中时间轮的零刻度
        if (smallWheel.getCurrentSlot() == 0 && middleWheel.getCurrentSlot() == 0) {
            List<TimerTask> largeTasks = largeWheel.advance();
            for (TimerTask largeTask : largeTasks) {
                middleWheel.addTask(largeTask);
            }
        }
    }
}

运行

public static void main(String[] args) {
        long start = System.currentTimeMillis();
        TimerTask timerTask1 = new TimerTask(3000, () -> System.out.println("延迟毫秒数:" + (System.currentTimeMillis() - start)));
        TimerTask timerTask2 = new TimerTask(33000, () -> System.out.println("延迟毫秒数:" + (System.currentTimeMillis() - start)));
        TimerTask timerTask3 = new TimerTask(333000, () -> System.out.println("延迟毫秒数:" + (System.currentTimeMillis() - start)));

        MultiLevelTimeWheel multiLevelTimeWheel = new MultiLevelTimeWheel();
        multiLevelTimeWheel.addTask(timerTask1);
        multiLevelTimeWheel.addTask(timerTask2);
        multiLevelTimeWheel.addTask(timerTask3);

        while (true) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            multiLevelTimeWheel.advance();
        }
    }

这里的代码实现只是一个demo,主要是为了了解其中的思想。


原文地址:https://blog.csdn.net/u013978512/article/details/142534665

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!