引言

熟悉Presto的人都知道,Presto有一套非常丰富的Connector spi,可以非常灵活自由地让开发人员去接入各种存储、数据源。在这些Connector spi中,有一个介面可能是大家所陌生,但是却非常实用。那就是本文今天将要介绍的SystemTable。

SystemTable

在Connector中,我们看到有一个介面叫getSystemTables。开发人员可以通过该介面对connector的SystemTable进行扩展。

Connector.java:
/**
* @return the set of system tables provided by this connector
*/
default Set<SystemTable> getSystemTables()
{
return emptySet();
}

SystemTable的定义

首先,我们先来看一下SystemTable的定义。一个SystemTable要包含3个要素: 1. 表的数据分布(,ALL_COORDINATORS,) SINGLE_COORDINATOR: 仅在接收当前查询的coordinator中被执行,适用于查询connector的元数据信息。 ALL_COORDINATORS: SystemTable对应的Split将被下发至所有coordinator执行。适用于收集coordinator节点的查询信息,例如: 系统当前执行的query、QPS等等 ALL_NODES: SystemTable对应的Split将被下发至coordinator、worker执行。适用于收集所有节点的runtime信息,例如: 内存使用、cpu使用等等。 2.表的schema定义 与普通表一样,通过ConnectorTableMetadata,定义了该表有那些列,列的类型,以及表的属性等等 3.系统表的数据载入 与普通标一样,系统表的数据载入,也提供了cursor、pageSource两种模式。可以将数据按行返回,也可以将数据以page为粒度进行返回。

SystemTable.java:
public interface SystemTable
{
enum Distribution
{
ALL_NODES, ALL_COORDINATORS, SINGLE_COORDINATOR
}

Distribution getDistribution();

ConnectorTableMetadata getTableMetadata();

/**
* Create a cursor for the data in this table.
*
* @param session the session to use for creating the data
* @param constraint the constraints for the table columns (indexed from 0)
*/
default RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
...
}

/**
* Create a page source for the data in this table.
*
* @param session the session to use for creating the data
* @param constraint the constraints for the table columns (indexed from 0)
*/
default ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
...
}
}

系统内嵌的SystemTable的执行

通过Presto-client执行show schemas from system,我们可以看到系统system connector中,自带了4个系统库,其中runtime库内置了nodes、queries、tasks、transactions4张表。

presto:runtime> show schemas from system;
show schemas from system;
Schema
--------------------
information_schema
jdbc
metadata
runtime
(4 rows)

presto:runtime> show tables from system.runtime;
show tables from system.runtime;
Table
--------------
nodes
queries
tasks
transactions
(4 rows)

我们进一步探究下, nodes表中,包含了哪些内容?

presto:runtime> select * from nodes;
select * from nodes;
node_id | http_uri | node_version | coordinator | state
--------------------------------------+-------------------------+--------------+-------------+--------
ffffffff-ffff-ffff-ffff-ffffffffffff | http://127.0.0.1:8080 | testversion | true | active
(1 row)

不难发现,nodes表里面,包含的是当前系统运行的节点相关的信息,包括node_id,版本,http_uri等。 select * from nodes 查询的执行Plan,如下图所示。 我们可以看到,该查询的plan与普通查询的plan基本没啥区别,只是TableScan Operator是从System Connector的PageSource中载入数据。

实际上,SystemTable的执行与普通表的执行,在运行态是没有区别的。运算元、函数、调度等等,都是复用的,另外SystemTable与普遍表之间,可以发生计算。 我们将system.runtime.nodes表与tpch.sf1.nation表进行union all,其中都适用了lower、upper两个字元函数。

presto:runtime> select lower(node_id), upper(node_id) from nodes union all (select lower(name), upper(name) from tpch.sf1.nation limit 1 );
select lower(node_id), upper(node_id) from nodes union all (select lower(name), upper(name) from tpch.sf1.nation limit 1 );
_col0 | _col1
--------------------------------------+--------------------------------------
ffffffff-ffff-ffff-ffff-ffffffffffff | FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF
algeria | ALGERIA
(2 rows)

在tpch connector中扩展SystemTable

自定义SystemTable

首先我们再tpch connector中,定义一张SystemTable,表名为metrics,schema为tpch_system_tables。包含3个栏位,queryId、type、runtime。

public class TpchMetricsTable
implements SystemTable
{
@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
SchemaTableName schemaTableName = new SchemaTableName("tpch_system_tables", "metrics");
ColumnMetadata queryId = new ColumnMetadata("queryId", VarcharType.VARCHAR);
ColumnMetadata type = new ColumnMetadata("type", VarcharType.VARCHAR);
ColumnMetadata runtime = new ColumnMetadata("runtime", BigintType.BIGINT);

return new ConnectorTableMetadata(schemaTableName, ImmutableList.of(queryId, type, runtime));
}
}

注册SystemTable

通过getSystemTables介面,将定义的TpchMetricsTable通过Connector注册到Presto中。

TpchConnectorFactory.java:
@Override
public Set<SystemTable> getSystemTables()
{
return ImmutableSet.of(new TpchMetricsTable());
}

载入SystemTable的数据

TpchMetricsTable中,我们再重载cursor方法,返回metrics表的数据。

@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new InMemoryRecordSet.InMemoryRecordCursor(
ImmutableList.of(VarcharType.VARCHAR, VarcharType.VARCHAR, BigintType.BIGINT),
ImmutableList.of(
ImmutableList.of("query_ddl_1", "ddl", 13),
ImmutableList.of("qeury_select_2", "query", 1203)
).iterator()
);
}

到这里,扩展的SystemTable接入Presto就全部完成了。我们已经可以通过tpch connector访问到我们扩展的metrics表了。

presto:runtime> show schemas from tpch;
show schemas from tpch;
Schema
--------------------
information_schema
sf1
sf100
sf1000
sf10000
sf100000
sf300
sf3000
sf30000
tiny
tpch_system_tables

presto:runtime> select * from tpch.tpch_system_tables.metrics;
select * from tpch.tpch_system_tables.metrics;
queryid | type | runtime
----------------+-------+---------
query_ddl_1 | ddl | 13
qeury_select_2 | query | 1203
(2 rows)

SystemTable应用

在实际应用中,我们可以扩展SystemTable,很方便的将接入Presto的存储相关的元信息,通过SystemTable展现。也可以将系统内部的查询状态,系统的节点负载,查询进度等等信息,以表的形式进行展现。

结语

Presto作为计算引擎,可以适配任意的存储,系统,甚至是http服务,这得益于其简单、灵活、易用的spi。通过本文大家可以了解到,通过spi扩展SystemTable仅需要简单的几个步骤就可以完成。

推荐阅读:

相关文章