1.Netty總體架構

Netty 是 JBoss 出品的高效的 Java NIO 開發框架,這個圖是Netty的總體結構圖,多多眼熟幾遍先。

2.網路模型

Netty是典型的Reactor模型結構,關於Reactor的詳盡闡釋,可參考POSA2,這裡不做概念性的解釋。而應用Java NIO構建Reactor模式,Doug Lea(就是那位讓人無限景仰的大爺)在「Scalable IO in Java」中給了很好的闡述。這裡截取其PPT中經典的圖例說明 Reactor模式的典型實現:

1、這是最簡單的單Reactor單線程模型。Reactor線程是個多面手,負責多路分離套接字,Accept新連接,並分派請求到處理器鏈中。該模型 適用於處理器鏈中業務處理組件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,所以實際使用的不多。

2、相比上一種模型,該模型在處理器鏈部分採用了多線程(線程池),也是後端程序常用的模型。

3、 第三種模型比起第二種模型,是將Reactor分成兩部分,mainReactor負責監聽server socket,accept新連接,並將建立的socket分派給subReactor。subReactor負責多路分離已連接的socket,讀寫網 絡數據,對業務處理功能,其扔給worker線程池完成。通常,subReactor個數上可與CPU個數等同。

說完Reacotr模型的三種形式,那麼Netty是哪種呢?其實,我還有一種Reactor模型的變種沒說,那就是去掉線程池的第三種形式的變種,這也 是Netty NIO的默認模式。在實現上,Netty中的Boss類充當mainReactor,NioWorker類充當subReactor(默認 NioWorker的個數是Runtime.getRuntime().availableProcessors())。在處理新來的請求 時,NioWorker讀完已收到的數據到ChannelBuffer中,之後觸發ChannelPipeline中的ChannelHandler流。

Netty是事件驅動的,可以通過ChannelHandler鏈來控制執行流向。因為ChannelHandler鏈的執行過程是在 subReactor中同步的,所以如果業務處理handler耗時長,將嚴重影響可支持的並發數。這種模型適合於像Memcache這樣的應用場景,但 對需要操作資料庫或者和其他模塊阻塞交互的系統就不是很合適。Netty的可擴展性非常好,而像ChannelHandler線程池化的需要,可以通過在 ChannelPipeline中添加Netty內置的ChannelHandler實現類–ExecutionHandler實現,對使用者來說只是 添加一行代碼而已。對於ExecutionHandler需要的線程池模型,Netty提供了兩種可 選:1) MemoryAwareThreadPoolExecutor 可控制Executor中待處理任務的上限(超過上限時,後續進來的任務將被阻 塞),並可控制單個Channel待處理任務的上限;2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子類,它還可以保證同一Channel中處理的事件流的順序性,這主要是控制事件在非同步處 理模式下可能出現的錯誤的事件順序,但它並不保證同一Channel中的事件都在一個線程中執行(通常也沒必要)。一般來 說,OrderedMemoryAwareThreadPoolExecutor 是個很不錯的選擇,當然,如果有需要,也可以DIY一個。

3、 buffer

org.jboss.netty.buffer包的介面及類的結構圖如下:

該包核心的介面是ChannelBuffer和ChannelBufferFactory,下面予以簡要的介紹。

Netty使用ChannelBuffer來存儲並操作讀寫的網路數據。ChannelBuffer除了提供和ByteBuffer類似的方法,還提供了 一些實用方法,具體可參考其API文檔。ChannelBuffer的實現類有多個,這裡列舉其中主要的幾個:

1)HeapChannelBuffer:這是Netty讀網路數據時默認使用的ChannelBuffer,這裡的Heap就是Java堆的意思,因為 讀SocketChannel的數據是要經過ByteBuffer的,而ByteBuffer實際操作的就是個byte數組,所以 ChannelBuffer的內部就包含了一個byte數組,使得ByteBuffer和ChannelBuffer之間的轉換是零拷貝方式。根據網路字 節續的不同,HeapChannelBuffer又分為BigEndianHeapChannelBuffer和 LittleEndianHeapChannelBuffer,默認使用的是BigEndianHeapChannelBuffer。Netty在讀網路 數據時使用的就是HeapChannelBuffer,HeapChannelBuffer是個大小固定的buffer,為了不至於分配的Buffer的 大小不太合適,Netty在分配Buffer時會參考上次請求需要的大小。

2)DynamicChannelBuffer:相比於HeapChannelBuffer,DynamicChannelBuffer可動態自適應大 小。對於在DecodeHandler中的寫數據操作,在數據大小未知的情況下,通常使用DynamicChannelBuffer。

3)ByteBufferBackedChannelBuffer:這是directBuffer,直接封裝了ByteBuffer的 directBuffer。

對於讀寫網路數據的buffer,分配策略有兩種:1)通常出於簡單考慮,直接分配固定大小的buffer,缺點是,對一些應用來說這個大小限制有時是不 合理的,並且如果buffer的上限很大也會有內存上的浪費。2)針對固定大小的buffer缺點,就引入動態buffer,動態buffer之於固定 buffer相當於List之於Array。

buffer的寄存策略常見的也有兩種(其實是我知道的就限於此):1)在多線程(線程池) 模型下,每個線程維護自己的讀寫buffer,每次處理新的請求前清空buffer(或者在處理結束後清空),該請求的讀寫操作都需要在該線程中完成。 2)buffer和socket綁定而與線程無關。兩種方法的目的都是為了重用buffer。

Netty對buffer的處理策略是:讀 請求數據時,Netty首先讀數據到新創建的固定大小的HeapChannelBuffer中,當HeapChannelBuffer滿或者沒有數據可讀 時,調用handler來處理數據,這通常首先觸發的是用戶自定義的DecodeHandler,因為handler對象是和ChannelSocket 綁定的,所以在DecodeHandler裏可以設置ChannelBuffer成員,當解析數據包發現數據不完整時就終止此次處理流程,等下次讀事件觸 發時接著上次的數據繼續解析。就這個過程來說,和ChannelSocket綁定的DecodeHandler中的Buffer通常是動態的可重用 Buffer(DynamicChannelBuffer),而在NioWorker中讀ChannelSocket中的數據的buffer是臨時分配的 固定大小的HeapChannelBuffer,這個轉換過程是有個位元組拷貝行為的。

對ChannelBuffer的創建,Netty內部使用的是ChannelBufferFactory介面,具體的實現有 DirectChannelBufferFactory和HeapChannelBufferFactory。對於開發者創建 ChannelBuffer,可使用實用類ChannelBuffers中的工廠方法。

4、Channel

和Channel相關的介面及類結構圖如下:

從該結構圖也可以看到,Channel主要提供的功能如下:

1)當前Channel的狀態信息,比如是打開還是關閉等。

2)通過ChannelConfig可以得到的Channel配置信息。

3)Channel所支持的如read、write、bind、connect等IO操作。

4)得到處理該Channel的ChannelPipeline,既而可以調用其做和請求相關的IO操作。

在Channel實現方面,以通常使用的nio socket來說,Netty中的NioServerSocketChannel和NioSocketChannel分別封裝了java.nio中包含的 ServerSocketChannel和SocketChannel的功能。

5、ChannelEvent

如前所述,Netty是事件驅動的,其通過ChannelEvent來確定事件流的方向。一個ChannelEvent是依附於Channel的 ChannelPipeline來處理,並由ChannelPipeline調用ChannelHandler來做具體的處理。下面是和 ChannelEvent相關的介面及類圖:

對於使用者來說,在ChannelHandler實現類中會使用繼承於ChannelEvent的MessageEvent,調用其 getMessage()方法來獲得讀到的ChannelBuffer或被轉化的對象。

6、ChannelPipeline

Netty 在事件處理上,是通過ChannelPipeline來控制事件流,通過調用註冊其上的一系列ChannelHandler來處理事件,這也是典型的攔截 器模式。下面是和ChannelPipeline相關的介面及類圖:

事件流有兩種,upstream事件和downstream事件。在ChannelPipeline中,其可被註冊的ChannelHandler既可以 是 ChannelUpstreamHandler 也可以是ChannelDownstreamHandler ,但事件在ChannelPipeline傳遞過程中只會調用匹配流的ChannelHandler。在事件流的過濾器鏈 中,ChannelUpstreamHandler或ChannelDownstreamHandler既可以終止流程,也可以通過調用 ChannelHandlerContext.sendUpstream(ChannelEvent)或 ChannelHandlerContext.sendDownstream(ChannelEvent)將事件傳遞下去。下面是事件流處理的圖示:

從上圖可見,upstream event是被Upstream Handler們自底向上逐個處理,downstream event是被Downstream Handler們自頂向下逐個處理,這裡的上下關係就是向ChannelPipeline裏添加Handler的先後順序關係。簡單的理 解,upstream event是處理來自外部的請求的過程,而downstream event是處理向外發送請求的過程。

服務端處 理請求的過程通常就是解碼請求、業務邏輯處理、編碼響應,構建的ChannelPipeline也就類似下面的代碼片斷:

ChannelPipeline pipeline = Channels.pipeline();

pipeline.addLast("decoder", new MyProtocolDecoder());pipeline.addLast("encoder", new MyProtocolEncoder());pipeline.addLast("handler", new MyBusinessLogicHandler());

其中,MyProtocolDecoder是ChannelUpstreamHandler類型,MyProtocolEncoder是 ChannelDownstreamHandler類型,MyBusinessLogicHandler既可以是 ChannelUpstreamHandler類型,也可兼ChannelDownstreamHandler類型,視其是服務端程序還是客戶端程序以及 應用需要而定。

補充一點,Netty對抽象和實現做了很好的解耦。像org.jboss.netty.channel.socket包, 定義了一些和socket處理相關的介面,而org.jboss.netty.channel.socket.nio、 org.jboss.netty.channel.socket.oio等包,則是和協議相關的實現。

7、codec framework

對於請求協議的編碼解碼,當然是可以按照協議格式自己操作ChannelBuffer中的位元組數據。另一方面,Netty也做了幾個很實用的codec helper,這裡給出簡單的介紹。

1)FrameDecoder:FrameDecoder內部維護了一個 DynamicChannelBuffer成員來存儲接收到的數據,它就像個抽象模板,把整個解碼過程模板寫好了,其子類只需實現decode函數即可。 FrameDecoder的直接實現類有兩個:(1)DelimiterBasedFrameDecoder是基於分割符 (比如
)的解碼器,可在構造函數中指定分割符。(2)LengthFieldBasedFrameDecoder是基於長度欄位的解碼器。如果協 議 格式類似「內容長度」+內容、「固定頭」+「內容長度」+動態內容這樣的格式,就可以使用該解碼器,其使用方法在API DOC上詳盡的解釋。

2)ReplayingDecoder: 它是FrameDecoder的一個變種子類,它相對於FrameDecoder是非阻塞解碼。也就是說,使用 FrameDecoder時需要考慮到讀到的數據有可能是不完整的,而使用ReplayingDecoder就可以假定讀到了全部的數據。

3)ObjectEncoder 和ObjectDecoder:編碼解碼序列化的Java對象。

4)HttpRequestEncoder和 HttpRequestDecoder:http協議處理。

下面來看使用FrameDecoder和ReplayingDecoder的兩個例子:

public class IntegerHeaderFrameDecoder extends FrameDecoder {

protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { if (buf.readableBytes() < 4) { return null; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return null; } return buf.readBytes(length); }}

而使用ReplayingDecoder的解碼片斷類似下面的,相對來說會簡化很多。

public class IntegerHeaderFrameDecoder2 extends ReplayingDecoder {

protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf, VoidEnum state) throws Exception { return buf.readBytes(buf.readInt()); }}

就實現來說,當在ReplayingDecoder子類的decode函數中調用ChannelBuffer讀數據時,如果讀失敗,那麼 ReplayingDecoder就會catch住其拋出的Error,然後ReplayingDecoder接手控制權,等待下一次讀到後續的數據後繼 續decode。


推薦閱讀:
相關文章