在上一篇文章中我們通過 findBest 已經找到了最 best 的 rel,雖然之前的 rel 已經滿足的 enumerable 等 convention,但要讓 rel 實際能執行還需要一步 implement 的過程,這部分邏輯實際已經不是 sql 優化,而是轉換到執行器的過程,本文代碼從這裡開始, 還是接著上一篇的 demo sql:

select * from EMPS where name = John

和當前要實現的 rel 樹是這樣:

EnumerableInterpreter: rowcount = 100.0, cumulative cost = {50.0 rows, 50.0 cpu, 0.0 io}, id = 33
BindableTableScan(table=[[SALES, EMPS]], filters=[[=($1, John)]]): rowcount = 100.0, cumulative cost = {0.5 rows, 0.505 cpu, 0.0 io}, id = 20

implement 過程最主要用的目的是生成 bindable, Bindable 可以理解為一個 prepare 好的 plan 執行的時候直接 bind 一個當前 context 就可以拿到一個 Enumerable 然後通過推懂 Enumerable 就可以執行(這個就是大家熟悉的 Volcano 執行器模型這裡不多解釋)。

這裡我們假設 requireTrait 的 convention 還是 enumerable, 這樣 implement 主要邏輯是調用 EnumerableInterpretable.toBindable。

RelImplementor

要想 implement 那就先來一隻 RelImplementor, 因為我們這裡希望的 require trait 是 enumerable, 所以這裡我們將用的是 EnumerableRelImplementor(其他的 repo 里還有 spark相關的 RelImplementor 不過這裡先不管), 這個 relImplementor 是一個 JavaRelImplementor 所以他的實現在調用 implementRoot 後會返回一個 ClassDeclaration, 這是個一個 linq4j 中的一個 Expression Tree 元素, 之後 expression tree 會轉為 java 文本代碼並進行編譯載入例從而獲取 Binable 實現,所以 implementRoot 就是一個產生實現 Bindable 的 Expression Tree 的過程。

對於 EnumerableRElImplemtor 生成的過程是從 root rel 向下調用 implement 方法(EnumerableConvention 的 trait 都有實現 EnumerableRel介面)返回一個包裝了 expression tree, 返回行類型和結果類型的 Result 對象。

LINQ Expression Tree

LINQ 是 .Net 3.5 引入的大家都非常喜歡的特性, 除了明面上看到的各種 LINQ to XX,外 Expression Tree 也是 LINQ 一個非常基礎和重要組件, 對於 .NET 的 Expression Tree 可以看下微軟官方文檔 或 R大的文章。calcite 作者自己 port 了 LINQ 到 java 叫 linq4j,並且也 port 了 Expression Tree 的多數功能所以 calcite 在需要生成 java 代碼的場景都會使用 linq4j 的 expression tree 來進行代碼的生成, 相比 spark 等直接拼字元串模板看上去更 Declarative 一些- -?

這裡我們就以 demo sql 的 best rel 的 root EnumerableInterpreter 來看看, 這段代碼會生成

{
final org.apache.calcite.interpreter.Interpreter interpreter = new org.apache.calcite.interpreter.Interpreter(
root,
v1stashed);
return interpreter;
}

等價的 BlockStatement, Expression Tree 在構建過程中會也會進行一些 optimize, 在返回到 RelEnumerableImplementor 後會幫 rel 生成過程中 stashed 過的變數生成從 root 取出代碼變數的邏輯, 並生成用於實現 Bindable 介面的兩個方法,生成的類的名字都加 Baz, 最後用 Expressions.toString 轉換出類的成員代碼, 結果類似這樣

public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {
final org.apache.calcite.rel.RelNode v1stashed = (org.apache.calcite.rel.RelNode) root.get("v1stashed");
final org.apache.calcite.interpreter.Interpreter interpreter = new org.apache.calcite.interpreter.Interpreter(
root,
v1stashed);
return interpreter;
}

public Class getElementType() {
return java.lang.Object[].class;
}

Get Bindable & Execute

這裡先不考慮 calcite 對接 spark, 接下來的處理就將剛才用 Expression Tree 生成的 member 代碼片段,編譯、載入和實例了, calcite 這裡用了一個叫 janio 的庫(基於javax.tools.JavaCompiler)來執行這個過程, 代碼位於這裡, 除了 janio 的調用, 如果生成代碼里沒有 static 欄位,這裡還會用 guava 做一個 bindable 的 cache。

之後通過 AvaticaResultSet -> AvcaticaConnection -> CalcitePrepare#enumerable , 最終調用 Bindable 的 bind 方法調剛生成代碼, 最後創建 Interpreter,之前提到過 interpreter 是會解釋執行下面的 rel, 會發現在創建的時候會用 HepPlanner 對下面的 rel 過幾個規則, 並用 Node.Compiler 對 rel compile 出可以解釋執行的 Node, 對於我們這個 demo 會生成 TableScanNode 並支持從 CsvFilterable 獲取 enumerator.

稍微複雜 SQL 的 Code Gen 例子

上面 demo SQL 比較簡單(被優化之後 project消除了, filter 也被推到 tableScan,並且也是 interpreter 執行所以沒啥), 所以我們本節來看一個稍微複雜點的 sql, 看看 Code Gen 的效果和為什麼要 Code Gen:

首先, 我們將前面例子中使用的 CsvFilterableTableScan 替換為 CsvTranslatableTable, 在Csv Example 使用文章中我們提到過 CsvTranslatableTable 能被轉換為實現了 EnumerableRel 的 CsvTableScan, 這樣使得 tableScan 不像前面例子那樣解釋執行而是預先生成代碼。

也講 sql 替換成這個:

select emps.empno, depts.name
from emps join depts on emps.name = depts.name
where depts.deptno = 2;

有 join 有 filter 有 project, 開始優化之前的 rel 是這樣(Translate 的 toRel 是在 findBest 之前就調用了的)。

LogicalProject(EMPNO=[$0], NAME=[$3]): rowcount = 225.0, cumulative cost = {2250.0 rows, 2352.0 cpu, 0.0 io}, id = 155
LogicalFilter(condition=[=($2, 2)]): rowcount = 225.0, cumulative cost = {2025.0 rows, 1902.0 cpu, 0.0 io}, id = 154
LogicalJoin(condition=[=($1, $3)], joinType=[inner]): rowcount = 1500.0, cumulative cost = {1800.0 rows, 402.0 cpu, 0.0 io}, id = 153
LogicalProject(EMPNO=[$0], NAME=[$1]): rowcount = 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 152
CsvTableScan(table=[[SALES, EMPS]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 129
CsvTableScan(table=[[SALES, DEPTS]], fields=[[0, 1]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 130

完成優化後的 cheapest rels 是這樣:

EnumerableProject(EMPNO=[$2], NAME=[$1]): rowcount = 225.0, cumulative cost = {739.9540863498665 rows, 684.6666666666666 cpu, 0.0 io}, id = 472
EnumerableJoin(condition=[=($1, $3)], joinType=[inner]): rowcount = 225.0, cumulative cost = {514.9540863498665 rows, 234.66666666666666 cpu, 0.0 io}, id = 471
EnumerableFilter(condition=[=($0, 2)]): rowcount = 15.0, cumulative cost = {115.0 rows, 201.0 cpu, 0.0 io}, id = 470
CsvTableScan(table=[[SALES, DEPTS]], fields=[[0, 1]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 1
CsvTableScan(table=[[SALES, EMPS]], fields=[[0, 1]]): rowcount = 100.0, cumulative cost = {33.33333333333333 rows, 33.666666666666664 cpu, 0.0 io}, id = 242

沒有解釋執行 Rel 都是 Enumerable 正是我們希望的情況。

  • 2 個 CsvTableScan: 會分別生成 empsdepts 的 getTable 調用並調用 CsvTableScan#project 來只獲取 0, 1 列的數據(project已經被下推到 tablescan 減少返回寬) 的 exp tree
  • EnumerableFilter: 這個運算元其實會在完成 volcano 之後被最後一個 program 會將 project 和 filter 轉換為同時可以 filter + project 的 cal 運算元, 所以到 implement 這裡實際看到的是 EnumerableCal, 對於 filter deptno=2 會在 tableScan expression tree 基礎上生成生成一個 AbstractEnumerable 的 Exp Tree, 主要在 moveNext 方法實現上對 condition 檢查
  • EnumerableJoin: Calcite 中還有其他幾個 join 實現這裡優化後選擇的是 enumerableJoin(後面應該會單獨來一篇介紹下 calcite / linq4j 中的 join 優化和執行), 他的 implement 就是生成將兩個 input enumerable 調用 EnumerableDefaults#join_ 的調用 exp tree, 他的 linq4j 實現是一個經典的 hashjoin 實現, 將 inner 構建 Lookup(也就是 value 是 list 的 hashmap), 然後用 outer 去 probes,不過生成代碼部分只是生成了對 linq4j 的調用。
  • EnumerableProject: 最上面的 project 也會被後置 hep 轉換為 EnumerableCalc, 這裡同樣再在 join 結果的 Enumerable 的基礎上再生成一個段再套一層 AbstractEnumerator 來完成 project 結果兩列的操作。

最後我們得到的 java 代碼是這樣:

final org.apache.calcite.linq4j.Enumerable _inputEnumerable = ((org.apache.calcite.adapter.csv.CsvTranslatableTable) root.getRootSchema().getSubSchema("SALES").getTable("DEPTS")).project(root, new int[] {0,1});
final org.apache.calcite.linq4j.AbstractEnumerable left = new org.apache.calcite.linq4j.AbstractEnumerable(){
public org.apache.calcite.linq4j.Enumerator enumerator() {
return new org.apache.calcite.linq4j.Enumerator(){
public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable.enumerator();
public void reset() {
inputEnumerator.reset();
}

public boolean moveNext() {
while (inputEnumerator.moveNext()) {
final Integer inp0_ = (Integer) ((Object[]) inputEnumerator.current())[0];
if (inp0_ != null && inp0_.intValue() == 2) {
return true;
}
}
return false;
}

public void close() {
inputEnumerator.close();
}

public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
return new Object[] {
current[0],
current[1]};
}
};
}

};
final org.apache.calcite.linq4j.Enumerable _inputEnumerable0 = left.join(
((org.apache.calcite.adapter.csv.CsvTranslatableTable) root.getRootSchema().getSubSchema("SALES").getTable("EMPS")).project(root, new int[] {0,1}), new org.apache.calcite.linq4j.function.Function1() {
public String apply(Object[] v1) {
return v1[1] == null ? (String) null : v1[1].toString();
}
public Object apply(Object v1) {
return apply(
(Object[]) v1);
}
}
, new org.apache.calcite.linq4j.function.Function1() {
public String apply(Object[] v1) {
return v1[1] == null ? (String) null : v1[1].toString();
}
public Object apply(Object v1) {
return apply(
(Object[]) v1);
}
}
, new org.apache.calcite.linq4j.function.Function2() {
public Object[] apply(Object[] left, Object[] right) {
return new Object[] {
left[0],
left[1],
right[0],
right[1]};
}
public Object[] apply(Object left, Object right) {
return apply(
(Object[]) left,
(Object[]) right);
}
}
, null, false, false);
return new org.apache.calcite.linq4j.AbstractEnumerable(){
public org.apache.calcite.linq4j.Enumerator enumerator() {
return new org.apache.calcite.linq4j.Enumerator(){
public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable0.enumerator();
public void reset() {
inputEnumerator.reset();
}

public boolean moveNext() {
return inputEnumerator.moveNext();
}

public void close() {
inputEnumerator.close();
}

public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
return new Object[] {current[2], current[1]};
}
};
}

};

可以對照上上面的分析過程, 所以我們看到 calcite 自帶的 EnumerableRel 通過 Code Gen 生成了運算元和運算元鏈接代碼, 但並沒有像 hyper 論文 或 spark (whole stage) 那樣 collapse 多個運算元間的調用, 還是通過多級嵌套 enumerable, 執行時還是會有 moveNext 和 current 調用, 當然相比解釋執行 Enumerable 還是少了很多 visit 虛函數調用和判斷,之後就只能靠 janio 和 JVM JIT 看有沒有概率能被優化了? 不過在 mailist 中也有個討論好像 drill 有類似 spark 的 CodeGen 實現? 後面有空看看- -~

表達式的 Code Gen

雖然在運算元間目前 calcite 的 code gen 沒做 collapse,我們可以來看下有表達式的情況~ 這樣一個 SQL

select ((deptno *3) + abs(age))/4 from emps

find best 後 projection 運算元同樣會被轉換成 enumerableCal:

rel#21:EnumerableCalc.ENUMERABLE.[](input=HepRelVertex#18,expr#0..9={inputs},expr#10=3,expr#11=*($t2, $t10),expr#12=ABS($t6),expr#13=+($t11, $t12),expr#14=4,expr#15=/($t13, $t14),EXPR$0=$t15)

生成的代碼是主要是 cal 對應的 enumerable 的 current 方法, 執行邏輯位於這裡 RexToLixTranslator, 也是對 rex 樹遍歷, 然後轉換 expression 的過程, 最後和之前運算元的 expression tree 放一塊和 Project 運算元一塊去編譯,java 代碼結果是這樣:

public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
final Integer inp2_ = (Integer) current[2];
final Integer inp6_ = (Integer) current[6];
return inp2_ == null || inp6_ == null ? (Integer) null : Integer.valueOf((inp2_.intValue() * 3 + org.apache.calcite.runtime.SqlFunctions.abs(inp6_.intValue())) / 4);
}

通過對表達式生成 java 代碼並依靠編譯器編譯,會比那些沒有 compiler as service 且難以動態載入的語言中對表達式的解釋實現好(因為少了太多的虛函數調用類型判斷和複雜實現)。

小結

這篇比較"簡短", 沒有花太多精力去深入 LINQ4J 和 janio 的實現和優化細節,因為自己目前用的語言連泛型都沒有(攤手),所以先只看使用不看源碼了,不過如果後面有機會用 java,並有生成 java 代碼並編譯的需求應該會想起他倆(其實自己很喜歡 .Net 逃~); 此外看的過程中還有個疑問為什麼作者選擇 Expression Tree 生成 java 代碼再編譯而不是 Expression Tree 後用 cglib 或 bytebuddy 之類的生成位元組碼的方案(drill 貌似可選 bytecode 或 plain code) - -?

到這裡基本過了一遍從 SQL 進入 avcatica 到最終獲取能讀取底層 csv 內容的 enumerator 的流程,中途跳了一些細節比如子查詢處理等,demo sql也非常簡單沒有太展現出 calcite rule 實力,後面會繼續學習補充很多細節~


推薦閱讀:
相关文章