phxpaxos源碼分析5. init network (下)
TcpWrite
前面介紹網路的時候,介紹了TcpRead
,那麼接下來將要介紹TcpWrite
。顧名思義,這裡肯定是與發送數據有關。
int TcpIOThread::Init(const std::string & sListenIp,
const int iListenPort,
const int iIOThreadCount) {
// iIOThreadCount指定了創建線程的數目
// 無論是接收方,還是發送方,都需要有這麼多線程。
for (int i = 0; i < iIOThreadCount; i++) {
// .. 初始化TcpRead部分的代碼省略
// TcpWrite中也有一個EventLoop對象
// m_poNetWork也是用來初始化EventLoop對象的。
TcpWrite * poTcpWrite = new TcpWrite(m_poNetWork);
m_vecTcpWrite.push_back(poTcpWrite);
}
// 初始化TcpWrite部分
for (auto & poTcpWrite: m_vecTcpWrite) {
ret = poTcpWrite->Init();
}
return 0;
}
這裡再看一下TcpWrite
:
TcpWrite::TcpWrite(NetWork * poNetWork)
: m_oTcpClient(&m_oEventLoop, poNetWork), m_oEventLoop(poNetWork) {
m_oEventLoop.SetTcpClient(&m_oTcpClient);
}
在TcpWrite
中會調用兩個類的初始構造函數TcpClient
和EventLoop
。EventLoop
在前面已經介紹過了。那麼這裡就只介紹TcpClient
。
class TcpClient {
public:
TcpClient(
EventLoop * poEventLoop, // 指向TcpWrite中的EventLoop
NetWork * poNetWork); // 指向DefaultNetwork
private:
EventLoop * m_poEventLoop;
NetWork * m_poNetWork;
private:
std::map<uint64_t, MessageEvent *> m_mapEvent;
std::vector<MessageEvent *> m_vecEvent;
std::mutex m_oMutex;
};
可以看出,在構造函數時,只是將EventLoop & network
指向了TcpWrite
中的EventLoop
和DefaultNetwork
。而在EventLoop類中,
void EventLoop::SetTcpClient(TcpClient * poTcpClient) {
m_poTcpClient = poTcpClient;
}
可以看出來,這裡就是相互引用了一下。到這裡,TcpWrite
類的構造就完成了。那麼接下來再看一下TcpWrite
的Init()
。
int TcpWrite::Init() {
return m_oEventLoop.Init(20480);
}
在這裡,每個TcpWrite
都會建立一個epoll file handler
,其中每個epoll fd
可以處理的文件句柄是20480
。然後每個TcpWrite
中會包含一個Notify
,每個Notify
都會有一個管道,管道的一頭交給epoll
負責。
到這裡,TcpWrite
的創建與初始化都完成了。
小結
那麼回顧一下整個的處理流程就變成了。
// main()
// |-> PhxEchoServer::RunPaxos()
// |-> Node::RunNode()
// |-> PNode::Init();
// |-> PNode()::InitLogStorage()
// |-> Step 1. LogStore::Init()
// | |_ step 1.1 創建目錄
// | |_ step 1.2 設置info log
// | |_ step 1.3 打開meta文件讀到文件id和offset.
// | |_ step 1.4 打開leveldb讀取log file id和offset
// | | |-> LogStore::RebuildIndex()
// | | |-> Step (1). 去leveldb中取詳細信息
// | | | |-> DataBase::GetMaxInstanceIDFileID()
// | | | |-> Database::GetMaxInstanceID()
// | | |-> Step (2). 從本地文件中拿最新信息
// | | |-> LogStore::RebuildIndexForOneFile()
// | |_ step 1.5 膨脹file_id文件,並且移動到offset位置
// | |- LogStore::ExpandFile()
// |-> step 2. InitNetwork()
// |-> DFNetWork::Init()
// |-> UDPRecv::Init()
// |-> UDPSend::Init()
// |-> TcpIOThread::Init()
// |-> TcpRead::TcpRead()
// | |-> EventLoop::EventLoop(DefaultNetWork)
// |-> TcpAcceptor::Listen()
// | |-> SocketAddress::SocketAddress()
// | |-> ServerSocket::listen(SocketAddress, 128)
// | |-> bind,listen
// |-> TcpRead::Init()
// | |-> epoll_fd = epoll_create()
// | |-> pipe(pipeFD[2])
// | |-> epoll_ctl(epoll_fd, pipeFD[0], EPOLL_ADD)
// |-> TcpWrite::TcpWrite()
// | |-> 生成TcpClient和EventLoop並相互引用
// |-> TcpWrite::Init()
// |-> 生成epoll fd
// |-> 生成Notify()
// |-> 開一個pipe,把fd[0]加到epoll中。
推薦閱讀: