TcpWrite

前面介紹網路的時候,介紹了TcpRead,那麼接下來將要介紹TcpWrite。顧名思義,這裡肯定是與發送數據有關。

int TcpIOThread::Init(const std::string & sListenIp,
const int iListenPort,
const int iIOThreadCount) {
// iIOThreadCount指定了創建線程的數目
// 無論是接收方,還是發送方,都需要有這麼多線程。
for (int i = 0; i < iIOThreadCount; i++) {
// .. 初始化TcpRead部分的代碼省略
// TcpWrite中也有一個EventLoop對象
// m_poNetWork也是用來初始化EventLoop對象的。
TcpWrite * poTcpWrite = new TcpWrite(m_poNetWork);
m_vecTcpWrite.push_back(poTcpWrite);
}
// 初始化TcpWrite部分
for (auto & poTcpWrite: m_vecTcpWrite) {
ret = poTcpWrite->Init();
}
return 0;
}

這裡再看一下TcpWrite:

TcpWrite::TcpWrite(NetWork * poNetWork)
: m_oTcpClient(&m_oEventLoop, poNetWork), m_oEventLoop(poNetWork) {
m_oEventLoop.SetTcpClient(&m_oTcpClient);
}

TcpWrite中會調用兩個類的初始構造函數TcpClientEventLoopEventLoop在前面已經介紹過了。那麼這裡就只介紹TcpClient

class TcpClient {
public:
TcpClient(
EventLoop * poEventLoop, // 指向TcpWrite中的EventLoop
NetWork * poNetWork); // 指向DefaultNetwork
private:
EventLoop * m_poEventLoop;
NetWork * m_poNetWork;
private:
std::map<uint64_t, MessageEvent *> m_mapEvent;
std::vector<MessageEvent *> m_vecEvent;
std::mutex m_oMutex;
};

可以看出,在構造函數時,只是將EventLoop & network指向了TcpWrite中的EventLoopDefaultNetwork。而在EventLoop類中,

void EventLoop::SetTcpClient(TcpClient * poTcpClient) {
m_poTcpClient = poTcpClient;
}

可以看出來,這裡就是相互引用了一下。到這裡,TcpWrite類的構造就完成了。那麼接下來再看一下TcpWriteInit()

int TcpWrite::Init() {
return m_oEventLoop.Init(20480);
}

在這裡,每個TcpWrite都會建立一個epoll file handler,其中每個epoll fd可以處理的文件句柄是20480。然後每個TcpWrite中會包含一個Notify,每個Notify都會有一個管道,管道的一頭交給epoll負責。

到這裡,TcpWrite的創建與初始化都完成了。

小結

那麼回顧一下整個的處理流程就變成了。

// main()
// |-> PhxEchoServer::RunPaxos()
// |-> Node::RunNode()
// |-> PNode::Init();
// |-> PNode()::InitLogStorage()
// |-> Step 1. LogStore::Init()
// | |_ step 1.1 創建目錄
// | |_ step 1.2 設置info log
// | |_ step 1.3 打開meta文件讀到文件id和offset.
// | |_ step 1.4 打開leveldb讀取log file id和offset
// | | |-> LogStore::RebuildIndex()
// | | |-> Step (1). 去leveldb中取詳細信息
// | | | |-> DataBase::GetMaxInstanceIDFileID()
// | | | |-> Database::GetMaxInstanceID()
// | | |-> Step (2). 從本地文件中拿最新信息
// | | |-> LogStore::RebuildIndexForOneFile()
// | |_ step 1.5 膨脹file_id文件,並且移動到offset位置
// | |- LogStore::ExpandFile()
// |-> step 2. InitNetwork()
// |-> DFNetWork::Init()
// |-> UDPRecv::Init()
// |-> UDPSend::Init()
// |-> TcpIOThread::Init()
// |-> TcpRead::TcpRead()
// | |-> EventLoop::EventLoop(DefaultNetWork)
// |-> TcpAcceptor::Listen()
// | |-> SocketAddress::SocketAddress()
// | |-> ServerSocket::listen(SocketAddress, 128)
// | |-> bind,listen
// |-> TcpRead::Init()
// | |-> epoll_fd = epoll_create()
// | |-> pipe(pipeFD[2])
// | |-> epoll_ctl(epoll_fd, pipeFD[0], EPOLL_ADD)
// |-> TcpWrite::TcpWrite()
// | |-> 生成TcpClient和EventLoop並相互引用
// |-> TcpWrite::Init()
// |-> 生成epoll fd
// |-> 生成Notify()
// |-> 開一個pipe,把fd[0]加到epoll中。

推薦閱讀:

相關文章