Iterator的設計

LevelDB中有很多的Iterator,這種設計的好處是使用起來很方便。非常符合順序遍歷的習慣與使用方式。 比如在SST文件中經常用到的一個用法是:

  • 查看所有的entry/record
  • 移動到下一個條目(即entry/record)

一種簡單的操作方式可能是直接把這些key/value通過hash_map存起來。但是這裡作者並沒有這麼操作。而是採用了Iterator的設計。 也就是說,利用類似指針的思路來進行操作。

聲明

class Block {
Iterator* NewIterator(const Comparator* comparator);
class Iter;
};

需要注意的是:

  • 返回的是Iterator*而不是像std::vector<>那樣直接返回std::vector<T>::iterator,這裡可以看做是不兼容的一方面。
  • 只有New,但是沒有Delete,這個內存釋放是如何處理的呢?

這個需要使用者自己釋放。使用方式就是

auto iter = Block::NewIterator(cmp);
...
delete iter;

在看Iterator的時候,也需要注意與Block的結構圖進行對比。

生成Iterator

// 新生成一個Iterator
Iterator* Block::NewIterator(const Comparator* cmp) {
// 如果block的size_連一個整形數的大小都沒有,那麼肯定是出錯了。
// 也就是說,這裡至少需要4個bytes.
// 因為block的TAIL裡面記錄的是number of restarts
// 假設所有的restarts都沒有。那麼存放的應該是0
if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
// 取得restarts的數目
// 這裡是如何操作的呢?
const uint32_t num_restarts = NumRestarts();

if (num_restarts == 0) {
return NewEmptyIterator();
} else {
return new Iter(cmp,
data_,
restart_offset_,
num_restarts
);
}
}

那麼,NumRestarts()應該就是去取得相應的num_restarts

inline uint32_t Block::NumRestarts() const {
assert(size_ >= sizeof(uint32_t));
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}

除此之外,遇到的第二個問題就是restart_offset_這個變數是如何設置的。這個就需要回到Block類的函數設置裡面了。

Block::Block(const BlockContents& contents)
// 設置內存數據區
: data_(contents.data.data()),
// 內存數據區的size
size_(contents.data.size()),
// 是否擁有的這個內存的生命周期
owned_(contents.heap_allocated) {
// 如果內存數據區的size小於int32
// 那麼出錯,出錯的設置為size_ = 0
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
// 檢查num_restarts的數目
// 如果把整個內存區都用來存放restarts
// 這個最多的restart是多少
size_t max_restarts_allowed = (size_-sizeof(uint32_t)) / sizeof(uint32_t);
// 如果數目比這種情況還要大,那麼就出錯
if (NumRestarts() > max_restarts_allowed) {
// The size is too small for NumRestarts()
size_ = 0;
} else {
// NumRestarts() * sizeof(uint32_t)
// 就是整個restarts數據區的大小
// 1就是存放number_restarts的大小
restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
}
}
}

總結一下計算restart_offset_的過程:

  • 首先取得number_restarts_
  • 然後每個restart長度,以及TAIL長度都是sizeof(uint32_t)
  • 利用size_ - restart*sizeof(uint32_t) - sizeof(uint32_t)就可以得到restart_offset_的位置。

這裡通過查看整個Block的圖也可以看明白。

內部數據

首先看一下Block::Iter內部的數據結構。通過Iter可以依次遍歷一個Block裡面的key:value

class Block::Iter : public Iterator {
// key比較器
const Comparator* const comparator_;
// 指向內存區域
const char* const data_; // underlying block contents
// restarts_在內存裡面的超始偏移位置
uint32_t const restarts_; // Offset of restart array (list of fixed32)
// restarts的數目
uint32_t const num_restarts_; // Number of uint32_t entries in restart array

// current_ is offset in data_ of current entry. >= restarts_ if !Valid
// current_指向的是每個key:value pair,也就是圖中每個record的開頭
uint32_t current_;
// 向在指向哪個index了
uint32_t restart_index_; // Index of restart block in which current_ falls
// 當前iterator指向的key, 注意這裡的key是完整的,
// shared_key + non_shared_key一起
std::string key_;
// 當前iterator指向的value
Slice value_;
// 狀態
Status status_;
};

大部分這些數據,並不是在生成Iter的時候去解析內存,那樣的話,會有很多的工作非常重複。所以在構造函數裡面如下:

public:
Iter(const Comparator* comparator,
const char* data,
uint32_t restarts,
uint32_t num_restarts)
: comparator_(comparator),
data_(data),
restarts_(restarts),
num_restarts_(num_restarts),
current_(restarts_),
restart_index_(num_restarts_) {
assert(num_restarts_ > 0);
}

基礎函數

首先看一些最基本的函數。

// valid只會查看當前的offset超過了restarts_
bool Valid() const { return current_ < restarts_; }
virtual Status status() const { return status_; }
// 拿到iter對應的key
virtual Slice key() const {
assert(Valid());
return key_;
}

// 拿到對應的value.
virtual Slice value() const {
assert(Valid());
return value_;
}

比較器

inline int Compare(const Slice& a, const Slice& b) const {
return comparator_->Compare(a, b);
}

Compare是比較兩個Slice的大小,這裡可以使用用戶自定義的比較器。

奇怪的命名

// Return the offset in data_ just past the end of the current entry.
inline uint32_t NextEntryOffset() const {
return (value_.data() + value_.size()) - data_;
}

實際上,這個函數名字個人覺得取得一點都不好。且聽我說為什麼?直觀意義上是在解如下圖的結構:

+------------------------+
| |
| shared_key_length |
+------------------------+
| |
| non_shared_key_len |
| |
+------------------------+
| |
| value_length |
| |
| |
+------------------------+
| |
| |
| non_shared_key content|
| |
| |
+------------------------+ <-+------+ value.data()
| | |
| | |
| | |
| value content | +----> value.size
| | |
| | |
| | |
+---------> +------------------------+ <-+
next record

可以看到value_.data() + value_.size()就是一個內存指針,指向了next entry的起始位置。那麼再減去data_就可以得到next entryoffset

但是,事情並不總是這樣。比如SeekToRestartPoint:

// 移動到下一個point
void SeekToRestartPoint(uint32_t index) {
// 因為要移動到一個restart index.
// 所以舊有的key不再有用,清除掉
key_.clear();
// 移動restart index
restart_index_ = index;
// current_ will be fixed by ParseNextKey();

// 拿到restart index對應到數據區的offset
// ParseNextKey() starts at the end of value_, so set value_ accordingly
uint32_t offset = GetRestartPoint(index);
// 注意:此時value裡面只是引用到了restart point
// 這個位置的內存,但是value的長度卻是設置為0
value_ = Slice(data_ + offset, 0);
}

問題就在於value_ = Slice(data_ + offset, 0);這裡移動到一個restart point之後,並沒有立馬開始解析這個restart point開始的key/val,而是把key/val都清空了。value指針指向了這個起始位置。

所以,一但有代碼是如下:

// 移動到index指向的內存位置
SeekToRestartPoint(index);
// 注意:這裡拿的並不是下一個entry的offset
// 而仍然是前面給的index 對應的offset所在位置。
offset = NextEntryOffset();

這樣的代碼讀起來就會比較奇怪。

  • SeekToRestartPoint()之後,並立即解析出key/value
  • NextEntryOffset()也並不是去拿到下一個entry的offset,因為有可能當前的value.size是0.根本就不會移動!

並且,如果在寫代碼的時候,一旦SeekToRestartPoint(),如果沒有立解析key/value,那麼拿到的key/value都是空的。所以如下代碼就是非法的。

iter.SeekToRestartPoint(index);
iter.key(); // 拿到空key
iter.value(); // 拿到空val

並且其他帶SeekXXX函數名稱裡面,Seek之後都會立馬把key/value解析出來。就這個SeekToRestartPoint()不會。

第一條記錄

說了那麼多,首先還是看一下如何讀取第一條記錄吧。

virtual void SeekToFirst() {
// 移動到第一個restart index
SeekToRestartPoint(0);
// 立馬解析key/value
// 因為SeekToRestartPoint()只會移動指針
// 不會解析

// 解析key/value.
ParseNextKey();
}

那麼接下來看一下ParseNextKey是如何實現的。

bool ParseNextKey() {
// 並不一定是跳過當前的key/value
// 如果是SeekToRestartPoint,那麼就是從當前指針位置
// 開始解析。那麼拿以的就是當前位置的offset
current_ = NextEntryOffset();
// 拿到應該解析的內存起始位置。
const char* p = data_ + current_;
// 不應該超出的內存位置
const char* limit = data_ + restarts_; // Restarts come right after data
// 如果超出內存位置
// 這個內存位置就是用restarts_內存所在位置來限制的
if (p >= limit) {
// No more entries to return. Mark as invalid.
current_ = restarts_;
restart_index_ = num_restarts_;
return false;
}

// Decode next entry
uint32_t shared, non_shared, value_length;
// 依次解析出三個長度
// 這個在前面一篇文章已經介紹過了
p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
// 出錯處理
if (p == nullptr || key_.size() < shared) {
CorruptionError();
return false;
} else {
// 注意:這裡使用resize,則會在上一個key的
// 基礎上,把non_shared 的部分剪掉
key_.resize(shared);
// 然後再追加上non shared的key部分
key_.append(p, non_shared);
// 至此,整個key就是完整的,接下來去取完整的value
value_ = Slice(p + non_shared, value_length);
// 移動完成之後,需要看一下是不是需要移動restart_index
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}
return true;
}
}

這裡大部分代碼都已經有了注釋。那麼這裡只需要重點說一下:

while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}

這裡主要是處理以下情況:

+----------------------------+ <--------+ restart A
| |
| Record a |
| |
+----------------------------+
| |
| Record b |
| |
+----------------------------+
| |
| Record c |
| |
+----------------------------+
| |
| Record d |
| |
| |
+----------------------------+ <--------+ restart B

  • 假設現在指針剛好指向restart A這個位置。出現這種情況,其中一種原因就是SeekToRestartPoint(index)調用之後,current_也會指向restart A。那麼while循環肯定是不會成立的。
  • 假設再調用一次ParseNextKey(),那麼current_就移動到了Record b的開頭位置。此時while也不會成立。
  • 直到current移動到了restart B,此時也把key/value解析出來了。那麼while成立,這個時候就需要移動restart_index_了。

既然講解清楚了ParseNextKey()函數,那麼Next()函數就一目了然了。

virtual void Next() {
assert(Valid());
ParseNextKey();
}

最後一條記錄

SeekToLast並不是說直接移動到restart_offset_這裡來。因為這裡並不是最後一個record。所以SeekToLast的含義是移動到最後一個record

如果用下圖來打比方,就是移動到***位置處,也就是Record d開始位置。

+----------------------------+ <--------+last restart
| |
| Record a |
| |
+----------------------------+
| |
| Record b |
| |
+----------------------------+
| |
| Record c |
| |
+----------------------------+ ***
| |
| Record d |
| |
| |
+----------------------------+ <---- restarts_
virtual void SeekToLast() {
// 首先移動到last restart這裡
// last restart = num_restarts_ - 1
SeekToRestartPoint(num_restarts_ - 1);
// 移動之後,解析出key/value
// 然後再看一下當前這個entry結束的位置是不是小於restarts_
// 偏移處
while (ParseNextKey() && NextEntryOffset() < restarts_) {
// Keep skipping
}
// 最終,NextEntryOffset()得到的值會是指向record d的尾部
// 這個時候也就是== restarts_
// 這個時候跳出循環,那麼也就完成了`SeekToLast`這個目標。
}

向前移一個Record

std::prev()裡面,就是拿到當前這個iterator的前向一個Iterator。那麼採用同樣的方式,iter.Prev則是把自己往前移動了。 意思一樣,本質是不一樣的。那麼看一下這個Iter是如何實現的。

virtual void Prev() {
assert(Valid());

// Scan backwards to a restart point before current_
// 這裡拿到當前record的 offset
const uint32_t original = current_;
// Seg 1.
while (GetRestartPoint(restart_index_) >= original) {
if (restart_index_ == 0) {
// No more entries
current_ = restarts_;
restart_index_ = num_restarts_;
return;
}
restart_index_--;
}
// Seg 2.
SeekToRestartPoint(restart_index_);
do {
// Loop until end of current entry hits the start of original entry
} while (ParseNextKey() && NextEntryOffset() < original);
}

這裡的代碼比較繞,首先看Seg 1這裡。

// Seg 1.
while (GetRestartPoint(restart_index_) >= original) {
// 如果restart_index_ == 0
// 本質上相當於SeekToLast();
// 代碼可以改寫成
// if (0 == restart_index_) return SeekToLast();
if (restart_index_ == 0) {
// No more entries
current_ = restarts_;
restart_index_ = num_restarts_;
return;
}
// 這裡什麼時候要--?
restart_index_--;
}

那麼while循環裡面什麼時候需要restart_index_自減?看下圖:從直覺上來說,當current指向一個restart index的開頭第一個record的時候。而GetRestartPoint(restart_index_) == current_是成立的。所以需要自減。

current
+-----> +----------------------------+ <--------+ restart index offset
| |
| Record a |
| |
+----------------------------+
| |
| Record b |
| |
+----------------------------+
| |
| Record c |
| |
+----------------------------+
| |
| Record d |
| |
| |
+----------------------------+

那麼什麼時候會出現GetRestartPoint(restart_index_) > current_的情況呢?個人感覺真實情況應該比較少。

接著再看Seg 2,這裡就是移動到上一個/當前這個restart的開頭。

  • 如果current_指向restart的開頭,比如Record a,那麼就需要移動到上一個restart index
  • 如果current_指向record b/c/d,那麼就只需要移動到開頭處.

這就是SeekToRestartPoint(restart_index_);的作用。

// 前面移動到了restart_index_處。並且沒有解析key/value
do {
// Loop until end of current entry hits the start of original entry
} while (ParseNextKey() && NextEntryOffset() < original);

考慮這種情況。current_指向Record d。那麼:

  • SeekToRestartPoint(restart_index_);移動到restart index offset指向的內存位置。
  • ParseNextKey()從這個內存位置開始解析。得到Record a。很明顯Record a並不是Record d的上一個Record。即NextEntryOffset() < original成立。那麼繼續移動。
  • 直到NextEntryOffset() = original成立,也就是得到Record c。這個時候就跳出循環。

移動到特定的key

移動到特定的key的時候。需要注意的是:

  • 只會利用每個restart index開頭的那個record來進行二分。
  • 每個開頭位置的key都是完整的。不需要拼接。可以直接用來比較。
  • 二分找的是restart index
  • 找到restart index之後,移動到這個resart index。然後取出裡面的record,直接record.key的值大於等於target.

virtual void Seek(const Slice& target) {
// Binary search in restart array to find the last restart point
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr = DecodeEntry(data_ + region_offset,
data_ + restarts_,
&shared, &non_shared, &value_length);
if (key_ptr == nullptr || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}

// Linear search (within restart block) for first key >= target
SeekToRestartPoint(left);
while (true) {
// 解析key/value出錯
if (!ParseNextKey()) {
return;
}
// 如果發現當前的record.key >= target
// 那麼跳出來。
if (Compare(key_, target) >= 0) {
return;
}
}
}

推薦閱讀:

相关文章