前面一篇文章介绍了一种将非同步调用改成c++20协程的通用套路,该套路有点像一个公式,可以将任意非同步调用改成协程。但是对于协程机制进一步理解以后,随便什么非同步调用都能改成协程了,就算没有回调函数也可以,这里再举一个grpc的例子,刚好这个例子中没有使用回调函数。

grpc越来越火了,最近见到好几个项目都用了它,所以我也赶紧花时间了解一下。但是它的c++非同步介面很难用,所以如果针对它封装一套协程框架是不错的选择。c++协程库很多,所以选择也很多。我决定尝试用c++20的coroutine来封装。

于是我花了几天时间看grpc的源码,想想怎么在上面封一套协程库,但是grpc有点复杂,之前又没用过,所以花了几天还没摸清楚啥套路。后来一想,不如直接看其例子的用法,不对库作任何封装,直接在例子里面改,说不定会更简单。于是就找了helloword里面的greeter_async_client2,花了十几分钟就修改成功了。

greeter_async_client2这个例子是在主线程中发送100个字元串,从"world 0"到"word 99",发送是连续的,不等伺服器回包;伺服器针对每个请求在字元串前面加个一个"hello",所以结果是收到"hello world 0"到"hello world 99",收取是在另一个线程中收的,收到一个就列印一个结果。

这个例子是比较典型的非同步调用,但是体验很差的就是发包和收包的逻辑是分开的,改成协程以后就会变成在同一个函数中发包、收回包、发下一个包、收下一个包的逻辑,代码写起来和同步是一样的体验。怎么改呢?根据前一篇教程,需要在发包的逻辑里面找一个类,添加上await_read/await_resume/await_suspend这3个函数。我是在AsyncClientCall这个结构体中添加,添加如下代码(为了篇幅简短,我把注释删掉了):

struct AsyncClientCall {
bool await_ready() const { return false; }
Status await_resume() { return status; }
void await_suspend(std::experimental::coroutine_handle<> h)
{
handle = h;
}
void Resume()
{
handle.resume();
}
std::experimental::coroutine_handle<> handle;

HelloReply reply;
ClientContext context;
Status status;
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader;
};

添加完上面的代码,就可以针对AsyncClientCall类型的对象调用co_await了。

发包的SayHello函数我也稍微作了一点点改动,返回AsyncClientCall对象的指针。

AsyncClientCall* SayHello(const std::string& user) {
HelloRequest request;
request.set_name(user);
AsyncClientCall* call = new AsyncClientCall;
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
call->response_reader->StartCall();
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return call;
}

在收发包的地方就可以改成下面的「同步」调用了,先发送一个请求,然后再同一个函数里面收回包:

Task HelloCoroutine(GreeterClient& greeter, std::string user)
{
std::cout<<"send request==>: "<<user<<std::endl;
auto* call = greeter.SayHello(user); // The actual RPC call!
Status status = co_await *call;

if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;

// Once were complete, deallocate the call object.
delete call;
}

上面的Task可以参见之前的文章。其实c++20里面关于std::task已经有一个提案了,有兴趣的参见这里。clang里面曾经有一个实现,我今天去看最新代码里面又把它删掉了。其实自己实现一个最简单的也很没什么工作量,就几行代码。

单个协程的内容改好了,再将主循环的内容改成同时启动100个协程:

for (int i = 0; i < 100; i++) { // 开100个协程
std::string user("world " + std::to_string(i));
HelloCoroutine(greeter, user);
}

协程的恢复是在另一个线程中,即收包的函数中,加一个resume即可,如下:

void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;

while (cq_.Next(&got_tag, &ok))
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
GPR_ASSERT(ok);
call->Resume(); // 收到包了,在这里恢复协程
}
}

总之,采用c++20的协程,不需要任何额外的event_loop,就可以轻松地将难用的grpc变得很好用。完全的源码在这里,保留了所有的注释,如果用beyond compare来比较与之前非同步方式的差异,会看得更清楚。

这个例子当然也可以用重载co_await运算符的方法来实现,完整源码在这里。可见,c++20的协程使用非常方便灵活。

struct awaiter {
explicit awaiter(AsyncClientCall* call):call_(call){}
bool await_ready() const { return false; }
Status await_resume() { return call_->status; }
void await_suspend(std::experimental::coroutine_handle<> h)
{
call_->handle = h;
}

AsyncClientCall* call_;
};

awaiter operator co_await(AsyncClientCall& call)
{
return std::move(awaiter(&call));
}

grpc还有好几个其它的例子,要全部改成协程应该也很快,以后有时间了再改。改完我会上传到github同一个目录中。

推荐阅读:

相关文章