作者:RickyHuo

本文轉載自公眾號「大道至簡bigdata」。

原文鏈接:優秀的數據工程師,怎麼用Spark在TiDB上做OLAP分析

TiDB 是一款定位於在線事務處理/在線分析處理的融合型資料庫產品,實現了一鍵水平伸縮,強一致性的多副本數據安全,分散式事務,實時 OLAP 等重要特性。

TiSpark 是 PingCAP 為解決用戶複雜 OLAP 需求而推出的產品。它藉助 Spark 平台,同時融合 TiKV 分散式集群的優勢。直接使用 TiSpark 完成 OLAP 操作需要了解 Spark,還需要一些開發工作。那麼,有沒有一些開箱即用的工具能幫我們更快速地使用 TiSpark 在 TiDB 上完成 OLAP 分析呢?目前開源社區上有一款工具 Waterdrop,可以基於 Spark,在 TiSpark 的基礎上快速實現 TiDB 數據讀取和 OLAP 分析。項目地址:github.com/InterestingL

使用 Waterdrop 操作 TiDB

在我們線上有這麼一個需求,從 TiDB 中讀取某一天的網站訪問數據,統計每個域名以及服務返回狀態碼的訪問次數,最後將統計結果寫入 TiDB 另外一個表中。 我們來看看 Waterdrop 是如何實現這麼一個功能的。

Waterdrop

Waterdrop 是一個非常易用,高性能,能夠應對海量數據的實時數據處理產品,它構建在 Spark 之上。Waterdrop 擁有著非常豐富的插件,支持從 TiDB、Kafka、HDFS、Kudu 中讀取數據,進行各種各樣的數據處理,然後將結果寫入 TiDB、ClickHouse、Elasticsearch 或者 Kafka 中。

準備工作

1. TiDB 表結構介紹

  • Input(存儲訪問日誌的表)

CREATE TABLE access_log (
domain VARCHAR(255),
datetime VARCHAR(63),
remote_addr VARCHAR(63),
http_ver VARCHAR(15),
body_bytes_send INT,
status INT,
request_time FLOAT,
url TEXT
)
+-----------------+--------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------+--------------+------+------+---------+-------+
| domain | varchar(255) | YES | | NULL | |
| datetime | varchar(63) | YES | | NULL | |
| remote_addr | varchar(63) | YES | | NULL | |
| http_ver | varchar(15) | YES | | NULL | |
| body_bytes_send | int(11) | YES | | NULL | |
| status | int(11) | YES | | NULL | |
| request_time | float | YES | | NULL | |
| url | text | YES | | NULL | |
+-----------------+--------------+------+------+---------+-------+

  • Output(存儲結果數據的表)

CREATE TABLE access_collect (
date VARCHAR(23),
domain VARCHAR(63),
status INT,
hit INT
)
+--------+-------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+-------------+------+------+---------+-------+
| date | varchar(23) | YES | | NULL | |
| domain | varchar(63) | YES | | NULL | |
| status | int(11) | YES | | NULL | |
| hit | int(11) | YES | | NULL | |
+--------+-------------+------+------+---------+-------+

2. 安裝 Waterdrop

有了 TiDB 輸入和輸出表之後, 我們需要安裝 Waterdrop,安裝十分簡單,無需配置系統環境變數

1) 準備 Spark 環境

2) 安裝 Waterdrop

3) 配置 Waterdrop

以下是簡易步驟,具體安裝可以參照 Quick Start。

# 下載安裝Spark
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
wget
# 下載安裝Waterdrop
https://github.com/InterestingLab/waterdrop/releases/download/v1.2.0/waterdrop-1.2.0.zip
unzip waterdrop-1.2.0.zip
cd waterdrop-1.2.0

vim config/waterdrop-env.sh
# 指定Spark安裝路徑
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.1.0-bin-hadoop2.7}

實現 Waterdrop 處理流程

我們僅需要編寫一個 Waterdrop 配置文件即可完成數據的讀取、處理、寫入。

Waterdrop 配置文件由四個部分組成,分別是 SparkInputFilterOutputInput 部分用於指定數據的輸入源,Filter 部分用於定義各種各樣的數據處理、聚合,Output 部分負責將處理之後的數據寫入指定的資料庫或者消息隊列。

整個處理流程為 Input -> Filter -> Output,整個流程組成了 Waterdrop 的處理流程(Pipeline)。

以下是一個具體配置,此配置來源於線上實際應用,但是為了演示有所簡化。

  • Input (TiDB)

這裡部分配置定義輸入源,如下是從 TiDB 一張表中讀取數據。

input {
tidb {
database = "nginx"
pre_sql = "select * from nginx.access_log"
table_name = "spark_nginx_input"
}
}

  • Filter

在 Filter 部分,這裡我們配置一系列的轉化, 大部分數據分析的需求,都是在 Filter 完成的。Waterdrop 提供了豐富的插件,足以滿足各種數據分析需求。這裡我們通過 SQL 插件完成數據的聚合操作。

filter {
sql {
table_name = "spark_nginx_log"
sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)=2019-01-20 group by domain, status, substring(datetime, 1, 10)"
}
}

  • Output (TiDB)

最後, 我們將處理後的結果寫入 TiDB 另外一張表中。TiDB Output 是通過 JDBC 實現的。

output {
tidb {
url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
table = "access_collect"
user = "username"
password = "password"
save_mode = "append"
}
}

  • Spark

這一部分是 Spark 的相關配置,主要配置 Spark 執行時所需的資源大小以及其他 Spark 配置。

我們的 TiDB Input 插件是基於 TiSpark 實現的,而 TiSpark 依賴於 TiKV 集群和 Placement Driver (PD)。因此我們需要指定 PD 節點信息以及 TiSpark 相關配置spark.tispark.pd.addressesspark.sql.extensions

spark {
spark.app.name = "Waterdrop-tidb"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
# Set for TiSpark
spark.tispark.pd.addresses = "localhost:2379"
spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}

運行 Waterdrop

我們將上述四部分配置組合成我們最終的配置文件conf/tidb.conf

spark {
spark.app.name = "Waterdrop-tidb"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
# Set for TiSpark
spark.tispark.pd.addresses = "localhost:2379"
spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}
input {
tidb {
database = "nginx"
pre_sql = "select * from nginx.access_log"
table_name = "spark_table"
}
}
filter {
sql {
table_name = "spark_nginx_log"
sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)=2019-01-20 group by domain, status, substring(datetime, 1, 10)"
}
}
output {
tidb {
url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
table = "access_collect"
user = "username"
password = "password"
save_mode = "append"
}
}

執行命令,指定配置文件,運行 Waterdrop ,即可實現我們的數據處理邏輯。

  • Local

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master local[2]

  • yarn-client

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master yarn

  • yarn-cluster

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode cluster -master yarn

如果是本機測試驗證邏輯,用本地模式(Local)就可以了,一般生產環境下,都是使用yarn-client或者yarn-cluster模式。

檢查結果

mysql> select * from access_collect;
+------------+--------+--------+------+
| date | domain | status | hit |
+------------+--------+--------+------+
| 2019-01-20 | b.com | 200 | 63 |
| 2019-01-20 | a.com | 200 | 85 |
+------------+--------+--------+------+
2 rows in set (0.21 sec)

總結

在這篇文章中,我們介紹了如何使用 Waterdrop 從 TiDB 中讀取數據,做簡單的數據處理之後寫入 TiDB 另外一個表中。僅通過一個配置文件便可快速完成數據的導入,無需編寫任何代碼。

除了支持 TiDB 數據源之外,Waterdrop 同樣支持 Elasticsearch,Kafka,Kudu, ClickHouse 等數據源。

與此同時,我們正在研發一個重要功能,就是在 Waterdrop 中,利用 TiDB 的事務特性,實現從 Kafka 到 TiDB 流式數據處理,並且支持端(Kafka)到端(TiDB)的 Exactly-Once 數據一致性。

希望了解 Waterdrop 和 TiDB,ClickHouse、Elasticsearch、Kafka 結合使用的更多功能和案例,可以直接進入項目主頁:github.com/InterestingL ,或者聯繫項目負責人: Garyelephan(微信: garyelephant)、RickyHuo (微信: chodomatte1994)。


推薦閱讀:
相关文章