Tars服務模型圖如下:
服務端:
非同步客戶端:
後續會根據上面的框架,去逐步分析其中的一些細節,探究Tars高性能的一些「祕密」
借用官方說法:
TARS編碼協議是一種數據編解碼規則,它將整形、枚舉值、字元串、序列、字典、自定義結構體等數據類型按照一定的規則編碼到二進位數據流中。對端接收到二進位數據流之後,按照相應的規則反序列化可得到原始數值。
簡單理解,TARS編碼協議提供了一種將數據序列化、反序列化的方法。其角色和我們認識的protobuf、json、xml等同。
一般客戶端到服務端的數據交互流程如下:
現在來看Tars 官方RPC源碼中是怎麼實現上面第3、4步的:
首先是服務端將數據序列化:
//位置:cpp/servant/libservant/TarsCurrent.cpp 221 void TarsCurrent::sendResponse(int iRet, const vector<char>& buffer, const map<string, string>& status, const string & sResultDesc) { //省略部分代碼 ………………
TarsOutputStream<BufferWriter> os; if (_request.iVersion != TUPVERSION) { //將數據放到ResponsePacket結構中 ResponsePacket response;
response.iRequestId = _request.iRequestId; response.iMessageType = _request.iMessageType; response.cPacketType = TARSNORMAL; response.iVersion = TARSVERSION; response.status = status; response.sBuffer = buffer; response.sResultDesc = sResultDesc; response.context = _responseContext; response.iRet = iRet;
TLOGINFO("[TARS]TarsCurrent::sendResponse :" << response.iMessageType << "|" << _request.sServantName << "|" << _request.sFuncName << "|" << response.iRequestId << endl);
//調用序列化方法,response中的數據都保存在了os中 response.writeTo(os); }
//省略部分代碼 …………………………
//獲取內容長度 tars::Int32 iHeaderLen = htonl(sizeof(tars::Int32) + os.getLength());
string s = "";
//返回的s的格式是內容長度+內容 s.append((const char*)&iHeaderLen, sizeof(tars::Int32));
s.append(os.getBuffer(), os.getLength());
_servantHandle->sendResponse(_uid, s, _ip, _port, _fd); }
再來看客戶端怎樣解析來自服務端的返回:
//位置:cpp/servant/libservant/Transceiver.cpp 331 int TcpTransceiver::doResponse(list<ResponsePacket>& done) { ………… if(!_recvBuffer.IsEmpty()) { try { //接收到的服務端的序列化好的數據 const char* data = _recvBuffer.ReadAddr(); size_t len = _recvBuffer.ReadableSize(); size_t pos = 0;
//獲取協議封裝類 ProxyProtocol& proto = _adapterProxy->getObjProxy()->getProxyProtocol();
if (proto.responseExFunc) { long id = _adapterProxy->getId(); //將data反序列化到done中 pos = proto.responseExFunc(data, len, done, (void*)id); } ………… }
}
這裡的responseExFunc來自ProxyProtocol::tarsResponse(cpp/servant/AppProtocal.h 398)
template<uint32_t iMaxLength> static size_t tarsResponseLen(const char* recvBuffer, size_t length, list<ResponsePacket>& done) { ………… TarsInputStream<BufferReader> is; //將數據放入is中 is.setBuffer(recvBuffer + pos + sizeof(tars::Int32), iHeaderLen - sizeof(tars::Int32)); pos += iHeaderLen; //將is中的數據進行反序列化,填充到rsp中 ResponsePacket rsp; rsp.readFrom(is); ………… }
從上面代碼中可以看出:
獲取代碼
下載代碼後,進入tup目錄
這個demo就是從9.2.2節中的內容直接抽取出來形成的,可以很方便的進行跟蹤調試。
把結構化數據序列化,用大白話解釋就是想辦法把不同類型的數據按照順序放在一個字元串裏。反序列化就是還能從這個字元串裏把類型和數據正確解析出來。一般來說,要達成正確的效果,有三個因素是必須考慮的:
Tars協議也跳不出這個基本規則,它的數據是由兩部分組成:
| HEAD | BUF |
我們再具體看下HEAD中包括的內容:
| TAG1(4 bits) | TYPE(4 bits) | TAG2(1 byte或者8 bits)
//位置:/cpp/servant/tup/Tars.h 60行 //數據頭類型 #define TarsHeadeChar 0 #define TarsHeadeShort 1 #define TarsHeadeInt32 2 #define TarsHeadeInt64 3 #define TarsHeadeFloat 4 #define TarsHeadeDouble 5 #define TarsHeadeString1 6 #define TarsHeadeString4 7 #define TarsHeadeMap 8 #define TarsHeadeList 9 #define TarsHeadeStructBegin 10 #define TarsHeadeStructEnd 11 #define TarsHeadeZeroTag 12 #define TarsHeadeSimpleList 13
//位置:/cpp/servant/tup/Tars.h 96行 #define TarsWriteToHead(os, type, tag) do { if (tars_likely((tag) < 15)) { //只有TAG1 TarsWriteUInt8TTypeBuf( os, (type) + ((tag)<<4) , (os)._len); } else { //TAG1 TarsWriteUInt8TTypeBuf( os, (type) + (240) , (os)._len); //TAG2 TarsWriteUInt8TTypeBuf( os, (tag), (os)._len); } } while(0)
00010010 換算為10進位是18。 前4位為TAG1,後4位為TYPE,從這樣的拼接方式中可以看到,相當於是把TAG1右移了4位再加上TYPE
從上面貼出的代碼中我們也可以看到拼接方式的表示:"(type) + ((tag)<<4)",即:HEAD = 2 + (1<<4) = 2 + 16 = 18。就這樣Tars協議可以用1byte同時表示數據類型和數據位置
11110010 11001000 用10進位表示為242 200 前八位中,1111代表TAG1的值240(即11110000),0010代表TYPE的值2(即0010)。後八位為TAG2的值200(即11001000)
BUF的內容和所佔據的位元組數根據不同的TYPE而有所不同
(1)基本類型(Short、UInt16、Int32、UInt32、Int64、Float、Double等)
//位置:cpp/servant/tup/Tars.h 1718行 void write(Short n, uint8_t tag) { //if(n >= CHAR_MIN && n <= CHAR_MAX){ if (n >= (-128) && n <= 127) { write((Char) n, tag); } else { /* DataHead h(DataHead::eShort, tag); h.writeTo(*this); n = htons(n); this->writeBuf(&n, sizeof(n)); */ //定義HEAD TarsWriteToHead(*this, TarsHeadeShort, tag); n = htons(n); //定義BUF TarsWriteShortTypeBuf(*this, n, (*this)._len); } }
具體BUF佔用大小在TarsWriteShortTypeBuf中
//位置:cpp/servant/tup/Tars.h 165行 #define TarsWriteShortTypeBuf(os, val, osLen) do { TarsReserveBuf(os, (osLen)+sizeof(Short)); (*(Short *)((os)._buf + (osLen))) = (val); (osLen) += sizeof(Short); } while(0)
其他基本類型都可以在Tars.h找到對應代碼,可自行參照
(2)數字0
//位置:cpp/servant/tup/Tars.h 1690行 void write(Char n, uint8_t tag) { /* DataHead h(DataHead::eChar, tag); if(n == 0){ h.setType(DataHead::eZeroTag); h.writeTo(*this); }else{ h.writeTo(*this); this->writeBuf(&n, sizeof(n)); } */ if (tars_unlikely(n == 0)) { //當n為0時候,僅需要在HEAD中保存TarsHeadeZeroTag類型即可,不需要BUF TarsWriteToHead(*this, TarsHeadeZeroTag, tag); } else { //寫HEAD TarsWriteToHead(*this, TarsHeadeChar, tag); //寫BUF TarsWriteCharTypeBuf(*this, n, (*this)._len); } }
(3)字元串,參見Tars.h中函數:
void write(const std::string& s, uint8_t tag) 1801行
(4)map,參見Tars.h中函數:
void write(const std::map& m, uint8_t tag) 1837行
(5)vector,參見Tars.h中函數:
void write(const std::vector& v, uint8_t tag) 1853行 void write(const std::vector& v, uint8_t tag) 1877行
(6)其他類型
以 9.2.3 節中的demo為例,舉例說明數據是怎樣被序列化的
//learn-tars/tup/main.cpp 14 TarsOutputStream<BufferWriter> os; 15 16 string res = "I am ok"; 17 18 vector<char> buffer; 19 20 buffer.assign(res.begin(),res.end()); 21 22 map<string, string> status; 23 24 status["test"] = "test"; 25 26 map<string, string> _responseContext; 27 28 _responseContext["test1"] = "test1"; 29 30 ResponsePacket response; 31 32 response.iRequestId = 1; 33 response.iMessageType = 0; 34 response.cPacketType = TARSNORMAL; 35 response.iVersion = TARSVERSION; 36 response.status = status; 37 response.sBuffer = buffer; 38 response.sResultDesc = "123"; 39 response.context = _responseContext; 40 response.iRet = 0; 41 42 response.writeTo(os);
response.writeTo方法是在RequestF.h中實現的,繼續看代碼:
//learn-tars/tup/RequestF.h 147 template<typename WriterT> 148 void writeTo(tars::TarsOutputStream<WriterT>& _os) const 149 { 150 _os.write(iVersion, 1); 151 _os.write(cPacketType, 2); 152 _os.write(iRequestId, 3); 153 _os.write(iMessageType, 4); 154 _os.write(iRet, 5); 155 _os.write(sBuffer, 6); 156 _os.write(status, 7); 157 if (sResultDesc != "") 158 { 159 _os.write(sResultDesc, 8); 160 } 161 if (context.size() > 0) 162 { 163 _os.write(context, 9); 164 } 165 }
注意:iVersion為Short,當值在-128和127之間時,是當作Char類型進行處理的,見如下代碼:
//learn-tars/tup/Tars.h 1753 void write(Short n, uint8_t tag) 1754 { 1755 std::cout<<"write Short "<<n<<" tag is "<<(int)tag<<std:: endl; 1756 //if(n >= CHAR_MIN && n <= CHAR_MAX){ 1757 if (n >= (-128) && n <= 127) 1758 { 1759 write((Char) n, tag); 1760 } 1761 else 1762 { 1763 /* 1764 DataHead h(DataHead::eShort, tag); 1765 h.writeTo(*this); 1766 n = htons(n); 1767 this->writeBuf(&n, sizeof(n)); 1768 */ 1769 TarsWriteToHead(*this, TarsHeadeShort, tag); 1770 n = htons(n); 1771 TarsWriteShortTypeBuf(*this, n, (*this)._len); 1772 } 1773 }
而在Char類型處理時候,n為0時候,TYPE取值TarsHeadeZeroTag,不為0時候,TYPE取值TarsHeadeChar,見如下代碼:
//learn-tars/tup/Tars.h 1722 void write(Char n, uint8_t tag) 1723 { 1724 std::cout<<"write Char "<<n<<" tag is "<<(int)tag<<std::e ndl; 1725 /* 1726 DataHead h(DataHead::eChar, tag); 1727 if(n == 0){ 1728 h.setType(DataHead::eZeroTag); 1729 h.writeTo(*this); 1730 }else{ 1731 h.writeTo(*this); 1732 this->writeBuf(&n, sizeof(n)); 1733 } 1734 */ 1735 if (tars_unlikely(n == 0)) 1736 { 1737 std::cout<<"write n == 0 "<<n<<std::endl; 1738 TarsWriteToHead(*this, TarsHeadeZeroTag, tag); 1739 } 1740 else 1741 { 1742 std::cout<<"write n != 0 "<<n<<std::endl; 1743 TarsWriteToHead(*this, TarsHeadeChar, tag); 1744 TarsWriteCharTypeBuf(*this, n, (*this)._len); 1745 } 1746 }
根據上面的邏輯圖,可以得到序列化後的真實數據
在調試9.2.3節demo時候,列印出來的數據默認是字元的,所以可以看到iVersion第一個列印出來的字元是^P(即整數16)
理解了協議的序列化,反序列化就比較簡單了,假設已經知道iVersion的反序列化數據為^P^A,其中|HEAD|為^P(注意,HEAD只佔一個Byte)
再複習下9.2.3節的代碼中是如何反序列化的
//learn-tars/tup/main.cpp 56 //反序列化 57 58 list<ResponsePacket> done; 59 60 ProxyProtocol _proxyProtocol; 61 62 _proxyProtocol.responseFunc = ProxyProtocol::tarsResponse; 63 64 const char* data = s.c_str(); 65 66 size_t len = s.size(); 67 //反序列化方法,反序列化完的數據放入done中 68 size_t pos = _proxyProtocol.responseFunc(data, len, done);
//learn-tars/tup/AppProtocal.h 451 ResponsePacket rsp; 452 rsp.readFrom(is);
//learn-tars/tup/RequestF.h 166 template<typename ReaderT> 167 void readFrom(tars::TarsInputStream<ReaderT>& _is) 168 { 169 resetDefautlt(); 170 _is.read(iVersion, 1, true); 171 _is.read(cPacketType, 2, true); 172 _is.read(iRequestId, 3, true); 173 _is.read(iMessageType, 4, true); 174 _is.read(iRet, 5, true); 175 _is.read(sBuffer, 6, true); 176 _is.read(status, 7, true); 177 _is.read(sResultDesc, 8, false); 178 _is.read(context, 9, false); 179 }
同樣以iVersion為例,簡要說明代碼處理流程:
//learn-tars/tup/Tars.h 1105 void read(Short& n, uint8_t tag, bool isRequire = true) 1106 { 1107 std::cout<<"read Short "<<std::endl; 1108 uint8_t headType = 0, headTag = 0; 1109 bool skipFlag = false; //TarsSkipToTag方法會確認TYPE和TAG的值 1110 TarsSkipToTag(skipFlag, tag, headType, headTag); 1111 if (tars_likely(skipFlag)) 1112 { //根據headType的值確定後面|BUF|內容的長度 1113 switch (headType) 1114 { 1115 case TarsHeadeZeroTag: 1116 std::cout<<"read Short TarsHeadeZeroTag"<<std::endl; 1117 n = 0; 1118 break; 1119 case TarsHeadeChar: 1120 std::cout<<"read Char TarsHeadeChar"<<std::endl; 1121 TarsReadTypeBuf(*this, n, Char); 1122 break; 1123 case TarsHeadeShort: 1124 std::cout<<"read Short TarsHeadeShort"<<std::endl; 1125 TarsReadTypeBuf(*this, n, Short); 1126 n = ntohs(n); 1127 break; 1128 default: 1129 { 1130 char s[64]; 1131 snprintf(s, sizeof(s), "read Short type mism atch, tag: %d, get type: %d.", tag, headType); 1132 throw TarsDecodeMismatch(s); 1133 } 1134 } 1135 } 1136 else if (tars_unlikely(isRequire)) 1137 { 1138 char s[64]; 1139 snprintf(s, sizeof(s), "require field not exist, tag: %d, headTag: %d", tag, headTag); 1140 throw TarsDecodeRequireNotExist(s); 1141 } 1142 std::cout<<"read Short n is "<<n<<std::endl; 1143 1144 }
再看下TarsSkipToTag是怎麼處理的:
//learn-tars/tup/Tars.h 335 #define TarsSkipToTag(flag, tag, retHeadType, retHeadTag) 336 do { 337 try 338 { 339 uint8_t nextHeadType, nextHeadTag; 340 while (!ReaderT::hasEnd()) 341 { 342 size_t len = 0; //TarsPeekFromHead裏進行了運算,得到TYPE和TAGE 343 TarsPeekFromHead(*this, nextHeadType, nextHeadTag, len); 344 if (tars_unlikely(nextHeadType == TarsHeadeStructEnd || ta g < nextHeadTag)) 345 { 346 break; 347 } 348 if (tag == nextHeadTag) 349 { 350 std::cout<<"TarsSkipToTag tag == nextHeadTag"<<std::en dl; 351 (retHeadType) = nextHeadType; 352 (retHeadTag) = nextHeadTag; 353 TarsReadHeadSkip(*this, len); 354 (flag) = true; 355 break; 356 } 357 std::cout<<"TarsSkipToTag tag != nextHeadTag"<<std::endl;
358 TarsReadHeadSkip(*this, len); 359 skipField(nextHeadType); 360 } 361 } 362 catch (TarsDecodeException& e) 363 { 364 } 365 } while(0)
//learn-tars/tup/Tars.h 255 #define TarsPeekFromHead(is, type, tag, n) 256 do { 257 std::cout<<"TarsPeekFromHead begin"<<std::endl; 258 (n) = 1; 259 uint8_t typeTag, tmpTag; 260 TarsPeekTypeBuf(is, typeTag, 0, uint8_t); 261 std::cout<<"TarsPeekFromHead typeTag "<<(int)typeTag<<std::endl; 262 tmpTag = typeTag >> 4; 263 std::cout<<"TarsPeekFromHead tmpTag "<<(int)tmpTag<<std::endl; 264 (type) = (typeTag & 0x0F); 265 std::cout<<"TarsPeekFromHead type "<<(int)type<<std::endl; 266 if(tars_unlikely(tmpTag == 15)) 267 { 268 std::cout<<"TarsPeekFromHead unlikely tmpTag "<<(int)tmpTag<<s td::endl; 269 TarsPeekTypeBuf(is, tag, 1, uint8_t); 270 (n) += 1; 271 } 272 else 273 { 274 (tag) = tmpTag; 275 } 276 } while(0)
推薦閱讀: