本文翻譯自 blogs.msdn.microsoft.com

前言

c#語言對於開發人員的工作效率非常友好,我很高興最近的努力使它更適合於高性能應用程序。 下面是一個例子:C#5引入了「async」方法。從用戶的角度來看,該特性非常有用,因為它有助於將幾個基於任務的操作組合成一個操作。但是這種抽象是有代價的。任務是引用類型,在創建它們的任何地方都會導致堆分配,即使在「async」方法同步完成的情況下也是如此。使用C#7,非同步方法可以返回類似於任務的類型,比如ValueTask,以減少堆分配的數量,或者在某些場景中完全避免它們。 為了理解所有這些是如何實現的,我們需要深入瞭解非同步方法是如何實現的。

歷史回顧

但首先,讓我們回顧一下歷史。 類Task和Task是在.Net 4.0中引入的,在我看來,在.Net中的非同步和並行編程領域發生了巨大的思想轉變。與以前的非同步模式(如.Net 1.0中的BeginXXX/EndXXX模式,也稱為「非同步編程模型」)或基於事件的非同步模式(如.Net 2.0中的BackgroundWorker類)不同,任務是可組合的。 任務表示一個工作單元,承諾在將來返回結果。這個承諾可以由IO-operation支持,也可以表示計算密集型的操作。沒關係。重要的是,行動的結果是自給自足的,是一等公民(笑)。您可以傳遞一個方法:您可以將它存儲在一個變數中,從一個方法返回它,或者將它傳遞給另一個方法。您可以將兩個方法合併(委託鏈)在一起形成另一個,您可以同步等待結果,或者通過在方法中添加continuation來「等待」結果。僅僅使用task實例,您就可以決定如果操作成功、出現故障或被取消該怎麼辦。

任務並行庫(TPL)改變了我們對並發性的看法,C#5語言通過引入async/await向前邁出了一步。async/await有助於組合任務,使用戶能夠使用熟悉的結構,如try/catch、using等。但是像其他抽象一樣,非同步/等待特性也有它的代價。要想知道成本是多少,你就得看看編譯器底層的東西。

非同步方法內部機制

常規方法只有一個入口點和一個出口點(它可以有多個返回語句,但在運行時,給定調用只存在一個點)。但是非同步方法和迭代器(具有返回的方法)是不同的。在非同步方法的情況下,方法調用者幾乎可以立即獲得結果(例如Task或Task),然後通過生成的任務「等待」方法的實際結果。 我們將術語「async方法」定義為使用上下文關鍵字async標記的方法。這並不一定意味著方法是非同步執行的。這也不意味著該方法是非同步的。它只意味著編譯器對方法執行一些特殊的轉換。

讓我們考慮以下非同步方法:

class StockPrices
{
private Dictionary<string, decimal> _stockPrices;
public async Task<decimal> GetStockPriceForAsync(string companyId)
{
await InitializeMapIfNeededAsync();
_stockPrices.TryGetValue(companyId, out var result);
return result;
}

private async Task InitializeMapIfNeededAsync()
{
if (_stockPrices != null)
return;

await Task.Delay(42);
// Getting the stock prices from the external source and cache in memory.
_stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };
}
}

方法GetStockPriceForAsync確保_stockPrices字典已初始化,然後從緩存中獲取值。 為了更好地理解編譯器的功能,讓我們嘗試手工編寫一個轉換。

手寫非同步方法

class GetStockPriceForAsync_StateMachine
{
enum State { Start, Step1, }
private readonly StockPrices @this;
private readonly string _companyId;
private readonly TaskCompletionSource<decimal> _tcs;
private Task _initializeMapIfNeededTask;
private State _state = State.Start;

public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)
{
this.@this = @this;
_companyId = companyId;
}

public void Start()
{
try
{
if (_state == State.Start)
{
// The code from the start of the method to the first await.

if (string.IsNullOrEmpty(_companyId))
throw new ArgumentNullException();

_initializeMapIfNeededTask = @this.InitializeMapIfNeeded();

// Update state and schedule continuation
_state = State.Step1;
_initializeMapIfNeededTask.ContinueWith(_ => Start());
}
else if (_state == State.Step1)
{
// Need to check the error and the cancel case first
if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)
_tcs.SetCanceled();
else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)
_tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);
else
{
// The code between first await and the rest of the method

@this._store.TryGetValue(_companyId, out var result);
_tcs.SetResult(result);
}
}
}
catch (Exception e)
{
_tcs.SetException(e);
}
}

public Task<decimal> Task => _tcs.Task;
}

public Task<decimal> GetStockPriceForAsync(string companyId)
{
var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);
stateMachine.Start();
return stateMachine.Task;
}

代碼冗長,但相對簡單。所有來自GetStockPriceForAsync的邏輯都被移動到GetStockPriceForAsync_StateMachine。使用「延續傳遞樣式」的Start方法。非同步轉換的一般演算法是將原始方法在await邊界處分割為多個塊。第一個塊是從方法開始到第一個await的代碼。第二個塊:從第一個await到第二個await。第三個塊:從上面的代碼到第三個塊或者直到方法結束,以此類推:

// Step 1 來自生成的非同步狀態機:

if (string.IsNullOrEmpty(_companyId)) throw new ArgumentNullException();
_initializeMapIfNeededTask = @this.InitializeMapIfNeeded();

現在,每個等待的任務都成為狀態機的一個欄位,Start方法將自己訂閱為每個任務的延續:

_state = State.Step1;
_initializeMapIfNeededTask.ContinueWith(_ => Start());

然後,當任務完成時,返回Start方法,並檢查_state欄位,以瞭解我們所處的階段。然後邏輯檢查任務`是否成功完成`、`是否被取消`或`是否成功`。在後一種情況下,狀態機向前移動並運行下一個代碼塊。當所有操作完成後,狀態機將`TaskCompletionSource<T>`實例的結果設置為完成,從`GetStockPricesForAsync`返回的結果任務將其狀態更改為完成

// The code between first await and the rest of the method

@this._stockPrices.TryGetValue(_companyId, out var result);
_tcs.SetResult(result); // The caller gets the result back

這種「實現」有幾個嚴重的缺點:

1. 大量堆分配:1個分配給狀態機,1個分配給TaskCompletionSource<T>,1個分配給TaskCompletionSource<T>中的任務,1個分配給延續委託。

2. 缺少「熱路徑優化」:如果等待的任務已經完成,就沒有理由創建延續。

3. 缺乏可擴展性:實現與基於任務的類緊密耦合,這使得不可能與其他場景一起使用,比如等待其他類型或返回Task或Task<T>之外的其他類型。

現在讓我們看一下實際的非同步機制,以瞭解如何解決這些問題。

真正的非同步機制

編譯器對非同步方法轉換所採用的總體方法與上面提到的方法非常相似。為了得到想要的行為,編譯器依賴於以下類型:

1.生成的狀態機,其作用類似於非同步方法的堆棧框架,並包含來自原始非同步方法的所有邏輯。

2.AsyncTaskMethodBuilder<T>,它保存完成的任務(非常類似於TaskCompletionSource<T>類型),並管理狀態機的狀態轉換。

3.TaskAwaiter<T>,它封裝了一個任務,並在需要時安排任務的延續。

4.調用IAsyncStateMachine的MoveNextRunner。在正確的執行上下文中使用MoveNextmethod。

`生成的狀態機是處於調試模式的類和處於發布模式的結構。`所有其他類型(MoveNextRunner類除外)都在BCL中定義為struct。

編譯器為狀態機生成一個類似於<YourMethodNameAsync>d_ 1的類型名。為了避免名稱衝突,生成的名稱包含無效的標識符字元,這些字元不能由用戶定義或引用。但是為了簡化下面所有示例,我將使用有效標識符,方法是用_替換<和>字元,並使用更容易理解的名稱。

原始的方法

原始的「非同步」方法創建一個狀態機實例,用捕獲的狀態初始化它(如果方法不是靜態的,包括這個指針),然後通過調用AsyncTaskMethodBuilder<T>開始執行。從引用傳遞的狀態機實例開始

[AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))]
public Task<decimal> GetStockPriceFor(string companyId)
{
_GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;
_GetStockPriceFor_d__.__this = this;
_GetStockPriceFor_d__.companyId = companyId;
_GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();
_GetStockPriceFor_d__.__state = -1;
var __t__builder = _GetStockPriceFor_d__.__builder;
__t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);
return _GetStockPriceFor_d__.__builder.Task;
}

通過引用傳遞是一個重要的優化,因為狀態機往往是相當大的結構體(>100位元組),通過引用傳遞它可以避免冗餘的複製。

狀態機

struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine
{
public StockPrices __this;
public string companyId;
public AsyncTaskMethodBuilder<decimal> __builder;
public int __state;
private TaskAwaiter __task1Awaiter;

public void MoveNext()
{
decimal result;
try
{
TaskAwaiter awaiter;
if (__state != 0)
{
// State 1 of the generated state machine:
if (string.IsNullOrEmpty(companyId))
throw new ArgumentNullException();

awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();

// Hot path optimization: if the task is completed,
// the state machine automatically moves to the next step
if (!awaiter.IsCompleted)
{
__state = 0;
__task1Awaiter = awaiter;

// The following call will eventually cause boxing of the state machine.
__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
return;
}
}
else
{
awaiter = __task1Awaiter;
__task1Awaiter = default(TaskAwaiter);
__state = -1;
}

// GetResult returns void, but itll throw if the awaited task failed.
// This exception is catched later and changes the resulting task.
awaiter.GetResult();
__this._stocks.TryGetValue(companyId, out result);
}
catch (Exception exception)
{
// Final state: failure
__state = -2;
__builder.SetException(exception);
return;
}

// Final state: success
__state = -2;
__builder.SetResult(result);
}

void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
__builder.SetStateMachine(stateMachine);
}
}

生成的狀態機看起來很複雜,但在本質上,它非常類似於我們手工創建的狀態機。

儘管狀態機類似於手工製作的狀態機,但它有幾個非常重要的區別:

生成狀態機與我們手工製作狀態機區別

1.熱路徑優化

與我們的簡單方法不同,生成的狀態機知道等待的任務可能已經完成。

awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();

// Hot path optimization: if the task is completed,
// the state machine automatically moves to the next step
if (!awaiter.IsCompleted)
{
// Irrelevant stuff

// The following call will eventually cause boxing of the state machine.
__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
return;
}

如果等待的任務已經完成(成功與否),狀態機將進入下一步:

// GetResult returns void, but itll throw if the awaited task failed.
// This exception is catched later and changes the resulting task.
awaiter.GetResult();
__this._stocks.TryGetValue(companyId, out result);

這意味著,如果所有等待的任務都已經完成,那麼整個狀態機將留在堆棧上。即使在今天,如果所有等待的任務已經完成或將同步完成,非同步方法的內存開銷也可能非常小。剩下的唯一分配將是任務本身!

2.錯誤處理

沒有什麼特殊的邏輯來覆蓋所等待任務的故障或取消狀態。狀態機調用一個awaiter.getresult(),如果任務被取消,它將拋出TaskCancelledException,如果任務失敗,則拋出另一個異常。這是一個很好的解決方案,因為GetResult()在錯誤處理方面與task.Wait()或task.Result稍有不同。

wait()和task。即使只有一個異常導致任務失敗,也要拋出AggregateException。原因很簡單:任務不僅可以表示通常只有一次失敗的io綁定操作,還可以表示並行計算的結果。在後一種情況下,操作可能有多個錯誤,AggregateException被設計為在一個地方攜帶所有這些錯誤。

但是async/await模式是專門為非同步操作設計的,非同步操作通常最多有一個錯誤。因此,該語言的作者決定,如果awaiter.GetResult()將「打開」AggregateException並拋出第一個失敗,則更有意義。這個設計決策並不完美,在下一篇文章中,我們將看到這個抽象什麼時候會泄漏。

非同步狀態機只表示拼圖的一部分。要了解整個情況,我們需要知道狀態機實例如何與 TaskAwaiter<T>和AsyncTaskMethodBuilder<T> 交互。

圖表看起來過於複雜,但每一塊都設計得很好,發揮著重要作用。最有趣的協作發生在一個等待的任務沒有完成的時候(圖中用棕色矩形標記):

狀態機調用_builder。`AwaitUnsafeOnCompleted`(ref awaiter, ref this);將自己註冊為任務的延續。

構建器確保任務完成時使用`IAsyncStateMachine`。調用`MoveNext`方法:

構建器捕獲當前ExecutionContext並創建MoveNextRunner實例來將其與當前狀態機實例關聯。然後它從MoveNextRunner創建一個Action實例。運行它將在捕獲的執行上下文中向前移動狀態機。

構建器調用TaskAwaiter.UnsafeOnCompleted(action),它將給定的操作調度為等待任務的延續。

當等待的任務完成時,調用給定的回調函數,狀態機運行非同步方法的下一個代碼塊。

執行上下文

有人可能會問:執行上下文是什麼,為什麼我們需要這麼複雜的東西?

在同步世界中,每個線程都將環境信息保存在線程本地存儲中。它可以是與安全相關的信息、特定於文化的數據或其他東西。當在一個線程中依次調用3個方法時,這些信息在所有方法之間自然地流動。但是對於非同步方法不再是這樣了。非同步方法的每個「部分」都可以在不同的線程中執行,這使得線程本地信息不可用。

執行上下文保存一個邏輯控制流的信息,即使它跨越多個線程。

方法類似的任務。或ThreadPool運行。QueueUserWorkItem自動執行此操作。的任務。Run方法從調用線程捕獲ExecutionContext,並將其與任務實例一起存儲。當與任務關聯的任務調度程序運行給定的委託時,它通過ExecutionContext運行委託。使用存儲的上下文運行。

我們可以使用AsyncLocal<T>來演示這個概念:

static Task ExecutionContextInAction()
{
var li = new AsyncLocal<int>();
li.Value = 42;

return Task.Run(() =>
{
// Task.Run restores the execution context
Console.WriteLine("In Task.Run: " + li.Value);
}).ContinueWith(_ =>
{
// The continuation restores the execution context as well
Console.WriteLine("In Task.ContinueWith: " + li.Value);
});
}

在這些情況下,執行上下文通過Task.Run然後Task.ContinueWith方法。如果你運行這個方法,你會看到

In Task.Run: 42

In Task.ContinueWith: 42

但並不是BCL中的所有方法都會自動捕獲並恢復執行上下文。兩個例外是TaskAwaiter<T>.UnsafeOnComplete和AsyncMethodBuilder<T>.AwaitUnsafeOnComplete。看起來很奇怪,語言作者決定使用AsyncMethodBuilder<T>和movenextr手動添加「不安全」方法來輪流執行上下文,而不是依賴於像AwaitTaskContinuation這樣的內置工具。我懷疑現有實現存在一些性能原因或其他限制。

下面是一個例子來說明兩者的區別:

static async Task ExecutionContextInAsyncMethod()
{
var li = new AsyncLocal<int>();
li.Value = 42;
await Task.Delay(42);

// The context is implicitely captured. li.Value is 42
Console.WriteLine("After first await: " + li.Value);

var tsk2 = Task.Yield();
tsk2.GetAwaiter().UnsafeOnCompleted(() =>
{
// The context is not captured: li.Value is 0
Console.WriteLine("Inside UnsafeOnCompleted: " + li.Value);
});

await tsk2;

// The context is captured: li.Value is 42
Console.WriteLine("After second await: " + li.Value);
}

輸出為

After first await: 42

Inside UnsafeOnCompleted: 0

After second await: 42

結論

非同步方法與同步方法非常不同。

編譯器為每個方法生成一個狀態機,並將原始方法的所有邏輯移到那裡。

生成的代碼針對同步場景進行了高度優化:如果所有等待的任務都完成了,那麼非同步方法的開銷就很小。

如果等待的任務沒有完成,則邏輯依賴於許多helper類型來完成任務。

推薦閱讀:

相關文章