Presto適合PB級海量數據複雜分析,互動式SQL查詢,?持跨數據源查詢。本文就將詳細解析Client提交查詢請求到PrestoServer端響應的細節,以及還原Presto資源組校驗的實現過程。

Presto調度模塊源碼解析-Client提交請求

當客戶端以控制台Console、腳本--execute等方式提交SQL作業時,Presto的Client會將作業相關的參數和腳本封裝成一個Restful請求,提交給PrestoServer端。然後再進行後續的諸如解析執行計劃、拆分Stage、調度task到Worker端執行等操作。下面就介紹一下Client提交作業到服務端部分的源碼。

Main函數

客戶端提交作業的代碼是從Presto的main函數開始的:

在Console的run方法中,如果入參中含有--execute,會直接將值取出作為待執行的SQL語句。否則認為是通過--file指定了SQL文件,此時會通過文件IO讀取該文件中的SQL腳本。這種情況對應著通過腳本提交作業的情況,而如果--execute和--file都沒有指定,則認為是通過控制台Console的方式提交SQL。

通過腳本提交作業

通過腳本的方式會直接執行Console類的executeCommand方法。按照「;」切分出SQL語句,並依次調用Console類的process方法來提交作業。

通過控制台提交作業

這種情況相對麻煩一些,他會執行Console.runConsole方法處理客戶提交的請求。Presto為這種方式設置了一個AtomicBoolean existing變數來判斷Client是否存在,如果不存在則不再提交後續的SQL(對應在控制台中輸入了多條sql語句,並用;間隔,當前邊的語句正在執行時退出Console,此時後續的sql就不會被提交了)。

在runConsole方法中可以看到,他會有一個while循環不斷的循環處理LineReader對象讀取到的命令,LineReader繼承自jline.console.ConsoleReader,是一個專門處理控制台輸入的Java類庫(官方網址是jline.github.io/)。這個類每讀取一行輸入就會將值傳遞給一個名為buffer的StringBuilder對象,然後根據「;」和「\G」來識別一個完整的SQL,並將SQL交給process方法進行調度。最後會將剩下不完整的語句賦值給重新初始化的buffer對象作為下一條SQL的開頭。

構建請求並發送

下面我們看一下process方法,這個方法中最重要的部分是他會在try with resources中調用QueryRunner的startQuery方法,如下:

這個語法表示小括弧中創建的對象如果實現了closable介面,則無論是否出現異常,都會在try catch結束後調用其close方法。

如下是startQuery方法:

然後我們一路點進去,經過QueryRunner的startInternalQuery、StatementClientFactory的newStatementClient方法之後,我們來到了StatementClientV1的構造函數:

在buildQueryRequest方法中,會構建一個目標Rest地址為/v1/statement的請求。

隨後在JsonResponse.execute中會發起這個請求。然後我們搜索一下Rest地址/v1/statement,發現他的目標服務類為StatementResouce。

以上就是Client提交查詢請求,到PrestoServer端響應的過程。

Presto調度模塊源碼解析-服務端響應-資源組選擇(2)

當用戶提交一個SQL作業時,Presto客戶端會封裝一個Request通過Restful介面將請求發送到服務端,下面就詳細講解一下服務端的處理過程。

Client端發送請求的地址是/v1/statement,對應到StatementResource的createQuery方法。在該方法中會調用Query的static方法create,在create方法中new了一個Query對象,然後會調用SqlQueryManager的createQuery方法。

在createQuery方法中首先會創建QueryId,生成規則是:

然後presto會判斷集群是否有可用節點,其中isIncludeCoordinator變數對應config.properties配置文件中的node-scheduler.include-coordinator配置項,表示是否允許調度task到coordinator節點進行計算。

如果集群可用節點小於最小值1(參數query-manager.initialization-required-workers),則給出熟悉的報錯信息「Cluster is still initializing……」。

除此之外presto還對sql長度做了限制,要求不能超過query.max-length(默認1_000_000_000,表示10億)。

然後presto會根據提交作業的客戶端信息選擇資源組。

configurationManager的類型是AtomicReference<ResourceGroupConfigurationManager>。selectGroup方法實現如下:

然後我們點進match方法,來到了ResourceGroupConfigurationManager介面中,我們看到這個方法的實現類有如下三個:

那麼問題來了,當我們調用match方法時,執行的是這三個實現類中的哪一個呢?

我們首先看一下configurationManager初始化時的值,如下圖所示初始化時其類型為LegacyResourceGroupConfigurationManager:

然後我們搜一下configurationManager的引用,發現在InternalResourceGroupManager類的setConfigurationManager方法中修改了他的值。如下圖:

該方法在同一個類的loadConfigurationManager方法中被調用。loadConfigurationManager方法會判斷常量RESOURCE_GROUPS_CONFIGURATION對應的etc/resource-groups.properties文件是否存在,如果存在會讀取文件中配置的resource-groups.configuration-manager參數為Key值,到configurationManagerFactories中取出對應的ResourceGroupConfigurationManagerFactory對象,然後調用其create方法構造一個ResourceGroupConfigurationManager對象,最終賦值給configurationManager。方法的實現如下圖:

而loadConfigurationManager方法又在PrestoServer類的初始化方法中被調用。

PS:ResourceGroupManager的實現類型是在CoordinatorModule這個類中被注入的:

也就是說,當PrestoServer通過其main方法調用run方法進行初始化時, 會讀取etc/resource-groups.properties文件中的配置項resource-groups.configuration-manager,再以它為Key值讀取configurationManagerFactories中對應的ResourceGroupConfigurationManagerFactory,然後調用讀取出來的工廠類的create方法構建ResourceGroupConfigurationManager對象,最後賦值給InternalResourceGroupManager類的configurationManager。

另一個問題出現了,configurationManagerFactories這個Map<String,ResourceGroupConfigurationManagerFactory>類型的全局變數是在什麼時候賦值的,裡邊都有哪些值呢?

我們還是搜索一下它的引用,發現在InternalResourceGroupManager的addConfigurationManagerFactory方法中對其進行了putIfAbsent操作(不存在則put)。

搜索引用發現,在PluginManager的installPlugin方法中調用了這個方法:

然後我們看一下plugin.getResourceGroupConfigurationManagerFactories方法的定義,發現他有兩個實現類:

ResourceGroupManagerPlugin的實現如下:

H2ResourceGroupManagerPlugin的實現如下:

我們在addConfigurationManagerFactory方法中可以看到,添加到configurationManagerFactories這個Map中時,是以factory的name作為Key值,factory為Value的:

所以我們看一下這三個實現類對應的name值,也就是resource-groups.configuration-manager參數的可選值:

db:DbResourceGroupConfigurationManagerFactory

h2:H2ResourceGroupConfigurationManagerFactory

file:FileResourceGroupConfigurationManagerFactory

然後,我們回過頭來看一下PluginManager的installPlugin方法,該方法在同類的loadPlugin方法中被調用:

loadPlugin方法又在該類中再次被調用:

再往上是loadPlugins方法:

再次向上查找,原來loadPlugins方法是在PrestoServer的run方法中,先與loadConfigurationManager方法被調用的:

也就是說,Presto默認是按照LegacyResourceGroupConfigurationManager進行資源組管理的。

在PrestoServer調用run方法進行初始化時,首先會執行PluginManager的loadPlugins方法,向InternalResourceGroupManager中一個存放ResourceGroupManagerFactory類型元素的Map添加可用的資源組管理工廠類。

然後會調用InternalResourceGroupManager的loadConfigurationManager方法,判斷是否配置了參數resource-groups.configuration-manager,如果配置了則會按照配置的manager類型從這個Map中根據ResourceGroupFactory的name取出相應的factory。

最後會根據取出的factory對象create一個ResourceGroupConfigurationManager,並將其賦值給configurationManager。

在Presto的官方文檔中我們看到,presto只描述了一種name為file的ResourceGroupManagerFactory,對應FileResourceGroupConfigurationManagerFactory。看來這是官方比較推薦的類型。

接下來我們看一下FileResourceGroupConfigurationManager類的match方法,如下圖:

入參SelectionCriteria是從session中取得的用戶信息,如下圖:

從match方法可以看到他會從selectors中找到跟session中用戶信息相匹配的ResourceGroupSelector,如果得到的Optional對象不存在value,則給出熟悉的異常信息Query did not match any selection rule,如果存在value則作業繼續向下執行。

selectors對象是從resource-groups.config-file配置項指定的文件中解析得到的ResourceGroup配置信息。其初始化的代碼是在FileResourceGroupConfigurationManager的構造函數中。

其中,config.getConfigFile方法對應配置項resource-groups.config-file:

在buildSelectors方法中可以看到selectors中添加的對象類型是StaticSelector,這樣在match方法的lambda表達式s -> s.match中,s對象就是StaticSelector類型的了。

在StaticSelector的match方法中我們看到,它會根據json文件中讀取到的信息與客戶端信息依次做校驗,如校驗不通過則返回一個沒有值的Optional對象,以便selectGroup方法拋出異常。如果全部校驗通過,最終會封裝一個SelectionContext類型的Optional對象返回。

以上就是Presto資源組校驗的代碼,後續將繼續整理服務端響應作業提交請求的代碼。

關於Presto你有什麼想說?歡迎在評論區留言。

一個彩蛋

歡迎參加2018易觀A10大數據應用峰會·技術專場詳情請看??

2018易觀A10大數據應用峰會?

www.huodongxing.com圖標
推薦閱讀:

相关文章