前言:

看了連接池的源碼感覺很爽,並且發現自己原來不懂的太多,趁著這次再多想想自己還有那些不懂。是不是以前想當然的事情,實際自己根本就不懂,故有了以下的思考。

一、服務端是如何維護長連接的

從客戶端的角度來講,維持一個連接池管理連接,用的時候拿出來即可,不用的時候不用對這個連接做任何操作。那服務端呢?服務端是怎麼能維持這個連接一直可使用呢?如果服務端也使用連接池的方式管理socket連接好像不對啊,它必然會監聽每個連接的,怎麼監聽的呢?總之,一個疑問:服務端是如何維護長連接的?

1.1 Thrift的服務端的Accept後幹了什麼

通過上述連接池的信息我們知道,當客戶端創建一個長連接的時候,可以一直使用這個連接,那麼服務端是怎麼維護這個連接的呢?

我們還是通過看源碼了解:

func (p *TSimpleServer) Serve() error {
err := p.Listen()
if err != nil {
return err
}
p.AcceptLoop()
return nil
}

這是thrift的啟動監控連接的代碼,首選創建監聽的對象,然後啟動循環,監聽客戶端發送的請求。

我們來看下AcceptLoop幹了什麼

func (p *TSimpleServer) AcceptLoop() error {
for {
client, err := p.serverTransport.Accept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
}
}
}

理解透這段代碼真廢了好大的力氣(之前了解的socket編程知識非常少)

最初調用Accept的時候,會發生阻塞,當有客戶端有建立連接請求的時候,Accept會返回一個連接,如果沒什麼錯誤,就會創建一個協程,處理該連接。 ok,繼續追代碼,如何處理這個處理這個連接的。

func (p *TSimpleServer) processRequests(client TTransport) error {
processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
defer func() {
if e := recover(); e != nil {
log.Printf("panic in processor: %s: %s", e, debug.Stack())
}
}()

if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
ok, err := processor.Process(inputProtocol, outputProtocol)
if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
return nil
} else if err != nil {
log.Printf("error processing request: %s", err)
return err
}
if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}
return nil
}

來看這個處理的過程,首先會根據client讀取一個processor,這個processor其實就是我們自己的業務邏輯處理函數的集合。然後就是獲取inputProtocol和outputProtocol,這倆玩意其實就是讀數據和寫數據的連接,其實他們是一個東西,這裡不詳細追源碼了。重點在下面,看到了一個for循環,循環體最重要的函數是processor.Process(inputProtocol, outputProtocol),它的參數為讀連接與寫連接,接下來就是讀取請求的數據,處理,然後返回結果,然後循環繼續。。。好吧,終於搞明白長連接在服務端到底是一個怎樣形式的存在,原來就是為每個連接開啟一個協程,協程循環的從連接中讀取數據、處理、返回結果。

1.2 Thrift如何建立連接的,為什麼沒看到TCP三次握手的代碼部分?—>netFD介紹

為什麼又冒出來一個netFD,聽我慢慢道來。 我們還是先從服務端入手,通過上述我們知道服務端的監聽是通過調用TSimpleServer的AcceptLoop函數進行的,而創建連接調用的是p.serverTransport.Accept(),這個serverTransport就是我們創建ThriftServe的時候需要初始化的,創建方式有多種,比如thrift.NewTServerSocketTimeout(),它返回一個帶有超時時間的TServerSocket。p.serverTransport.Accept()這個Accept()實際上調用的是TServerSocket的,我們看下這個函數幹了什麼?

func (p *TServerSocket) Accept() (TTransport, error) {
p.mu.RLock()
interrupted := p.interrupted
p.mu.RUnlock()

if interrupted {
return nil, errTransportInterrupted
}
if p.listener == nil {
return nil, NewTTransportException(NOT_OPEN, "No underlying server socket")
}
conn, err := p.listener.Accept()
if err != nil {
return nil, NewTTransportExceptionFromError(err)
}
return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil
}

首先它返回了TTransport類型的對象,這個對象是誰TServerSocket類型的listener變數的Accept()創建,那麼這個listener在在哪呢?

我們找到TServerSocket它還有一個Listen()函數

func (p *TServerSocket) Listen() error {
if p.IsListening() {
return nil
}
l, err := net.Listen(p.addr.Network(), p.addr.String())
if err != nil {
return err
}
p.listener = l
return nil
}

這個listener實際是從net包的Listen創建的,就這樣一直追代碼,最終找到了

一個netFD的結構體

// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd + serialize access to Read and Write methods
fdmu fdMutex

// immutable until Close
sysfd int
family int
sotype int
isStream bool
isConnected bool
net string
laddr Addr
raddr Addr

// writev cache.
iovecs *[]syscall.Iovec

// wait server
pd pollDesc
}

也就是說在AcceptLoop()中的Accept實際上是調用的netFD的Accept函數,這個netFD實際是在啟動服務的時候調用TSimpleServer的Listen(),可以認為它就是socket連接。

注意:1.socket是通過套接字來找到對方的,套接字就是ip+埠+協議組成,基礎知識就不扯了。每個socket會佔用一個套接字,每個socket會佔用一個套接字!!! 2.TSimpleServer的Listen()創建的socket連接佔用的就是我們經常對外提供的埠,客戶端發送建立連接請求的時候也是打到這個埠,也就是被這個連接捕獲到,當這個連接發現有客戶端連接請求的時候,會為這個客戶端重新創建一個新的socket連接,注意,這個新的socket當然會佔用一個新的埠,這個埠當然是未被佔用的,然後使用這個新建的連接進行數據通信。總結來說就是,Listen()的時候創建一個socket監聽我們對外提供的埠,收到請求後重新創建一個socket連接,然後進行數據通信。 ok,我們繼續,還是沒有看到TCP三次握手在哪啊! 那就繼續追代碼,我們來看下net的accept都幹了什麼?

func (fd *netFD) accept() (netfd *netFD, err error) {
/*省略*/
for {
s, rsa, err = accept(fd.sysfd)
if err != nil {
nerr, ok := err.(*os.SyscallError)
if !ok {
return nil, err
}
switch nerr.Err {
case syscall.EAGAIN:
if err = fd.pd.waitRead(); err == nil {
continue
}
case syscall.ECONNABORTED:
continue
}
return nil, err
}
break
}

if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil {
closeFunc(s)
return nil, err
}
if err = netfd.init(); err != nil {
fd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

把accept的前半部分代碼刪掉了,不影響我們看主要的代碼,我們看到for循環中一直在調用accept函數,如果成功則重新創建了一個socket連接netfd,注意在創建新的netFD後,設置了兩個地址,lsa和rsa,之前一直以為是左連接和右連接,其實是localadder和remoteadder的意思,我們看到rsa是accept得到的地址,而lsa是根據netfd.sysfd調用Getsockname得到的,這個netfd就是新建的socket,新建這個socket的時候newFD(s, fd.family, fd.sotype, fd.net) 中的fd就是監控socket連接了,那自然就是本地的地址了。

如果沒有成功,當錯誤類型是syscall.EAGAIN的時候,就調用waitRead()等待新的連接,如果返回錯誤為nil,則繼續循環,這種機制其實是golang中把非阻塞轉換為阻塞形式的方法,涉及到linux的I/O多路復用epoll,這個稍後會做詳細的介紹。 總結來說,netFD的accept會阻塞等待一個連接請求,當一個新的連接請求到來時,會新建一個連接,然後返回該連接。

ok,好像還沒到TCP三次握手啊,那就繼續追代碼,我們來看看netFD中的accept調用的accept(fd.sysfd)到底在幹什麼?

func accept(s int) (int, syscall.Sockaddr, error) {
ns, sa, err := acceptFunc(s)
if err == nil {
syscall.CloseOnExec(ns)
}
if err != nil {
return -1, nil, os.NewSyscallError("accept", err)
}
if err = syscall.SetNonblock(ns, true); err != nil {
closeFunc(ns)
return -1, nil, os.NewSyscallError("setnonblock", err)
}
return ns, sa, nil
}

繼續看acceptFunc(s)

package net

import "syscall"
var (
acceptFunc func(int) (int, syscall.Sockaddr, error) = syscall.Accept
)

繼續追

func Accept(fd int) (nfd int, sa Sockaddr, err error) {
var rsa RawSockaddrAny
var len _Socklen = SizeofSockaddrAny
nfd, err = accept(fd, &rsa, &len)
if err != nil {
return
}
if runtime.GOOS == "darwin" && len == 0 {
// Accepted socket has no address.
// This is likely due to a bug in xnu kernels,
// where instead of ECONNABORTED error socket
// is accepted, but has no address.
Close(nfd)
return 0, nil, ECONNABORTED
}
sa, err = anyToSockaddr(&rsa)
if err != nil {
Close(nfd)
nfd = 0
}
return
}

再繼續

func accept(s int, rsa *RawSockaddrAny, addrlen *_Socklen) (fd int, err error) {
r0, _, e1 := Syscall(SYS_ACCEPT, uintptr(s), uintptr(unsafe.Pointer(rsa)), uintptr(unsafe.Pointer(addrlen)))
fd = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}

package syscall

import (
"internal/race"
"runtime"
"sync"
"unsafe"
)

var (
Stdin = 0
Stdout = 1
Stderr = 2
)

const (
darwin64Bit = runtime.GOOS == "darwin" && sizeofPtr == 8
dragonfly64Bit = runtime.GOOS == "dragonfly" && sizeofPtr == 8
netbsd32Bit = runtime.GOOS == "netbsd" && sizeofPtr == 4
solaris64Bit = runtime.GOOS == "solaris" && sizeofPtr == 8
)

func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno)
func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func RawSyscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno)

最終調到了Syscall了,也就是所謂的系統調用了,好吧,對於服務端來說,三次握手的過程,封裝在了系統調用中了。 ok,我們再看一個東西,netFD的accept中的accept函數(好彆扭)返回的值:s, rsa, err,其中s為int類型,它是什麼呢?我們 看它後續在創建新的socket連接的時候用到了newFD(s, fd.family, fd.sotype, fd.net),最終對應到netFD結構體的sysfd(fd意思是file description),其實它就是這個socket的套接字描述符,因為對於linux來說,一切接文件,所以對於socket來說,也被當成一個文件,那麼創建socket連接的時候,自然要為它來生成一個文件描述符,這也是我們最初的時候說創建連接消耗資源的其中一個原因之一,再看下accept(fd.sysfd)傳進去的也不是我們監控連接的套接字,而是其對應的描述符,我們的系統調用函數,只認識文件描述符(關於描述符,文章的最後會做介紹)。

如果是從客戶端追代碼是一樣的,你會發現最終是調用了socketFunc並且再connectFunc,喜歡的同學可以看下源碼。

1.3 套接字

上述過程中,我們提到了套接字,嗯….不說了,繼續給自己掃盲。 首先套接字是網路通信埠的抽象概念,你可以認為它就像打電話的時候的手機號一樣,A、B應用程序通信,他們通過套接字可以相互把信息精準的傳輸到對方。 所謂套接字就是ip+協議+埠,在網路通信中,ip只能定位一個主機,而加上埠目的是為了唯一的定位應用程序,為什麼加上協議,通信雙方得使用相同的語言吧,我說中文,你說英文,怎麼交流,我使用TCP,你使用UDP,雙方還怎麼解析對方的數據,TCP怎麼保證可靠性…,肯定要使用同樣的協議。 注意: 1.一個完整的套接字是由協議+本地ip+本地埠+遠程ip+遠程埠唯一確認。 2.因為系統會為每個套接字分配一個發送和接受緩衝區。 3.對於linux來說一切皆文件,socket也不例外,系統會為每個套接字分配一個文件描述符,稱之為套接字描述符socketfd,是一個整數,應用程序每次訪問socket讀取數據或者發送數據的時候,都是把它當成一個文件通過socketfd訪問,稍後會詳細描述。4.每個套接字不一定會獨佔一個埠(這個稍後介紹原因)

1.4 服務端最大空閑時間

在跟另外一個下游服務-拼車運營活動配置服務對接的時候,同樣發現在聯調時總是報EOF錯誤,後來得知下游服務設置了最大空閑時間。這個時間是針對服務端的:我們知道,服務端收到客戶端的連接請求,會跟客戶端創建一個連接,這個連接會一直保持著用於雙方通信,但是如果出現異常情況,比如客戶端斷網了導致連接沒有被正常關閉,或者客戶端使用連接後,忘記關閉連接,就會使得服務端浪費資源在維護這個連接,那麼服務端就設置一個時間,當某個連接在超過一定時間都沒有被使用,就主動關閉這個連接。嘮叨再多不如看源碼,先說下我們看源碼的目的是找到什麼?我們要找到那段實際從socket連接中讀取數據,並且要判斷是否超時的代碼,明確目的後,下面開始追代碼了:

func (p *TSimpleServer) AcceptLoop() error {
for {
client, err := p.serverTransport.Accept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
}
}
}

還是這段Thrift的server端的AcceptLoop的代碼,我們看到,當為連接請求創建一個新的socket連接後,開啟了一個協程去處理,處理的函數processRequests,processRequests函數內部實際處理這個是,我們ok,err:=processor.Process(inputProtocol, outputProtocol)(在上述的時候我們看到過這個函數),這個processor是我們的業務邏輯的處理函數集合一個對象,ok,我們以GCS的代碼為例來看看這個對象的Process函數幹了什麼

func (p *GCSProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return false, err
}
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId, iprot, oprot)
}
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x110 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x110.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()

return false, x110

}

解釋下這個函數,iprot, oprot實際就是從socket讀取數據和從socket寫返回數據的兩個入口,實際他們最終指向的是同一個socket連接(就是我們跟客戶端建立的連接),iprot.ReadMessageBegin是在讀取一些頭部信息,返回的是方法名字,還有一個sqlId後面我們再討論。p.GetProcessorFunction(name)這個應該好理解,根據方法名字,找到對應的處理函數processor,接著調用processor的Process處理即可。處理完畢後就是寫返回值了。 ok,我們就看看ReadMessageBegin幹了什麼

func (p *TBinaryProtocol) ReadMessageBegin() (name string, typeId TMessageType, seqId int32, err error) {
size, e := p.ReadI32()
if e != nil {
return "", typeId, 0, NewTProtocolException(e)
}
if size < 0 {
typeId = TMessageType(size & 0x0ff)
version := int64(int64(size) & VERSION_MASK)
if version != VERSION_1 {
return name, typeId, seqId, NewTProtocolExceptionWithType(BAD_VERSION, fmt.Errorf("Bad version in ReadMessageBegin"))
}
name, e = p.ReadString()
if e != nil {
return name, typeId, seqId, NewTProtocolException(e)
}
seqId, e = p.ReadI32()
if e != nil {
return name, typeId, seqId, NewTProtocolException(e)
}
return name, typeId, seqId, nil
}
if p.strictRead {
return name, typeId, seqId, NewTProtocolExceptionWithType(BAD_VERSION, fmt.Errorf("Missing version in ReadMessageBegin"))
}
name, e2 := p.readStringBody(size)
if e2 != nil {
return name, typeId, seqId, e2
}
b, e3 := p.ReadByte()
if e3 != nil {
return name, typeId, seqId, e3
}
typeId = TMessageType(b)
seqId, e4 := p.ReadI32()
if e4 != nil {
return name, typeId, seqId, e4
}
return name, typeId, seqId, nil
}

大概解釋下這個函數的流程,p.ReadI32(),首先從連接中讀取一個int32的數字,這個數字如果是版本號的話,二進位的首位是為1的,那麼也就是小於0的,我們回去判斷跟當前的Thrift的版本是否對應,接著回去讀取一個stirng,也就是我們的name,和sqlId,然後返回即可。 strictRead和strictWrite的意思其實就是嚴格度和嚴格寫,指的就是是否要判斷版本號和寫版本號,Thrift默認創建的NewTBinaryProtocolFactoryDefault()的時候分別置為false和true

func NewTBinaryProtocolFactoryDefault() *TBinaryProtocolFactory {
return NewTBinaryProtocolFactory(false, true)
}

這是題外話,ok,我們繼續看讀取數據的操作,以ReadI32為例,看看裡面發生了什麼,追代碼追到了這

func (p *TBinaryProtocol) readAll(buf []byte) error {
_, err := io.ReadFull(p.reader, buf)
return NewTProtocolException(err)
}
func ReadFull(r Reader, buf []byte) (n int, err error) {
return ReadAtLeast(r, buf, len(buf))
}
func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n >= min {
err = nil
} else if n > 0 && err == EOF {
err = ErrUnexpectedEOF
}
return
}

我們先看最下面的函數ReadAtLeast,裡面讀取數據的調用的是傳入的r.Read(),這個應該就是我們最終想找的讀數據的地方,但是r是哪來的?看第一個函數,喔,是TBinaryProtocol.reader,那麼這個reader是從哪來的呢?

func NewTBinaryProtocol(t TTransport, strictRead, strictWrite bool) *TBinaryProtocol {
p := &TBinaryProtocol{origTransport: t, strictRead: strictRead, strictWrite: strictWrite}
if et, ok := t.(TRichTransport); ok {
p.trans = et
} else {
p.trans = NewTRichTransport(t)
}
p.reader = p.trans
p.writer = p.trans
return p
}

ok,reader是在New的時候傳入的TTransport轉的,那什麼時候調用了NewTBinaryProtocol呢?我們找到了這個函數:

func (p *TBinaryProtocolFactory) GetProtocol(t TTransport) TProtocol {
return NewTBinaryProtocol(t, p.strictRead, p.strictWrite)
}

看起來很熟悉啊,這玩意不就是在處理新建的連接的函數processRequests中調用的函數嗎?不記得了?貼代碼

processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)

喔,其實這個r最終來源還是client。ok,我們再看看創建這個client的函數

func (p *TServerSocket) Accept() (TTransport, error) {
p.mu.RLock()
interrupted := p.interrupted
p.mu.RUnlock()

if interrupted {
return nil, errTransportInterrupted
}
if p.listener == nil {
return nil, NewTTransportException(NOT_OPEN, "No underlying server socket")
}
conn, err := p.listener.Accept()
if err != nil {
return nil, NewTTransportExceptionFromError(err)
}
return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil
}

看到最後一行代碼沒有,創建這個新的socket連接的時候把timeout傳進去了,他返回的是TSocket類型,ok,我們先來看看這個timeout是從哪來的,p.clientTimeout,這個p就是我們的TServerSocket,還記得AcceptLoop嗎?

func (p *TSimpleServer) AcceptLoop() error {
for {
client, err := p.serverTransport.Accept()
....
}

這個裡面的p.serverTransport也就是我們在init ThriftServer的時候調用的

ss, err = thrift.NewTServerSocketTimeout(taddr, self.ttimeoutms)

這個timeout才是對應到我們配置文件的timeout。

timeout的來源我們搞清楚了,我們再看看TSocket是不是真的有Read函數,很幸運,它是有的,看一眼乾了什麼

func (p *TSocket) Read(buf []byte) (int, error) {
if !p.IsOpen() {
return 0, NewTTransportException(NOT_OPEN, "Connection not open")
}
p.pushDeadline(true, false)
n, err := p.conn.Read(buf)
fmt.Println(err)
fmt.Println(time.Now().Unix())
return n, NewTTransportExceptionFromError(err)
}
func (p *TSocket) pushDeadline(read, write bool) {
var t time.Time
if p.timeout > 0 {
t = time.Now().Add(time.Duration(p.timeout))
}
if read && write {
p.conn.SetDeadline(t)
} else if read {
p.conn.SetReadDeadline(t)
} else if write {
p.conn.SetWriteDeadline(t)
}
}

pushDeadline實際就是把timeout設置生效,設置生效的方式是調用TSocket的conn的SetDeadline、SetReadDeadline、SetWriteDeadline,然後調用了,TSocket的conn的Read,看來實際操作的是這個conn,通過追代碼我們知道這個conn就是TCPListener,看下他的結構

type TCPListener struct {
fd *netFD
}
func (l *TCPListener) SetDeadline(t time.Time) error {
if !l.ok() {
return syscall.EINVAL
}
if err := l.fd.setDeadline(t); err != nil {
return &OpError{Op: "set", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return nil
}

那就是netFD嘛,繼續追代碼看到了

func (fd *netFD) setDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, r+w)
}

func (fd *netFD) setReadDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, r)
}

func (fd *netFD) setWriteDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, w)
}

來看下netFD的Read函數

func (fd *netFD) Read(p []byte) (n int, err error) {
...
if err := fd.pd.prepareRead(); err != nil {
return 0, err
}
...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
break
}
if _, ok := err.(syscall.Errno); ok {
err = os.NewSyscallError("read", err)
}
return
}

我們來看下實際Read調用的是syscall,是系統調用,這是非阻塞的,如果沒讀到數據的話,會調用waitRead(),這個東西東西實際是golang的網路輪詢器,我們稍後介紹golang是如何把系統非阻塞方式轉換為socket的阻塞方式的,採用的是linux的epoll。我們現在暫且認為是等待讀取,直到返回的有error,這個error實際就有可能會是超時,稍後會詳細講解。

到這裡,我們基本上弄明白了最大空閑時間是怎麼生效的,其實就是讀取數據的時候,超過這個時間了的話,就會返回error,我們再看下processRequests裡面的那個for循環

if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
ok, err := processor.Process(inputProtocol, outputProtocol)
if err, ok := err.(thrift.TTransportException); ok && err.TypeId() == thrift.END_OF_FILE {
return nil
} else if err != nil {
log.Printf("error processing request: %s", err)
return err
}
if err, ok := err.(thrift.TApplicationException); ok && err.TypeId() == thrift.UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}

如果處理的時候返回錯誤,循環就不會繼續了,直接返回,這個socket隨之也被關閉。

1.5 客戶端的超時時間

其實客戶端的超時時間跟服務端的最大空閑時間沒什麼區別,追到最後,發現都是socket讀取數據的時候的最大等待時長。

二、為什麼要使用長連接

一般查閱資料都會得到一個結論:建立連接需要消耗很大的資源,如果只使用短連接,當QPS過大的時候,對伺服器壓力太大。 問題來了,建立連接到底需要什麼資源呢? 1.我們知道TCP建立連接需要三次握手。 2.一個套接字需要佔用埠,如果socket數過多,會有埠耗盡的風險(稍後會詳細解釋,這裡有坑) 3.socket的連接釋放是需要一定的時間。 4.每個socket會佔用內存的,比如分配的發送緩衝區和接受緩衝區,如果socket短連接短時間過高並發,來不及釋放,會使得內存消耗過大,使得系統變慢,這裡注意下,socket的緩衝區一定會佔用內存嗎?實際上不是的,只有網路的數據到達緩衝區的時候,才會佔用內存,當數據被讀取完畢後,內存會被釋放。詳情見一個大神的文章:zhuanlan.zhihu.com/p/25 5.linux中,一切接文件,需要給每一個套接字分配一個文件描述符。

三、socket連接數過多問題

做服務端開發一定對於下面的報警再熟悉不過狀態:XX故障名稱:net.sockets.used大於7W指標:net.sockets.used主機:XXXX節點:XXXXXX當前值:121880說明:happen(net.sockets.used,#6,3)>70000恢復時間:2018-04-03 16:01:50詳情:XXXXXXXX

遇到這個問題老司機一看首先想到的是連接是不是泄露了,是不是連接池的連接使用了沒有put嗎,或者短連接使用了後沒有close。但是有沒有想過,我們為什麼要設置這個報警呢,又為什麼設置到7W呢?跟著以下幾個問題,我又給自己掃了一頓盲。

3.1 每個socket會獨佔埠嗎?

之前一直有一個錯誤的認知,每個套接字會獨佔一個埠,因為之前以為套接字是由ip+協議+埠,而且認為我們的socket數量限制就是因為這個原因。但是實際上這是個片面的認知,注意套接字不僅由本地ip和埠唯一確認,還是由遠程的ip和埠組成,那麼我們如果兩個套接字佔用同樣的本地埠也是可以的,只要遠程ip和埠不同即可。 其實再反過頭來想想也是,linux的 socket使用16bit無符號整型表示埠號,最大到65535,而且有些埠是不能隨便用的,如果每個套接字佔用一個埠,那我們設置net.sockets.used的報警為7W本身是不對的。

3.2 服務端Accept新建的socket會佔用新的埠嗎?

又是之前的一個錯誤認知,我們知道服務端會有一個AcceptLoop不斷的Accept,這個Accept會創建一個新的socket。我以為每創建一個socket就會佔用本機的一個埠,現在想想,如果這麼搞的話,那麼太容易出問題了,伺服器的接受連接數一旦超過埠可用數,就會出現伺服器不可用的問題,不能再接受新的連接請求。 剛剛我們了解到,套接字是根據本地信息和遠程的信息共同組成的,所以其實在服務端Accept之後,並不會佔用本地的新的埠,還是佔用服務啟動的時候監聽的那個埠。我們再回過頭看下netFD的Accept函數

func (fd *netFD) accept() (netfd *netFD, err error) {
...
for {
s, rsa, err = accept(fd.sysfd)
...
break
}

if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil {
closeFunc(s)
return nil, err
}
...
lsa, _ := syscall.Getsockname(netfd.sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

在文章之處我們了解到lsa是本地的地址,其實這個地址是ip+埠,我們來看下,netfd.addrFunc()返回的是什麼?

func (fd *netFD) addrFunc() func(syscall.Sockaddr) Addr {
switch fd.family {
case syscall.AF_INET, syscall.AF_INET6:
switch fd.sotype {
case syscall.SOCK_STREAM:
return sockaddrToTCP
case syscall.SOCK_DGRAM:
return sockaddrToUDP
case syscall.SOCK_RAW:
return sockaddrToIP
}
case syscall.AF_UNIX:
switch fd.sotype {
case syscall.SOCK_STREAM:
return sockaddrToUnix
case syscall.SOCK_DGRAM:
return sockaddrToUnixgram
case syscall.SOCK_SEQPACKET:
return sockaddrToUnixpacket
}
}
return func(syscall.Sockaddr) Addr { return nil }
}

返回的是一個函數,我們看一個TCP的函數長什麼樣

func sockaddrToTCP(sa syscall.Sockaddr) Addr {
switch sa := sa.(type) {
case *syscall.SockaddrInet4:
return &TCPAddr{IP: sa.Addr[0:], Port: sa.Port}
case *syscall.SockaddrInet6:
return &TCPAddr{IP: sa.Addr[0:], Port: sa.Port, Zone: zoneToString(int(sa.ZoneId))}
}
return nil
}

看到了吧,這個函數返回的是一個Addr包括了ip+埠。

3.3 埠復用問題以及我們為什麼要設置socket數量限制?

在3.1中描述的其實就是埠復用的情況,所以理論上來講我們的socket連接數並不會受埠數量的直接影響。那麼socket數量就可以無上限嗎?其實不是。 1.我們知道套接字=協議+本地ipport+遠程ipport,那麼在極端情況下,一台伺服器,如果我的上游某台客戶端機器一直跟我建立連接,知道打到埠上限,這樣的話這台客戶端機器再跟服務端建立連接就會失敗,當然這是極端情況,而且也只會影響這一台機器,理論上不會影響其他客戶端對埠的使用。 2.這樣的話,我是不是可以無限制的擴容客戶端機器,來請求下游呢?不是的,原因是套接字是佔用一定量內存的。其實之前還有一個疑問,就是如果socket過多的時候,我需要搜索某個socket的時候怎麼辦,會不會很慢呢?但是思考了下感覺沒有問題,因為我們讀取socket的時候,是按照socket描述符來找到socket的,描述符實際上就是進程文件描述符表的下表,而且,每個進程打開文件的數量是有限制的。
推薦閱讀:
相关文章