前面一篇文章介紹了一種將非同步調用改成c++20協程的通用套路,該套路有點像一個公式,可以將任意非同步調用改成協程。但是對於協程機制進一步理解以後,隨便什麼非同步調用都能改成協程了,就算沒有回調函數也可以,這裡再舉一個grpc的例子,剛好這個例子中沒有使用回調函數。

grpc越來越火了,最近見到好幾個項目都用了它,所以我也趕緊花時間瞭解一下。但是它的c++非同步介面很難用,所以如果針對它封裝一套協程框架是不錯的選擇。c++協程庫很多,所以選擇也很多。我決定嘗試用c++20的coroutine來封裝。

於是我花了幾天時間看grpc的源碼,想想怎麼在上面封一套協程庫,但是grpc有點複雜,之前又沒用過,所以花了幾天還沒摸清楚啥套路。後來一想,不如直接看其例子的用法,不對庫作任何封裝,直接在例子裡面改,說不定會更簡單。於是就找了helloword裡面的greeter_async_client2,花了十幾分鐘就修改成功了。

greeter_async_client2這個例子是在主線程中發送100個字元串,從"world 0"到"word 99",發送是連續的,不等伺服器回包;伺服器針對每個請求在字元串前面加個一個"hello",所以結果是收到"hello world 0"到"hello world 99",收取是在另一個線程中收的,收到一個就列印一個結果。

這個例子是比較典型的非同步調用,但是體驗很差的就是發包和收包的邏輯是分開的,改成協程以後就會變成在同一個函數中發包、收回包、發下一個包、收下一個包的邏輯,代碼寫起來和同步是一樣的體驗。怎麼改呢?根據前一篇教程,需要在發包的邏輯裡面找一個類,添加上await_read/await_resume/await_suspend這3個函數。我是在AsyncClientCall這個結構體中添加,添加如下代碼(為了篇幅簡短,我把注釋刪掉了):

struct AsyncClientCall {
bool await_ready() const { return false; }
Status await_resume() { return status; }
void await_suspend(std::experimental::coroutine_handle<> h)
{
handle = h;
}
void Resume()
{
handle.resume();
}
std::experimental::coroutine_handle<> handle;

HelloReply reply;
ClientContext context;
Status status;
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader;
};

添加完上面的代碼,就可以針對AsyncClientCall類型的對象調用co_await了。

發包的SayHello函數我也稍微作了一點點改動,返回AsyncClientCall對象的指針。

AsyncClientCall* SayHello(const std::string& user) {
HelloRequest request;
request.set_name(user);
AsyncClientCall* call = new AsyncClientCall;
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
call->response_reader->StartCall();
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return call;
}

在收發包的地方就可以改成下面的「同步」調用了,先發送一個請求,然後再同一個函數裡面收回包:

Task HelloCoroutine(GreeterClient& greeter, std::string user)
{
std::cout<<"send request==>: "<<user<<std::endl;
auto* call = greeter.SayHello(user); // The actual RPC call!
Status status = co_await *call;

if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;

// Once were complete, deallocate the call object.
delete call;
}

上面的Task可以參見之前的文章。其實c++20裡面關於std::task已經有一個提案了,有興趣的參見這裡。clang裡面曾經有一個實現,我今天去看最新代碼裡面又把它刪掉了。其實自己實現一個最簡單的也很沒什麼工作量,就幾行代碼。

單個協程的內容改好了,再將主循環的內容改成同時啟動100個協程:

for (int i = 0; i < 100; i++) { // 開100個協程
std::string user("world " + std::to_string(i));
HelloCoroutine(greeter, user);
}

協程的恢復是在另一個線程中,即收包的函數中,加一個resume即可,如下:

void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;

while (cq_.Next(&got_tag, &ok))
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
GPR_ASSERT(ok);
call->Resume(); // 收到包了,在這裡恢復協程
}
}

總之,採用c++20的協程,不需要任何額外的event_loop,就可以輕鬆地將難用的grpc變得很好用。完全的源碼在這裡,保留了所有的注釋,如果用beyond compare來比較與之前非同步方式的差異,會看得更清楚。

這個例子當然也可以用重載co_await運算符的方法來實現,完整源碼在這裡。可見,c++20的協程使用非常方便靈活。

struct awaiter {
explicit awaiter(AsyncClientCall* call):call_(call){}
bool await_ready() const { return false; }
Status await_resume() { return call_->status; }
void await_suspend(std::experimental::coroutine_handle<> h)
{
call_->handle = h;
}

AsyncClientCall* call_;
};

awaiter operator co_await(AsyncClientCall& call)
{
return std::move(awaiter(&call));
}

grpc還有好幾個其它的例子,要全部改成協程應該也很快,以後有時間了再改。改完我會上傳到github同一個目錄中。

推薦閱讀:

相關文章