9.1 Tars RPC服務模型概覽

Tars服務模型圖如下:

服務端:

  • 1 TC_EpollServer中可以配置NetThread的數量,默認為1個,最多為15個。0號NetThread負責處理綁定和監聽文件操作符,並保存在_listenners中
  • 2 所有NetThread線程中的epoll會通過_epoller.add將_listeners加入到自己的事件集中
  • 3 所有NetThread都有可能接收客戶端連接,生成Connection,Connection按照一定規則分配到不同的NetThread上
  • 4 不同NetThread通過Connection,將接收到的內容放到同一個隊列r_queue中(代碼裏叫做recv_queue _rbuffer)
  • 5 多個業務處理線程ServantHandleThread會從r_queue中取出(pop_front)內容進行處理
  • 6 ServantHandleThread處理完業務邏輯後會按照一定規則將結果push_back到不同的NetThread的隊列(s_queue)中(代碼裏叫做send_queue _sbuffer;),最後在通過Connection將結果返回給客戶端

非同步客戶端:

  • 1 CommunicatorEpollThread的數目也是可配的,最低1個,最多64個。AsyncThread數目默認3個,最多1024個。客戶端的請求封裝在ReqMessage中,放入到隊列ReqInfoQueue中,並通知CommunicatorEpollThread,這裡ReqInfoQueue的數目對應了CommunicatorEpollThread的數目
  • 2 CommunicatorEpollThread從ReqInfoQueue中獲取請求數據ReqMessage
  • 3 CommunicatorEpollThread將ReqMessage發送到服務端,同時把ReqMessage保存在_timeoutQueue中。接收到來自服務端的結果後,從_timeoutQueue中拿到對應的ReqMessage,把結果放入ReqMessage中。如果是同步客戶端,到這一步就完成了,如果是非同步客戶端,繼續下面步驟
  • 4 CommunicatorEpollThread將保存返回結果的ReqMessage放入ReqInfoQueue中,這裡的ReqInfoQueue屬於每個CommunicatorEpollThread
  • 5 AsyncThread線程從ReqInfoQueue中獲取ReqMessage進行回調函數的處理

後續會根據上面的框架,去逐步分析其中的一些細節,探究Tars高性能的一些「祕密」

9.2 Tars協議

9.2.1 是什麼

借用官方說法:

TARS編碼協議是一種數據編解碼規則,它將整形、枚舉值、字元串、序列、字典、自定義結構體等數據類型按照一定的規則編碼到二進位數據流中。對端接收到二進位數據流之後,按照相應的規則反序列化可得到原始數值。

簡單理解,TARS編碼協議提供了一種將數據序列化、反序列化的方法。其角色和我們認識的protobuf、json、xml等同。

9.2.2 怎麼用

一般客戶端到服務端的數據交互流程如下:

  • 1、客戶端原始請求數據---->序列化---->服務端
  • 2、服務端---->反序列化---->原始請求數據
  • 3、服務端原始返回數據---->序列化----->客戶端
  • 4、客戶端----->反序列化----->原始返回數據

現在來看Tars 官方RPC源碼中是怎麼實現上面第3、4步的:

首先是服務端將數據序列化:

//位置:cpp/servant/libservant/TarsCurrent.cpp 221
void TarsCurrent::sendResponse(int iRet, const vector<char>& buffer, const map<string, string>& status, const string & sResultDesc)
{
//省略部分代碼
………………

TarsOutputStream<BufferWriter> os;
if (_request.iVersion != TUPVERSION)
{
//將數據放到ResponsePacket結構中
ResponsePacket response;

response.iRequestId = _request.iRequestId;
response.iMessageType = _request.iMessageType;
response.cPacketType = TARSNORMAL;
response.iVersion = TARSVERSION;
response.status = status;
response.sBuffer = buffer;
response.sResultDesc = sResultDesc;
response.context = _responseContext;
response.iRet = iRet;

TLOGINFO("[TARS]TarsCurrent::sendResponse :"
<< response.iMessageType << "|"
<< _request.sServantName << "|"
<< _request.sFuncName << "|"
<< response.iRequestId << endl);

//調用序列化方法,response中的數據都保存在了os中
response.writeTo(os);
}

//省略部分代碼
…………………………

//獲取內容長度
tars::Int32 iHeaderLen = htonl(sizeof(tars::Int32) + os.getLength());

string s = "";

//返回的s的格式是內容長度+內容
s.append((const char*)&iHeaderLen, sizeof(tars::Int32));

s.append(os.getBuffer(), os.getLength());

_servantHandle->sendResponse(_uid, s, _ip, _port, _fd);
}

再來看客戶端怎樣解析來自服務端的返回:

//位置:cpp/servant/libservant/Transceiver.cpp 331
int TcpTransceiver::doResponse(list<ResponsePacket>& done)
{
…………
if(!_recvBuffer.IsEmpty())
{
try
{
//接收到的服務端的序列化好的數據
const char* data = _recvBuffer.ReadAddr();
size_t len = _recvBuffer.ReadableSize();
size_t pos = 0;

//獲取協議封裝類
ProxyProtocol& proto = _adapterProxy->getObjProxy()->getProxyProtocol();

if (proto.responseExFunc)
{
long id = _adapterProxy->getId();
//將data反序列化到done中
pos = proto.responseExFunc(data, len, done, (void*)id);
}
…………
}

}

這裡的responseExFunc來自ProxyProtocol::tarsResponse(cpp/servant/AppProtocal.h 398)

template<uint32_t iMaxLength>
static size_t tarsResponseLen(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
{
…………
TarsInputStream<BufferReader> is;
//將數據放入is中
is.setBuffer(recvBuffer + pos + sizeof(tars::Int32), iHeaderLen - sizeof(tars::Int32));
pos += iHeaderLen;
//將is中的數據進行反序列化,填充到rsp中
ResponsePacket rsp;
rsp.readFrom(is);
…………
}

從上面代碼中可以看出:

  • 序列化數據使用的是:ResponsePacket.writeTo()
  • 反序列化數據使用的是:ResponsePacket.readFrom()

9.2.3 一個獨立的可編譯調試的demo

獲取代碼

下載代碼後,進入tup目錄

  • 執行./rush.sh,可生成proto-demo
  • 執行./proto-demo > tmp.txt,可在tmp.txt看到相關調試內容(我自己已經加了一些調試內容)

這個demo就是從9.2.2節中的內容直接抽取出來形成的,可以很方便的進行跟蹤調試。

9.2.4 協議序列化分析-HEAD

把結構化數據序列化,用大白話解釋就是想辦法把不同類型的數據按照順序放在一個字元串裏。反序列化就是還能從這個字元串裏把類型和數據正確解析出來。一般來說,要達成正確的效果,有三個因素是必須考慮的:

  • 標記數據的位置。例如是位於字元串頭部還是字元串末尾,或者中間某個部分
  • 標記數據的類型,例如int char float vector等
  • 標記數據內容

Tars協議也跳不出這個基本規則,它的數據是由兩部分組成:

| HEAD | BUF |

  • HEAD為頭部信息(包含了數據位置和數據類型),BUF為實際數據。注意BUF裏可以繼續嵌套| HEAD | BUF |這樣的類型,以滿足複雜數據結構的需要
  • 像char、short、int之類的簡單類型時,只需要:| HEAD | BUF |
  • 當數據類型為vector< char >時,就變為了| HEAD1 | HEAD2 | BUF |。這時候HEAD1 存儲vector類型,HEAD2 存儲char類型

我們再具體看下HEAD中包括的內容:

| TAG1(4 bits) | TYPE(4 bits) | TAG2(1 byte或者8 bits)

  • TYPE表示類型,用4個二進位位表示,取值範圍是0~15,用來標識數據類型。下面的Tars官方代碼標明瞭具體數據類型的TYPE值

//位置:/cpp/servant/tup/Tars.h 60行
//數據頭類型
#define TarsHeadeChar 0
#define TarsHeadeShort 1
#define TarsHeadeInt32 2
#define TarsHeadeInt64 3
#define TarsHeadeFloat 4
#define TarsHeadeDouble 5
#define TarsHeadeString1 6
#define TarsHeadeString4 7
#define TarsHeadeMap 8
#define TarsHeadeList 9
#define TarsHeadeStructBegin 10
#define TarsHeadeStructEnd 11
#define TarsHeadeZeroTag 12
#define TarsHeadeSimpleList 13

  • TAG由TAG1和TAG2一起組成,標識數據的位置。當TAG值不超過14時候,只需要TAG1,當TAG值超過14時候,TAG1為240,TAG2標識TAG的值。下面的代碼標明瞭這個邏輯

//位置:/cpp/servant/tup/Tars.h 96行
#define TarsWriteToHead(os, type, tag)
do {
if (tars_likely((tag) < 15))
{
//只有TAG1
TarsWriteUInt8TTypeBuf( os, (type) + ((tag)<<4) , (os)._len);
}
else
{
//TAG1
TarsWriteUInt8TTypeBuf( os, (type) + (240) , (os)._len);
//TAG2
TarsWriteUInt8TTypeBuf( os, (tag), (os)._len);
}
} while(0)

  • 具體看個TAG小於14的例子:TAG1為1,TYPE為TarsHeadeInt32(2),用二進位表示的話,TAG1為0001,TYPE為0010,HEAD組成方式是將TAG1二進位和TYPE二進位拼接起來,即:

00010010 換算為10進位是18。 前4位為TAG1,後4位為TYPE,從這樣的拼接方式中可以看到,相當於是把TAG1右移了4位再加上TYPE

從上面貼出的代碼中我們也可以看到拼接方式的表示:"(type) + ((tag)<<4)",即:HEAD = 2 + (1<<4) = 2 + 16 = 18。就這樣Tars協議可以用1byte同時表示數據類型和數據位置

  • 再看個TAG大於14的例子:TAG1為240(固定值),TAG2為200,TYPE為TarsHeadeInt32(2),HEAD的二進位表示為:

11110010 11001000 用10進位表示為242 200 前八位中,1111代表TAG1的值240(即11110000),0010代表TYPE的值2(即0010)。後八位為TAG2的值200(即11001000)

9.2.5 協議序列化分析-BUF

| HEAD | BUF |

BUF的內容和所佔據的位元組數根據不同的TYPE而有所不同

(1)基本類型(Short、UInt16、Int32、UInt32、Int64、Float、Double等)

  • 以Short為例:當值在-128和127之間,借用Char來保存BUF,即BUF僅佔用sizeof(Char)(一般為1byte)。當值不在上述區間,BUF佔用sizeof(Short)(一般為2byte)。具體代碼如下:

//位置:cpp/servant/tup/Tars.h 1718行
void write(Short n, uint8_t tag)
{
//if(n >= CHAR_MIN && n <= CHAR_MAX){
if (n >= (-128) && n <= 127)
{
write((Char) n, tag);
}
else
{
/*
DataHead h(DataHead::eShort, tag);
h.writeTo(*this);
n = htons(n);
this->writeBuf(&n, sizeof(n));
*/
//定義HEAD
TarsWriteToHead(*this, TarsHeadeShort, tag);
n = htons(n);
//定義BUF
TarsWriteShortTypeBuf(*this, n, (*this)._len);
}
}

具體BUF佔用大小在TarsWriteShortTypeBuf中

//位置:cpp/servant/tup/Tars.h 165行
#define TarsWriteShortTypeBuf(os, val, osLen)
do {
TarsReserveBuf(os, (osLen)+sizeof(Short));
(*(Short *)((os)._buf + (osLen))) = (val);
(osLen) += sizeof(Short);
} while(0)

其他基本類型都可以在Tars.h找到對應代碼,可自行參照

(2)數字0

  • 數字0比較特殊,HEAD拼好後,不需要BUF。參見下面代碼。

//位置:cpp/servant/tup/Tars.h 1690行
void write(Char n, uint8_t tag)
{
/*
DataHead h(DataHead::eChar, tag);
if(n == 0){
h.setType(DataHead::eZeroTag);
h.writeTo(*this);
}else{
h.writeTo(*this);
this->writeBuf(&n, sizeof(n));
}
*/
if (tars_unlikely(n == 0))
{
//當n為0時候,僅需要在HEAD中保存TarsHeadeZeroTag類型即可,不需要BUF
TarsWriteToHead(*this, TarsHeadeZeroTag, tag);
}
else
{
//寫HEAD
TarsWriteToHead(*this, TarsHeadeChar, tag);
//寫BUF
TarsWriteCharTypeBuf(*this, n, (*this)._len);
}
}

(3)字元串,參見Tars.h中函數:

void write(const std::string& s, uint8_t tag) 1801行

(4)map,參見Tars.h中函數:

void write(const std::map& m, uint8_t tag) 1837行

(5)vector,參見Tars.h中函數:

void write(const std::vector& v, uint8_t tag) 1853行 void write(const std::vector& v, uint8_t tag) 1877行

(6)其他類型

9.2.6 協議序列化實例

以 9.2.3 節中的demo為例,舉例說明數據是怎樣被序列化的

//learn-tars/tup/main.cpp
14 TarsOutputStream<BufferWriter> os;
15
16 string res = "I am ok";
17
18 vector<char> buffer;
19
20 buffer.assign(res.begin(),res.end());
21
22 map<string, string> status;
23
24 status["test"] = "test";
25
26 map<string, string> _responseContext;
27
28 _responseContext["test1"] = "test1";
29
30 ResponsePacket response;
31
32 response.iRequestId = 1;
33 response.iMessageType = 0;
34 response.cPacketType = TARSNORMAL;
35 response.iVersion = TARSVERSION;
36 response.status = status;
37 response.sBuffer = buffer;
38 response.sResultDesc = "123";
39 response.context = _responseContext;
40 response.iRet = 0;
41
42 response.writeTo(os);

  • 14行聲明瞭序列化容器
  • 16-28行準備了測試數據
  • 30-40行對response進行了數據填充
  • 42行調用了response.writeTo方法進行序列化

response.writeTo方法是在RequestF.h中實現的,繼續看代碼:

//learn-tars/tup/RequestF.h
147 template<typename WriterT>
148 void writeTo(tars::TarsOutputStream<WriterT>& _os) const
149 {
150 _os.write(iVersion, 1);
151 _os.write(cPacketType, 2);
152 _os.write(iRequestId, 3);
153 _os.write(iMessageType, 4);
154 _os.write(iRet, 5);
155 _os.write(sBuffer, 6);
156 _os.write(status, 7);
157 if (sResultDesc != "")
158 {
159 _os.write(sResultDesc, 8);
160 }
161 if (context.size() > 0)
162 {
163 _os.write(context, 9);
164 }
165 }

  • 150-164行說明瞭序列化的順序

注意:iVersion為Short,當值在-128和127之間時,是當作Char類型進行處理的,見如下代碼:

//learn-tars/tup/Tars.h
1753 void write(Short n, uint8_t tag)
1754 {
1755 std::cout<<"write Short "<<n<<" tag is "<<(int)tag<<std::
endl;
1756 //if(n >= CHAR_MIN && n <= CHAR_MAX){
1757 if (n >= (-128) && n <= 127)
1758 {
1759 write((Char) n, tag);
1760 }
1761 else
1762 {
1763 /*
1764 DataHead h(DataHead::eShort, tag);
1765 h.writeTo(*this);
1766 n = htons(n);
1767 this->writeBuf(&n, sizeof(n));
1768 */
1769 TarsWriteToHead(*this, TarsHeadeShort, tag);
1770 n = htons(n);
1771 TarsWriteShortTypeBuf(*this, n, (*this)._len);
1772 }
1773 }

而在Char類型處理時候,n為0時候,TYPE取值TarsHeadeZeroTag,不為0時候,TYPE取值TarsHeadeChar,見如下代碼:

//learn-tars/tup/Tars.h
1722 void write(Char n, uint8_t tag)
1723 {
1724 std::cout<<"write Char "<<n<<" tag is "<<(int)tag<<std::e
ndl;
1725 /*
1726 DataHead h(DataHead::eChar, tag);
1727 if(n == 0){
1728 h.setType(DataHead::eZeroTag);
1729 h.writeTo(*this);
1730 }else{
1731 h.writeTo(*this);
1732 this->writeBuf(&n, sizeof(n));
1733 }
1734 */
1735 if (tars_unlikely(n == 0))
1736 {
1737 std::cout<<"write n == 0 "<<n<<std::endl;
1738 TarsWriteToHead(*this, TarsHeadeZeroTag, tag);
1739 }
1740 else
1741 {
1742 std::cout<<"write n != 0 "<<n<<std::endl;
1743 TarsWriteToHead(*this, TarsHeadeChar, tag);
1744 TarsWriteCharTypeBuf(*this, n, (*this)._len);
1745 }
1746 }

  • 根據上面思路,可以看到其他類型的數據也都對應著一套HEAD和BUF的拼裝方法,由此可以整理出demo中數據的邏輯處理順序(如果圖片看不清楚,可右鍵選擇「在新標籤頁中打開圖片」)

根據上面的邏輯圖,可以得到序列化後的真實數據

在調試9.2.3節demo時候,列印出來的數據默認是字元的,所以可以看到iVersion第一個列印出來的字元是^P(即整數16)

9.2.7 協議反序列化分析

理解了協議的序列化,反序列化就比較簡單了,假設已經知道iVersion的反序列化數據為^P^A,其中|HEAD|為^P(注意,HEAD只佔一個Byte)

  • ^P轉為十進位整數為16,轉為二進位為00010000
  • 將數據00010000右移4位為00000001,這就是TAG的值1
  • 將數據00010000做與運算:00010000 & 0x0F = 0, 這就是TYPE的值0,即TarsHeadeChar
  • 知道了TYPE的值,後面|BUF|所佔長度可由TYPE得到,本例中也佔一個位元組,及^A,轉為十進位整數為1
  • 這樣就完成了iVersion的反序列化

9.2.8 協議反序列化實例

再複習下9.2.3節的代碼中是如何反序列化的

//learn-tars/tup/main.cpp
56 //反序列化
57
58 list<ResponsePacket> done;
59
60 ProxyProtocol _proxyProtocol;
61
62 _proxyProtocol.responseFunc = ProxyProtocol::tarsResponse;
63
64 const char* data = s.c_str();
65
66 size_t len = s.size();
67
//反序列化方法,反序列化完的數據放入done中
68 size_t pos = _proxyProtocol.responseFunc(data, len, done);

  • 這裡的responseExFunc來自ProxyProtocol::tarsResponse,其中最主要的邏輯是

//learn-tars/tup/AppProtocal.h
451 ResponsePacket rsp;
452 rsp.readFrom(is);

  • rsp.readFrom(is)中readFrom的實現代碼如下:

//learn-tars/tup/RequestF.h
166 template<typename ReaderT>
167 void readFrom(tars::TarsInputStream<ReaderT>& _is)
168 {
169 resetDefautlt();
170 _is.read(iVersion, 1, true);
171 _is.read(cPacketType, 2, true);
172 _is.read(iRequestId, 3, true);
173 _is.read(iMessageType, 4, true);
174 _is.read(iRet, 5, true);
175 _is.read(sBuffer, 6, true);
176 _is.read(status, 7, true);
177 _is.read(sResultDesc, 8, false);
178 _is.read(context, 9, false);
179 }

同樣以iVersion為例,簡要說明代碼處理流程:

  • 上面170行代碼會調用:

//learn-tars/tup/Tars.h
1105 void read(Short& n, uint8_t tag, bool isRequire = true)
1106 {
1107 std::cout<<"read Short "<<std::endl;
1108 uint8_t headType = 0, headTag = 0;
1109 bool skipFlag = false;
//TarsSkipToTag方法會確認TYPE和TAG的值
1110 TarsSkipToTag(skipFlag, tag, headType, headTag);
1111 if (tars_likely(skipFlag))
1112 {
//根據headType的值確定後面|BUF|內容的長度
1113 switch (headType)
1114 {
1115 case TarsHeadeZeroTag:
1116 std::cout<<"read Short TarsHeadeZeroTag"<<std::endl;
1117 n = 0;
1118 break;
1119 case TarsHeadeChar:
1120 std::cout<<"read Char TarsHeadeChar"<<std::endl;
1121 TarsReadTypeBuf(*this, n, Char);
1122 break;
1123 case TarsHeadeShort:
1124 std::cout<<"read Short TarsHeadeShort"<<std::endl;
1125 TarsReadTypeBuf(*this, n, Short);
1126 n = ntohs(n);
1127 break;
1128 default:
1129 {
1130 char s[64];
1131 snprintf(s, sizeof(s), "read Short type mism
atch, tag: %d, get type: %d.", tag, headType);
1132 throw TarsDecodeMismatch(s);
1133 }
1134 }
1135 }
1136 else if (tars_unlikely(isRequire))
1137 {
1138 char s[64];
1139 snprintf(s, sizeof(s), "require field not exist, tag:
%d, headTag: %d", tag, headTag);
1140 throw TarsDecodeRequireNotExist(s);
1141 }
1142 std::cout<<"read Short n is "<<n<<std::endl;
1143
1144 }

  • 1110行函數TarsSkipToTag會確認TYPE和TAG的值
  • 1113行會根據headType的值確定後面|BUF|內容的長度

再看下TarsSkipToTag是怎麼處理的:

//learn-tars/tup/Tars.h
335 #define TarsSkipToTag(flag, tag, retHeadType, retHeadTag)
336 do {
337 try
338 {
339 uint8_t nextHeadType, nextHeadTag;
340 while (!ReaderT::hasEnd())
341 {
342 size_t len = 0;
//TarsPeekFromHead裏進行了運算,得到TYPE和TAGE
343 TarsPeekFromHead(*this, nextHeadType, nextHeadTag, len);
344 if (tars_unlikely(nextHeadType == TarsHeadeStructEnd || ta
g < nextHeadTag))
345 {
346 break;
347 }
348 if (tag == nextHeadTag)
349 {
350 std::cout<<"TarsSkipToTag tag == nextHeadTag"<<std::en
dl;
351 (retHeadType) = nextHeadType;
352 (retHeadTag) = nextHeadTag;
353 TarsReadHeadSkip(*this, len);
354 (flag) = true;
355 break;
356 }
357 std::cout<<"TarsSkipToTag tag != nextHeadTag"<<std::endl;

358 TarsReadHeadSkip(*this, len);
359 skipField(nextHeadType);
360 }
361 }
362 catch (TarsDecodeException& e)
363 {
364 }
365 } while(0)

  • 343 行TarsPeekFromHead函數裏得到了nextHeadType和nextHeadTag,對應著9.2.7節中的運算

//learn-tars/tup/Tars.h
255 #define TarsPeekFromHead(is, type, tag, n)
256 do {
257 std::cout<<"TarsPeekFromHead begin"<<std::endl;
258 (n) = 1;
259 uint8_t typeTag, tmpTag;
260 TarsPeekTypeBuf(is, typeTag, 0, uint8_t);
261 std::cout<<"TarsPeekFromHead typeTag "<<(int)typeTag<<std::endl;
262 tmpTag = typeTag >> 4;
263 std::cout<<"TarsPeekFromHead tmpTag "<<(int)tmpTag<<std::endl;
264 (type) = (typeTag & 0x0F);
265 std::cout<<"TarsPeekFromHead type "<<(int)type<<std::endl;
266 if(tars_unlikely(tmpTag == 15))
267 {
268 std::cout<<"TarsPeekFromHead unlikely tmpTag "<<(int)tmpTag<<s
td::endl;
269 TarsPeekTypeBuf(is, tag, 1, uint8_t);
270 (n) += 1;
271 }
272 else
273 {
274 (tag) = tmpTag;
275 }
276 } while(0)

推薦閱讀:

相關文章