Presto适合PB级海量数据复杂分析,互动式SQL查询,?持跨数据源查询。本文就将详细解析Client提交查询请求到PrestoServer端响应的细节,以及还原Presto资源组校验的实现过程。

Presto调度模块源码解析-Client提交请求

当客户端以控制台Console、脚本--execute等方式提交SQL作业时,Presto的Client会将作业相关的参数和脚本封装成一个Restful请求,提交给PrestoServer端。然后再进行后续的诸如解析执行计划、拆分Stage、调度task到Worker端执行等操作。下面就介绍一下Client提交作业到服务端部分的源码。

Main函数

客户端提交作业的代码是从Presto的main函数开始的:

在Console的run方法中,如果入参中含有--execute,会直接将值取出作为待执行的SQL语句。否则认为是通过--file指定了SQL文件,此时会通过文件IO读取该文件中的SQL脚本。这种情况对应著通过脚本提交作业的情况,而如果--execute和--file都没有指定,则认为是通过控制台Console的方式提交SQL。

通过脚本提交作业

通过脚本的方式会直接执行Console类的executeCommand方法。按照「;」切分出SQL语句,并依次调用Console类的process方法来提交作业。

通过控制台提交作业

这种情况相对麻烦一些,他会执行Console.runConsole方法处理客户提交的请求。Presto为这种方式设置了一个AtomicBoolean existing变数来判断Client是否存在,如果不存在则不再提交后续的SQL(对应在控制台中输入了多条sql语句,并用;间隔,当前边的语句正在执行时退出Console,此时后续的sql就不会被提交了)。

在runConsole方法中可以看到,他会有一个while循环不断的循环处理LineReader对象读取到的命令,LineReader继承自jline.console.ConsoleReader,是一个专门处理控制台输入的Java类库(官方网址是jline.github.io/)。这个类每读取一行输入就会将值传递给一个名为buffer的StringBuilder对象,然后根据「;」和「\G」来识别一个完整的SQL,并将SQL交给process方法进行调度。最后会将剩下不完整的语句赋值给重新初始化的buffer对象作为下一条SQL的开头。

构建请求并发送

下面我们看一下process方法,这个方法中最重要的部分是他会在try with resources中调用QueryRunner的startQuery方法,如下:

这个语法表示小括弧中创建的对象如果实现了closable介面,则无论是否出现异常,都会在try catch结束后调用其close方法。

如下是startQuery方法:

然后我们一路点进去,经过QueryRunner的startInternalQuery、StatementClientFactory的newStatementClient方法之后,我们来到了StatementClientV1的构造函数:

在buildQueryRequest方法中,会构建一个目标Rest地址为/v1/statement的请求。

随后在JsonResponse.execute中会发起这个请求。然后我们搜索一下Rest地址/v1/statement,发现他的目标服务类为StatementResouce。

以上就是Client提交查询请求,到PrestoServer端响应的过程。

Presto调度模块源码解析-服务端响应-资源组选择(2)

当用户提交一个SQL作业时,Presto客户端会封装一个Request通过Restful介面将请求发送到服务端,下面就详细讲解一下服务端的处理过程。

Client端发送请求的地址是/v1/statement,对应到StatementResource的createQuery方法。在该方法中会调用Query的static方法create,在create方法中new了一个Query对象,然后会调用SqlQueryManager的createQuery方法。

在createQuery方法中首先会创建QueryId,生成规则是:

然后presto会判断集群是否有可用节点,其中isIncludeCoordinator变数对应config.properties配置文件中的node-scheduler.include-coordinator配置项,表示是否允许调度task到coordinator节点进行计算。

如果集群可用节点小于最小值1(参数query-manager.initialization-required-workers),则给出熟悉的报错信息「Cluster is still initializing……」。

除此之外presto还对sql长度做了限制,要求不能超过query.max-length(默认1_000_000_000,表示10亿)。

然后presto会根据提交作业的客户端信息选择资源组。

configurationManager的类型是AtomicReference<ResourceGroupConfigurationManager>。selectGroup方法实现如下:

然后我们点进match方法,来到了ResourceGroupConfigurationManager介面中,我们看到这个方法的实现类有如下三个:

那么问题来了,当我们调用match方法时,执行的是这三个实现类中的哪一个呢?

我们首先看一下configurationManager初始化时的值,如下图所示初始化时其类型为LegacyResourceGroupConfigurationManager:

然后我们搜一下configurationManager的引用,发现在InternalResourceGroupManager类的setConfigurationManager方法中修改了他的值。如下图:

该方法在同一个类的loadConfigurationManager方法中被调用。loadConfigurationManager方法会判断常量RESOURCE_GROUPS_CONFIGURATION对应的etc/resource-groups.properties文件是否存在,如果存在会读取文件中配置的resource-groups.configuration-manager参数为Key值,到configurationManagerFactories中取出对应的ResourceGroupConfigurationManagerFactory对象,然后调用其create方法构造一个ResourceGroupConfigurationManager对象,最终赋值给configurationManager。方法的实现如下图:

而loadConfigurationManager方法又在PrestoServer类的初始化方法中被调用。

PS:ResourceGroupManager的实现类型是在CoordinatorModule这个类中被注入的:

也就是说,当PrestoServer通过其main方法调用run方法进行初始化时, 会读取etc/resource-groups.properties文件中的配置项resource-groups.configuration-manager,再以它为Key值读取configurationManagerFactories中对应的ResourceGroupConfigurationManagerFactory,然后调用读取出来的工厂类的create方法构建ResourceGroupConfigurationManager对象,最后赋值给InternalResourceGroupManager类的configurationManager。

另一个问题出现了,configurationManagerFactories这个Map<String,ResourceGroupConfigurationManagerFactory>类型的全局变数是在什么时候赋值的,里边都有哪些值呢?

我们还是搜索一下它的引用,发现在InternalResourceGroupManager的addConfigurationManagerFactory方法中对其进行了putIfAbsent操作(不存在则put)。

搜索引用发现,在PluginManager的installPlugin方法中调用了这个方法:

然后我们看一下plugin.getResourceGroupConfigurationManagerFactories方法的定义,发现他有两个实现类:

ResourceGroupManagerPlugin的实现如下:

H2ResourceGroupManagerPlugin的实现如下:

我们在addConfigurationManagerFactory方法中可以看到,添加到configurationManagerFactories这个Map中时,是以factory的name作为Key值,factory为Value的:

所以我们看一下这三个实现类对应的name值,也就是resource-groups.configuration-manager参数的可选值:

db:DbResourceGroupConfigurationManagerFactory

h2:H2ResourceGroupConfigurationManagerFactory

file:FileResourceGroupConfigurationManagerFactory

然后,我们回过头来看一下PluginManager的installPlugin方法,该方法在同类的loadPlugin方法中被调用:

loadPlugin方法又在该类中再次被调用:

再往上是loadPlugins方法:

再次向上查找,原来loadPlugins方法是在PrestoServer的run方法中,先与loadConfigurationManager方法被调用的:

也就是说,Presto默认是按照LegacyResourceGroupConfigurationManager进行资源组管理的。

在PrestoServer调用run方法进行初始化时,首先会执行PluginManager的loadPlugins方法,向InternalResourceGroupManager中一个存放ResourceGroupManagerFactory类型元素的Map添加可用的资源组管理工厂类。

然后会调用InternalResourceGroupManager的loadConfigurationManager方法,判断是否配置了参数resource-groups.configuration-manager,如果配置了则会按照配置的manager类型从这个Map中根据ResourceGroupFactory的name取出相应的factory。

最后会根据取出的factory对象create一个ResourceGroupConfigurationManager,并将其赋值给configurationManager。

在Presto的官方文档中我们看到,presto只描述了一种name为file的ResourceGroupManagerFactory,对应FileResourceGroupConfigurationManagerFactory。看来这是官方比较推荐的类型。

接下来我们看一下FileResourceGroupConfigurationManager类的match方法,如下图:

入参SelectionCriteria是从session中取得的用户信息,如下图:

从match方法可以看到他会从selectors中找到跟session中用户信息相匹配的ResourceGroupSelector,如果得到的Optional对象不存在value,则给出熟悉的异常信息Query did not match any selection rule,如果存在value则作业继续向下执行。

selectors对象是从resource-groups.config-file配置项指定的文件中解析得到的ResourceGroup配置信息。其初始化的代码是在FileResourceGroupConfigurationManager的构造函数中。

其中,config.getConfigFile方法对应配置项resource-groups.config-file:

在buildSelectors方法中可以看到selectors中添加的对象类型是StaticSelector,这样在match方法的lambda表达式s -> s.match中,s对象就是StaticSelector类型的了。

在StaticSelector的match方法中我们看到,它会根据json文件中读取到的信息与客户端信息依次做校验,如校验不通过则返回一个没有值的Optional对象,以便selectGroup方法抛出异常。如果全部校验通过,最终会封装一个SelectionContext类型的Optional对象返回。

以上就是Presto资源组校验的代码,后续将继续整理服务端响应作业提交请求的代码。

关于Presto你有什么想说?欢迎在评论区留言。

一个彩蛋

欢迎参加2018易观A10大数据应用峰会·技术专场详情请看??

2018易观A10大数据应用峰会?

www.huodongxing.com图标
推荐阅读:

相关文章