在上一篇文章中分析了eventBus consumer方法的實質就是往eventBus中map添加handler,其中address就是key。

接著上一篇文章,繼續分析send相關的源碼。

先分析send方法。

//
public <T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
sendOrPubInternal(createMessage(true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
return this;
}

  • address:發送的地址,consumer註冊到eventBus的地址。
  • message: 發送的參數。
  • options: 發送選項,如果未指定,就創建一個默認的。
  • replyHandler:消息返回時處理的handler。

在EventBusImpl 104行中, 所有send方法最終都是到這裡來。 在這個方法裡面,先調用createMessage方法創建一個Message。 然後再調用sendOrPubInternal方法發送這個新創建的message。

protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
MessageCodec codec = codecManager.lookupCodec(body, codecName);
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, this);
return msg;
}

  • 在createMessage方法裡面,lookupCodec方法是上一篇文章分析過的。用於查找消息編解碼器。
  • 然後根據查找到的消息編解碼器,再創建一個MessageImpl實例。
  • 分析這個messageImpl類。

public class MessageImpl<U, V> implements Message<V> {

protected MessageCodec<U, V> messageCodec; //消息編解碼器
protected EventBusImpl bus; //eventBus實例
protected String address; //發送的目標地址
protected String replyAddress; //消息回復時的地址
protected MultiMap headers; //消息header
protected U sentBody; //消息參數
protected V receivedBody; //經過消息編解碼器處理過的消息參數
protected boolean send; //true: 發送消息, false: 廣播

public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, EventBusImpl bus) {
this.messageCodec = messageCodec;
this.address = address;
this.replyAddress = replyAddress;
this.headers = headers;
this.sentBody = sentBody;
this.send = send;
this.bus = bus;
}
}

創建message的過程比較簡單。 注意一個參數: replyAddress。 此時replyAddress == null,下一步的時候,會給這個replyAddress賦值。接著往下走。

private <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(message, options, replyHandler);
SendContextImpl<T> sendContext = new SendContextImpl<>(message, options, replyHandlerRegistration);
sendContext.next();
}

sendOrPubInternal是個內部方法, 方法裡面createReplyHandlerRegistration方法的調用是判斷replyHandler是否為空,如果不為空,就臨時註冊一個consumer,用於接收消息回復。

private <T> HandlerRegistration<T> createReplyHandlerRegistration(MessageImpl message,
DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyHandler != null) {
long timeout = options.getSendTimeout(); //
String replyAddress = generateReplyAddress(); //生成回復地址
message.setReplyAddress(replyAddress); //把回復地址設置到message
Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
//創建consumer,然後註冊到本地eventBus中
HandlerRegistration<T> registration =
new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
registration.handler(simpleReplyHandler);
return registration;
} else {
return null;
}
}

首先先獲取發送選項中的timeout,用於創建consumer。

然後調用generateReplyAddress方法生成replyAddress,並設置到message中,然後調用message#reply方法時,就可以找到這個臨時註冊的consuemr。

用生成的replyAddress作為address創建一個臨時的consumer

創建並註冊consumer上一篇文章分析過了,就不再分析了。

創建好consumer之後。

SendContextImpl<T> sendContext =newSendContextImpl<>(message, options, replyHandlerRegistration); sendContext.next();
sendContext.next();

創建一個sendContext,然後調用其next方法,sendContext比較簡單,就不分析了。相關攔截器也是在sendContext和next方法中實現的。

public void next() {
if (iter.hasNext()) {
Handler<SendContext> handler = iter.next();
try {
handler.handle(this);
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
sendOrPub(this);
}
}

其中iter是攔截器的Iterator,一般都是空。直接在看sendOrPub方法。

protected <T> void sendOrPub(SendContextImpl<T> sendContext) {
//刪除非主分支代碼
deliverMessageLocally(sendContext);
}

//這裡主要調用deliverMessageLocally方法,返回false,代表未找到consumer。
protected <T> void deliverMessageLocally(SendContextImpl<T> sendContext) {
if (!deliverMessageLocally(sendContext.message)) {
// no handlers
if (metrics != null) {
metrics.replyFailure(sendContext.message.address, ReplyFailure.NO_HANDLERS);
}
//回調通知send方法定義的handler(臨時註冊的consumer)未找到目標consumer。
//其實如果走到這個分支,整個過程目前來看還是同步的。
if (sendContext.handlerRegistration != null) {
sendContext.handlerRegistration.sendAsyncResultFailure(ReplyFailure.NO_HANDLERS, "No handlers for address "
+ sendContext.message.address);
}
}
}

sendOrPub方法調用deliverMessageLocally方法。

其中deliverMessageLocally方法主要處理髮送之後的處理。如果發送成功,就不處理任何操作,send發送的調用過程也完成了。如果發送失敗(未找到consumer的失敗),那麼回調通知handler。如果是失敗分支的話,可以看得出來,這個分支的過程是同步的。即同步回調handler。

接著分析最重要的方法。deliverMessageLocally

//刪除一些指標監控的代碼
protected <T> boolean deliverMessageLocally(MessageImpl msg) {
msg.setBus(this);
//map中查找handler, handlers是多個handler集合。handler就是consumer。
Handlers handlers = handlerMap.get(msg.address());
if (handlers != null) {
//發送send,否則就是廣播
if (msg.isSend()) {
//選擇一個handler。輪詢
HandlerHolder holder = handlers.choose();
//調用handler處理message
if (holder != null) {
deliverToHandler(msg, holder);
}
} else {
//廣播
//即調用所有的handler處理。所以整個send和publish的區別主要在這。
for (HandlerHolder holder: handlers.list) {
deliverToHandler(msg, holder);
}
}
return true;
} else {
//未找到handler。
return false;
}
}

private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
//對message進行copy。
Message<T> copied = msg.copyBeforeReceive();
//通過context,代碼執行在consumer所在的線程。
holder.getContext().runOnContext((v) -> {
try {
if (!holder.isRemoved()) {
holder.getHandler().handle(copied);
}
} finally {
//如果是臨時註冊的consumer, 調用完之後,就註銷掉。
if (holder.isReplyHandler()) {
holder.getHandler().unregister();
}
}
});
}

首先就是從eventBus的屬性handlerMap中查找handlers,即handler集合。該handler集合中包含了所有相同address的consumer。

然後判斷message是send模式 or publish模式。

  • send模式: 就是handlers(handler集合)中輪詢的方式選取出一個handler。執行deliverToHandler方法。
  • publish模式: 廣播, 遍歷所有的handler執行deliverToHandler。所以分析完send模式,publish模式也就基本差不多流程了。

deliverToHandler方法: 對message進行copy,然後根據handler中的context,執行到consumer所在的線程,如果consumer在Verticle中調用的,那麼context就是來自Verticle的Context。

繼續看Message copy。

Message<T> copied = msg.copyBeforeReceive();

//這裡copy就是創建一個新的Message
public MessageImpl<U, V> copyBeforeReceive() {
return new MessageImpl<>(this);
}

protected MessageImpl(MessageImpl<U, V> other) {
this.bus = other.bus;
this.address = other.address;
this.replyAddress = other.replyAddress;
this.messageCodec = other.messageCodec;
//對header進行copy
if (other.headers != null) {
List<Map.Entry<String, String>> entries = other.headers.entries();
this.headers = new CaseInsensitiveHeaders();
for (Map.Entry<String, String> entry: entries) {
this.headers.add(entry.getKey(), entry.getValue());
}
}
if (other.sentBody != null) {
this.sentBody = other.sentBody;
//上一篇文章說到消息編解碼器,對消息參數進行copy,就發現在這裡
this.receivedBody = messageCodec.transform(other.sentBody);
}
this.send = other.send;
}

message copy進行創建一個新的message,其中最重要的部分就是messageCodec.transform(other.sentBody),對消息參數進行線程安全保障處理。

因為接下來存在跨線程調用,為了保證線程安全, 對線程不安全的參數就進行copy處理。 簡單有效的處理線程安全問題。

holder.getHandler().handle(copied)這個方法裡面,包括流控處理和調用consumer中定義的handler。

holder.getHandler()返回的是consumer實現類,定義其handle方法。此時調用該方法不再是原來的send方法所在的線程了。通過context#runOnContext回到consumer所在的線程。

public void handle(Message<T> message) {
Handler<Message<T>> theHandler;
synchronized (this) {
//consumer暫時了,
if (paused) {
//緩衝隊列未超過最大, 默認是1000,進行隊列中
if (pending.size() < maxBufferedMessages) {
pending.add(message);
} else {
//否則就調用discardHandler處理, discardHandler == null ,就丟棄。
if (discardHandler != null) {
discardHandler.handle(message);
} else {
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address);
}
}
return;
} else {
//先進先出。即如果存在之前到的message,那麼要先進行處理。
if (pending.size() > 0) {
pending.add(message);
message = pending.poll();
}
theHandler = handler;
}
}
//
deliver(theHandler, message);
}

private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
checkNextTick(); //流控相關。
boolean local = true;
//集羣相關忽略。
if (message instanceof ClusteredMessage) {
// A bit hacky
ClusteredMessage cmsg = (ClusteredMessage)message;
if (cmsg.isFromWire()) {
local = false;
}
}

//刪除指標監控相關代碼
//流控相關。 直接調用
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}

//theHandler就是在consumer中定義的handler。 調用我
try {
theHandler.handle(message);
} catch (Exception e) {
log.error("Failed to handleMessage. address: " + message.address(), e);
throw e;
}
}

其中,流控相關的代碼,接下來的sender方法中分析。

至此, send方法的流程就講完了。 那麼包括publish方法也大同小異了。就不再講解了。

另外還有一個額外的分支,timeout。如果send方法臨時註冊的consumer在超過timeout未收到回復message,那麼就會收到一個timeout的異常。

//在創建臨時consumer時, timeout != -1
public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address,
String repliedAddress, boolean localOnly,
Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) {
this.vertx = vertx;
this.metrics = metrics;
this.eventBus = eventBus;
this.address = address;
this.repliedAddress = repliedAddress;
this.localOnly = localOnly;
this.asyncResultHandler = asyncResultHandler;
//臨時註冊的consumer,會定義一個定時器
if (timeout != -1) {
timeoutID = vertx.setTimer(timeout, tid -> {
if (metrics != null) {
metrics.replyFailure(address, ReplyFailure.TIMEOUT);
}
sendAsyncResultFailure(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress);
});
}
}

因為send方法中臨時創建的consumer timeout != -1, 那麼會定義一個定時器。當timeout時間內未收到回復。就會執行timeout操作。回調通知send方法中定義的handler。其中參數是timeoutException。並執行註銷操作。

前面分析過程中, 說到臨時註冊consumer,調用完之後,就會執行註銷操作, 其中註銷操作就包括取消這個定時器。


接下來分析Message#reply

絕大多數情況下,都是調用這個reply方法。

public void reply(Object message) {
reply(message, new DeliveryOptions(), null);
}

最終reply方法都是調用的是這個。

public <R> void reply(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
if (replyAddress != null) {
sendReply(bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
}
}

如果replyAddress == null,那麼reply不會執行。 那什麼情況下replyAddress == null呢

根據前面的分析,send方法未定義handler時。即調用EventBus這兩個方法時。

  • EventBus send(String address, Object message);
  • EventBus send(String address, Object message, DeliveryOptions options);

這裡 createMessage方法跟前面分析一樣。其中replyAddress就是這個新創建message的目標地址。

接著sendReply方法。

protected <T> void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyMessage.address() == null) {
throw new IllegalStateException("address not specified");
} else {
//這裡跟前面分析的一樣
HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(replyMessage, options, replyHandler);
//
new ReplySendContextImpl<>(replyMessage, options, replyHandlerRegistration, replierMessage).next();
}
}

createReplyHandlerRegistration方法,就是前面分析創建臨時consumer,大多數情況, reply的時候都不再創建收到消息的consumer。所以這裡一般是null

接著創建ReplySendContextImpl實例, 並調用其next方法。 這個ReplySendContextImpl根據前面分析過的SendContextImpl差不多, ReplySendContextImpl繼承自SendContextImpl。

next方法跟SendContextImpl基本一樣,留了一些拓展方法,當前版本還未有拓展。所以可以認為ReplySendContextImpl跟SendContextImpl。那麼next方法又回到上面分析過的流程了。


最後來看看Sender方法。

先看來sender方法。就是創建MessageProducerImpl實例。

public <T> MessageProducer<T> sender(String address) {
return new MessageProducerImpl<>(vertx, address, true, new DeliveryOptions());
}

public <T> MessageProducer<T> sender(String address, DeliveryOptions options) {
return new MessageProducerImpl<>(vertx, address, true, options);
}

MessageProducer就是

public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
this.vertx = vertx;
this.bus = vertx.eventBus();
this.address = address;
this.send = send;
this.options = options;
if (send) {
String creditAddress = UUID.randomUUID().toString() + "-credit";

creditConsumer = bus.consumer(creditAddress, msg -> {
doReceiveCredit(msg.body());
});
//往option#header添加參數, value就是上面
options.addHeader(CREDIT_ADDRESS_HEADER_NAME, creditAddress);
} else {
creditConsumer = null;
}
}

public class MessageProducerImpl<T> implements MessageProducer<T> {

public static final String CREDIT_ADDRESS_HEADER_NAME = "__vertx.credit";

private final Vertx vertx;
private final EventBus bus;
private final boolean send; //true: send模式, false: publish模式
private final String address;
private final Queue<T> pending = new ArrayDeque<>(); //緩衝隊列
private final MessageConsumer<Integer> creditConsumer; //構造方法中創建的consumer
private DeliveryOptions options; //發送選項 調用該類的send相關方法,都是使用這個options

//緩衝隊列最大size,默認1000, 用於調用drainHandler
private int maxSize = DEFAULT_WRITE_QUEUE_MAX_SIZE;

//翻譯成令牌 比較適合, 當credits > 0才能send,否則先加到緩衝區。
private int credits = DEFAULT_WRITE_QUEUE_MAX_SIZE;

private Handler<Void> drainHandler;

public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
this.vertx = vertx;
this.bus = vertx.eventBus();
this.address = address;
this.send = send;
this.options = options;
//send模式
if (send) {
String creditAddress = UUID.randomUUID().toString() + "-credit";
//本地註冊一個consumer
creditConsumer = bus.consumer(creditAddress, msg -> {
doReceiveCredit(msg.body());
});
//往option#header添加參數, value就是上面consumer的地址
options.addHeader(CREDIT_ADDRESS_HEADER_NAME, creditAddress);
} else {
creditConsumer = null;
}
}

先看構造方法中定義的consumer。其中handler執行了doReceiveCredit方法。

MessageProducerImpl所有send方法,都會用到裡面定義的options。其中options的header中就包括一個特殊的參數。key:CREDIT_ADDRESS_HEADER_NAME。 value:構造方法中定義的consumer地址。

在前面分析過程中, 調用consumer的邏輯中, 有一段這樣的代碼。

String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}

從message#header中取key:CREDIT_ADDRESS_HEADER_NAME。 如果取的到, 說明該消息是從MessageProducerImpl發出來的。 然後對這個MessageProducer中的consumer發送消息。

最終調用到MessageProducerImpl#doReceiveCredit方法。

//最終調用到該方法
private synchronized void doReceiveCredit(int credit) {
//增加credits
credits += credit;
//credits > 0代表可能發送消息, 如果緩衝隊列中還有消息,就取出來發送。
while (credits > 0) {
T data = pending.poll();
if (data == null) {
break;
} else {
credits--;
bus.send(address, data, options);
}
}
checkDrained();
}

增加credits。

public MessageProducer<T> send(T message) {
doSend(message, null);
return this;
}

public synchronized MessageProducer<T> write(T data) {
if (send) {
doSend(data, null);
} else {
bus.publish(address, data, options);
}
return this;
}

private synchronized <R> void doSend(T data, Handler<AsyncResult<Message<R>>> replyHandler) {
if (credits > 0) {
credits--;
//這裡bus.send就是上面分析過的了。
if (replyHandler == null) {
bus.send(address, data, options);
} else {
bus.send(address, data, options, replyHandler);
}
} else {
//這裡直接丟掉replyHandler,目測應該是個bug。
pending.add(data);
}
}

最終調用到doSend, 如果credits > 0, 就發送, 否則就加到pending隊列中。

所以說sender方法,在調用send或者write方法,是包含流控的。

至此,local模式的EventBus已經分析完了。

其中, 流控相關的代碼, WriteStream, ReadStream、Pump另起一篇文章詳細說說。 MessageProducer也正是繼承WriteStream,MessageConsumer繼承ReadStream。所以他兩必須實現流控。關於EventBus的流控相關代碼, 未詳細深入。

good night.


推薦閱讀:
相關文章