0%

leveldb源码阅读----WAL和Compaction


leveldb源码阅读4——WAL和Compaction

前面已经把leveldb的读流程已经说得非常清楚了,接下来我便想从写流程来阅读leveldb的源码,这其中会涉及到,WAL的使用,文件的合并等。


WAL

log文件的目的主要是为了系统发生故障时用于恢复。leveldb中log文件是在写Memtable之前进行写入的,主要调用了Writer::AddRecord函数。一个Writer绑定一个日志文件。每个日志文件由多个32KB的block组成,每个block有多条不定长的record组成,如下图:

每条Record的格式为:CRC(4)+Length(2)+Type(1)+Record Content,如果Block末尾不够七个字节,则会padding。在黑皮书《数据库系统实现》中,也说到,对于这种比较长的记录,在一个块中装不下,通常就会设计一个Type表示它是开始中间或者是结尾,这里也是如此。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class Writer {
public:
···
Status AddRecord(const Slice& slice);

private:
Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);

WritableFile* dest_; // 日志文件
int block_offset_; // 写到哪了
uint32_t type_crc_[kMaxRecordType + 1];// 用于计算CRC
};

Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();

// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) { // 不够七个字节
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}

// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;// 除去头部还有多少空间
const size_t fragment_length = (left < avail) ? left : avail;// 能装多少就装多少

RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;// 当前Log Block里的空间足以容纳写入的数据
} else if (begin) {
type = kFirstType;// 当剩余空间不够,且刚开始装数据,则当前record为第一个
} else if (end) {
type = kLastType;// 当剩余空间够,且不是第一次装数据了,则当前record为末尾
} else {
type = kMiddleType;
}

s = EmitPhysicalRecord(type, ptr, fragment_length);// 写入文件
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}

Compaction

leveldb在写过程,写入Memtable之前,也需要保证Memtable中有足够的位置来容纳WriteBatch的数据,因此leveldb中封装了一个函数MakeRoomForWrite来完成腾出空间这一操作,这会导致数据库内部的一系列变化,例如:immutable memtable的落盘,后台的compaction等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
// 后台出现错误
if (!bg_error_.ok()) {
s = bg_error_;
break;
// 如果level0有太多文件,则睡眠一会儿等待合并再执行一会儿
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
// 有足够空间
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
// imm还没彻底落盘,需要等待
} else if (imm_ != nullptr) {
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
// level0还是有太多文件,则等待合并完成
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
} else {
// Memtable满了,则创建新的log文件和新的Memtable,并将原Memtable标记为imm,并创建一个后台线程开始尝试compaction
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}

尝试合并的过程,就是判断当前是否需要compaction(判断是否已经有compaction线程,系统是否有错,系统是否已经关闭,某些level是否需要合并),如果需要就开一个后台线程进行compaction,compaction完还需要再次尝试合并一下。这些逻辑的判断,就不展示代码了 ,这里主要说明是如何进行合并的。合并的函数主要在void DBImpl::BackgroundCompaction()中。leveldb中,compaction分为2大类:minor compaction,major compaction。前者是将memdb中的数据落入sst文件,后者则是将sst文件进行归并,提高文件的检索效率。

因为牵扯的类和函数的代码体量太大了,下面就重点说一下思路:

在BackgroundCompaction()中主要完成了如下几件事:

  • 先尝试minor compaction
  • 如果有手动compaction,则手动compaction
  • 如果没有,则尝试major compaction
    • 如果这些合并的文件不需要合并,直接移动下层就行,那就直接移动
    • 合并
  • 删除无用文件

minor compaction

minor compaction主要完成两件事:

  • Build一个Table并写入SST中

  • 选择将SST落入第几层

    • 在策略上,尽量要将新的compact的文件推到高level,因为在level 0 如果控制文件过多,compaction IO和查找都比较耗费。

      另一方面也不能推至过高level,控制查找该文件中的kv的查找次数。

      如果新生成的sstable和Level 0的sstable有重叠,新产生的sstable就直接加入level 0;如果与level1不重叠,且与level2重叠得不多,则加入level1;如果与level2不重叠,且与level3重叠得不多,则加入level2。最高推到Level2。

      为什么有这个重叠得不多?在compaction时,避免与下层合并时的文件过大过多。


major compaction

major compaction进行的条件有两个:

  • Size Compaction:文件数目太多或者某一层级文件总大小过大。
  • Seek Compaction:某个文件seek次数太多。为什么seek次数太多也要合并?LevelDB认为如果一个 sst 文件在 level i 中总是miss seek,而是在 level i+1 中seek success,那么就认为该文件应该在level i+1中,否则每次都会到该文件查一下又查不到。

显而易见,Size Compaction优先级更高,应该先执行,因为这是LSM结构的核心逻辑。

Seek Compaction在选择文件时,就单单针对那些Seek次数过多的文件。

Size Compaction则需要对一整个level进行合并,按key顺序进行合并,直到Size小于阈值。由于level0的特殊性,故分为以下两种:

  • Level0 –> Level1
    1)选择一个level0文件,考虑上一次compaction做到了哪个key,然后大于该key的第一个文件即为level 0的所选文件。
    2)再找到一个和该level0有重复key的level1文件。
    3)再查找出所有和这个level1文件有重复key的level0文件。
    4)将所有level0文件和level1文件合并成一个新的level1文件。

  • Level N –> Level N +1
    1)选择一个 Level N的文件,考虑上一次compaction做到了哪个key,然后大于该key的第一个文件即为level N的所选文件。
    2)查找所有和该文件由重复key的Level N +1的文件。
    3)compaction,生成新的Level N + 1的文件。