此文已由作者趙忠傑授權網易雲社區發布。

歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。

1.需求背景

在做服務端開發時,我們經常會有這樣的需求,需要把一個資料庫中的數據經過一定的處理,落入到另一個資料庫中。數據轉化期間,

可能需要一系列的轉化步驟,例如原始數據經過數據轉化為目標資料庫的格式,過濾不滿足條件的數據,通過一些網路調用來增強數

據等等。這些都是數據同步/轉換中的步驟,面對不同的需求,處理階段可能會增加,也可能會減少。

當然,不局限於數據同步,開發中有很多場景都可以抽象為上述模型,即把整個流程抽象為不同的處理階段。

2.面臨的問題

在面對大數據量時,我們常常採用如下的多線程優化方式:

其中,S1-SN 為不同的處理階段。問題在於:各個處理階段(S1-SN)中可能充斥著大量的網路調用,IO操作等,即使採用了多線程的

優化方式,每個線程處理任務的效率仍然非常低下,無法充分利用CPU資源,從而造成整體任務效率低下。此時,pipeline模式就應用而生了。

3.pipeline架構示意圖

如果我們把一個任務分成3個處理階段,對於一個任務來說,任務是串列執行的,但對於多個任務來說,各個階段是並行執行的,所以任務

從整體上看是並行執行的,由於是並行執行,所以能夠充分利用CPU資源。

上述只是單線程流水線狀態,我們可以對其進行線程池化的改造,來進一步提交效率。

在pipeline上,對於任務不同的處理階段,我們可以進行非同步化,池化改造,從而實現不同階段並行計算,同一階段池化運算,充分利用多核CPU的處理能力,

提高整體任務的執行效率。

4.pipeline分類

從上圖可知,pipeline分為線性pipeline和非線性pipeline,關鍵在於pipeline對不同處理階段pipe是如何組裝與路由的。針對不同的業務

場景,可以選擇不同的pipeline類型。

5.pipeline類圖

上圖給出的是一個SimplePipeline的簡易類圖。對於不同的需求,可以針對具體情況做出相應的調整。

Pipe:處理階段的抽象。負責對輸入進行處理,並將輸出作為下一個階段的輸入。Pipe可以理解為(輸入,處理,輸出)三元組。

process:用於接收前一階段的處理結果,用作該處理階段的輸入。

init:初始化當前處理階段對外提供的服務。

shutdown:關閉當前處理階段對外提供的服務。

setNextPipe:設置當前處理階段的下一個處理階段。

ThreadPoolPipeDecorator:基於線程池的Pipe實現類。該類主要實現用線程池去執行對各個輸入元素的處理。

AbstractPipe:Pipe的抽象實現類。

process: 接收前一階段的處理結果作為輸入,並調用子類的doProcess方法對元素進行處理,相應的處理結果會提交給

下一個階段進行處理。

doProcess : 留給子類實現的抽象方法。

PipeContext:對各個處理階段的計算環境的抽象,主要用於異常處理。

Pipeline: 對複合Pipe的抽象。一個Pipeline實例可以包含多個Pipe實例。

addPipe:往該Pipeline實例中添加一個Pipe實例。

SimplePipeline:基於AbstractPipe的Pipeline介面的一個簡單實現類。

6.代碼示例

AbstractPipe為Pipe介面的方法提供了默認實現,子類按需覆蓋即可。process方法中,會調用子類的doProcess方法。

Pipe介面的具體子類實現真正的數據處理。

通過委託的方式,實現Pipe的池化處理。

Pipeline的簡單實現。通過Pipeline實現Pipe的組裝與初始化,生命周期管理等。

上述Pipeline的實現比較簡單,如果有必要, 可以藉助配置等形式以動態的形式來創建和組裝Pipeline。

7.Pipeline模式考量

1.Pipeline模式可以對有依賴關係的任務實現並行處理,應用Pipeline後,對任務的處理整體是並行的。提高了並發性。

2.Pipeline雖然可以提高並發性,但背後也隱藏這代價。各個階段的處理都有其時間和空間的開銷,而且編程複雜,出現問題不易排查,因此pipeline模式

適合於處理規模較大的任務,否則可能得不償失。

3.Pipeline的深度需要依賴於任務的性質而定。如果是CPU密集型,深度最好不要超過CPU個數;如果是IO密集型,深度最好不要超過2*CPU個數。

免費領取驗證碼、內容安全、簡訊發送、直播點播體驗包及雲伺服器等套餐

更多網易技術、產品、運營經驗分享請點擊。

推薦閱讀:

相关文章