通过滑动窗口来优化vector

最近在写一个库,有一个场景是std::vector存储一些对象,每次都是往后面添加,但是内部的元素可能会随机地失效(且不可恢复),这种场合下需要针对性地优化vector:

  1. 尽可能回收这些占坑的无效元素,避免容器支离破碎的碎片
  2. 降低添加过程的响应时间,避免扩充时造成的$O(n)$卡顿(就是要求严格$O(1)$,非均摊$O(1)$)

其中要求一是这个特殊容器的特定条件,要求二是这个容器需要达成的目的,不希望加入两个元素之间有过大的性能差异(这会对其它操作有影响),而扩容是vector的通病

一个场景的模拟

 ---------------------------
|x| | | | |x| | |x|x| | |x| |
 ---------------------------

这是一个容器的内部,其中x就是已经失效但是占坑的元素,甚至扩容也会把这些碎片copy过去,洁癖必须处理

常规做法

不考虑碎片问题的话,就是上来就为vector保留一个较大的capacity(好像是effective系列提到的做法)

但这么做又有问题了,

  1. 我不想有碎片,但为什么上来就造一个可能很大又用不着的capacity,这浪费可能比碎片还大
  2. 这个容器最终的大小无法估计,预留只能简单缓解前N次的扩容
  3. 如果capacity过大可以shrink_to_fit,但shrink_to_fit也是一个相当"卡顿"的操作,而且时机不好把控

常规做法2

用链表取代vector容器,总不会不合理的扩容了

但是碎片处理依然无法下手(见下面GC)

而且我需要连续的一块空间(暂不谈cache友好这种玄学)

GC的思路

不妨以消灭碎片为前提来考虑,假如每次扩容前都check一下有哪些index是已经失效的,搬过去的时候不copy这些失效元素,是否更加合理

这种做法是有一定依据,完全回收了过去失效的碎片,也让下一次扩容的时机延迟了(个数少了,就更少机会触发扩容)

但这又有问题

  1. check碎片怎么check,遍历吗?不也“卡顿”?
  2. 那我记一个表?失效的时候先写个index到里面去?那么要为这个GC思路买单是不是不太划算,首先额外空间是$O(n)$的,其次对设计是侵入式的,接口很别扭,元素自己失效必须要调用这个容器的某个接口,万一元素本身就有一个失效又不会记下映射表的可能?比如unique_ptr.reset()

可行但不够优雅

是不是应该从简单的例子发现点什么优雅的做法

设想

先从简单的场景设想一下,万一碎片是这样子的,是不是很好处理

 ---------------------------
| | | |x|x|x|x|x| | | | | | |
 ---------------------------

就是说碎片总是连续一大块的,既只有一块,也是连续的块

那就简单了,记下一对碎片下标的区间[begin,end],每次分配都先复用它们,维护beginend就好了,低时延,少碎片

进一步为了方便维护,更加理想的碎片分布应该是这样

 ---------------------------
| | | | | | | | | |x|x|x|x|x|
 -----------------·---------
                  begin

这个时候我只维护一个begin就足够了,end永远等于capacity

swap

设想美好,那就尽可能凑成这种样子

假如此时有一个元素随机失效了,像这样

 ---------------------------
| | | |x| | | | | |x|x|x|x|x|
 -----------------·---------
                  begin

那我们可以通过swap的技巧来实现,把begin-1换过去,只要右值成本足够廉价

 ---------------------------
| | | |-| | | | |x|x|x|x|x|x|
 ---------------·-----------
                begin

BUT

这只是理想,根据前面的要求,我没有维护一个表,失效是元素自身的操作,我不知道哪里会有x出现

这个时候需要一定的算法,比如定时check一小部分连续空间,来做到严格$O(1)$的查找,

比如第一次查下标[0,3],第二次查[4,7],就是算法傻了点

因为有可能存在这种情况:查[4,7]时发现6是失效的,那把begin-1下标swap过去吧!

万一begin-1也是随机无效的?那begin-2吧!......

swap这一步就可能陷于一个while,虽然通过各种方法可以凑成一个高效可行的方法(比如顺便把begin更新了、比如也限制固定的操作数、或者牺牲一点无视这种情况),但不够优雅,别问为什么,问就是我试过了

那基于这个做法有什么优雅点的改进

维护窗口

尽可能利用有效信息,在前面的每次查找一小部分连续空间算法中,我们能得到的有效信息是:这一块我检测过了

为了维护信息方便用于下一次的查找,维护一个窗口,内部是一个近似维护检测过的信息,在这里我选择维护已经无效的元素

就是说在窗口$[l,r]$中表示从$l$到$r$这一块的区间总是无效的,只维护一块区间窗口

问题就来了,怎么确保碎片总是聚在一块,和前面的固定次数的遍历又有什么联系

swap和滑动这两个技巧就知道了

  1. 零大小窗口:如果遍历总是找不到一个失效元素,保持$r = l-1$,$l,r$保持向右滑动,仍为未开启状态($r < l$)
  2. 启动窗口:当找到第一个失效元素时$(vec[l] == \phi)$,令$r = l$,表示启动一个大小为1的窗口
  3. 交换:当启动窗口后发现一个有效元素时($vec[r+1]<>\phi$),$swap(vec[l],vec[r+1])$,并整体右移窗口,继续维护$[l,r]$的性质
  4. 结束:当窗口和$begin$可以拼在一块时,直接令$begin = l$,窗口关闭,复位为$[0,-1]$,并重新开始下一边轮询

(就是一个会交换的滑动窗口,很直观吧,关键操作就是第三点)

Code

这里试给出一个不严格的实现

    void updateReusableIndex() {
        auto resetWindow = [this] {
            _window.left = 0;
            _window.right = -1;
        };
        if(_reusableIndex != 0) { // need GC
            for(int _ = 0; _ < step; ++_) {
                if(_window.left > _window.right) { // 未启动
                    if(_container[_window.left] == nullptr) { // 可以启动
                        ++_window.right;
                    } else { // 零大小窗口整体右滑,仍未启动
                        ++_window.left;
                        ++_window.right;
                        if(_window.left == _reusableIndex) { // TODO 没有准确算过
                            resetWindow();
                        }
                    }
                    continue;
                }
                // 直到_right >= _left 说明container[left]是一个nullptr

                // left/right < reuseIndex
                if(_window.right + 1 == _reusableIndex) { // merge, [left,right] + [reuse,size()) 全是nullptr,可回收
                    _reusableIndex = _window.left;
                    resetWindow(); // close window
                } else if(_container[_window.right + 1] == nullptr) {
                    ++_window.right;
                } else { // 没有触发到reuse,但是找到一个可用连接
                    std::swap(_container[++_window.right],
                         _container[_window.left++]); // R+1 和 L交换,且窗口右滑
                }

            }
        }
    }

// 一些成员
    int _reusableIndex; // 第一个可用的index
    struct { int left, right; } _window; // 用于维护回收连接的窗口
    constexpr static int step = 2;

这种做法可以满足:

  1. 尽量回收碎片(非严格,但是窗口永远是合法的)
  2. 尽量延迟了扩容,碎片越多越容易更新begin
  3. 算法严格$O(1)$非均摊,额外空间成本也是$O(1)$
  4. 不需要修改元素的接口,只需要在容器中定义好什么算失效即可

-O3且添加N=10^8次的添加操作,随机失效的测试中,数据为

滑动窗口
30.7963s // 表示处理N=1e8需要的整体时间
0.087155s // 表示添加元素之间最大的时间差

耿直的vector
32.3052s
0.217208s

可以看出这个优化是相当管用的,卡顿是成倍的差距很好理解,因为最卡顿的时候就是扩容,每次都是倍增

这个简单的策略以后会汇总于我的轮子说明文档中

THE END

附录

比较简单的测试用例

#include <bits/stdc++.h>

template <typename T>
struct Vector {

    void ban(int pos) {
        _container[pos].reset();
    }

    void add(const T &val) {
        if(_container.size() > 128) updateReusableIndex();
        auto ptr = std::make_unique<T>(val);
        if(reusable()) {
             int pos = _reusableIndex++;
            _container[pos] = std::move(ptr);
        } else {
            _container.emplace_back(std::move(ptr));
        }
    }

    Vector(int size = 16): _container(size), _reusableIndex(0), _window{0,-1} { }

    bool reusable() { return _reusableIndex != _container.size(); }

    void updateReusableIndex() {
        auto resetWindow = [this] {
            _window.left = 0;
            _window.right = -1;
        };
        if(_reusableIndex != 0) { // need GC
            for(int _ = 0; _ < step; ++_) {
                if(_window.left > _window.right) { // 未启动
                    if(_container[_window.left] == nullptr) { // 可以启动
                        ++_window.right;
                    } else { // 零大小窗口整体右滑,仍未启动
                        ++_window.left;
                        ++_window.right;
                        if(_window.left == _reusableIndex) { // TODO 没有准确算过
                            resetWindow();
                        }
                    }
                    continue;
                }
                // 直到_right >= _left 说明container[left]是一个nullptr

                // left/right < reuseIndex
                if(_window.right + 1 == _reusableIndex) { // merge, [left,right] + [reuse,size()) 全是nullptr,可回收
                    _reusableIndex = _window.left;
                    resetWindow(); // close window
                } else if(_container[_window.right + 1] == nullptr) {
                    ++_window.right;
                } else { // 没有触发到reuse,但是找到一个可用连接
                    std::swap(_container[++_window.right],
                         _container[_window.left++]); // R+1 和 L交换,且窗口右滑
                }

            }
        }
    }
    std::vector<std::unique_ptr<T>> _container;

    int _reusableIndex; // 第一个可用的index
    struct { int left, right; } _window; // 用于维护回收连接的窗口
    constexpr static int step = 2;
};

int main() {
    // Vector<int> v;
    const int N = 1e8;
    auto t1 = clock();
    long m1 = 0, m2 = 0;
    long maxDelta = 0;
    auto counter = [&]() {
        if(m1 == 0) {
            m1 = clock();
        } else {
            m1 = m2;
            m2 = clock();
            maxDelta = std::max(maxDelta, m2 - m1);
        }
    };
    // for(int i = 0; i < N; ++i) {
    //     int j = random();
    //     v.add(j);
    //     if(j &1) {
    //         v.ban(j % v._reusableIndex);
    //     }
    //     counter();
    // }
    std::vector<std::unique_ptr<int>> v(1);
    for(int i = 0; i < N; ++i) {
        int j = random();
        v.emplace_back(std::make_unique<int>(j));
        if(j &1) {
            v[j % v.size()].reset();
        }
        counter();
    }
    auto t2 = clock();
    std::cout << (1.0*t2-t1)/(CLOCKS_PER_SEC) << std::endl;
    std::cout << 1.0*maxDelta/CLOCKS_PER_SEC << std::endl;
    return 0;
}

对了

最近看的书比较晦涩,作者总是喜欢用恰当的比喻来简化理解

那我也试着用一个比喻来说一下这个特殊容器到底是干什么的

想象一下这个容器是一个部门,图里的x是工位,

而工位是必须要存在的,且最好是相邻,但是频繁变动也是常有的事情,甚至拆开也没问题,只要有坑在能找到人就好了

反正找人也是随便找到一个人就行了

至于为什么会随机失效,被裁人没了,工位不就没了么(你说随机,什么随机)

发表评论

邮箱地址不会被公开。 必填项已用*标注