0%

leveldb基本组件源码阅读


leveldb基本组件源码阅读

Slice

Slice是leveldb中的字符串类型,对C风格的字符串和C++的String做了简单封装。为什么要封装呢?

  • 相比于拷贝string,拷贝slice代价很小,只用拷贝头指针和长度
  • 修改的代价也小,同样是只用修改头指针和长度
  • 标准库不支持remove_prefix和starts_with等函数,不太方便

这里要注意的是,头指针指向的字符串并不是在Slice中的构造函数malloc或者new的分配的,可能提前分配好,也有可能创建Slice后交给某个函数分配。因此在使用过程中,要尤其注意某个Slice中的底层数据是否存在。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class LEVELDB_EXPORT Slice {
...

void remove_prefix(size_t n) {
assert(n <= size());
data_ += n;
size_ -= n;
}
// Return true iff "x" is a prefix of "*this"
bool starts_with(const Slice& x) const {
return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0));
}

···

private:
const char* data_;
size_t size_;
}

Arena内存管理

首先Arena是一个内存管理工具,用于MemTable。那么这里,就不禁问一个问题:为什么不直接用malloc/free或者new/delete。

  • 数据库插入单个键值时,在大多数的情况下,一个键值所需要的内存较小,而malloc/free或者new/delete的调用开销较大,如果有大量的小空间分配请求,频繁的调用和分配就很慢。因此一个合理的思想就是:对于小块内存的分配,我一次性分配多一点,之后的分配如果剩余空间够用,就直接从里面取。

这种做法会导致内存碎片(如果第二次分配大于剩余空间,就会重新分配一个块),牺牲了内存利用率,提高了速度。但是在这种应用场景是合理的,数据库插入一个键值在大多数的情况下,所需要的内存较小。

要补充说明的是,Arena只在MemTable(SkipList)中使用,因为其他地方都是大块的分配,就不需要这种优化方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
// 如果当前块空间够,就直接取
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
return AllocateFallback(bytes);
}

char* Arena::AllocateFallback(size_t bytes) {
// 如果分配的内存大于1kb(大块),直接分配一个块
if (bytes > kBlockSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
return result;
}

// 如果分配小于等于1kb,分配一个4kb的块,更新指针
// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;

char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}

SkipList

Data Structure

SkipList本质是一个多层有序链表,在普通链表的基础上增加了多级索引,实现节点的快速跳跃,结构简单,且查询速度较快。

查找某个元素时,从图中的左上角开始,逐渐向右下方移动,无须遍历整个链表就可定位到目标元素的位置。

时间复杂度分析:查找、插入、删除都是O(logn),假设元素个数为n,每两个结点提取一个结点建立索引,那么每往下一层走相当于节点数目减少一半,走到最底层就逼近答案,粗暴的理解,查询时每次减少一半的元素的时间复杂度都是O(log n),比如二叉树的查找、二分法查找、归并排序、快速排序。

下面是leveldb中跳表源码的大致框架,对外开放的接口主要有Insert和Contain。

  • Insert用于向跳表中插入一个Key,本质上是插入一列。
  • Contain用于判断某个key是否存在。

为什么没有删除Key呢?不能有!因为删除的时候需要这个标记把之前的标记无效化,如果你在memtable里面加这个删除操作,那刷盘的时候就没这个key了,那之前的版本就能被读到了。

其他Seek,Valid,Next等操作放置于SkipList的内部Iterator中。在对Memtable进行读时,主要是对Memtable的迭代器进行使用,Memtable的迭代器类型取决于Memtable索引数据结构迭代器的类型,在leveldb中,使用的是跳表,这个是可以更换的;进行写时,调用Insert即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template <typename Key, class Comparator>
class SkipList {
private:
// 一列,包含一个值和若干指针,注意用的是struct,内部第一个成员为key,第二个成员为std::atomic<Node*> next[1],位于末尾是个可扩展数组
struct Node;

public:
explicit SkipList(Comparator cmp, Arena* arena);

void Insert(const Key& key);

bool Contains(const Key& key) const;

class Iterator;

private:
enum { kMaxHeight = 12 };

Comparator const compare_;
Arena* const arena_; // Arena used for allocations of nodes

Node* const head_;

std::atomic<int> max_height_; // Height of the entire list

};

Insert

下面说说跳表是如何实现Insert的。因为跳表是单向链表,所以对于链表的插入应该找到前驱节点,前面说过Insert操作本质上是插入一列,因此我们需要找到每一层Key的前驱节点。Insert中主要分为三步:

  • 找到每一层的前驱节点
  • 确定层数
  • 自底向上按照链表的方式进行插入

首先是通过FindGreaterOrEqual找到每一层的前驱节点,算法很简单,就是对每一层,找到一个prekey,满足prekey < key < prekey->next的位置。搜索起点从跳表的左上角开始,然后逐渐下沉,直到到最后一层,并不需要每一层都从起点开始,当前层的搜索是基于上一层的搜索结果作为起点的。

确定层数是选择大于原层数的一个随机层数,为什么选择随机,目的还不是很清楚。

最后就是插入,这里插入的方式是从最底层到最高层插入,目的是为了防止并发情况下的错误。如果先将NewNode从高层链入,假设此时有一个并发的search操作到来,当该search操作到达这个node之后,它可能会顺着该node向下移动,但此时NewNode下层的next指针还没有设置完,从而导致错误。(参考康巴尔)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// relax方式
max_height_.store(height, std::memory_order_relaxed);
}

x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
// relax方式
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key, Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) prev[level] = x;
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}

Concurrency

在源码中,maxheightnode->next使用了原子变量std::atomic,对它们的读写使用了 std::memory_order_xxx,这些是什么?为什么要用这个?

  • 首先说目的,并发编程下,锁常常是性能瓶颈,当我们想绕开锁时,就必须得使用原子指令,但是原子指令假如要实现顺序一致性并没有想象中的快,所以可以做一些一致性的取舍。leveldb在此处通过了原子操作和内存顺序做到了无锁并发,提高了性能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  // Accessors/mutators for links.  Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_release);
}

// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}

以下说明参考了以下博客:

leveldb中的简单使用如下:

  • 在Insert函数中,有三处中使用的是std::memory_order_relaxedstd::memory_order_relaxed:只保证当前操作的原子性,也就是说在该条命令之后的命令可能被排到该命令之前,会影响线程间的同步,其他线程可能读到新值,也可能读到旧值。其中两处用在Insert函数中新节点的插入和设置,因为新节点的插入是不存在读到旧值的情况,读不到只是意味着还没插入成功,这个结果是可以接受的,所以可以用宽松一点的要求,还有一处用在Insert函数的最大高度设置,如果读到旧值,也就意味着对SkipList的操作不是从最高层开始进行的,下层的节点更多,并不会影响操作的正确性,因此也可以用宽松一点的要求。

  • 剩余的两处用在普通节点的读写(Next和SetNext),用的是Release-Acquire ordering模型,这种模型限制了自己线程内的指令重排store()使用memory_order_release,而load()使用memory_order_acquire。这种模型有三种效果。

    • store()之前的所有读写操作,不允许被移动到这个store()的后面。
    • load()之后的所有读写操作,不允许被移动到这个load()的前面。
    • 假设 Thread-1 store()的那个值,成功被 Thread-2 load()到了,那么 Thread-1 在store()之前对内存的所有写入操作,此时对 Thread-2 来说,都是可见的。

    例如下面例子,B永远不会被重排到A之前,C永远不会被重排到D之和,且C一旦load到之后,B之前的A是对C可见的。

    这样就保证了leveldb中,某个写之前的操作是对某个读操作是可见的,是合法的,对于一些读写操作,顺序不一致问题可以接受,但是接受不了出现一些中间态,比如说,我明明写了,但是还是读不到。这里虽然没有保证顺序一致性,但是提高了性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
std::atomic<bool> ready{ false };
int data = 0;
//thread 1
void producer()
{
data = 100; // A
ready.store(true, std::memory_order_release); // B
}
//thread 2
void consumer()
{
while (!ready.load(std::memory_order_acquire)) // C
;
assert(data == 100); // never failed // D
}
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();

Status

用于记录leveldb中状态信息,保存错误码和对应的字符串错误信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 private:
enum Code {
kOk = 0,
kNotFound = 1,
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5
};

Status::Status(Code code, const Slice& msg, const Slice& msg2) {
assert(code != kOk);
const uint32_t len1 = static_cast<uint32_t>(msg.size());
const uint32_t len2 = static_cast<uint32_t>(msg2.size());
const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
char* result = new char[size + 5];
std::memcpy(result, &size, sizeof(size));
result[4] = static_cast<char>(code);
std::memcpy(result + 5, msg.data(), len);
if (len2) {
result[5 + len1] = ':';
result[6 + len1] = ' ';
std::memcpy(result + 7 + len1, msg2.data(), len2);
}
state_ = result;
}