LevelDB的源碼比RocksDB源碼簡潔很多,通過源碼瞭解這兩款KV存儲引擎最好選擇閱讀LevelDB的源碼,能夠很快上手並從代碼層面瞭解其讀寫等各種機制。
首先看下寫操作的入口代碼:
// Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); return Write(opt, &batch); }
LevelDB寫操作採用批量寫(即使是單條插入)的方式,將待寫入的key-value存放在WriteBatch中;這其實也體現了LevelDB寫操作的原子性。WriteBatch通過std::string類型的_rep存儲數據,數據格式:
Count表示rep_中的Record數量,ValueType有兩種取值:KTypeDeletion(0x00)、KTypeValue(0x01);
key和value以data.size()和data方式存儲,當ValueType取值為KTypeDeletion時,Record中只有key數據;
WriteBatch和Slice是LevelDB中針對特定場景設計的很巧妙的數據結構,詳情可以參見源碼。
下面看下寫操作的源碼,DB是基類,實現類是DBImpl,Write操作對應DBImpl::Write:
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { Writer w(&mutex_); w.batch = my_batch; w.sync = options.sync; w.done = false;
MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; }
// May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence); }
while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; }
// Notify new head of write queue if (!writers_.empty()) { writers_.front()->cv.Signal(); }
return status; }
寫操作邏輯比較清晰,首先構造一個Writer寫入對象實例,該寫入對象會被放入寫入隊列writers中,該寫入隊列是多線程寫入,引入mutex_進行並發控制,當寫入對象成功寫入memtable後,由條件變數喚醒其他排隊寫入的線程進行寫入。
正常情況下,LevelDB會對寫操作進行限制,當條件不滿足時,會通過睡眠或者阻塞來限制寫操作。MakeRoomForWrite方法就是限制寫操作的,看下源碼:
// REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; Status s; while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; } else if ( allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each // individual write by 1ms to reduce latency variance. Also, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock(); env_->SleepForMicroseconds(1000); allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable break; } else if (imm_ != nullptr) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting... "); background_work_finished_signal_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting... "); background_work_finished_signal_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* lfile = nullptr; s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); if (!s.ok()) { // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); break; } delete log_; delete logfile_; logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); imm_ = mem_; has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_); mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); } } return s; }
上述代碼流程:
(1)持有鎖斷言和寫隊列非空斷言;
(2)進入while循環,如果允許延遲寫入(allow_delay==true)並且當前level0的sstable文件數目大於等於配置的kL0SlowdownWritesTrigger數目,解鎖,sleep 1ms,讓出CPU給compact線程進行compact;設置allow_delay=false,加鎖,重新進入while循環。
注意:步驟(2)只進入一次,即只出讓一次CPU給compact線程compact level0的sstable。
即使步驟(2)解鎖,其他線程也不可能寫入,因為本次寫入任務仍然在隊列頭部。
(3)如果mutable memtable還有剩餘空間,跳出循環,函數返回,允許寫入。
(4)如果mutable memtable沒有剩餘空間,並且imutable memtable不為空,說明正在進行imutable memtable flush到 level0 sstable的落盤操作;寫入速度太快,超過了將內存刷新到磁碟的速度,如果持續下去會耗盡內存,需要阻塞等待。
(5)如果mutable memtable沒有剩餘空間,並且imutable memtable為空,但是當前level0的sstable文件數目大於等於配置的kL0_StopWritesTrigger,說明當前level0級別的sstable數目太多了,寫入速度太快,超過了後臺線程compact sstable的速度,如果持續下去,會堆積大量level0 sstable,嚴重影響查詢效率,需要阻塞等待。
(6)如果mutable memtable沒有剩餘空間,並且imutable memtable為空,並且level0級別的sstable也保持在合理的水平。則將DB實例的imutable memtable指針imm_指向mem,並為mem分配新的內存空間,創建新的WAL log文件。同時調用MaybeScheduleCompaction(),觸發compact,其中有一步驟就是將immutable memtable flush到磁碟,將imm_設置為NULL,並通過條件變數通知其他等待寫入的線程。
寫入流程今天先寫到這吧,後面有時間再補充後續流程,太晚了。。。
想了解更多相關知識,請搜索關注微信公眾號 bloomingTony