定时器的简单讨论

轮子所用的定时器方案大概定型了,觉得可以对一下思路,讨论一下逐步扩展的实现

需求

要设计任何一个东西其实要基于需求比较好,虽然本质上我就是想自己造轮子,但是需求能给人一种方向感,以及能说服自己为什么要这么写

  • 功能相对丰富
  • 性能不太拉跨
  • 线程安全
  • 接口友善
  • 代码简洁

功能与接口

接口的友善基于两点:user define literalbuilder模式

在说明前我先介绍在设计之初就构思的两个类:TimerEventTimer

TimerEvent包含时间参数和可调用对象,Timer封装调度用的容器

具体操作就是构造TimerEvent然后丢给Timer去调度

在功能的提供方面,我至少确定了:

  • 什么时候开始(when
  • 调度的内容(what
  • 允许多次(atMost
  • 多次调度的间隔(interval
  • 相同时间下可能的优先级(ticket

user define literal

这个是C++11比较少见的特性,但是用在定时器上是一个非常exciting的搭配

基本解决了怎么传时间参数的痛点

比如你要设计一个runAfter(/*传入时间*/)的接口,一般要怎么设计

是通过一个runAfter(int timeMs)这样的声明

还是构造一堆的class Second / Minute / Hour然后重载多个runAfter(Second / Minute / Hour)再通过runAfter(Second.valueOf(2))来实现

前面用整型语义的问题是不同时间单位难表示,调用方也无法清晰明白是基于什么单位

后面的基本解决了上面提到的两个问题,但是user define literal可以更好地完成

用户自定义字面值只需这样就足够了:runAfter(2s) / runAfter(1h)

并且不需要考虑重载,std::chrono已经帮你搞定了,使用上参数类型选择std::chrono::nanoseconds就可以兼容ns/ms/us/s/min/h

Builder

TimerEvent参数越来越多时自然会考虑一个构造函数的参数顺序问题

现在的TimerEvent大概是这样

struct TimerEvent {
    Timestamp _when;
    Nanosecond _interval;
    uint64_t _atMost;
    uint64_t _ticket;
    Callable _what;
};

当需要一个构造函数时,哪些参数放在前面,哪些放在后面,哪些提供默认可忽略都难以抉择

所以干脆点也用户自行选择

class Timer {
    // ...
    struct TimerHelper {
        Timestamp _when;
        Nanosecond _interval;
        uint64_t _atMost;
        Timer *_thisTimer;

        template <typename ...Args>
        TimerHelper& with(Args &&...args) { 
            _thisTimer->append(_when, 
                Callable::make(std::forward<Args>(args)...), 
                _interval, _atMost); 
            return *this;
        }

        TimerHelper& per(Nanosecond interval) {
            _interval = interval;
            return *this;
        }
        TimerHelper& atMost(uint64_t times) {
            _atMost = times;
            return *this;
        }
        TimerHelper at(Timestamp when) {
            _when = when;
            return *this;
        }

        TimerHelper priority(URGENCY urgency) {
            _ticket = urgency;
            return *this;
        }
    };

    // usage:
    // runAfter(2s).per(2s).atMost(3).with(func, arg0, arg1);
    // runEvery(500ms).at(now()+1s).with([]{std::cerr<<"b";});

    TimerHelper runAt(Timestamp when) 
        { return TimerHelper{when, Millisecond::zero(), 1, this}; }
    TimerHelper runAfter(Nanosecond interval) 
        { return TimerHelper{nowAfter(interval), Millisecond::zero(), 1, this}; }
    TimerHelper runEvery(Nanosecond interval) 
        { return TimerHelper{now(), interval, std::numeric_limits<uint64_t>::max(), this}; }
};

实现在Timer中甚至不需要知道TimerEvent的存在,只需由timer方调用即可

选择的权利-续

Builder是一种将选择留给用户的做法,在定时器的启动函数run()中也需要这种考虑

在最初我的设计是

class Timer {
    // ...
    void run() {
        for(; !_stop; ) {
            // 处理TimerEvent
        }
    }
};

这个问题很明显,一旦run()就必须强占(至少)一个线程

尽管我觉得大部分简单使用还是for完事,但是很明显是否强占线程应该留下余地

class Timer {
    // ...
    void run() {
        // 处理一次到来的TimerEvent
    }
};

// 调用方
int func() {
    // ...
    for(;;) timer.run();
}

更进一步的,应该尽可能提供足够多的信息

class Timer {
    // ...
    Millisecond run() {
        // 处理一次到来的TimerEvent
        // return 距离下一次调度的最近时间差
    }
};

这个会在与epoll搭配时起作用

void loop() {
    // ...
    auto timeout = timer.run();
    epoller.poll(std::max(timeout, 1ms));
}

至少可省下为epoll提供wakeup的操作

性能和简洁

封装

定时器常见的三种算法实现之间的对比并不是讨论重点,我是毫不犹豫使用堆来实现的

原因是堆本身性能足够好,并且STL可以非常简洁的做到这个实现

class Timer {
    // ...
    using EventHeap = std::priority_queue<
            TimerEvent, std::vector<TimerEvent>, std::greater<TimerEvent>>;
    EventHeap _container;
};

容器的封装三行就完成了(其实还需要operator>,不过也很简洁)

实现

接下来讨论run的具体实现

该实现依赖于轮子已有的Callable,可以翻我前面讨论回调接口的文章查看实现

这是一个朴素实现

// 不会阻塞,只会执行到不符合条件的时间就返回
Millisecond run() {
    if(_container.empty()) return Millisecond::zero();
    Timestamp current = now();
    while(!_container.empty()) {
        auto &e = _container.top();
        if(e._when > current) break;
        Defer _ {[this] { _container.pop(); }}; // 避免手残忘记、call抛出异常而失效
        if(e._atMost > 0) e._what.call();
        if(e._atMost > 1) {
            _reenterables.push_back(std::move(const_cast<TimerEvent&>(e)));
            auto &b = _reenterables.back();
            b.next(); // 一个更新when atMost等成员为下一次调度做准备的函数
        }
    }
    for(auto &e : _reenterables) _container.push(std::move(e));
    _reenterables.clear();
    if(_container.empty()) return Millisecond::zero();
    return std::chrono::duration_cast<Millisecond>(_container.top()._when - now());
}

该实现非常浅显易懂,得益于堆的性质,直接把时间按最早到最晚pop对比当前时间,合适的就调用call,并即时更新、抛弃TimerEvent,最后插回原来的堆中

那接下来考虑线程安全问题,最简单上锁就可以了

Millisecond run() {
    std::lock_guard<std::mutex> _ {_mutex};
    if(_container.empty()) return Millisecond::zero();
    Timestamp current = now();
    // 跟前面一样....
}

考虑优化:能不能把持锁的粒度缩小一点?

考虑参考muduo中使用swap来减小锁粒度,可行不?

具体就是:

  1. 实现依然和前面一样不需变更
  2. 当mainThread用到timer时,swap(timer, localTimer)
  3. 调度操作全在localTimer,其它线程的TimerEvnet添加往timer里加

很遗憾,timer最后是需要合并的,因为调度完成后还是会timerlocalTimer依然持有TimerEvent

而STL优先队列并不是快速的可并堆,swap的技巧并不适用

但是我们可以考虑把可能最耗时的call移出到其它线程中

在这里使用传入传出引用来避免不必要的vector构造

Millisecond run(std::vector<TimerEvent> &tasks) {
    std::lock_guard<std::mutex> _ {_mutex};
    Timestamp current = now();
    for(size_t n = tasks.size(); n; --n) {
        auto &task = tasks.back();
        if(task._atMost > 1) {
            task.next();
            _container.emplace(std::move(task));
        }
        tasks.pop_back();
    }
    if(_container.empty()) return Millisecond::zero();
    while(!_container.empty()) {
        auto &task = _container.top();
        if(task._when > current) break;
        tasks.emplace_back(std::move(std::move(const_cast<TimerEvent&>(task))));
        _container.pop();
    }
    if(_container.empty()) return Millisecond::zero();
    return std::chrono::duration_cast<Millisecond>(_container.top()._when - now());
}

考虑优化2:这种情况,真的需要全部push回原来的堆中吗

确实有些不需要,我们杂糅一下链表的做法,使部分需要$O(logn)$的地方降为$O(1)$

并且_reenterables仍是成员变量,就是说capacity是单调递增的,不用每次都分配

Millisecond run(std::vector<TimerEvent> &tasks) {
    std::lock_guard<std::mutex> _ {_mutex};
    Timestamp current = now();
    for(size_t n = tasks.size(); n; --n) {
        auto &task = tasks.back();
        if(task._atMost > 1) {
            task.next();
            if(task._when > current) {
                _container.emplace(std::move(task));
            } else {
                _reenterables.emplace_back(std::move(task));
            }
        }
        tasks.pop_back();
    }
    while(!reenterables.empty()) {
        auto &task = reenterables.back();
        tasks.emplace_back(std::move(task));
        reenterables.pop_back();
    }
    if(_container.empty()) return Millisecond::zero();
    // ...略
}

考虑优化3:reenter的转移是必要的吗

注意到必然为空的性质,swap一波就好,部分$O(n)$降为$O(1)$

Millisecond run(std::vector<TimerEvent> &tasks) {
    std::lock_guard<std::mutex> _ {_mutex};
    Timestamp current = now();
    for(size_t n = tasks.size(); n; --n) {
        auto &task = tasks.back();
        if(task._atMost > 1) {
            task.next();
            if(task._when > current) {
                _container.emplace(std::move(task));
            } else {
                _reenterables.emplace_back(std::move(task));
            }
        }
        tasks.pop_back();
    }
     _reenterables.swap(tasks);
    if(_container.empty()) return Millisecond::zero();
    // ...略
}

杂碎的讨论:优先级

说下ticket实现的优先级

这个优先级是指代相同的时间内,优先处理谁的意思

本来我的想法是让ticket只供内部使用,是用于解决每次相同时间内第$i$个event总是第$j$次调用的问题

// TimerEvent内
bool operator > (const TimerEvent &rhs) const {
    if(_when != rhs._when) return _when > rhs._when;
    return _ticket > rhs._ticket;
}

void next() {
    _when += _interval;
    _atMost--;
    if(!(_atMost & 7)) _ticket = random<uint64_t>(); // shuffle一波
}

可是后来想到,虽然初衷是要只用于解决这种场合

但是第一次调度的优先级是可以通过ticket得到保证的,因此在TimerHelper中开放了prioritysetter

进一步的其实还可以考虑这样扩展优先级的定义:同一次调度中,尽可能地优先处理

也就是说,在单次的while(!container.empty())...调度中,只要优先级够高,就会优先处理

如果期望$n$个这样的TimerEvent的额外调度成本是$O(n)$,有没有一种容易扩展的方法?

其实还是很容易做到的,首先看我们接口最后的定案Millisecond run(std::vector<TimerEvent> &tasks)

如果tasks也是一个priority_queue,并且operator>重载只按照ticket比较,是不是可以了呢?

不过可惜的是这种$n$次插入的复杂度是$O(nlogn)$的,因为严格基于比较的做法的复杂度必然在$O(nlogn)$量级

如果不在乎优先级的精度且仍要求严格优先,可以进行桶排序并记录一个size来及时剪枝

能不能即使是std::vector<TimerEvent> &tasks也可以做到$O(n)$并排好优先级?

给出Hint

  1. 已有数据上直接构造堆是$O(n)$的复杂度
  2. 对堆进行$BFS$尽管不太严格,但能实现高优先级的一半严格优先于低优先级
  3. 最高的优先级永远得到严格保证

杂碎的讨论:定时与调度

Timer的定位永远就只是一个Timer吗,能不能在前面的run实现发现点什么?

也许不太明显,但是可以提示一下muduo的线程安全是如何实现的:把event放到loop线程下

那跟Timer有什么关系?

void loop() {
    for(std::vector<TimerEvent> tasks; !stop;) {
        auto timeout = timer.run(tasks);
        for(auto &&task : tasks) task._what();
        epoller.poll(std::max(timeout, 1ms));
        // ...
    }
}

这下是不是明显了?Timer同样是一个用于调度eventloop线程的工具

也就是说,在这种实现下,不仅完成了一个Timer,还间接做到了runInLoop的功能

THE END

总的来说基本定时器的方案就是这样了,并不需要一个特别复杂实现的实例,只是简单分享一个简洁并且用到一点技巧的实现方案


2021.1 Update.

这好吗?这不好

当我意识到在开发过程中timer更多地应用于线程调度时,我就为它的插入性能感到捉急

对于线程调度而言,大量定时时间为0并且可能会尝试retry,但是在堆中却要经历漫长的push up是不应该的

因此我决定有空再造一个时间轮的轮子,谁不搀它的插入性能呢

发表评论

邮箱地址不会被公开。 必填项已用*标注