0%

leveldb源码阅读----读写流程


leveldb源码阅读—-读写流程

在我看完几个组件的数据结构,感觉对leveldb的脉络把握的还不是很清楚,因此现在阅读源码的方式直接瞄向了用户接口,然后不断扩展,因此这篇博客是想要从leveldb暴露出最基本的读写接口出发,以此为脉络,介绍leveldb数据库的读写过程,并介绍一些相关的组件。


写流程

leveldb中有一个db.h,里面包含一个抽象类,提供作为数据库所需要的具体接口。而leveldb的具体实现则写在db_impl.h中,db_impl类继承于抽象基类db。

对于一个键对存储系统,所提供的写操作只有两类,一类是Put,一类是Delete。在LSM结构中,两个函数接口的实现基本一样,这主要因为LSM结构采用的是仅追加的方式进行写操作,因此对于Delete操作只需要通过Put一个“tombstone”的键对来完成,使用一个“tombstone”来对所有的旧版本进行覆盖。所以,leveldb中的Put和Delete中调用的是同一个函数——Write,只是构建的WriteBatch不同。下面则主要围绕WriteBatch和Write函数进行说明。


WriteBatch

首先谈谈WtriteBatch类,我觉得WriteBatch这个设计的思路特别好。我对WtriteBatch的理解是,它是一个写请求的数据的封装,允许多个Write请求的数据放到一个WriteBatch中,每个写请求的信息全部存储在std::string rep_中。

rep_的结构如下:

  • squence:首先用八个字节(fixed64位)存当前写请求的版本号。
  • count:然后用四个字节(fixed32位)存当前写请求的kv数目。
  • data:然后跟的就是kv数据,kv数据分为两种,两种数据的长度不一样。
    • kTypeValue:表示插入数据,具体数据是:一个字节表示类型 + 变长的key + 变长的value
    • kTypeDeletion:表示删除数据,具体数据是:一个字节表示类型 + 变长的key

为什么这样设计好?因为在并发写的过程中,写的时候需要加锁,每次加锁后如果只写一个kv,那么写的效率会大大地被频繁的加锁解锁所限制,所以为了提高并发度,每次加锁后的写,尽量多写一些,所以通过将多个写请求封装到同一个WriteBatch中,一次性写入MemTable中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class LEVELDB_EXPORT WriteBatch {
public:
WriteBatch();

// Intentionally copyable.
WriteBatch(const WriteBatch&) = default;
WriteBatch& operator=(const WriteBatch&) = default;

~WriteBatch();

void Put(const Slice& key, const Slice& value);

void Delete(const Slice& key);

void Clear();

size_t ApproximateSize() const;

void Append(const WriteBatch& source);

Status Iterate(Handler* handler) const;

private:
friend class WriteBatchInternal; //包含了内部接口

// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue + varstring + varstring |
// kTypeDeletion + varstring
// varstring :=
// len: varint32
// data: uint8[len]
std::string rep_; // See comment in write_batch.cc for the format of rep_
};

Write

Writer是一个写请求,在获得请求队列的锁之后,加到请求队列末尾,然后等待。等待至自己被完成,或者自己到达了队头。

如果自己被完成,则返回完成结果,如果自己在队头并还未开始执行写,则开始写。

怎么写呢?

先执行MakeRoomForWrite,顾明思议,就是保证memtable中有足够空间用来写,这个过程可能会有immutable memtable的落盘,后台的compaction等。

然后执行BuildBatchGroup,该函数所作的是从当前的请求开始,向后遍历请求,将他们包装到同一个WriteBatch中,在包装时,如果遇到sync需要特判,包装的写请求也有做大小限制。

然后写入log,如果sync为true,则立即刷盘。

然后写入MemTable中。

最后把写入的请求从请求队列弹出,并唤醒相关线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
// 如果在队头了就开始执行
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}

// 不是特判,是当前w在别的writebatch整合里面已经被写了
if (w.done) {
return w.status;
}

// May temporarily unlock and wait.
// 整个过程会导致数据库内部的一系列变化,例如:immutable memtable的落盘,后台的compaction等等。最后的结果是active的memtable中应当有足够的位置来容纳接下来的batch write。
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
// 这个函数的目的主要是对队列后的一些请求作整合,有做大小限制,并对sync的请求有单独处理。
WriteBatch* write_batch = BuildBatchGroup(&last_writer); // last_writer会变
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
// 一个writebatch里可能有多个record
last_sequence += WriteBatchInternal::Count(write_batch);

// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
// 写入log
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
// 立即把log刷盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
// 写入memtable
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

读流程

Get

leveldb的读就只有一个Get函数。逻辑很简单,大体就是先读MemTable,读不到就再读Immutable,再读不到就读SSTable。读MemTable就是在Skiplist里面查询,这里就不赘述了,Immutable类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}

MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
// 引用计数
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();

bool have_stat_update = false;
Version::GetStats stats;

// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);

if (mem->Get(lkey, value, &s)) { // 读memtable
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) { //读immutable
// Done
} else {
s = current->Get(options, lkey, value, &stats); // 读sstable
have_stat_update = true;
}
mutex_.Lock();
}

if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}

读SSTable时,调用了一个叫做current的变量,current的类型是Version,在DBImpl中有一个变量叫做versions_,其类型是VersionSetVersionSet就是一个双向循环链表,维护了多个Version,每个Version维护一个DB的版本,如下图,leveldb允许旧版本的存在和访问,current便取自于此,代表最新版本。利用current即可对所有的SSTable进行查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Version{
···
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version

// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];// 二维数组,所有level文件的元数据

// Next file to compact based on seek stats.
FileMetaData* file_to_compact_; // 当文件查找到一定次数时,就需要执行合并操作
int file_to_compact_level_;

// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
double compaction_score_; // 当score>=1时,也需要进行合并
int compaction_level_;
};

struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}

int refs;
int allowed_seeks; // 允许最大的查找次数 --!! SSTable为什么查找一定次数后要compaction。
uint64_t number; // 文件编号
uint64_t file_size;
InternalKey smallest; // 最大值和最小值提高查找效率
InternalKey largest;
};

从Version的Get函数,可以看到查找操作主要用了两个函数,如下。Get中的Match函数,作用是打开相应的SSTable,加载到缓冲中,然后进行查找,有了level和f中的number便可以唯一确定一个SSTable;ForEachOverlapping函数则对查找逻辑进行说明。文件查找SSTable的大体逻辑,就是先从level0里面找,由于level0里面的文件可能有重复键值,因此需要对每个文件做出判断,由于内部维护了元数据(最大key和最小key),我们可以对每个SSTable做出提前筛选,空间换时间,提高了查找效率。如果level0找不到,再依次向下面的level中寻找,这次由于文件之间没有重复键值,所以每个level只用查找一个文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
Status Version::Get(const ReadOptions& options, const LookupKey& k,
std::string* value, GetStats* stats) {
stats->seek_file = nullptr;
stats->seek_file_level = -1;

// 主要是对查询状态的封装,包括查找结果,上一个level是多少,option,key等
struct State{
···
// arg变量本质上是State类型,为什么要写成void*?
// 因为State结构体是在该函数内声明的,在ForEachOverlapping里想用这个结构就不能用,所以用void*类型来表示。
// 该函数的功能就是打开相应的SSTable,加载到缓冲中,然后进行查找。有了level和f中的number便可以唯一确定一个SSTable文件
static bool Match(void* arg, int level, FileMetaData* f);
};

State state;
state.found = false;
state.stats = stats;
state.last_file_read = nullptr;
state.last_file_read_level = -1;

state.options = &options;
state.ikey = k.internal_key();
state.vset = vset_;

state.saver.state = kNotFound;
state.saver.ucmp = vset_->icmp_.user_comparator();
state.saver.user_key = k.user_key();
state.saver.value = value;

ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);

return state.found ? state.s : Status::NotFound(Slice());
}

void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*)) {
const Comparator* ucmp = vset_->icmp_.user_comparator();

// Search level-0 in order from newest to oldest.
// 因为第0层可能有重复元素,所以需要对每个文件做出判断。
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
// 调用Match函数
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}

// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;

// Binary search to find earliest index whose largest key >= internal_key.
// 对其他level只用找一个可能的文件,这里的FindFile内部使用的是二分查找,找到相关文件
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}