Vert.x的EventBus是一個簡單易用的消息匯流排,和Verticle配合使用更是錦上添花,但是一個channel上只能接受單個類型的消息,如果一個需要讓一個Verticle處理多種類型的消息則需要大量的手工重複勞動,並且容易出錯,所以我在Vert.x/Kotlin的基礎上用EventBus實現了一個極簡的RPC框架。

選擇Kotlin的原因無他,就因為糖多,而且Kotlin對於coroutine的支持十分完善,對於RPC這種天生非同步的功能更是絕配,Scala在糖多這點上比起Kotlin有過之而無不及,但是Coroutine的支持還不完善,從早年間的shift/reset到現在的Scala coroutine,並沒有一個真正達到了可用的程度,而且Vert.x也沒有支持。另外Scala和Java的interop一直是巨大的PITA,寫純Scala還好,一旦遇到要使用Java包的場景總會讓人噁心一陣子。

回到正題,一個RPC庫,如果拋開management/discovery/governance之類的上層建築,剩下的功能無非就是序列化/反序列化、請求傳輸、以及服務端的分派,序列化反序列化有無數現成的庫所以不是重點,我們重點看一下客戶端的ServiceProxy以及服務端的Dynamic Dispatch。

作為輕量級的RPC框架,要求用戶寫IDL之類的東西是會被人打的,所以我們會通過Kotlin/Java提供的反射功能動態的處理所有這些雜事。

讓我們先定義請求/響應的包,這玩意兒其實簡單到沒啥好說:

data class RpcRequest(val service: String = "",
val method: String = "",
val args: Array<out Any?> = arrayOf())

data class RpcResponse(val response: Any? = null)

先來看看服務端如何處理一個RpcRequest並返回一個RpcResponse。

suspend fun processRequest(request: RpcRequest): RpcResponse {
val ret = impl::class.members.first {
// TODO: Check signature to support overloading
it.name == request.method
}.callSuspend(impl, *(request.args))
return RpcResponse(ret)

這部分其實挺簡單的,Kotlin提供了足夠多的反射功能,並且支持suspend,所以我們就需要在服務實體中找到對應的method並且把參數都傳給它即可,然後拿到返回值並打包成RpcResponse。

服務端的工作還剩一點,就是如何host這些service,最簡單的辦法就是創建一個CoroutineVerticle,然後接受message並分派到指定的service,代碼寫出來大概就是這個樣子,其中services這個成員就是個map:

for (msg in vertx.eventBus().consumer<ByteArray>(channel).toChannel(vertx)) {
// Start a new coroutine to handle the incoming request to support recursive call
launch(vertx.dispatcher()) {
try {
with(msg.body().toRpcRequest()) {
msg.reply(services[service]?.processRequest(this)?.toBytes()
?: throw NoSuchElementException("Service $service not found"))
}
} catch (e: Throwable) {
msg.fail(1, e.message)
}
}
}

然後來看客戶端如何把一個請求打包成RpcRequest並處理服務端的RpcReponse。

在實現這一部分之前,我們先要規範一下客戶端。

一般而言,對於一個RPC service,無論服務端如何實現,客戶端需要有一個明確的定義比如IDL或者interface之類,為了簡單我們這裡選擇interface。另外由於RPC的天性,所有的調用都是非同步的,所以一個Service interface寫出來大概是這個樣子,每個method都是suspend:

interface HelloSvc {
suspend fun hello(name: String): String
}

有了這個interface,我們只需要為它動態創建一個Invocation proxy,然後在裡面塞進我們自己的代碼就好了:

inline fun <reified T : Any> getServiceProxy(vertx: Vertx, channel: String, name: String) =
Proxy.newProxyInstance(T::class.java.classLoader, arrayOf(T::class.java)) { _, method, args: Array<Any?> ->
val lastArg = args.lastOrNull()
if (lastArg is Continuation<*>) {
// The last argument of a suspend function is the Continuation object
@Suppress("UNCHECKED_CAST") val cont = lastArg as Continuation<Any?>
val argsButLast = args.take(args.size - 1)
// Send request to the given channel on the event bus
vertx.eventBus().send(channel, RpcRequest(name, method.name, argsButLast.toTypedArray()).toBytes(),
Handler<AsyncResult<Message<ByteArray>>> { event ->
// Resume the suspended coroutine on reply
if (event?.succeeded() == true) {
cont.resume(event.result().body().toRpcResponse().response)
} else {
cont.resumeWithException(event?.cause() ?: Exception("Unknown error"))
}
})
// Suspend the coroutine to wait for the reply
COROUTINE_SUSPENDED
} else {
// The function is not suspend
null
}
} as T

這段代碼有點繞,我們分開看。

最外面是為指定的interface創建一個Proxy,我們需要定義一個InvocationHandler,這個handler會截獲每一個Proxy的方法調用,對於上面那個HelloSvc來說就是那個hello。

前面我們說過interface中的每個方法都是suspend,Kotlin中的suspend function在Java中看起來不一樣,以上面那個hello為例,Java中看到的是這樣:

void hello(String name, Continuation<String> cont);

可以看到參數列表中多了一個continuation,這是因為Kotlin編譯器為每個suspend函數做了CPS變換,這樣就可以讓一個函數返回到由continuation指定的位置。

在InvocationHandler中調用這個函數時沒有Kotlin編譯器來幫忙,因為這一坨都是Java的東西,它們對coroutine一無所知,所以我們需要手工處理suspend function的suspend和resume。

首先我們把結尾處的continuation從參數列表中取出來,因為服務端對這個參數一無所知,服務端要的是除掉最後一個之外的所有參數,有了方法名字和參數列表,我們就可以把請求打包成RpcRequest了。

接下來就是發送消息並設置Result handler,然後我們返回COROUTINE_SUSPENDED讓這個coroutine暫停,這個coroutine會在收到返回後由剛才發送的message的reply handler恢復,並且我們通過cont.resume把剛才收到的返回值塞回suspend function裏。

剩下的部分就只有在EventBus上發送這個message並等待reply了,在Kotlin coroutine的加持下沒啥好說的。

有了這些東西之後,我們就可以寫RPC service/client了:

// Server
vertx.deployVerticle(RpcServerVerticle("some-channel").register("hello-svc", object {
// 這裡有沒有suspend都無所謂,callSuspend也支持普通函數
fun hello(name: String) = "Hello, $name!"
}))

// Client
interface HelloSvc {
// 這裡必須有suspend
suspend fun hello(name: String)
}

val svc = getServiceProxy(vertx, "some-channel", "hello-svc")
assertEquals("Hello, world!", svc.hello("world"))

其實這個框架還可以很簡單的擴展到HTTP上,代碼都差不多,就是把發送/接受消息的部分換成HTTP Server/Client,沒難度,不說了。

一個寫好能用的庫在這裡:

windoze/vertx-kotlin-rpc?

github.com
圖標

這裡是碼雲上的鏡像:

Windoze/vertx-kotlin-rpc?

gitee.com
圖標

推薦閱讀:
相關文章