LevelDB中有很多的Iterator,這種設計的好處是使用起來很方便。非常符合順序遍歷的習慣與使用方式。 比如在SST文件中經常用到的一個用法是:
LevelDB
Iterator
SST
一種簡單的操作方式可能是直接把這些key/value通過hash_map存起來。但是這裡作者並沒有這麼操作。而是採用了Iterator的設計。 也就是說,利用類似指針的思路來進行操作。
hash_map
class Block { Iterator* NewIterator(const Comparator* comparator); class Iter; };
需要注意的是:
Iterator*
std::vector<>
std::vector<T>::iterator
New
Delete
這個需要使用者自己釋放。使用方式就是
auto iter = Block::NewIterator(cmp); ... delete iter;
在看Iterator的時候,也需要注意與Block的結構圖進行對比。
Block
// 新生成一個Iterator Iterator* Block::NewIterator(const Comparator* cmp) { // 如果block的size_連一個整形數的大小都沒有,那麼肯定是出錯了。 // 也就是說,這裡至少需要4個bytes. // 因為block的TAIL裡面記錄的是number of restarts // 假設所有的restarts都沒有。那麼存放的應該是0 if (size_ < sizeof(uint32_t)) { return NewErrorIterator(Status::Corruption("bad block contents")); } // 取得restarts的數目 // 這裡是如何操作的呢? const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) { return NewEmptyIterator(); } else { return new Iter(cmp, data_, restart_offset_, num_restarts ); } }
那麼,NumRestarts()應該就是去取得相應的num_restarts。
NumRestarts()
num_restarts
inline uint32_t Block::NumRestarts() const { assert(size_ >= sizeof(uint32_t)); return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); }
除此之外,遇到的第二個問題就是restart_offset_這個變數是如何設置的。這個就需要回到Block類的函數設置裡面了。
restart_offset_
Block::Block(const BlockContents& contents) // 設置內存數據區 : data_(contents.data.data()), // 內存數據區的size size_(contents.data.size()), // 是否擁有的這個內存的生命周期 owned_(contents.heap_allocated) { // 如果內存數據區的size小於int32 // 那麼出錯,出錯的設置為size_ = 0 if (size_ < sizeof(uint32_t)) { size_ = 0; // Error marker } else { // 檢查num_restarts的數目 // 如果把整個內存區都用來存放restarts // 這個最多的restart是多少 size_t max_restarts_allowed = (size_-sizeof(uint32_t)) / sizeof(uint32_t); // 如果數目比這種情況還要大,那麼就出錯 if (NumRestarts() > max_restarts_allowed) { // The size is too small for NumRestarts() size_ = 0; } else { // NumRestarts() * sizeof(uint32_t) // 就是整個restarts數據區的大小 // 1就是存放number_restarts的大小 restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t); } } }
總結一下計算restart_offset_的過程:
number_restarts_
restart
sizeof(uint32_t)
size_ - restart*sizeof(uint32_t) - sizeof(uint32_t)
這裡通過查看整個Block的圖也可以看明白。
首先看一下Block::Iter內部的數據結構。通過Iter可以依次遍歷一個Block裡面的key:value。
Block::Iter
Iter
key:value
class Block::Iter : public Iterator { // key比較器 const Comparator* const comparator_; // 指向內存區域 const char* const data_; // underlying block contents // restarts_在內存裡面的超始偏移位置 uint32_t const restarts_; // Offset of restart array (list of fixed32) // restarts的數目 uint32_t const num_restarts_; // Number of uint32_t entries in restart array
// current_ is offset in data_ of current entry. >= restarts_ if !Valid // current_指向的是每個key:value pair,也就是圖中每個record的開頭 uint32_t current_; // 向在指向哪個index了 uint32_t restart_index_; // Index of restart block in which current_ falls // 當前iterator指向的key, 注意這裡的key是完整的, // shared_key + non_shared_key一起 std::string key_; // 當前iterator指向的value Slice value_; // 狀態 Status status_; };
大部分這些數據,並不是在生成Iter的時候去解析內存,那樣的話,會有很多的工作非常重複。所以在構造函數裡面如下:
public: Iter(const Comparator* comparator, const char* data, uint32_t restarts, uint32_t num_restarts) : comparator_(comparator), data_(data), restarts_(restarts), num_restarts_(num_restarts), current_(restarts_), restart_index_(num_restarts_) { assert(num_restarts_ > 0); }
首先看一些最基本的函數。
// valid只會查看當前的offset超過了restarts_ bool Valid() const { return current_ < restarts_; } virtual Status status() const { return status_; } // 拿到iter對應的key virtual Slice key() const { assert(Valid()); return key_; }
// 拿到對應的value. virtual Slice value() const { assert(Valid()); return value_; }
inline int Compare(const Slice& a, const Slice& b) const { return comparator_->Compare(a, b); }
Compare是比較兩個Slice的大小,這裡可以使用用戶自定義的比較器。
Compare
Slice
// Return the offset in data_ just past the end of the current entry. inline uint32_t NextEntryOffset() const { return (value_.data() + value_.size()) - data_; }
實際上,這個函數名字個人覺得取得一點都不好。且聽我說為什麼?直觀意義上是在解如下圖的結構:
+------------------------+ | | | shared_key_length | +------------------------+ | | | non_shared_key_len | | | +------------------------+ | | | value_length | | | | | +------------------------+ | | | | | non_shared_key content| | | | | +------------------------+ <-+------+ value.data() | | | | | | | | | | value content | +----> value.size | | | | | | | | | +---------> +------------------------+ <-+ next record
可以看到value_.data() + value_.size()就是一個內存指針,指向了next entry的起始位置。那麼再減去data_就可以得到next entry的offset。
value_.data() + value_.size()
next entry
data_
offset
但是,事情並不總是這樣。比如SeekToRestartPoint:
SeekToRestartPoint
// 移動到下一個point void SeekToRestartPoint(uint32_t index) { // 因為要移動到一個restart index. // 所以舊有的key不再有用,清除掉 key_.clear(); // 移動restart index restart_index_ = index; // current_ will be fixed by ParseNextKey();
// 拿到restart index對應到數據區的offset // ParseNextKey() starts at the end of value_, so set value_ accordingly uint32_t offset = GetRestartPoint(index); // 注意:此時value裡面只是引用到了restart point // 這個位置的內存,但是value的長度卻是設置為0 value_ = Slice(data_ + offset, 0); }
問題就在於value_ = Slice(data_ + offset, 0);這裡移動到一個restart point之後,並沒有立馬開始解析這個restart point開始的key/val,而是把key/val都清空了。value指針指向了這個起始位置。
value_ = Slice(data_ + offset, 0);
restart point
key/val
value
所以,一但有代碼是如下:
// 移動到index指向的內存位置 SeekToRestartPoint(index); // 注意:這裡拿的並不是下一個entry的offset // 而仍然是前面給的index 對應的offset所在位置。 offset = NextEntryOffset();
這樣的代碼讀起來就會比較奇怪。
key/value
並且,如果在寫代碼的時候,一旦SeekToRestartPoint(),如果沒有立解析key/value,那麼拿到的key/value都是空的。所以如下代碼就是非法的。
SeekToRestartPoint()
iter.SeekToRestartPoint(index); iter.key(); // 拿到空key iter.value(); // 拿到空val
並且其他帶SeekXXX函數名稱裡面,Seek之後都會立馬把key/value解析出來。就這個SeekToRestartPoint()不會。
SeekXXX
Seek
說了那麼多,首先還是看一下如何讀取第一條記錄吧。
virtual void SeekToFirst() { // 移動到第一個restart index SeekToRestartPoint(0); // 立馬解析key/value // 因為SeekToRestartPoint()只會移動指針 // 不會解析
// 解析key/value. ParseNextKey(); }
那麼接下來看一下ParseNextKey是如何實現的。
ParseNextKey
bool ParseNextKey() { // 並不一定是跳過當前的key/value // 如果是SeekToRestartPoint,那麼就是從當前指針位置 // 開始解析。那麼拿以的就是當前位置的offset current_ = NextEntryOffset(); // 拿到應該解析的內存起始位置。 const char* p = data_ + current_; // 不應該超出的內存位置 const char* limit = data_ + restarts_; // Restarts come right after data // 如果超出內存位置 // 這個內存位置就是用restarts_內存所在位置來限制的 if (p >= limit) { // No more entries to return. Mark as invalid. current_ = restarts_; restart_index_ = num_restarts_; return false; }
// Decode next entry uint32_t shared, non_shared, value_length; // 依次解析出三個長度 // 這個在前面一篇文章已經介紹過了 p = DecodeEntry(p, limit, &shared, &non_shared, &value_length); // 出錯處理 if (p == nullptr || key_.size() < shared) { CorruptionError(); return false; } else { // 注意:這裡使用resize,則會在上一個key的 // 基礎上,把non_shared 的部分剪掉 key_.resize(shared); // 然後再追加上non shared的key部分 key_.append(p, non_shared); // 至此,整個key就是完整的,接下來去取完整的value value_ = Slice(p + non_shared, value_length); // 移動完成之後,需要看一下是不是需要移動restart_index while (restart_index_ + 1 < num_restarts_ && GetRestartPoint(restart_index_ + 1) < current_) { ++restart_index_; } return true; } }
這裡大部分代碼都已經有了注釋。那麼這裡只需要重點說一下:
while (restart_index_ + 1 < num_restarts_ && GetRestartPoint(restart_index_ + 1) < current_) { ++restart_index_; }
這裡主要是處理以下情況:
+----------------------------+ <--------+ restart A | | | Record a | | | +----------------------------+ | | | Record b | | | +----------------------------+ | | | Record c | | | +----------------------------+ | | | Record d | | | | | +----------------------------+ <--------+ restart B
restart A
SeekToRestartPoint(index)
current_
while循環肯定是不會成立的。
ParseNextKey()
Record b
while
current
restart B
restart_index_
既然講解清楚了ParseNextKey()函數,那麼Next()函數就一目了然了。
Next()
virtual void Next() { assert(Valid()); ParseNextKey(); }
SeekToLast並不是說直接移動到restart_offset_這裡來。因為這裡並不是最後一個record。所以SeekToLast的含義是移動到最後一個record。
SeekToLast
record
如果用下圖來打比方,就是移動到***位置處,也就是Record d開始位置。
***
Record d
+----------------------------+ <--------+last restart | | | Record a | | | +----------------------------+ | | | Record b | | | +----------------------------+ | | | Record c | | | +----------------------------+ *** | | | Record d | | | | | +----------------------------+ <---- restarts_ virtual void SeekToLast() { // 首先移動到last restart這裡 // last restart = num_restarts_ - 1 SeekToRestartPoint(num_restarts_ - 1); // 移動之後,解析出key/value // 然後再看一下當前這個entry結束的位置是不是小於restarts_ // 偏移處 while (ParseNextKey() && NextEntryOffset() < restarts_) { // Keep skipping } // 最終,NextEntryOffset()得到的值會是指向record d的尾部 // 這個時候也就是== restarts_ // 這個時候跳出循環,那麼也就完成了`SeekToLast`這個目標。 }
在std::prev()裡面,就是拿到當前這個iterator的前向一個Iterator。那麼採用同樣的方式,iter.Prev則是把自己往前移動了。 意思一樣,本質是不一樣的。那麼看一下這個Iter是如何實現的。
std::prev()
iterator
iter.Prev
virtual void Prev() { assert(Valid());
// Scan backwards to a restart point before current_ // 這裡拿到當前record的 offset const uint32_t original = current_; // Seg 1. while (GetRestartPoint(restart_index_) >= original) { if (restart_index_ == 0) { // No more entries current_ = restarts_; restart_index_ = num_restarts_; return; } restart_index_--; } // Seg 2. SeekToRestartPoint(restart_index_); do { // Loop until end of current entry hits the start of original entry } while (ParseNextKey() && NextEntryOffset() < original); }
這裡的代碼比較繞,首先看Seg 1這裡。
Seg 1
// Seg 1. while (GetRestartPoint(restart_index_) >= original) { // 如果restart_index_ == 0 // 本質上相當於SeekToLast(); // 代碼可以改寫成 // if (0 == restart_index_) return SeekToLast(); if (restart_index_ == 0) { // No more entries current_ = restarts_; restart_index_ = num_restarts_; return; } // 這裡什麼時候要--? restart_index_--; }
那麼while循環裡面什麼時候需要restart_index_自減?看下圖:從直覺上來說,當current指向一個restart index的開頭第一個record的時候。而GetRestartPoint(restart_index_) == current_是成立的。所以需要自減。
restart index
GetRestartPoint(restart_index_) == current_
current +-----> +----------------------------+ <--------+ restart index offset | | | Record a | | | +----------------------------+ | | | Record b | | | +----------------------------+ | | | Record c | | | +----------------------------+ | | | Record d | | | | | +----------------------------+
那麼什麼時候會出現GetRestartPoint(restart_index_) > current_的情況呢?個人感覺真實情況應該比較少。
GetRestartPoint(restart_index_) > current_
接著再看Seg 2,這裡就是移動到上一個/當前這個restart的開頭。
Seg 2
Record a
record b/c/d
這就是SeekToRestartPoint(restart_index_);的作用。
SeekToRestartPoint(restart_index_);
// 前面移動到了restart_index_處。並且沒有解析key/value do { // Loop until end of current entry hits the start of original entry } while (ParseNextKey() && NextEntryOffset() < original);
考慮這種情況。current_指向Record d。那麼:
restart index offset
Record
NextEntryOffset() < original
NextEntryOffset() = original
Record c
移動到特定的key的時候。需要注意的是:
key
resart index
record.key
target
virtual void Seek(const Slice& target) { // Binary search in restart array to find the last restart point // with a key < target uint32_t left = 0; uint32_t right = num_restarts_ - 1; while (left < right) { uint32_t mid = (left + right + 1) / 2; uint32_t region_offset = GetRestartPoint(mid); uint32_t shared, non_shared, value_length; const char* key_ptr = DecodeEntry(data_ + region_offset, data_ + restarts_, &shared, &non_shared, &value_length); if (key_ptr == nullptr || (shared != 0)) { CorruptionError(); return; } Slice mid_key(key_ptr, non_shared); if (Compare(mid_key, target) < 0) { // Key at "mid" is smaller than "target". Therefore all // blocks before "mid" are uninteresting. left = mid; } else { // Key at "mid" is >= "target". Therefore all blocks at or // after "mid" are uninteresting. right = mid - 1; } }
// Linear search (within restart block) for first key >= target SeekToRestartPoint(left); while (true) { // 解析key/value出錯 if (!ParseNextKey()) { return; } // 如果發現當前的record.key >= target // 那麼跳出來。 if (Compare(key_, target) >= 0) { return; } } }
推薦閱讀: