0%

leveldb源码阅读----读SSTable


leveldb源码阅读—-读SSTable

续上篇博客介绍了leveldb在Get时,会通过Version维护的元数据,具体找到一个SSTable来进行查找。这里就想通过这个Get过程,来介绍一下读SSTable时会用到的Cache组件以及SSTable的组成结构。由于之前只介绍了跳表,因此文章结尾顺便也对MemTable作出了补充说明。

Cache

TableCache

在总的VersionSet类中,维护了一个table_cache_,该变量的作用就是一个缓冲区,将打开的SSTable文件读入缓冲区后进行查找。Version中的Get函数便调用了了该table_cache内部的Get函数。这里设计到的类比较多,就不详细地通过赋源码的方式进行说明,我简单画了个图,来说明各个设计到的类的关系。

下面介绍一下SSTable查找一个key-value过程的调用链。

从table_cache_的Get出发,核心逻辑就是FindTable以及InternalGetFindTable的功能是打开ldb文件,解析出data block以及index block然后加载到内部缓存中,InternalGet则是在内部缓存中完成查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
Status TableCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle);// 确保SSTable已经加载到Cache中
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;// LRUCache中的LRUhandle的value本质类型是TableAndFile
s = t->InternalGet(options, k, arg, handle_result);// 用Table的接口进行查找key-value
cache_->Release(handle);
}
return s;
}

让我们更深一步看FindTable是如何工作的,先是cache_->Lookup,看看想要的文件是否已经加载到缓存中;如果不在,则通过env提供的接口打开文件,然后通过Table::Open解析文件并加载到临时变量中,最后调用cache_->Inser,将解析后的结果载入缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
*handle = cache_->Lookup(key);// 先看看想要的文件是否已经加载到缓存中
if (*handle == nullptr) {
// ldb和sst都是SSTable文件的有效后缀? 那为什么要分两种---只是新版本的源码扩展名为“.ldb”,同时向下兼容,支持老版本的扩展名“.sst”
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = nullptr;
Table* table = nullptr;
s = env_->NewRandomAccessFile(fname, &file); // 打开文件
if (!s.ok()) {
std::string old_fname = SSTTableFileName(dbname_, file_number);
if (env_->NewRandomAccessFile(old_fname, &file).ok()) {
s = Status::OK();
}
}
// 根据file打开文件并解析,加载到table中
if (s.ok()) {
s = Table::Open(options_, file, file_size, &table);
}

if (!s.ok()) {
assert(table == nullptr);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
// key是文件号的编码值
*handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
}
return s;
}

对于env相关函数想在之后单开一篇博客来进行阅读讲解。Table::Open这里不就展开说明,在对SSTable的文件结构了解后应该显而易见。因此这里就只借cache_->Lookup函数和cache_->Insert函数来对SharedLRUCacheLRUCache来进行展开介绍。


SharedLRUCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ShardedLRUCache : public Cache {
private:
LRUCache shard_[kNumShards];
port::Mutex id_mutex_;
uint64_t last_id_;

···

public:
// 根据之前的调用可知,value的类型其实是TableAndFile* ,内部包含了env的文件和一个table
Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) override {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
Handle* Lookup(const Slice& key) override {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}

···
};

SharedLRUCache组成比较简单,它就是一个LRUCache的封装,为什么要进行这样的封装呢?原因很简单,因为levelDB是多线程的,每个线程如果直接访问Cache的时候都会锁住。为了多线程访问,尽可能快速,减少锁开销,对外封装成ShardedLRUCache访问,其内部有16个LRUCache。查找Key时首先计算key属于哪一个分片,需要注意的是,这个key是文件编号,是对Cache中存各个文件的一个key,而不是user想要查找的那个key-value的那个key,分片的计算方法是取32位hash值的高4位,然后在相应的LRUCache中进行查找,这样就大大减少了多线程的访问锁的开销。这种设计思路类似cmu15445中Lab1的Parallel Buffer Pool的思路。


LRUCache

在最开头给了个图,里面说明了LRUCache中的链表每个元素类型是LRUHandle,其实LRUHandle这个类就说明了整个Cache里面存的到底是什么内容, 由LRUCache中Insert函数构造LRUHandle的过程可知,value的类型仍然是TableAndFile*,所以,Cache中存的最核心的东西就是SSTable经过解析后的一整个table(此处仅针对最初的Version中的table_cache_),value设计成void*也是使代码能够有更好的扩展性吧。在没细看代码前,我还认为也许存的是具体table的key-value。在前面对于key-value的具体查找,则交付给Table的内部查找接口完成。其实到这里,就已经把SSTable读key-value的全过程已经说明白了:无非就是先看缓存里面有没有对应的文件,有就直接查,没有就去磁盘中读,解析,然后加载到缓存中,再查

1
2
3
4
5
6
7
8
9
10
Cache::Handle* LRUCache::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key,
void* value)) {
MutexLock l(&mutex_);
LRUHandle* e =
reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + key.size()));
e->value = value;
···
}

最后将LRUCache和LRUHandle补充说明完,LRUCache和LRUHandle源码如下,其实基于LRU的Cache设计思路还是很简单,就不做过多说明了,就是两个链表,也可以是一个链表。其中,有一个点不理解,一个是LRUHandle为什么要有bool in_cache这个变量,有引用计数值不也能有同样的作用?另外,需要注意的是,LRUCache中除了有两个链表,还有一个Table存了所有数据。lru_链表意味着里面元素是可以牺牲的,in_use_链表意味着里面的元素不可牺牲。那Table是拿来干嘛的呢?Table中因为有所有的元素,这里用了一个hash的方法可以快速查找某一个LRUHandle,避免了查找时遍历两个链表的速度缓慢,也是一个优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct LRUHandle {
void* value; // 由LRUCache中Insert函数构造LRUHandle的过程可知,这个value的类型仍然是TableAndFile* ,内部包含了env的文件和一个table
void (*deleter)(const Slice&, void* value);
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
size_t charge; // 占用cache空间数目,目前始终为1
size_t key_length;
bool in_cache; // 表示当前元素是否在cache中,false表示回收内存
uint32_t refs; // 当引用计数为0时就要删除
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
char key_data[1]; // key值,同样是末尾,意味着可变长

Slice key() const {
// next is only equal to this if the LRU handle is the list head of an
// empty list. List heads never have meaningful keys.
assert(next != this);

return Slice(key_data, key_length);
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class LRUCache {
public:
LRUCache();
~LRUCache();

// Separate from constructor so caller can easily make an array of LRUCache
void SetCapacity(size_t capacity) { capacity_ = capacity; }

// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key, void* value));
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
void Prune();
size_t TotalCharge() const {
MutexLock l(&mutex_);
return usage_;
}

private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* list, LRUHandle* e);
void Ref(LRUHandle* e);
void Unref(LRUHandle* e);
bool FinishErase(LRUHandle* e) EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// Initialized before use.
size_t capacity_;//容量 超过这个容量后就要移除旧数据

// mutex_ protects the following state.
mutable port::Mutex mutex_;
size_t usage_ GUARDED_BY(mutex_);//当前cache使用量

// Dummy head of LRU list.
// 稳定状态下 所有元素都存在这个链表中 这个链表中元素refs一定等于1
LRUHandle lru_ GUARDED_BY(mutex_);

// Dummy head of in-use list.
// 当用户查询某条记录时 会将元素从lru_移动到这个链表中 这个链表中元素refs一定大于等于2 当使用完毕后
// refs自减 然后将元素移回到lru_中
LRUHandle in_use_ GUARDED_BY(mutex_);

HandleTable table_ GUARDED_BY(mutex_);/* 元素始终存在hashtable 只有从LRUCache删除时才会吧hashtable中元素删除 *
};

SSTable

上面只是粗略地说将SSTable读到table_cache_中,但没具体说是如何解析.ldb文件的,也没有说在Table中是如何具体查询一个key的。因此下面主要围绕这两点,来展开说明。


Format

.ldb文件即SSTable的文件格式如上,它是一个自解释的文件。自解释文件的关键是什么,就是文件固定位置要有固定的内容,Footer就是位于文件尾一个固定字长(48字节)的一个区域。下面对上图做一个整体的解释:

  • Footer:保存两个BlockHandler(包含两个内容,offset和size),存放Meta Index Block和Index Block的位置信息

  • Index Block:保存了每个Data Block的索引,存的内容实际是一些k-v,每个key大于等于对应的Data Block中最大的key,value为Data Block的Block Handler

  • Meta Index Block:保存Filter Block的索引,存法同上

  • Filter Block:存储该SSTable的过滤器,可以快速判断一个key是否在该SST中

  • Data Block:存储实际的k-v数据

代码中,Index Block、Meta Block和Data Block使用相同的数据结构,Footer和Filter Block有其自己的数据结构。下面具体说他们的数据结构:

Footer的大小固定为48个字节,offset和size的类型是varint64,一个varint64最大是10个字节,所以两个BlockHandler最长40个字节,不够就padding,最后加上8个字节的魔数。

Index Block、Meta Block和Data Block如上图。因为SST中键都是排好序的,所以相邻的键很有可能包含相同的前缀,考虑到这个,Data Block做了优化,采用了前缀压缩,也就是后面一个键只需要记录一个Group中第一个键不同的部分,以及和第一个键相同部分的长度,这样就可以通过第一个键恢复出一个键,节省空间。LevelDB中每16个Kv是一个Group,并且还保存了每个Group的Offset,为什么要存这个Offset,有了这个Offset就可以快速定位每个Group第一个键的值,然后在查找值时,可以比较判断是否属于这个Group,以此为Check函数可以做一个Data Block内部的二分查找。对于Meta Block和Index Block,其内部存储的kv并未采用压缩,只需要设置shared size=0即可,当然,查找过程同样可以使用二分。

Filter Block结构如上图,每个Filter Offset是4个字节,那最后一位是拿来干嘛的呢?主要是用于定位一个DataBlock其自身的filter的值。下面的KeyMayMatch用于判断一个Key是否在某个DataBlock中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bool FilterBlockReader::KeyMayMatch(uint64_t block_offset, const Slice& key) {
// 找到对应DataBlock的filter的Index
uint64_t index = block_offset >> base_lg_;
if (index < num_) {
// 每个offset四个字节
uint32_t start = DecodeFixed32(offset_ + index * 4);
uint32_t limit = DecodeFixed32(offset_ + index * 4 + 4);
if (start <= limit && limit <= static_cast<size_t>(offset_ - data_)) {
Slice filter = Slice(data_ + start, limit - start);
return policy_->KeyMayMatch(key, filter);
} else if (start == limit) {
// Empty filters do not match any keys
return false;
}
}
return true; // Errors are treated as potential matches
}

Analyse .ldb File

当从TableCache中想要Get一个KV时,如果Table没有保存在Cache中,就需要解析对应的.ldb文件,这个逻辑已经在上面说过了。解析过程的代码主要在Table::Open中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
Status Table::Open(const Options& options, RandomAccessFile* file,
uint64_t size, Table** table) {
*table = nullptr;
if (size < Footer::kEncodedLength) {
return Status::Corruption("file is too short to be an sstable");
}

char footer_space[Footer::kEncodedLength];
Slice footer_input;
// 读Footer
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;

Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;

// 读index block
BlockContents index_block_contents;
ReadOptions opt;
if (options.paranoid_checks) {
opt.verify_checksums = true;
}
s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);

if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
// 构建Table
Block* index_block = new Block(index_block_contents);
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = nullptr;
rep->filter = nullptr;
*table = new Table(rep);
// 读MetaBlock
// 根据Footer读出MetaBlock,
// 然后再根据Meta Block读对应FilterPolicy的Filter Block
// 然后读Filter Block并保存到 rep->filter
(*table)->ReadMeta(footer);
}

return s;
}

How to Get

解析完.ldb,就可以支持查找了。主要逻辑于备注中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); // 创建IndexBlock的iter
iiter->Seek(k);// 根据Index Block找到Key在哪个DataBlock中,内部是二分
if (iiter->Valid()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) { // 通过布隆过滤器判断DataBlock中是否有可能包含Key
// Not found
} else {
Iterator* block_iter = BlockReader(this, options, iiter->value());// 创建对应DataBlock的iter
block_iter->Seek(k);// 查找Key,内部是二分
if (block_iter->Valid()) {
(*handle_result)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();// 保存查找结果
delete block_iter;
}
}
if (s.ok()) {
s = iiter->status();
}
delete iiter;
return s;
}

可以看到,关键的函数是Block::Iter的Seek函数,Seek函数的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
void Seek(const Slice& target) override {
// Binary search in restart array to find the last restart point
// with a key < target
// left right指的是在第几个Group
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
int current_key_compare = 0;

// 优化1
// 首先通过当前的record判断目标key所在的record是在当前record的前面(record > target)还是后面(record < target)
if (Valid()) {
// If we're already scanning, use the current position as a starting
// point. This is beneficial if the key we're seeking to is ahead of the
// current position.
current_key_compare = Compare(key_, target);
if (current_key_compare < 0) {
// key_ is smaller than target
left = restart_index_;
} else if (current_key_compare > 0) {
right = restart_index_;
} else {
// We're seeking to the key we're already at.
return;
}
}


while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);// 找到mid所属Group的起点
uint32_t shared, non_shared, value_length;
const char* key_ptr =
DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
&non_shared, &value_length);
if (key_ptr == nullptr || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);// 求出所属Group起点的key的公共部分
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}

// We might be able to use our current position within the restart block.
// This is true if we determined the key we desire is in the current block
// and is after than the current key.

// 优化2
// 假如二分查找到的还原点是此时迭代器所在的那个restart_index_,并且迭代器当前指向的key< target,那么可以从此时迭代器的位置开始向后遍历查找数据,
// 这样快一些,而不是从这个recordGroup的第一个entry开始遍历。
assert(current_key_compare == 0 || Valid());
bool skip_seek = left == restart_index_ && current_key_compare < 0;
if (!skip_seek) {
SeekToRestartPoint(left);
}

// Linear search (within restart block) for first key >= target
// 最后进行线性搜索,直到知道第一个大于等于target的key,其中ParseNextKey起到向后移动一格并且将key_更新为当前key值得作用
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}

现在重新梳理一下leveldb的整个读流程,主要如下:

  • 读Memtable:跳表的查询
  • 读immutable:跳表的查询
  • 读SStable:
    • 根据version元数据找到key属于哪个sst(二分)
    • 判断该sst是否在cache中,不在就从文件读到cache中
    • 然后在cache中查找
    • 先根据index block,找到key在哪个data block中(二分)
    • 利用布隆过滤器判断key是否在data block中
    • 如果可能在,就在data block中继续查(二分)
    • 返回结果。

MemTable

memtable是leveldb的一个存储组件,是一个内存型的数据结构,它需要满足 sortedmap 的基本要求:有序,快速查找。

像STL中的std::set 就属于一个sortedmap ,其底层是用红黑树实现的,但memtable中使用的是跳表进行实现,跳表实现起来简单,当然也可以用户自定义更换,满足接口要求即可。

由于在上篇博客已经详细介绍了SkipList的具体实现,所以这里主要谈两个东西,一个是memtable具体的结构,另一个是编码规则。


Data Structure

四个成员:

  • table_:跳表
  • arena_:内存管理器,本质上是跳表中的Arena
  • ref_:引用计数器
  • comparator_:key比较器

支持的操作:

  • Add:插入键对
  • Get:查询键对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class MemTable {
public:
explicit MemTable(const InternalKeyComparator& comparator);

MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;

//返回Arena中大致内存的使用量,为什么是大致的,因为在并发下使用的是std::memory_order_relaxed
size_t ApproximateMemoryUsage();

void Ref();

void Unref();

Iterator* NewIterator();

void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value);

bool Get(const LookupKey& key, std::string* value, Status* s);

private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;

~MemTable(); // 由于是私有的,所以只有党ref_=0时,才自动析构

typedef SkipList<const char*, KeyComparator> Table;

KeyComparator comparator_;
int refs_; // 引用计数,ref_=0时自动析构
Arena arena_;
Table table_;
};

How to encode

MemTable 中保存的数据是 key 和 value 编码成的一个字符串,由四个部分组成,这里需要说明的是虽然存的是key和value,但是在跳表查询过程只用到了key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
// 这里分配的是key所占的内存
char* buf = arena_.Allocate(encoded_len);
// 编码
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
const char* entry = iter.key();// 这个key不是key,而是key+value的全部数据
uint32_t key_length;
// 解码后比较
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}