實時流處理一般是將業務系統產生的數據進行實時收集,交由流處理框架進行數據清洗,統計,入庫,並可以通過可視化的方式對統計結果進行實時的展示。傳統的面向靜態數據表的計算引擎無法勝任流數據領域的分析和計算任務。在金融交易、物聯網、互聯網/移動互聯網等應用場景中,複雜的業務需求對大數據處理的實時性提出了更高的要求。對於這一類高實時性需求的場景,需要一個快速、高效、靈活可控的流式大數據處理平台來支撐。

DolphinDB內置的流數據框架支持流數據發布、訂閱、流數據預處理、實時內存計算、複雜指標的滾動窗口計算等,是一個運行高效,使用便捷的流數據處理框架。

與其它流數據系統相比,DolphinDB流數據處理系統的優點在於:

  • 吞吐量大,低延遲
  • 與時序資料庫及數據倉庫集成,一站式解決方案
  • 天然具備流表對偶性,支持SQL語句數據注入和查詢分析

本教程包含以下內容:

  • DolphinDB流數據框架及概念
  • 使用DolphinDB流數據
  • 使用Java API來訂閱DolphinDB流數據
  • 監控流數據運行狀態
  • 流數據性能調優
  • 與開源系統Grafana結合使用

1. DolphinDB流數據框架及概念

流數據框架對流數據的管理和應用是基於發布-訂閱-消費的模式,通過流數據表來發布數據,數據節點或者第三方的應用可以通過DolphinDB腳本或者 API來訂閱消費流數據。

上圖展示了DolphinDB的流數據處理框架。把實時數據注入到發布節點流數據表後,發布的數據可以同時供多方訂閱消費:

  • 可由數據倉庫訂閱並保存,作為分析系統與報表系統的數據源。
  • 可以由聚合引擎訂閱,進行聚合計算,並將聚合結果輸出到流數據表。聚合結果既可以由Grafana等流數據展示平台展示,也可以作為數據源再次發布出去,供二次訂閱做事件處理。
  • 也可由API訂閱,例如第三方的Java應用程序可以通過Java API訂閱流數據,應用到業務系統中。

1.1 實時流數據表

DolphinDB實時流數據表可以作為發布和訂閱流數據的載體,發布一條消息等價於往流數據表插入一條記錄,同時它也可以通過SQL來進行查詢和分析。

1.2 發布和訂閱

DolphinDB的流數據框架使用了經典的訂閱發布模式。每當有新的流數據寫入時,發布方會通知所有的訂閱方去處理新的流數據。數據節點通過subscribeTable函數來訂閱發布的流數據。

1.3 實時聚合引擎

實時聚合引擎指的是專門用於處理流數據實時計算和分析的模塊。DolphinDB提供createStreamAggregator函數用於持續地對流數據做實時聚合計算,並且將計算結果持續輸出到指定的數據表中,具體如何使用聚合引擎可以參考流數據聚合引擎教程。

2. 使用DolphinDB流數據

要開啟DolphinDB支持流數據功能的模塊,需要對DolphinDB數據節點增加配置項。

對於發布節點需要的配置項:

maxPubConnections:發布信息節點能夠連接的最大節點數。如果maxPubConnections>0,節點可以作為信息發布節點。默認值為0。
persisitenceDir:共享的流數據表保存的路徑。如果需要保存流數據表,必須指定該參數。
persistenceWorkerNum:負責以非同步模式保存流數據表的工作線程數。默認值為0。
maxPersistenceQueueDepth:非同步保存流數據表時,允許的最大消息隊列深度。
maxMsgNumPerBlock:當伺服器發布或組合消息時,消息塊中的最大記錄數。
maxPubQueueDepthPerSite:發布節點可容許的最大消息隊列深度。

對於訂閱節點需要的配置項:

subPort:訂閱線程監聽的埠號。當節點作為訂閱節點時,該參數必須指定。默認值為0。
subExecutors:訂閱節點中消息處理線程的數量。默認值為0,表示解析消息線程也處理消息。
maxSubConnections:伺服器能夠接收的最大的訂閱連接數。默認值是64。
subExecutorPooling: 表示執行流計算的線程是否處於pooling模式的布爾值。默認值是false。
maxSubQueueDepth:訂閱節點可容許的最大消息隊列深度。

2.1 流數據發布

定義一個streamTable,向其寫入數據即意味著發布流數據,由於流數據表需要被不同的會話訪問,所以要使用share,將流數據表進行共享。下面的例子中,定義並共享流數據表pubTable,向pubTable表寫入數據即意味著發布數據:

share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable

2.2 流數據訂閱

訂閱數據通過subscribeTable函數來實現。

語法如下:

subscribeTable([server], tableName, [actionName], [offset=-1], handler, [msgAsTable=false], [batchSize=0], [throttle=1], [hash=-1])

參數說明:

  • 只有tableName和handler兩個參數是必需的。其他所有參數都是可選參數。
  • server:字元串,表示伺服器的別名或流數據所在的xdb連接伺服器。如果它沒有指定,或者為空字元串,表示伺服器是本地實例。

實際情況中,發布者與訂閱者的關係有三種可能。下面的例子解釋這三種情況的server參數如何構造:

  • 發布者與訂閱者是同一節點。

subscribeTable(, "pubTable", "actionName", 0, subTable , true)

發布者與訂閱者是同一集群內的不同節點。此處發布節點別名為「NODE2」。

subscribeTable("NODE2", "pubTable", "actionName", 0, subTable , true)

發布者與訂閱者不在同一個集群內。此處發布者節點為 (host="192.168.1.13",port=8891)。

pubNodeHandler=xdb("192.168.1.13",8891)
subscribeTable(pubNodeHandler, "pubTable", "actionName", 0, subTable , true)

  • tableName:被訂閱的數據表名。

share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable
subscribeTable(, "pubTable", "actionName", 0, subTable , true)

  • actionName:流數據可以針對各種場景分別訂閱消費。同一份流數據,可用於實時聚合運算,同時亦可將其存儲到數據倉庫供第三方應用做批處理。subscribeTable函數提供了actionName參數以區分同一個流數據表被訂閱用於不同場景的情況。

share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable
topic1 = subscribeTable(, "pubTable", "actionName_realtimeAnalytics", 0, subTable , true)
topic2 = subscribeTable(, "pubTable", "actionName_saveToDataWarehouse", 0, subTable , true)

subscribeTable函數的返回值是訂閱主題,它是訂閱表所在節點的別名,流數據表名稱和訂閱任務名稱(如果指定了actionName)的組合,使用下劃線分隔。如果訂閱主題已經存在,函數將會拋出異常。當前節點別名為:NODE1,上述例子返回的兩個topic內容如下:

topic1:

NODE1/pubTable/actionName_realtimeAnalytics

topic2:

NODE1/pubTable/actionName_saveToDataWarehouse

  • offset:訂閱任務開始後的第一條消息所在的位置。消息是流數據表中的行。如果沒有指定offset,或為負數,或超過了流數據表的記錄行數,訂閱將會從流數據表的當前行開始。offset與流數據表創建時的第一行對應。如果某些行因為內存限制被刪除,在決定訂閱開始的位置時,這些行仍然考慮在內。 下面的示例說明offset的作用,向pubTable寫入100行數據,建立三個訂閱,分別從102,-1,50行開始訂閱:

share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable1
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable2
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable3
vtimestamp = 1..100
vtemp = norm(2,0.4,100)
tableInsert(pubTable,vtimestamp,vtemp)
topic1 = subscribeTable(, "pubTable", "actionName1", 102,subTable1 , true)
topic1 = subscribeTable(, "pubTable", "actionName2", -1, subTable2 , true)
topic1 = subscribeTable(, "pubTable", "actionName3", 50,subTable3 , true)//50

從結果看到,subTable1,subTable2都沒有數據,subTable3有50條數據,說明只有當offset在從0到數據集記錄數之間才能正常起作用,否則訂閱會從當前行開始,只有當新數據進入發布表時才能訂閱到數據。

  • handler:一元函數或表。它用於處理訂閱數據。若它是函數,其唯一的參數是訂閱到的數據。訂閱數據可以是一個數據表或元組,訂閱數據表的每個列是元組的一個元素。我們經常需要把訂閱數據插入到數據表。為了方便使用,handler也可以是一個數據表,並且訂閱數據可以直接插入到該表中。 下面的示例展示handler的兩種用途,subTable1直接把訂閱數據寫入目標table,subTable2通過自定義函數myHandler將數據進行過濾後寫入。

def myhandler(msg){
t = select * from msg where temperature>0.2
if(size(t)>0)
subTable2.append!(t)
}
share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable1
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable2
topic1 = subscribeTable(, "pubTable", "actionName1", -1, subTable1 , true)
topic1 = subscribeTable(, "pubTable", "actionName2", -1, myhandler , true)

vtimestamp = 1..10
vtemp = 2.0 2.2 2.3 2.4 2.5 2.6 2.7 0.13 0.23 2.9
tableInsert(pubTable,vtimestamp,vtemp)

從結果可以看到pubTable寫入10條數據,subTable1是全部接收了,而subTable2經過myhandler過濾掉了0.13,收到9條數據。

  • msgAsTable:表示訂閱的數據是否為表的布爾值。默認值是false,表示訂閱數據是由列組成的元組。 訂閱數據格式的不同通過下面的示例展示:

def myhandler1(table){
subTable1.append!(table)
}
def myhandler2(tuple){
tableInsert(subTable2,tuple[0],tuple[1])
}
share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable1
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable2
//msgAsTable = true
topic1 = subscribeTable(, "pubTable", "actionName1", -1, myhandler1 , true)
//msgAsTable = false
topic2 = subscribeTable(, "pubTable", "actionName2", -1, myhandler2 , false)

vtimestamp = 1..10
vtemp = 2.0 2.2 2.3 2.4 2.5 2.6 2.7 0.13 0.23 2.9
tableInsert(pubTable,vtimestamp,vtemp)

  • batchSize:一個整數,表示批處理的消息的行數。如果它是正數,直到消息的數量達到batchSize時,handler才會處理進來的消息。如果它沒有指定或者是非正數,消息一進來,handler就會處理消息。 下面示例展示當batchSize設置為11時,先向pubTable寫入10條數據,觀察訂閱表,然後再寫入1條數據,再觀察數據。

//batchSize
share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable
share streamTable(10000:0,`ts`temp, [TIMESTAMP,DOUBLE]) as subTable1
topic1 = subscribeTable(, "pubTable", "actionName1", -1, subTable1 , true, 11)
vtimestamp = 1..10
vtemp = 2.0 2.2 2.3 2.4 2.5 2.6 2.7 0.13 0.23 2.9
tableInsert(pubTable,vtimestamp,vtemp)

print size(subTable1)//0
insert into pubTable values(11,3.1)
print size(subTable1)//11

從結果可以看到,當發布數據累計到11條時,數據才進入到subTable1。

  • throttle:一個整數,表示handler處理進來的消息之前等待的時間,以秒為單位。默認值為1。如果沒有指定batchSize,throttle將不會起作用。

batchSize是用來做數據緩衝使用,有時候流數據的寫入頻率非常高,當消費能力跟不上數據進入的速度時,需要進行流量控制,否者訂閱端緩衝區很快會堆積數據並耗光所有的內存。 throttle設定一個時間,根據訂閱端的消費速度定時放一批數據進來,保障訂閱端的緩衝區數據量穩定。

  • hash:一個非負整數,指定某個訂閱線程處理進來的消息。如果沒有指定該參數,系統會自動分配一個線程。如果需要使用一個線程來處理多個訂閱任務的消息,把訂閱任務的hash設置為相同的值。當需要在兩個或多個訂閱的處理過程中保持消息數據的同步,可以將多個訂閱的hash值設置成相同,這樣就能使用同一個線程來同步處理多個數據源,不會出現數據處理有先後導致結果誤差。

2.3 取消訂閱

每一次訂閱都由一個訂閱主題topic作為唯一標識。如果訂閱時topic已存在,那麼會訂閱失敗。這時需要通過unsubscribeTable函數取消訂閱才能再次訂閱。取消訂閱示例如下:

//unsubscribe a local table
unsubscribeTable(,"pubTable","actionName1")

//unsubscribe a remote table
unsubscribeTable("NODE_1","pubTable","actionName1")

若要刪除共享的流數據表,可以使用undef函數。

undef("pubStreamTable",SHARED)

2.4 數據持久化

默認情況下,流計算的表把所有數據保存在內存中。隨著流數據持續寫入,系統可能會出現內存不足的情況。為了避免這個問題,我們可以設置流數據持久化到磁碟。如果流數據表的行數達到設定的界限值,前面一半的記錄行會從內存轉移到磁碟。持久化的數據支持重訂閱,當訂閱指定數據下標時,下標的計算包含持久化的數據。流數據持久化另一個重要的功能是流數據的備份和回復,當節點出現異常重啟時,持久化的數據會在重啟時自動載入到流數據表。

要啟動數據持久化,首先要在節點的配置文件中添加持久化路徑:

persisitenceDir = /data/streamCache

在腳本中使用enableTablePersistence函數設置針對某一個流數據表啟用持久化。 下面的示例針對pubTable表啟用持久化,其中asyn=true,compress=true, cacheSize=1000000,即當流數據表達到100萬行數據時啟用持久化,採用非同步方式壓縮保存。

對於持久化是否啟用非同步,需要對持久化數據一致性和性能之間作權衡,當流數據的一致性要求極高時,可以使用同步方式,這樣只有保證持久化做完,數據才會進入發布隊列;若對實時性要求極高,不希望磁碟IO影響到流數據的實時性,那麼可以啟用非同步方式,只有啟用非同步方式時,持久化工作線程數persistenceWorkerNum配置項才會起作用,當有多個publisher表需要持久化,增加persistenceWorkerNum可以提升非同步保存的效率。

enableTablePersistence(pubTable,true, true, 1000000)

當不需要保存在磁碟上的流數據時,通過clearTablePersistence函數可以刪除持久化數據。

clearTablePersistence(pubTable)

當整個流數據寫入結束時,可以使用disableTablePersistence命令關閉持久化。

disableTablePersistence(pubTable)

3. 使用Java API來訂閱DolphinDB流數據

當流數據進入DolphinDB並發布之後,數據的消費者可能是DolphinDB本身的聚合引擎,也可能是第三方的消息隊列或者第三方程序。所以DolphinDB提供了Streaming API供第三方程序來訂閱流數據。當有新數據進入時,這些通過API的訂閱者能夠及時的接收到通知,這使得DolphinDB的流數據框架可與第三方的應用做深入的整合。目前DolphinDB提供Java流數據API,後續會逐步支持C++、C#等流數據API。

Java API處理數據的方式有兩種:輪詢方式(Polling)和事件方式(EventHandler)。

輪詢方式示例代碼(Java):

PollingClient client = new PollingClient(subscribePort);
TopicPoller poller1 = client.subscribe(serverIP, serverPort, tableName, offset);

while (true) {
ArrayList<IMessage> msgs = poller1.poll(1000);
if (msgs.size() > 0) {
BasicInt value = msgs.get(0).getEntity(2); //取數據中第一行第二個欄位
}
}

當每次流數據表有新數據發布時,poller1會拉取到新數據,否則會阻塞在poller1.poll方法這裡等待。

事件方式示例代碼:

Java API使用預先設定的MessageHandler獲取和處理新數據。 首先需要調用者先定義數據處理器Handler,Handler需要實現com.xxdb.streaming.client.MessageHandler介面。

Handler實現示例如下:

public class MyHandler implements MessageHandler {
public void doEvent(IMessage msg) {
BasicInt qty = msg.getValue(2);
//..處理數據
}
}

在啟動訂閱時,把handler實例作為參數傳入訂閱函數。

ThreadedClient client = new ThreadedClient(subscribePort);
client.subscribe(serverIP, serverPort, tableName, new MyHandler(), offsetInt);

當每次流數據表有新數據發布時,API會調用MyHandler方法,並將新數據通過msg參數傳入。

4. 監控流數據運行狀態

當流數據通過訂閱方式進行數據的實時處理,所有的計算都在後台進行,用戶無法直觀的看到運行的情況。DolphinDB提供getStreamingStat函數,可以全方位監控流數據狀態。

getStreamingStat函數返回的是一個tuple結構,其中包含了pubConns, subConns, persistWorker, subWorkers四個表。

4.1 pubConns

pubConns表是流數據發布者狀態監控。每個發布者線程的最大隊列深度默認是1000萬。

列名稱 說明
client 發布端信息,記錄發布端IP和埠
queueDepthLimit 發布端數據緩衝區隊列最大限制
queueDepth 發布端數據緩衝區隊列深度
tables 發布的流數據表,多表通過,號分隔

查看錶內容:

getStreamingStat().pubConns

client queueDepthLimit queueDepth tables
192.168.1.61:8086 10,000,000 0 st1,st

pubConns表會列出當前所有的publisher節點,發布隊列情況,以及發布的流數據表名稱。

4.2 subConns

subConns表是流數據訂閱者鏈接狀態監控。

列名稱 說明
publisher 發布端信息,記錄發布端IP和埠
cumMsgCount 累計訂閱消息數
cumMsgLatency 累計消息延遲時間(毫秒)
lastMsgLatency 最後一次訂閱數據延遲時間(毫秒)
lastUpdate 最後一次數據更新時間

查看錶內容:

getStreamingStat().subConns

publisher cumMsgCount cumMsgLatency lastMsgLatency lastUpdate
local8081 199,980 19,799 10,990 2018.11.21T07:19:59.767945044

這張表列出所有非本地訂閱方的鏈接狀態和消息統計信息。

4.3 persistWorkers

persistWorkers 表是持久化工作線程監控。每個持久化工作線程的最大隊列深度默認是1000萬。

列名稱 說明
workerId worker編號
queueDepthLimit 隊列深度限制
queueDepth 隊列深度
tables 持久化表

只有當持久化啟用時,才能通過getStreamingStat獲取這張表,這裡記錄了所有持久化的表信息,這張表的記錄數等於persistenceWorkerNum配置數。比如持久化兩張數據表,可以通過getStreamingStat().persistWorkers查看。

當persistenceWorkerNum=1時,結果為:

getStreamingStat().persistWorkers

workerId queueDepthLimit queueDepth tables
0 10,000,000 0 st1,st

當persistenceWorkerNum=3時,結果為:

getStreamingStat().persistWorkers

workerId queueDepthLimit queueDepth tables
0 10,000,000 0 st
1 10,000,000 0 st1
2 10,000,000 0

從結果可以看出,persistenceWorkerNum為持久化數據表提供並行化能力。

4.4 subWorkers

subWorkers表是流數據訂閱者工作線程監控,這張表每條記錄代表一個訂閱工作線程。每個訂閱者線程的最大隊列深度默認是1000萬。

列名稱 說明
workerId worker編號
queueDepthLimit 訂閱端數據緩衝區隊列最大限制
queueDepth 訂閱端數據緩衝區隊列深度
processedMsgCount 已處理消息數量
failedMsgCount 處理失敗消息數量
lastErrMsg 上次失敗的消息
topics 已訂閱主題

配置項subExecutors, subExecutorPooling這兩個配置項的對流數據處理的影響,在這張表上可以得到充分的展現。在訂閱兩張流數據表st、st1時,可以通過getStreamingStat().subWorkers查看。

當subExecutorPooling=false, subExecutors=1時,結果如下:

getStreamingStat().subWorkers

workerId queueDepthLimit queueDepth processedMsgCount failedMsgCount lastErrMsg topics
0 10,000,000 0 0 0 local8081/st/st_act,local8081/st1/st1_act

此時,所有表的訂閱消息共用一個線程隊列。

當subExecutorPooling=false, subExecutors=2時,結果如下:

getStreamingStat().subWorkers

workerId queueDepthLimit queueDepth processedMsgCount failedMsgCount lastErrMsg topics
0 10,000,000 0 0 0 local8081/st/st_act
0 10,000,000 0 0 0 local8081/st/st_act

此時,各個表訂閱消息分配到兩個線程隊列獨立處理。

當subExecutorPooling=true, subExecutors=2時,結果如下:

getStreamingStat().subWorkers

workerId queueDepthLimit queueDepth processedMsgCount failedMsgCount lastErrMsg topics
0 10,000,000 0 0 0 local8081/st/st_act,local8081/st1/st1_act
0 10,000,000 0 0 0 local8081/st/st_act,local8081/st1/st1_act

此時,各個表的訂閱消息共享由兩個線程組成的線程池。

當有流數據進入時,可以通過這個表觀察到已處理數據量等信息:

getStreamingStat().subWorkers

workerId queueDepthLimit queueDepth processedMsgCount failedMsgCount lastErrMsg topics
0 10,000,000 0 100,621 0 local8081/st/st_act,local8081/st1/st1_act
0 10,000,000 0 99,359 0 local8081/st/st_act,local8081/st1/st1_act

5. 流數據性能調優

當數據流量極大而系統來不及處理時,系統監控中會看到訂閱端subWorkers表的queueDepth數值極高,此時系統會按照從訂閱端隊列-發布端隊列-數據注入端逐級反饋數據壓力。當訂閱端隊列深度達到上限時開始阻止發布端數據進入,此時發布端的隊列開始累積,當發布端的隊列深度達到上限時,系統會阻止流數據注入端寫入數據。這時可以通過以下幾種方式來調整,使得系統對流數據的處理性能達到最優化。

  • 調整訂閱參數中的batchSize和throttle參數,來組織數據的批處理和控制接收數據的流量,平衡發布端和訂閱端的緩存,讓數據處理速度和流數據輸入速度達到一個動態的平衡。batchSize可以設定等待流數據積累到一定量時才進行消費,可以充分發揮數據批量處理的性能優勢,但是這樣會帶來一定程度的內存佔用;而當batchSize比較大的時候,特殊情況下會發生數據量一直沒有達到batchSize而長期滯留在緩衝區的情況,throttle參數值是一個時間間隔,它的作用是即使batchSize未滿足,也能將緩衝區的數據消費掉。
  • 可以通過調整subExecutors配置參數,增加訂閱端計算的並行度,來加快訂閱端隊列的消費速度。系統默認採用哈希演算法為每一個訂閱分配一個executor。 在訂閱處理過程中,如果需要確保兩個訂閱用同一個executor來處理,可以在訂閱函數subscribeTable中指定參數hash的值。兩個訂閱使用相同的hash值,來指定用同一個線程來處理這兩個訂閱數據流,這樣可以保證這兩個流數據表的時序同步。當有多個executor存在時,如果不同訂閱的數據流頻率不均或者處理複雜度差異很大,容易導致低負載的executor資源閑置。通過設置subExecutorPooling=true,可以讓所有executor作為一個共享線程池,共同處理所有訂閱的消息。在這種共享池模式下,所有訂閱的消息進入同一個隊列,多個executor從隊列中讀取消息並行處理。共享線程池處理流數據的一個副作用是不能保證消息按到達的時間順序處理。當實際場景對消息處理的時間順序有嚴格要求時,不能開啟此設置。
  • 若流數據表(stream table)啟用同步持久化,那麼磁碟的IO可能會成為瓶頸。一種處理方法是參考2.4採用非同步方式持久化數據,同時設置一個合理的持久化隊列(maxPersistenceQueueDepth,默認值10000000條消息)。當然也可以通過更換硬體,提供更高寫入性能的存儲設備比如SSD硬碟來提高寫入性能。持久化路徑通過參數persistence來設置。
  • 如果數據發布端(publisher)成為系統的瓶頸,譬如訂閱的客戶端太多可能導致發布瓶頸,可以採用幾種處理辦法。首先可以通過多級級聯降低每一個發布節點的訂閱數量,對延遲不敏感的應用可以訂閱二級甚至三級的發布節點。其次可以調整部分參數來平衡延遲和吞吐量兩個指標。參數maxMsgNumBlock設置批量發送消息時批的大小,默認值是1024。一般情況下,批的值較大,吞吐量能提升,但網路延遲會增加。
  • 若輸入流數據的流量波動較大,高峰期導致消費隊列積壓至隊列峰值(默認1000萬),那麼可以修改配置項maxPubQueueDepthPerSite和maxSubQueueDepth來增加發布端和訂閱端的最大隊列深度,提高系統抵抗數據流大幅波動的能力。隊列深度增加時,內存消耗會增加,要正確估算內存的使用量,合理配置內存。

6. 流數據的展示

流數據可視化按功能可以分為兩種可視化類型:

  • 一種是實時值監控,用滑動窗口固定一個時間區間,把流數據聚合為一個值,並定時刷新,通常用於指標的監控和預警;
  • 另一種是趨勢監控,把新產生的數據附加到原有的數據上並以可視化圖表的方式漸進更新,通常用於做數據全局分析。

現有很多數據可視化的平台都能支持流數據的實時監控,比如當前流行的開源數據可視化框架Grafana, 它可以設定固定時間間隔去請求流數據表,並把數據以動態更新的數字或圖表方式展示出來。DolphinDB已經實現了Grafana的服務端和客戶端的介面,具體將配置可以參考教程Grafana如何連接DolphinDB。

推薦閱讀:

相关文章