Certain代碼比paxoskv複雜度要高不少,需要更多耐心才能讀懂。Certain實現的例子依然是KV形式的,但是Certain本身是為普通的PaxosLog 和DB的實現設計的。它的主要優化點包含批量提交,一致性讀優化,無複雜Paxos狀態機實現等。相對來說,Certain會更加完整,除了PLog as DB這個點沒辦法實施外,論文的其他細節更多可以體現出來。paxosstore論文上的流程,論文上的一些其他細節,都可以在Certain更直接對應出來。

按照慣例,首先看目錄結構與一些基礎結構。

├── build.sh
├── example
│ ├── BenchTool.cpp
│ ├── CardTool.cpp 客戶端測試工具
│ ├── CertainUserImpl.cpp
│ ├── CertainUserImpl.h 服務配置,如角色,服務地址等信息
│ ├── Client.cpp
│ ├── Client.h grpc的客戶端
│ ├── Coding.cpp
│ ├── Coding.h 編碼到leveldb的方式
│ ├── CoHashLock.cpp
│ ├── CoHashLock.h
│ ├── DBImpl.cpp
│ ├── DBImpl.h 例子對於clsDBBase描象類的實現, 為數據落地的實現
│ ├── DBTool.cpp
│ ├── DBType.h
│ ├── example.conf
│ ├── example.proto
│ ├── GrpcHelper.h grpc初始化註冊,各個類的組合。Grpc提供給客戶端介面
│ ├── PLogImpl.cpp
│ ├── PLogImpl.h 例子對於Certain::clsPLogBase描象類的實現,PLog落地的實現
│ ├── RWStressTool.cpp
│ ├── Server.cpp main入口, 其中會啟動50 UserWorker線程
│ ├── ServiceImpl.cpp
│ ├── ServiceImpl.h 服務處理實現,實現各個client處理方式
│ ├── TemporaryTable.cpp
│ ├── TemporaryTable.h 臨時表的實現,使用在合併批量提交
│ ├── UserWorker.cpp
│ ├── UserWorker.h 從grpc隊列取出請求後,由UserWork調用ServiceImpl的實現來處理, 每UserWorker線程啟動50協程,共UserWorker
│ ├── UUIDGenerator.cpp
│ └── UUIDGenerator.h
├── include
│ └── certain
│ ├── Certain.h 主介面
│ ├── CertainUserBase.h 基本配置
│ ├── DBBase.h DB描象類
│ └── PLogBase.h PLog描象類
├── network
│ ├── EpollIO.cpp
│ ├── EpollIO.h Epoll封裝
│ ├── InetAddr.h
│ ├── IOChannel.cpp
│ ├── IOChannel.h
│ ├── SocketHelper.cpp
│ └── SocketHelper.h
├── src
│ ├── AsyncPipeMng.cpp
│ ├── AsyncPipeMng.h 框架使用管道來通知喚醒等待
│ ├── AsyncQueueMng.cpp
│ ├── AsyncQueueMng.h 各個隊列的管理器
│ ├── CatchUpWorker.cpp
│ ├── CatchUpWorker.h 追趕的線程
│ ├── Certain.proto
│ ├── CertainWrapper.cpp Certain主要入口,如RunPaxos
│ ├── Command.cpp
│ ├── Command.h 交互的命令
│ ├── Common.h
│ ├── Configure.cpp
│ ├── Configure.h
│ ├── ConnWorker.cpp
│ ├── ConnWorker.h 連接管理。負責協商 serverid
│ ├── DBWorker.cpp
│ ├── DBWorker.h
│ ├── EntityInfoMng.cpp
│ ├── EntityInfoMng.h
│ ├── EntityWorker.cpp
│ ├── EntityWorker.h
│ ├── EntryInfoMng.cpp
│ ├── EntryInfoMng.h
│ ├── EntryState.cpp
│ ├── EntryState.h 日誌狀態機
│ ├── GetAllWorker.cpp
│ ├── GetAllWorker.h
│ ├── IOWorker.cpp
│ ├── IOWorker.h 網路IO處理線程, RspQueue發送給其他伺服器,收到的網路消息會PutToIOReqQueue
│ ├── LeasePolicy.h 論文描述的:一致性寫time window。如果其他主提交了Entity, 在時間窗口內本地不能再啟動新的paxos提議
│ ├── PLogWorker.cpp
│ ├── PLogWorker.h
│ ├── UUIDGroupMng.cpp
│ ├── UUIDGroupMng.h
│ └── WakeUpPipeMng.h
├── third
│ ├── autobuild.sh
│ ├── googletest
│ ├── grpc
│ ├── libco
│ ├── protobuf
│ └── rocksdb
└── utils
├── AOF.cpp
├── AOF.h
├── ArrayTimer.h
├── Assert.h
├── AutoHelper.h
├── CircleQueue.h 環形隊列
├── CRC32.cpp
├── CRC32.h
├── FixSizePool.cpp
├── FixSizePool.h
├── Hash.h
├── Header.h
├── Logger.cpp
├── Logger.h
├── LRUTable.h 帶LRU機制的Hashtable
├── ObjReusedPool.h
├── OSSReport.cpp
├── OSSReport.h
├── Random.h
├── Singleton.h
├── Thread.cpp
├── Thread.h
├── Time.h
├── UseTimeStat.cpp
└── UseTimeStat.h

先介紹clsAsyncQueueMng, clsAsyncQueueMng是隊列的容器,基本各個隊列都可以從這裡找到定義。理解這幾個隊列對理解整個代碼庫的消息中轉非常重要。由於大量使用了隊列,加上Paxos協議落到編碼層理解還是會比較複雜。

typedef clsCircleQueue<clsCmdBase *> clsIOReqQueue;
typedef clsCircleQueue<clsCmdBase *> clsIORspQueue;
typedef clsCircleQueue<clsCmdBase *> clsPLogReqQueue;
typedef clsCircleQueue<clsCmdBase *> clsPLogRspQueue;
typedef clsCircleQueue<clsClientCmd *> clsDBReqQueue;
typedef clsCircleQueue<clsPaxosCmd *> clsGetAllReqQueue;
typedef clsCircleQueue<clsPaxosCmd *> clsGetAllRspQueue;
typedef clsCircleQueue<clsPaxosCmd *> clsCatchUpReqQueue;
typedef clsCircleQueue<clsPaxosCmd *> clsPLogWriteReqQueue;
class clsAsyncQueueMng : public clsSingleton<clsAsyncQueueMng>
{
private:
clsIOReqQueue **m_ppIOReqQueue; // 請求隊列,包含本地的請求,或者遠端發送過來的請求,clsEntityWorker::Run負責消費
clsIORspQueue **m_ppIORspQueue; // 回包隊列, clsIOWorker::ConsumeIORspQueue負責消費
clsPLogReqQueue **m_ppPLogReqQueue; // clsPLogWorker::CoEpollTick負責消費, 再轉交給多個線程來處理,clsPLogWorker::PLogRoutine 處理
clsPLogRspQueue **m_ppPLogRspQueue; // clsEntityWorker::Run負責消費, 轉給DoWithPaxosCmdFromPLog處理
clsDBReqQueue **m_ppDBReqQueue; // clsDBWorker::DBSingle
clsGetAllReqQueue **m_ppGetAllReqQueue; // clsGetAllWorker::Run 負責消費
clsGetAllRspQueue **m_ppGetAllRspQueue; // clsEntityWorker::Run 負責消費
clsCatchUpReqQueue **m_ppCatchUpReqQueue; // 追趕的線程
clsPLogWriteReqQueue **m_ppPLogWriteReqQueue;


下面將目標放到入口,例子使用grpc處理來自客戶端的請求。最後每個grpc包由clsUserWorker::UserRoutine線程來調用,最後會調用clsServiceImpl::InsertCard/ UpdateCard/ SelectCard的具體實現。這個是grpc非同步編程的知識,這裡不再介紹。

// Write command
int clsServiceImpl::InsertCard(grpc::ServerContext& oContext,
const example::CardRequest& oRequest,
example::CardResponse& oResponse)
{
return BatchFunc(example::OperCode::eInsertCard, oRequest, oResponse);
}

int clsServiceImpl::UpdateCard(grpc::ServerContext& oContext,
const example::CardRequest& oRequest,
example::CardResponse& oResponse)
{
return BatchFunc(example::OperCode::eUpdateCard, oRequest, oResponse);
}

int clsServiceImpl::DeleteCard(grpc::ServerContext& oContext,
const example::CardRequest& oRequest,
example::CardResponse& oResponse)
{
return BatchFunc(example::OperCode::eDeleteCard, oRequest, oResponse);
}

// Read command, 如果BatchFunc成功,調用本地DB讀取結果
int clsServiceImpl::SelectCard(grpc::ServerContext& oContext,
const example::CardRequest& oRequest,
example::CardResponse& oResponse)
{
int iRet = BatchFunc(example::OperCode::eSelectCard, oRequest, oResponse);
if (iRet != 0) return iRet;

clsDBImpl* poDBEngine = dynamic_cast<clsDBImpl*>(
Certain::clsCertainWrapper::GetInstance()->GetDBEngine());
dbtype::DB *poDB = poDBEngine->GetDB();
clsTemporaryTable oTable(poDB);

std::string strKey;
EncodeInfoKey(strKey, oRequest.entity_id(), oRequest.card_id());
std::string strValue;
iRet = oTable.Get(strKey, strValue);
if (iRet == Certain::eRetCodeNotFound)
{
return example::StatusCode::eCardNotExist;
}

if (iRet == 0 && !oResponse.mutable_card_info()->ParseFromString(strValue)) {
return Certain::eRetCodeParseProtoErr;
}
return iRet;
}

clsServiceImpl::InsertCard/ UpdateCard/ SelectCard都會調用BatchFunc,下面來看下BatchFunc的代碼

int clsServiceImpl::BatchFunc(int iOper,
const example::CardRequest& oRequest,
example::CardResponse& oResponse)
{
uint64_t iStartUS = Certain::GetCurrTimeUS();

// 1. Push in queue
uint64_t iPushStartUS = Certain::GetCurrTimeUS();
QueueItem_t *poItem = new QueueItem_t;
Certain::clsAutoDelete<QueueItem_t> oAutoDelete(poItem);
poItem->iOper = iOper;
poItem->iEntityID = oRequest.entity_id();
poItem->poRequest = (void*)&oRequest;
poItem->poResponse = (void*)&oResponse;
poItem->iRet = BatchStatus::WAITING;

{
Certain::clsThreadLock oLock(&m_poBatchMapMutex); // 臨界區很小,使用普通的pthread_mutex_lock
m_oBatchMap[poItem->iEntityID].push(poItem);
}
uint64_t iPushEndUS = Certain::GetCurrTimeUS();

// 2. Lock
uint64_t iLockStartUS = Certain::GetCurrTimeUS();
Certain::clsAutoEntityLock oAuto(poItem->iEntityID); // 這個鎖的是為了批量執行,同個entity的請求會放到隊列中,由其中的一個請求來執行。 leveldb中也有類似的實現優化合併請求

if (poItem->iRet != BatchStatus::WAITING)
{
return poItem->iRet;
}

clsDBImpl* poDBEngine = dynamic_cast<clsDBImpl*>(
Certain::clsCertainWrapper::GetInstance()->GetDBEngine());
dbtype::DB *poDB = poDBEngine->GetDB();
clsTemporaryTable oTable(poDB); // 由於請求需求合併,讀寫對後面介面可見,但是並沒有落盤,所以不能直接讀DB, 需要將寫緩存在clsTemporaryTable
uint64_t iLockEndUS = Certain::GetCurrTimeUS();

// 3. Pop requests with the same entity. // 取出同一個entity所有的請求
uint64_t iPopStartUS = Certain::GetCurrTimeUS();
std::queue<QueueItem_t*> oQueue;
{
Certain::clsThreadLock oLock(&m_poBatchMapMutex);
auto iter = m_oBatchMap.find(poItem->iEntityID);
assert(iter != m_oBatchMap.end());
while (!iter->second.empty())
{
oQueue.push(iter->second.front());
iter->second.pop();
}
}
uint64_t iPopEndUS = Certain::GetCurrTimeUS();

// 4. EntityCatchUp EntityCatchUp entry, db的狀態向entry追趕
uint64_t iCatchUpStartUS = Certain::GetCurrTimeUS();
Certain::clsCertainWrapper* poCertain = Certain::clsCertainWrapper::GetInstance();
uint64_t iEntry = 0, iMaxCommitedEntry = 0;
int iRet = poCertain->EntityCatchUp(poItem->iEntityID, iMaxCommitedEntry);
if (iRet != 0)
{
BatchReturn(&oQueue, iRet);
return iRet;
}
uint64_t iCatchUpEndUS = Certain::GetCurrTimeUS();

// 5. clsTemporaryTable: Handle requests.
uint64_t iBatchHandleStartUS = Certain::GetCurrTimeUS();
std::vector<uint64_t> vecUUID;
uint64_t iQueueSize = oQueue.size();
uint64_t iRead = 0, iWrite = 0;
while (iQueueSize > 0)
{
QueueItem_t* poItem = oQueue.front();
oQueue.pop();
--iQueueSize;

assert(poItem->iRet == BatchStatus::WAITING);
HandleSingleCommand(&oTable, poItem, &vecUUID); // 對單個命令進行處理,由於當前的數據可能被上一命令改變,所以緩存中間結果,最後操作DB的結果匯合在clsTemporaryTable。
if (poItem->iOper == example::OperCode::eSelectCard) iRead++; else iWrite++;
// Re-push items since they need to RunPaxos.
oQueue.push(poItem);
}
uint64_t iBatchHandleEndUS = Certain::GetCurrTimeUS();

// 6. RunPaxos 發起一個paxos提議
uint64_t iRunPaxosStartUS = Certain::GetCurrTimeUS();
iEntry = iMaxCommitedEntry + 1;
iRet = poCertain->RunPaxos(poItem->iEntityID, iEntry, example::OperCode::eBatchFunc, vecUUID,
oTable.GetWriteBatchString());
BatchReturn(&oQueue, iRet);
uint64_t iRunPaxosEndUS = Certain::GetCurrTimeUS();

uint64_t iEndUS = Certain::GetCurrTimeUS();

if (iRet == 0)
{
poDBEngine->Commit(poItem->iEntityID, iEntry, oTable.GetWriteBatchString()); // 這裡直接落地,因為前面已經EntityCatchUp, 所以可以落地
}

CertainLogInfo("iEntityID %lu iPushTime %lu iLockTime %lu iPopTime %lu "
"iCatchUpTime %lu iBatchHandleTime %lu iRunPaxos %lu iTotalUS %lu "
"iRead %lu iWrite %lu",
poItem->iEntityID,
iPushEndUS - iPushStartUS, iLockEndUS - iLockStartUS, iPopEndUS - iPopStartUS,
iCatchUpEndUS - iCatchUpStartUS, iBatchHandleEndUS - iBatchHandleStartUS,
iRunPaxosEndUS - iRunPaxosStartUS, iEndUS - iStartUS, iRead, iWrite);

iRet = poItem->iRet;
return iRet;
}

int clsCertainWrapper::RunPaxos(uint64_t iEntityID, uint64_t iEntry,
uint16_t hSubCmdID, const vector<uint64_t> &vecWBUUID, // 請求的uuid
const string &strWriteBatch) // strWriteBatch.size() == 0 <==> readonly
{
if (m_poConf->GetEnableLearnOnly()) // learn only 是不能啟動實例的
{
CertainLogError("E(%lu, %lu) reject if learn only %u",
iEntityID, iEntry, m_poConf->GetEnableLearnOnly());
return eRetCodeRejectAll;
}

for (uint32_t i = 0; i < vecWBUUID.size(); ++i)
{
if (clsUUIDGroupMng::GetInstance()->IsUUIDExist(iEntityID, vecWBUUID[i]))
{
return eRetCodeDupUUID; // 同一個命令不執行兩次
}
}

// Its estimated One uint64_t uuid is 32 bytes in pb conservatively.
if (strWriteBatch.size() + vecWBUUID.size() * 32 > MAX_WRITEBATCH_SIZE) // cmd不能超過20M
{
CertainLogError("BUG maybe strWriteBatch.size %lu vecWBUUID.size %lu",
strWriteBatch.size(), vecWBUUID.size());
return eRetCodeSizeExceed;
}

TIMERMS_START(iUseTimeMS);

uint64_t iUUID = 0;
if (strWriteBatch.size() == 0)
{
AssertEqual(0, vecWBUUID.size());
iUUID = clsCmdFactory::GetInstance()->GetNextUUID(); // 生成一個臨時的UUID, 在get的使用
}
else
{
//AssertLess(0, vecWBUUID.size());
}

clsWriteBatchCmd *poWB = new clsWriteBatchCmd(hSubCmdID, iUUID,
vecWBUUID, strWriteBatch);
clsAutoDelete<clsWriteBatchCmd> oAuto(poWB);

poWB->SetTimestampUS(GetCurrTimeUS());

poWB->SetEntityID(iEntityID);
poWB->SetEntry(iEntry);
poWB->SetEvalOnly(true);
poWB->SetReadOnly(strWriteBatch.size() == 0);

int iRet = SyncWaitCmd(poWB); //將消息push到IOReqQueue
, 同步等待結果
TIMERMS_STOP(iUseTimeMS);
OSS::ReportRunPaxosTimeMS(iRet, iUseTimeMS);

return iRet;
}

SyncWaitCmd主要是將消息push到IOReqQueue,同步等待結果。 所以不再貼這個函數的代碼。IOReqQueue是由clsEntityWorker::Run負責消費。順便說明下,遠端的paxos節點發送給本地的消息是在clsIOWorker::HandleRead(clsFDBase *poFD)處理,再放進IOReqQueue。

clsEntityWorker::Run()會調用m_poIOReqQueue->TakeByOneThread(&poCmd), 從而讀取IOReqQueue的內容,交由clsEntityWorker::DoWithIOReq來處理。其中,在RunPaxos生成的clsWriteBatchCmd, 會交給DoWithClientCmd(poClientCmd)來處理。讀者估計讀到這裡,已經快失去耐心了。前面提到的都是Client請求的中轉,從DoWithClientCmd開始,才會真的開始發起Paxos提議。具體Paxos流程, Paxos的組組件PLog/DB是結何使用的,留到下一篇再分析。


推薦閱讀:
相關文章