LevelDB的源碼比RocksDB源碼簡潔很多,通過源碼瞭解這兩款KV存儲引擎最好選擇閱讀LevelDB的源碼,能夠很快上手並從代碼層面瞭解其讀寫等各種機制。

首先看下寫操作的入口代碼:

// Default implementations of convenience methods that subclasses of DB
// can call if they wish

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

LevelDB寫操作採用批量寫(即使是單條插入)的方式,將待寫入的key-value存放在WriteBatch中;這其實也體現了LevelDB寫操作的原子性。WriteBatch通過std::string類型的_rep存儲數據,數據格式:

Count表示rep_中的Record數量,ValueType有兩種取值:KTypeDeletion(0x00)、KTypeValue(0x01);

key和value以data.size()和data方式存儲,當ValueType取值為KTypeDeletion時,Record中只有key數據;

WriteBatch和Slice是LevelDB中針對特定場景設計的很巧妙的數據結構,詳情可以參見源碼。

下面看下寫操作的源碼,DB是基類,實現類是DBImpl,Write操作對應DBImpl::Write:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}

// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);

// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

寫操作邏輯比較清晰,首先構造一個Writer寫入對象實例,該寫入對象會被放入寫入隊列writers中,該寫入隊列是多線程寫入,引入mutex_進行並發控制,當寫入對象成功寫入memtable後,由條件變數喚醒其他排隊寫入的線程進行寫入。

正常情況下,LevelDB會對寫操作進行限制,當條件不滿足時,會通過睡眠或者阻塞來限制寫操作。MakeRoomForWrite方法就是限制寫操作的,看下源碼:

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...
");
background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...
");
background_work_finished_signal_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}

上述代碼流程:

(1)持有鎖斷言和寫隊列非空斷言;

(2)進入while循環,如果允許延遲寫入(allow_delay==true)並且當前level0的sstable文件數目大於等於配置的kL0SlowdownWritesTrigger數目,解鎖,sleep 1ms,讓出CPU給compact線程進行compact;設置allow_delay=false,加鎖,重新進入while循環。

注意:步驟(2)只進入一次,即只出讓一次CPU給compact線程compact level0的sstable。

即使步驟(2)解鎖,其他線程也不可能寫入,因為本次寫入任務仍然在隊列頭部。

(3)如果mutable memtable還有剩餘空間,跳出循環,函數返回,允許寫入。

(4)如果mutable memtable沒有剩餘空間,並且imutable memtable不為空,說明正在進行imutable memtable flush到 level0 sstable的落盤操作;寫入速度太快,超過了將內存刷新到磁碟的速度,如果持續下去會耗盡內存,需要阻塞等待。

(5)如果mutable memtable沒有剩餘空間,並且imutable memtable為空,但是當前level0的sstable文件數目大於等於配置的kL0_StopWritesTrigger,說明當前level0級別的sstable數目太多了,寫入速度太快,超過了後臺線程compact sstable的速度,如果持續下去,會堆積大量level0 sstable,嚴重影響查詢效率,需要阻塞等待。

(6)如果mutable memtable沒有剩餘空間,並且imutable memtable為空,並且level0級別的sstable也保持在合理的水平。則將DB實例的imutable memtable指針imm_指向mem,並為mem分配新的內存空間,創建新的WAL log文件。同時調用MaybeScheduleCompaction(),觸發compact,其中有一步驟就是將immutable memtable flush到磁碟,將imm_設置為NULL,並通過條件變數通知其他等待寫入的線程。

寫入流程今天先寫到這吧,後面有時間再補充後續流程,太晚了。。。

想了解更多相關知識,請搜索關注微信公眾號 bloomingTony


推薦閱讀:
相關文章