(譯)剖析C#中的非同步方法
本文翻譯自 https://blogs.msdn.microsoft.com/seteplia/2017/11/30/dissecting-the-async-methods-in-c/
前言
c#語言對於開發人員的工作效率非常友好,我很高興最近的努力使它更適合於高性能應用程序。 下面是一個例子:C#5引入了「async」方法。從用戶的角度來看,該特性非常有用,因為它有助於將幾個基於任務的操作組合成一個操作。但是這種抽象是有代價的。任務是引用類型,在創建它們的任何地方都會導致堆分配,即使在「async」方法同步完成的情況下也是如此。使用C#7,非同步方法可以返回類似於任務的類型,比如ValueTask,以減少堆分配的數量,或者在某些場景中完全避免它們。 為了理解所有這些是如何實現的,我們需要深入瞭解非同步方法是如何實現的。
歷史回顧
但首先,讓我們回顧一下歷史。 類Task和Task是在.Net 4.0中引入的,在我看來,在.Net中的非同步和並行編程領域發生了巨大的思想轉變。與以前的非同步模式(如.Net 1.0中的BeginXXX/EndXXX模式,也稱為「非同步編程模型」)或基於事件的非同步模式(如.Net 2.0中的BackgroundWorker類)不同,任務是可組合的。 任務表示一個工作單元,承諾在將來返回結果。這個承諾可以由IO-operation支持,也可以表示計算密集型的操作。沒關係。重要的是,行動的結果是自給自足的,是一等公民(笑)。您可以傳遞一個方法:您可以將它存儲在一個變數中,從一個方法返回它,或者將它傳遞給另一個方法。您可以將兩個方法合併(委託鏈)在一起形成另一個,您可以同步等待結果,或者通過在方法中添加continuation來「等待」結果。僅僅使用task實例,您就可以決定如果操作成功、出現故障或被取消該怎麼辦。
任務並行庫(TPL)改變了我們對並發性的看法,C#5語言通過引入async/await向前邁出了一步。async/await有助於組合任務,使用戶能夠使用熟悉的結構,如try/catch、using等。但是像其他抽象一樣,非同步/等待特性也有它的代價。要想知道成本是多少,你就得看看編譯器底層的東西。
非同步方法內部機制
常規方法只有一個入口點和一個出口點(它可以有多個返回語句,但在運行時,給定調用只存在一個點)。但是非同步方法和迭代器(具有返回的方法)是不同的。在非同步方法的情況下,方法調用者幾乎可以立即獲得結果(例如Task或Task),然後通過生成的任務「等待」方法的實際結果。 我們將術語「async方法」定義為使用上下文關鍵字async標記的方法。這並不一定意味著方法是非同步執行的。這也不意味著該方法是非同步的。它只意味著編譯器對方法執行一些特殊的轉換。
讓我們考慮以下非同步方法:
class StockPrices
{
private Dictionary<string, decimal> _stockPrices;
public async Task<decimal> GetStockPriceForAsync(string companyId)
{
await InitializeMapIfNeededAsync();
_stockPrices.TryGetValue(companyId, out var result);
return result;
}
private async Task InitializeMapIfNeededAsync()
{
if (_stockPrices != null)
return;
await Task.Delay(42);
// Getting the stock prices from the external source and cache in memory.
_stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };
}
}
方法GetStockPriceForAsync確保_stockPrices字典已初始化,然後從緩存中獲取值。 為了更好地理解編譯器的功能,讓我們嘗試手工編寫一個轉換。
手寫非同步方法
class GetStockPriceForAsync_StateMachine
{
enum State { Start, Step1, }
private readonly StockPrices @this;
private readonly string _companyId;
private readonly TaskCompletionSource<decimal> _tcs;
private Task _initializeMapIfNeededTask;
private State _state = State.Start;
public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)
{
this.@this = @this;
_companyId = companyId;
}
public void Start()
{
try
{
if (_state == State.Start)
{
// The code from the start of the method to the first await.
if (string.IsNullOrEmpty(_companyId))
throw new ArgumentNullException();
_initializeMapIfNeededTask = @this.InitializeMapIfNeeded();
// Update state and schedule continuation
_state = State.Step1;
_initializeMapIfNeededTask.ContinueWith(_ => Start());
}
else if (_state == State.Step1)
{
// Need to check the error and the cancel case first
if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)
_tcs.SetCanceled();
else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)
_tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);
else
{
// The code between first await and the rest of the method
@this._store.TryGetValue(_companyId, out var result);
_tcs.SetResult(result);
}
}
}
catch (Exception e)
{
_tcs.SetException(e);
}
}
public Task<decimal> Task => _tcs.Task;
}
public Task<decimal> GetStockPriceForAsync(string companyId)
{
var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);
stateMachine.Start();
return stateMachine.Task;
}
代碼冗長,但相對簡單。所有來自GetStockPriceForAsync的邏輯都被移動到GetStockPriceForAsync_StateMachine。使用「延續傳遞樣式」的Start方法。非同步轉換的一般演算法是將原始方法在await邊界處分割為多個塊。第一個塊是從方法開始到第一個await的代碼。第二個塊:從第一個await到第二個await。第三個塊:從上面的代碼到第三個塊或者直到方法結束,以此類推:
// Step 1 來自生成的非同步狀態機:
if (string.IsNullOrEmpty(_companyId)) throw new ArgumentNullException();
_initializeMapIfNeededTask = @this.InitializeMapIfNeeded();
現在,每個等待的任務都成為狀態機的一個欄位,Start方法將自己訂閱為每個任務的延續:
_state = State.Step1;
_initializeMapIfNeededTask.ContinueWith(_ => Start());
然後,當任務完成時,返回Start方法,並檢查_state欄位,以瞭解我們所處的階段。然後邏輯檢查任務`是否成功完成`、`是否被取消`或`是否成功`。在後一種情況下,狀態機向前移動並運行下一個代碼塊。當所有操作完成後,狀態機將`TaskCompletionSource<T>`實例的結果設置為完成,從`GetStockPricesForAsync`返回的結果任務將其狀態更改為完成
// The code between first await and the rest of the method
@this._stockPrices.TryGetValue(_companyId, out var result);
_tcs.SetResult(result); // The caller gets the result back
這種「實現」有幾個嚴重的缺點:
1. 大量堆分配:1個分配給狀態機,1個分配給TaskCompletionSource<T>,1個分配給TaskCompletionSource<T>中的任務,1個分配給延續委託。
2. 缺少「熱路徑優化」:如果等待的任務已經完成,就沒有理由創建延續。
3. 缺乏可擴展性:實現與基於任務的類緊密耦合,這使得不可能與其他場景一起使用,比如等待其他類型或返回Task或Task<T>之外的其他類型。
現在讓我們看一下實際的非同步機制,以瞭解如何解決這些問題。
真正的非同步機制
編譯器對非同步方法轉換所採用的總體方法與上面提到的方法非常相似。為了得到想要的行為,編譯器依賴於以下類型:
1.生成的狀態機,其作用類似於非同步方法的堆棧框架,並包含來自原始非同步方法的所有邏輯。2.AsyncTaskMethodBuilder<T>,它保存完成的任務(非常類似於TaskCompletionSource<T>類型),並管理狀態機的狀態轉換。
3.TaskAwaiter<T>,它封裝了一個任務,並在需要時安排任務的延續。
4.調用IAsyncStateMachine的MoveNextRunner。在正確的執行上下文中使用MoveNextmethod。
`生成的狀態機是處於調試模式的類和處於發布模式的結構。`所有其他類型(MoveNextRunner類除外)都在BCL中定義為struct。
編譯器為狀態機生成一個類似於<YourMethodNameAsync>d_ 1的類型名。為了避免名稱衝突,生成的名稱包含無效的標識符字元,這些字元不能由用戶定義或引用。但是為了簡化下面所有示例,我將使用有效標識符,方法是用_替換<和>字元,並使用更容易理解的名稱。
原始的方法
原始的「非同步」方法創建一個狀態機實例,用捕獲的狀態初始化它(如果方法不是靜態的,包括這個指針),然後通過調用AsyncTaskMethodBuilder<T>開始執行。從引用傳遞的狀態機實例開始
[AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))]
public Task<decimal> GetStockPriceFor(string companyId)
{
_GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;
_GetStockPriceFor_d__.__this = this;
_GetStockPriceFor_d__.companyId = companyId;
_GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();
_GetStockPriceFor_d__.__state = -1;
var __t__builder = _GetStockPriceFor_d__.__builder;
__t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);
return _GetStockPriceFor_d__.__builder.Task;
}
通過引用傳遞是一個重要的優化,因為狀態機往往是相當大的結構體(>100位元組),通過引用傳遞它可以避免冗餘的複製。
狀態機
struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine
{
public StockPrices __this;
public string companyId;
public AsyncTaskMethodBuilder<decimal> __builder;
public int __state;
private TaskAwaiter __task1Awaiter;
public void MoveNext()
{
decimal result;
try
{
TaskAwaiter awaiter;
if (__state != 0)
{
// State 1 of the generated state machine:
if (string.IsNullOrEmpty(companyId))
throw new ArgumentNullException();
awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
// Hot path optimization: if the task is completed,
// the state machine automatically moves to the next step
if (!awaiter.IsCompleted)
{
__state = 0;
__task1Awaiter = awaiter;
// The following call will eventually cause boxing of the state machine.
__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
return;
}
}
else
{
awaiter = __task1Awaiter;
__task1Awaiter = default(TaskAwaiter);
__state = -1;
}
// GetResult returns void, but itll throw if the awaited task failed.
// This exception is catched later and changes the resulting task.
awaiter.GetResult();
__this._stocks.TryGetValue(companyId, out result);
}
catch (Exception exception)
{
// Final state: failure
__state = -2;
__builder.SetException(exception);
return;
}
// Final state: success
__state = -2;
__builder.SetResult(result);
}
void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
__builder.SetStateMachine(stateMachine);
}
}
生成的狀態機看起來很複雜,但在本質上,它非常類似於我們手工創建的狀態機。
儘管狀態機類似於手工製作的狀態機,但它有幾個非常重要的區別:
生成狀態機與我們手工製作狀態機區別
1.熱路徑優化
與我們的簡單方法不同,生成的狀態機知道等待的任務可能已經完成。
awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
// Hot path optimization: if the task is completed,
// the state machine automatically moves to the next step
if (!awaiter.IsCompleted)
{
// Irrelevant stuff
// The following call will eventually cause boxing of the state machine.
__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
return;
}
如果等待的任務已經完成(成功與否),狀態機將進入下一步:
// GetResult returns void, but itll throw if the awaited task failed.
// This exception is catched later and changes the resulting task.
awaiter.GetResult();
__this._stocks.TryGetValue(companyId, out result);
這意味著,如果所有等待的任務都已經完成,那麼整個狀態機將留在堆棧上。即使在今天,如果所有等待的任務已經完成或將同步完成,非同步方法的內存開銷也可能非常小。剩下的唯一分配將是任務本身!
2.錯誤處理
沒有什麼特殊的邏輯來覆蓋所等待任務的故障或取消狀態。狀態機調用一個awaiter.getresult(),如果任務被取消,它將拋出TaskCancelledException,如果任務失敗,則拋出另一個異常。這是一個很好的解決方案,因為GetResult()在錯誤處理方面與task.Wait()或task.Result稍有不同。
wait()和task。即使只有一個異常導致任務失敗,也要拋出AggregateException。原因很簡單:任務不僅可以表示通常只有一次失敗的io綁定操作,還可以表示並行計算的結果。在後一種情況下,操作可能有多個錯誤,AggregateException被設計為在一個地方攜帶所有這些錯誤。
但是async/await模式是專門為非同步操作設計的,非同步操作通常最多有一個錯誤。因此,該語言的作者決定,如果awaiter.GetResult()將「打開」AggregateException並拋出第一個失敗,則更有意義。這個設計決策並不完美,在下一篇文章中,我們將看到這個抽象什麼時候會泄漏。
非同步狀態機只表示拼圖的一部分。要了解整個情況,我們需要知道狀態機實例如何與 TaskAwaiter<T>和AsyncTaskMethodBuilder<T> 交互。