最近在写一个库,有一个场景是std::vector
存储一些对象,每次都是往后面添加,但是内部的元素可能会随机地失效(且不可恢复),这种场合下需要针对性地优化vector:
- 尽可能回收这些占坑的无效元素,避免容器支离破碎的碎片
- 降低添加过程的响应时间,避免扩充时造成的$O(n)$卡顿(就是要求严格$O(1)$,非均摊$O(1)$)
其中要求一是这个特殊容器的特定条件,要求二是这个容器需要达成的目的,不希望加入两个元素之间有过大的性能差异(这会对其它操作有影响),而扩容是vector
的通病
一个场景的模拟
---------------------------
|x| | | | |x| | |x|x| | |x| |
---------------------------
这是一个容器的内部,其中x
就是已经失效但是占坑的元素,甚至扩容也会把这些碎片copy过去,洁癖必须处理
常规做法
不考虑碎片问题的话,就是上来就为vector保留一个较大的capacity
(好像是effective系列提到的做法)
但这么做又有问题了,
- 我不想有碎片,但为什么上来就造一个可能很大又用不着的
capacity
,这浪费可能比碎片还大 - 这个容器最终的大小无法估计,预留只能简单缓解前N次的扩容
- 如果capacity过大可以
shrink_to_fit
,但shrink_to_fit
也是一个相当"卡顿"的操作,而且时机不好把控
常规做法2
用链表取代vector
容器,总不会不合理的扩容了
但是碎片处理依然无法下手(见下面GC)
而且我需要连续的一块空间(暂不谈cache友好这种玄学)
GC的思路
不妨以消灭碎片为前提来考虑,假如每次扩容前都check一下有哪些index是已经失效的,搬过去的时候不copy这些失效元素,是否更加合理
这种做法是有一定依据,完全回收了过去失效的碎片,也让下一次扩容的时机延迟了(个数少了,就更少机会触发扩容)
但这又有问题
- check碎片怎么check,遍历吗?不也“卡顿”?
- 那我记一个表?失效的时候先写个index到里面去?那么要为这个GC思路买单是不是不太划算,首先额外空间是$O(n)$的,其次对设计是侵入式的,接口很别扭,元素自己失效必须要调用这个容器的某个接口,万一元素本身就有一个失效又不会记下映射表的可能?比如
unique_ptr.reset()
?
可行但不够优雅
是不是应该从简单的例子发现点什么优雅的做法
设想
先从简单的场景设想一下,万一碎片是这样子的,是不是很好处理
---------------------------
| | | |x|x|x|x|x| | | | | | |
---------------------------
就是说碎片总是连续一大块的,既只有一块,也是连续的块
那就简单了,记下一对碎片下标的区间[begin,end]
,每次分配都先复用它们,维护begin
和end
就好了,低时延,少碎片
进一步为了方便维护,更加理想的碎片分布应该是这样
---------------------------
| | | | | | | | | |x|x|x|x|x|
-----------------·---------
begin
这个时候我只维护一个begin
就足够了,end
永远等于capacity
swap
设想美好,那就尽可能凑成这种样子
假如此时有一个元素随机失效了,像这样
---------------------------
| | | |x| | | | | |x|x|x|x|x|
-----------------·---------
begin
那我们可以通过swap
的技巧来实现,把begin-1
换过去,只要右值成本足够廉价
---------------------------
| | | |-| | | | |x|x|x|x|x|x|
---------------·-----------
begin
BUT
这只是理想,根据前面的要求,我没有维护一个表,失效是元素自身的操作,我不知道哪里会有x
出现
这个时候需要一定的算法,比如定时check一小部分连续空间,来做到严格$O(1)$的查找,
比如第一次查下标[0,3],第二次查[4,7],就是算法傻了点
因为有可能存在这种情况:查[4,7]时发现6是失效的,那把begin-1
下标swap过去吧!
万一begin-1
也是随机无效的?那begin-2
吧!......
在swap
这一步就可能陷于一个while
,虽然通过各种方法可以凑成一个高效可行的方法(比如顺便把begin更新了、比如也限制固定的操作数、或者牺牲一点无视这种情况),但不够优雅,别问为什么,问就是我试过了
那基于这个做法有什么优雅点的改进
维护窗口
尽可能利用有效信息,在前面的每次查找一小部分连续空间算法中,我们能得到的有效信息是:这一块我检测过了
为了维护信息方便用于下一次的查找,维护一个窗口,内部是一个近似维护检测过的信息,在这里我选择维护已经无效的元素
就是说在窗口$[l,r]$中表示从$l$到$r$这一块的区间总是无效的,只维护一块区间窗口
问题就来了,怎么确保碎片总是聚在一块,和前面的固定次数的遍历又有什么联系
用swap
和滑动这两个技巧就知道了
- 零大小窗口:如果遍历总是找不到一个失效元素,保持$r = l-1$,$l,r$保持向右滑动,仍为未开启状态($r < l$)
- 启动窗口:当找到第一个失效元素时$(vec[l] == \phi)$,令$r = l$,表示启动一个大小为1的窗口
- 交换:当启动窗口后发现一个有效元素时($vec[r+1]<>\phi$),$swap(vec[l],vec[r+1])$,并整体右移窗口,继续维护$[l,r]$的性质
- 结束:当窗口和$begin$可以拼在一块时,直接令$begin = l$,窗口关闭,复位为$[0,-1]$,并重新开始下一边轮询
(就是一个会交换的滑动窗口,很直观吧,关键操作就是第三点)
Code
这里试给出一个不严格的实现
void updateReusableIndex() {
auto resetWindow = [this] {
_window.left = 0;
_window.right = -1;
};
if(_reusableIndex != 0) { // need GC
for(int _ = 0; _ < step; ++_) {
if(_window.left > _window.right) { // 未启动
if(_container[_window.left] == nullptr) { // 可以启动
++_window.right;
} else { // 零大小窗口整体右滑,仍未启动
++_window.left;
++_window.right;
if(_window.left == _reusableIndex) { // TODO 没有准确算过
resetWindow();
}
}
continue;
}
// 直到_right >= _left 说明container[left]是一个nullptr
// left/right < reuseIndex
if(_window.right + 1 == _reusableIndex) { // merge, [left,right] + [reuse,size()) 全是nullptr,可回收
_reusableIndex = _window.left;
resetWindow(); // close window
} else if(_container[_window.right + 1] == nullptr) {
++_window.right;
} else { // 没有触发到reuse,但是找到一个可用连接
std::swap(_container[++_window.right],
_container[_window.left++]); // R+1 和 L交换,且窗口右滑
}
}
}
}
// 一些成员
int _reusableIndex; // 第一个可用的index
struct { int left, right; } _window; // 用于维护回收连接的窗口
constexpr static int step = 2;
这种做法可以满足:
- 尽量回收碎片(非严格,但是窗口永远是合法的)
- 尽量延迟了扩容,碎片越多越容易更新
begin
- 算法严格$O(1)$非均摊,额外空间成本也是$O(1)$
- 不需要修改元素的接口,只需要在容器中定义好什么算失效即可
在-O3
且添加N=10^8
次的添加操作,随机失效的测试中,数据为
滑动窗口
30.7963s // 表示处理N=1e8需要的整体时间
0.087155s // 表示添加元素之间最大的时间差
耿直的vector
32.3052s
0.217208s
可以看出这个优化是相当管用的,卡顿是成倍的差距很好理解,因为最卡顿的时候就是扩容,每次都是倍增
这个简单的策略以后会汇总于我的轮子说明文档中
THE END
附录
比较简单的测试用例
#include <bits/stdc++.h>
template <typename T>
struct Vector {
void ban(int pos) {
_container[pos].reset();
}
void add(const T &val) {
if(_container.size() > 128) updateReusableIndex();
auto ptr = std::make_unique<T>(val);
if(reusable()) {
int pos = _reusableIndex++;
_container[pos] = std::move(ptr);
} else {
_container.emplace_back(std::move(ptr));
}
}
Vector(int size = 16): _container(size), _reusableIndex(0), _window{0,-1} { }
bool reusable() { return _reusableIndex != _container.size(); }
void updateReusableIndex() {
auto resetWindow = [this] {
_window.left = 0;
_window.right = -1;
};
if(_reusableIndex != 0) { // need GC
for(int _ = 0; _ < step; ++_) {
if(_window.left > _window.right) { // 未启动
if(_container[_window.left] == nullptr) { // 可以启动
++_window.right;
} else { // 零大小窗口整体右滑,仍未启动
++_window.left;
++_window.right;
if(_window.left == _reusableIndex) { // TODO 没有准确算过
resetWindow();
}
}
continue;
}
// 直到_right >= _left 说明container[left]是一个nullptr
// left/right < reuseIndex
if(_window.right + 1 == _reusableIndex) { // merge, [left,right] + [reuse,size()) 全是nullptr,可回收
_reusableIndex = _window.left;
resetWindow(); // close window
} else if(_container[_window.right + 1] == nullptr) {
++_window.right;
} else { // 没有触发到reuse,但是找到一个可用连接
std::swap(_container[++_window.right],
_container[_window.left++]); // R+1 和 L交换,且窗口右滑
}
}
}
}
std::vector<std::unique_ptr<T>> _container;
int _reusableIndex; // 第一个可用的index
struct { int left, right; } _window; // 用于维护回收连接的窗口
constexpr static int step = 2;
};
int main() {
// Vector<int> v;
const int N = 1e8;
auto t1 = clock();
long m1 = 0, m2 = 0;
long maxDelta = 0;
auto counter = [&]() {
if(m1 == 0) {
m1 = clock();
} else {
m1 = m2;
m2 = clock();
maxDelta = std::max(maxDelta, m2 - m1);
}
};
// for(int i = 0; i < N; ++i) {
// int j = random();
// v.add(j);
// if(j &1) {
// v.ban(j % v._reusableIndex);
// }
// counter();
// }
std::vector<std::unique_ptr<int>> v(1);
for(int i = 0; i < N; ++i) {
int j = random();
v.emplace_back(std::make_unique<int>(j));
if(j &1) {
v[j % v.size()].reset();
}
counter();
}
auto t2 = clock();
std::cout << (1.0*t2-t1)/(CLOCKS_PER_SEC) << std::endl;
std::cout << 1.0*maxDelta/CLOCKS_PER_SEC << std::endl;
return 0;
}