本節介紹flink的外部數據存儲的非同步I/O的API相關使用內容。

註:有關非同步I/O實用程序的設計和實現的詳細信息,請參閱提議和設計文檔FLIP-12:非同步I / O設計和實現。cwiki.apache.org/conflu

需要非同步I / O操作

當與外部系統交互時(例如,當使用存儲在資料庫中的數據來豐富流事件時),需要注意與外部系統的通信延遲不會影響流應用程序的整體工作。

直接訪問外部資料庫中的數據,例如在MapFunction中,通常意味著同步交互:向資料庫發送請求,並且MapFunction等待直到收到響應。 在許多情況下,這種等待佔據了函數的絕大部分時間。

與資料庫的非同步交互意味著單個並行函數實例可以同時處理許多請求並同時接收響應。 這樣,可以通過發送其他請求和接收響應來覆蓋等待時間。 至少,等待時間在多個請求上均攤。 這會使得大多數情況下流量吞吐量更高。

注意:通過將MapFunction擴展到非常高的並行度來提高吞吐量在某些情況下也是可能的, 但通常需要非常高的資源成本:擁有更多並行MapFunction實例意味著更多的任務,線程,Flink內部網路連接,資料庫鏈接,緩衝區和通用內部bookkeeping開銷。

先決條件

如上一節所示,對資料庫(或key/value存儲)實現適當的非同步I/O需要客戶端訪問支持非同步請求的資料庫。 許多流行的資料庫提供這樣的客戶端

在沒有這樣的客戶端的情況下,可以通過創建多個客戶端並使用線程池處理同步調用來嘗試將同步客戶端轉變為有限的並發客戶端。 但是,這種方法通常比適當的非同步客戶端效率低。

非同步I / O API

Flink的Async I/O API允許用戶將非同步請求客戶端與數據流一起使用。 API處理與數據流的集成,以及處理順序,事件時間,容錯等。

假設有一個目標資料庫的非同步客戶端,則需要三個部分來實現對資料庫的非同步I/O流轉換:

  • 調度請求的AsyncFunction的實現
  • 一個回調,它接受操作的結果並將其交給ResultFuture
  • 在DataStream上應用非同步I/O操作作為轉換

以下代碼示例說明了基本模式:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8s futures (which is the same one followed by Flinks Future)

/**
* An implementation of the AsyncFunction that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}

@Override
public void close() throws Exception {
client.close();
}

@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);

// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
CompletableFuture.supplyAsync(new Supplier<String>() {

@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

重要說明:ResultFuture在第一次調用ResultFuture.complete時完成。 隨後的所有完整調用都將被忽略。

以下兩個參數控制非同步操作:

  • 超時:超時定義非同步請求在被視為失敗之前可能需要多長時間。 此參數可防止dead/failed的請求。
  • 容量:此參數定義可能同時有多少非同步請求正在進行中。 儘管非同步I/O方法通常會帶來更好的吞吐量,但操作運算元仍然可能成為流應用程序的瓶頸。 限制並發請求的數量可確保操作運算元不會累積不斷增加的待處理請求積壓,但一旦容量耗盡就會觸發反壓。

超時處理

當非同步I/O請求超時時,默認情況下會引發異常並重新啟動作業。如果要處理超時,可以覆蓋AsyncFunction#timeout方法。

結果的順序

AsyncFunction發出的並發請求經常以某種未定義的順序完成,具體取決於首先完成的請求。為了控制發出結果記錄的順序,Flink提供了兩種模式:

  • Unordered:非同步請求完成後立即發出結果記錄。在非同步I/O運算符之後,流中記錄的順序與之前不同。當使用處理時間作為基本時間特性時,此模式具有最低延遲和最低開銷。 對此模式使用AsyncDataStream.unorderedWait(...)。
  • Ordered:在這種情況下,保留流順序。結果記錄的發出順序與觸發非同步請求的順序相同(運算符輸入記錄的順序)。 為此,操作運算元緩衝結果記錄,直到其所有先前記錄被發出(或超時)。這通常會在檢查點中引入一些額外的延遲和一些開銷,因為與無序模式相比,記錄或結果在檢查點狀態下保持更長的時間。 對此模式使用AsyncDataStream.orderedWait(...)。

事件時間

當流應用程序與事件時間一起工作時,非同步I / O操作符將正確處理watermark。這意味著兩種順序模式具體如下:

  • Unordered:watermark不會超過記錄,反之亦然,這意味著watermark建立了一個順序邊界。記錄僅在watermark之間無序發出。 只有在發出watermark後才會發出某個watermark後發生的記錄。反過來,只有在輸入的所有結果記錄發出之後才會發出watermark。

這意味著在存在watermark的情況下,無序模式會引入一些與有序模式相同的延遲和管理開銷。開銷量取決於watermark頻率。

  • Ordered:保留記錄的watermark順序,就像保留記錄之間的順序一樣。與處理時間相比,開銷沒有顯著變化。

請記住,注入時間是事件時間的一種特殊情況,其中自動生成的watermark基於源處理時間。

容錯保證

非同步I/O運算符提供完全一次的容錯保證。它將檢查點中的傳輸中非同步請求的記錄存儲起來,並在從故障中恢復時恢復/重新觸發請求。

實現技巧

對於有一個Executor(或scala的執行上下文-ExecutionContext)Future回調的實現,建議使用一個DirectExecutor,因為回調通常做最少的工作,此外DirectExecutor避免了額外的線程到線程切換的開銷。 回調通常只將結果傳遞給ResultFuture,後者將其添加到輸出緩衝區。從那裡開始,包括記錄發射和與檢查點簿記交互的重要邏輯無論如何都發生在專用線程池中。

DirectExecutor可以通過org.apache.flink.runtime.concurrent.Executors.directExecutor()或com.google.common.util.concurrent.MoreExecutors.directExecutor()獲得。

警告

AsyncFunction不能叫做多線程

想在這裡明確指出的常見混淆是AsyncFunction不是以多線程方式調用的。只存在一個AsyncFunction實例,並且為流的相應分區中的每個記錄順序調用它。 除非asyncInvoke(...)方法返回快速並依賴於回調(由客戶端),否則它將不會導致正確的非同步I/O.

例如,以下模式導致阻塞asyncInvoke(...)函數,從而使非同步行為無效:

  • 使用lookup/query方法調用資料庫客戶端會阻塞直到收到結果為止
  • 阻塞/等待非同步客戶端在asyncInvoke(...)方法中返回的future-type對象

推薦閱讀:

相关文章