RocksDB的Pipelined寫入

來自專欄基礎架構16 人贊了文章

概述

版本:v5.15.10

之前我們分析了RocksDB的默認寫入方式,而在options可以設置enable_pipelined_write, 即pipelined(流水線)寫入方式,默認的寫入方式中,一個batch的需要完成WAL之後,再完成Memtable的寫入才選出下一個Leader.而Pipelined寫入中不需要等待Memtable寫入完成,即當WAL寫入完成之後,即可選出下一個Leader繼續完成下一個batch的寫入從而達到Pipelined的效果.

具體實現

Pipelined的寫入在默認的寫入方式中進行跳轉,我們直接來看Pipelined的具體實現db/db_impl_write.cc:PipelinedWriteImpl():

Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, bool disable_memtable, uint64_t* seq_used) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);? WriteContext write_context; // 新建一個writer WriteThread::Writer w(write_options, my_batch, callback, log_ref, disable_memtable); // 將新建的writer加入batch隊列中 write_thread_.JoinBatchGroup(&w); // 判斷狀態是否為STATE_GROUP_LEADER, 即一個batch的leader if (w.state == WriteThread::STATE_GROUP_LEADER) { // ... // leader選擇其他writer加入整個batch中 last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group); const SequenceNumber current_sequence = write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; size_t total_count = 0; size_t total_byte_size = 0; // ... // 需要寫入WAL if (w.ShouldWriteToWAL()) { // ... // 寫入WAL w.status = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, need_log_dir_sync, current_sequence); }? // batch的Leader完成WAL的寫入後退出,稍後會介紹該該函數 write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); }? // 假如狀態為STATE_MEMTABLE_WRITER_LEADER, 即作為memtable的batch中的Leader WriteThread::WriteGroup memtable_write_group; if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { PERF_TIMER_GUARD(write_memtable_time); assert(w.status.ok()); // leader選擇其他的writer加入整個batch write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group); if (memtable_write_group.size > 1 && immutable_db_options_.allow_concurrent_memtable_write) { // 假如允許並發寫memtable,leader喚醒其他writer來完成各自的memtable寫入任務 write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); } else { // 否則由memtable的batch中所設置的leader來完成memtable的寫入 memtable_write_group.status = WriteBatchInternal::InsertInto( memtable_write_group, w.sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, seq_per_batch_); versions_->SetLastSequence(memtable_write_group.last_sequence); write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); } } // 假如狀態為STATE_PARALLEL_MEMTABLE_WRITER,即memtale的batch中的writer if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { assert(w.ShouldWriteToMemtable()); ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); // memtable的寫入 w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); // 查看memtable的batch中的所有writers是否已經全部完成了寫入 if (write_thread_.CompleteParallelMemTableWriter(&w)) { // 假如全部完成了寫入,則以memtable的batch的leader身份設置其他的writer狀態, // 並選擇下一個memtabl的batch的leader. MemTableInsertStatusCheck(w.status); versions_->SetLastSequence(w.write_group->last_sequence); write_thread_.ExitAsMemTableWriter(&w, *w.write_group); } } if (seq_used != nullptr) { *seq_used = w.sequence; } assert(w.state == WriteThread::STATE_COMPLETED); return w.FinalStatus();}

來分析ExitAsBatchGroupLeader():

void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Status status) { Writer* leader = write_group.leader; Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr);? // Propagate memtable write error to the whole group. if (status.ok() && !write_group.status.ok()) { status = write_group.status; }? // 假如是Pipelined寫入方式 if (enable_pipelined_write_) { // Notify writers dont write to memtable to exit. for (Writer* w = last_writer; w != leader;) { Writer* next = w->link_older; w->status = status; // 如果無需寫入memtale, 則設置其他的writer狀態為STATE_COMPLETED後退出 if (!w->ShouldWriteToMemtable()) { CompleteFollower(w, write_group); } w = next; } // 假如Leader也無需寫入memtable, 設置自己的狀態為STATE_COMPLETED後退出 if (!leader->ShouldWriteToMemtable()) { CompleteLeader(write_group); }? Writer* next_leader = nullptr;? // 選出下一個batch的leader // 首先判斷等待隊列中是否有新來的writer, 假如沒有,則插入一個dummy(空)的write至 // 等隊列 Writer dummy; Writer* expected = last_writer; bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy); if (!has_dummy) { // 假如插入失敗,即目前等待隊列已經存在新來的writer,則設置下一個batch的leader next_leader = FindNextLeader(expected, last_writer); assert(next_leader != nullptr && next_leader != last_writer); }? // leader判斷batch的數量,假如大於0,則將自己連同其他的writer合併進memtable的 // batch隊列中 if (write_group.size > 0) { if (LinkGroup(write_group, &newest_memtable_writer_)) { // The leader can now be different from current writer. SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER); } }? // 假如dummpy插入成功,即目前不存在新來的writer,則以dummy為基準,選擇下一個batch // 的leader if (has_dummy) { assert(next_leader == nullptr); expected = &dummy; bool has_pending_writer = !newest_writer_.compare_exchange_strong(expected, nullptr); if (has_pending_writer) { next_leader = FindNextLeader(expected, &dummy); assert(next_leader != nullptr && next_leader != &dummy); } }? // 假如下一個batch的leader設置成功 if (next_leader != nullptr) { next_leader->link_older = nullptr; // 將其喚醒,並設置狀態為STATE_GROUP_LEADER SetState(next_leader, STATE_GROUP_LEADER); } // leader等待喚醒 AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &eabgl_ctx); } else { // 默認的寫入方式 // ... }}

一個Pipelined的Bug

官方曾經修復過一個關於Pipelined的Bug: 我們在上面的ExitAsBatchGroupLeader()中可以看到每次batch的完成之後在查找下個Leader之前會插入一個dummy的writer.

至於為什麼要插入這個dummy,我們來梳理一下RocksDB的Pipelined中查找下個Leader的一個關鍵邏輯: Pipelined會從最新的writer回溯至本次batch的last_writer, 將last_writer之後的writer置為下一個batch的leader, 而在某些特別的邊界情況下,last_writer的指針地址與之後的batch中某一個writer地址相同,所以last_writer與其之間的writer都會被忽略,從而導致阻塞.

解決方法就是在中間放置一個dummy, 拋棄之前與last_writer比較的邏輯,改為每次與dummy進行比較.

推薦閱讀:

相關文章