此文已由作者赵忠杰授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。

1.需求背景

在做服务端开发时,我们经常会有这样的需求,需要把一个资料库中的数据经过一定的处理,落入到另一个资料库中。数据转化期间,

可能需要一系列的转化步骤,例如原始数据经过数据转化为目标资料库的格式,过滤不满足条件的数据,通过一些网路调用来增强数

据等等。这些都是数据同步/转换中的步骤,面对不同的需求,处理阶段可能会增加,也可能会减少。

当然,不局限于数据同步,开发中有很多场景都可以抽象为上述模型,即把整个流程抽象为不同的处理阶段。

2.面临的问题

在面对大数据量时,我们常常采用如下的多线程优化方式:

其中,S1-SN 为不同的处理阶段。问题在于:各个处理阶段(S1-SN)中可能充斥著大量的网路调用,IO操作等,即使采用了多线程的

优化方式,每个线程处理任务的效率仍然非常低下,无法充分利用CPU资源,从而造成整体任务效率低下。此时,pipeline模式就应用而生了。

3.pipeline架构示意图

如果我们把一个任务分成3个处理阶段,对于一个任务来说,任务是串列执行的,但对于多个任务来说,各个阶段是并行执行的,所以任务

从整体上看是并行执行的,由于是并行执行,所以能够充分利用CPU资源。

上述只是单线程流水线状态,我们可以对其进行线程池化的改造,来进一步提交效率。

在pipeline上,对于任务不同的处理阶段,我们可以进行非同步化,池化改造,从而实现不同阶段并行计算,同一阶段池化运算,充分利用多核CPU的处理能力,

提高整体任务的执行效率。

4.pipeline分类

从上图可知,pipeline分为线性pipeline和非线性pipeline,关键在于pipeline对不同处理阶段pipe是如何组装与路由的。针对不同的业务

场景,可以选择不同的pipeline类型。

5.pipeline类图

上图给出的是一个SimplePipeline的简易类图。对于不同的需求,可以针对具体情况做出相应的调整。

Pipe:处理阶段的抽象。负责对输入进行处理,并将输出作为下一个阶段的输入。Pipe可以理解为(输入,处理,输出)三元组。

process:用于接收前一阶段的处理结果,用作该处理阶段的输入。

init:初始化当前处理阶段对外提供的服务。

shutdown:关闭当前处理阶段对外提供的服务。

setNextPipe:设置当前处理阶段的下一个处理阶段。

ThreadPoolPipeDecorator:基于线程池的Pipe实现类。该类主要实现用线程池去执行对各个输入元素的处理。

AbstractPipe:Pipe的抽象实现类。

process: 接收前一阶段的处理结果作为输入,并调用子类的doProcess方法对元素进行处理,相应的处理结果会提交给

下一个阶段进行处理。

doProcess : 留给子类实现的抽象方法。

PipeContext:对各个处理阶段的计算环境的抽象,主要用于异常处理。

Pipeline: 对复合Pipe的抽象。一个Pipeline实例可以包含多个Pipe实例。

addPipe:往该Pipeline实例中添加一个Pipe实例。

SimplePipeline:基于AbstractPipe的Pipeline介面的一个简单实现类。

6.代码示例

AbstractPipe为Pipe介面的方法提供了默认实现,子类按需覆盖即可。process方法中,会调用子类的doProcess方法。

Pipe介面的具体子类实现真正的数据处理。

通过委托的方式,实现Pipe的池化处理。

Pipeline的简单实现。通过Pipeline实现Pipe的组装与初始化,生命周期管理等。

上述Pipeline的实现比较简单,如果有必要, 可以借助配置等形式以动态的形式来创建和组装Pipeline。

7.Pipeline模式考量

1.Pipeline模式可以对有依赖关系的任务实现并行处理,应用Pipeline后,对任务的处理整体是并行的。提高了并发性。

2.Pipeline虽然可以提高并发性,但背后也隐藏这代价。各个阶段的处理都有其时间和空间的开销,而且编程复杂,出现问题不易排查,因此pipeline模式

适合于处理规模较大的任务,否则可能得不偿失。

3.Pipeline的深度需要依赖于任务的性质而定。如果是CPU密集型,深度最好不要超过CPU个数;如果是IO密集型,深度最好不要超过2*CPU个数。

免费领取验证码、内容安全、简讯发送、直播点播体验包及云伺服器等套餐

更多网易技术、产品、运营经验分享请点击。

推荐阅读:

相关文章