eventbus是vert.x的神經網路,連接vert.x的單元(Handler | Verticle)關鍵組件。

如果是verticle每個孤立的小島,那麼eventBus就是連接這些島嶼的橋樑。

copy from https://www.jdon.com/soa/reactive-microservices-with-eclipse-vertx.html
copy from https://blog.csdn.net/king866/article/details/55215337

我們可以看到,verticle之間的交互是通過eventBus完成的,而不是直接的方法調用。即使在不同的機器上,也可以通過集羣模式的EventBus完成通訊。

通過eventBus調用的好處,可以保證線程安全,eventBus已經把線程安全問題解決了。例如JsonObject發送的時候會經過一次copy。

eventbus提供的功能是跟消息隊列非常相似的,有如下幾種模式。

  1. 點對點通訊模式。
  2. 廣播模式。
  3. request-response模式(基於點對點模式)

但是eventBus是不保證消息一定達到。

下面是幾個小demo。簡單說說eventBus是怎麼使用的。

consumer

consumer方法,定義一個consumer,如果是集羣模式的話,會廣播到其他節點。localConsumer方法,定義本地的consumer,如果是集羣模式的話,不會廣播到其他節點。 其中兩個方法中的lambda表達式,是接收到回復消息的業務處理。

sender

第一個send方法,是不需要接收回復的發送消息。 第二個send方法,是接收到回復消息之後,進行業務處理。

sender方法,先定義一個sender對象,定義該方法時,需要目標地址的參數,其中上例中的「demo」就是目標地址。然後就可以利用sender對象,持續的往該目標地址發送消息。 該方式與send方法的區別是, sender對象是有流控的。而send方法是沒有流控的。

publish方法,廣播。 往目標地址廣播。 上例中, 所有定義為「demo」的comsumer,都可以收到廣播。

接下來看看eventBus是怎麼實現的。


先來看看EventBus的核心api

public interface EventBus extends Measured {
//發送消息,不需要回復
EventBus send(String address, Object message);

//發送消息, 需要回復
<T> EventBus send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);

//發送消息, 自定義發送選項。(例如修改默認的超時時間)
<T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler);

//廣播
EventBus publish(String address, Object message, DeliveryOptions options);

//消費者
<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);
<T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);

//sender
<T> MessageProducer<T> sender(String address, DeliveryOptions options);

//往當前eventBus註冊消息編解碼器
EventBus registerCodec(MessageCodec codec);
<T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec);

//其他非主分支api,也比較簡單,就不再一一列舉了
}

接下來,先來分析消息編解碼器。 eventBus發送和接收參數的時候,都經過編解碼器處理。 其中基本類型和對應的包裝類型, String, Json都已經有了對應的編解碼器並默認添加到EventBus中。 當我們需要發自定義的POJO,那麼就需要實現MessageCodec,並註冊到EventBus中。

接下來繼續看MessageCodec api

public interface MessageCodec<S, R> {

//把參數s進行序列化添加到buffer中,該方法用於集羣模式
void encodeToWire(Buffer buffer, S s);

//從buffer讀取數據, 反序列化出對象R, 該方法用於集羣模式
R decodeFromWire(int pos, Buffer buffer);

//對消息參數進行處理. 例如發送Json參數發生copy,正是用該參數處理的。
//該方法主要對參數進行線程安全處理
R transform(S s);

//消息編解碼器的名稱, 該名稱必須唯一。
String name();

/**
* Used to identify system codecs. Should always return -1 for a user codec.
*
* @return -1 for a user codec.
*/
byte systemCodecID();
}

其中encodeToWire方法和decodeFromWire方法,下一篇講到集羣模式時,再詳細分析。

下面列舉一些eventBus的自實現的MessageCodec

public class StringMessageCodec implements MessageCodec<String, String> {
@Override
public void encodeToWire(Buffer buffer, String s) {
byte[] strBytes = s.getBytes(CharsetUtil.UTF_8);
buffer.appendInt(strBytes.length);
buffer.appendBytes(strBytes);
}
@Override
public String decodeFromWire(int pos, Buffer buffer) {
int length = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + length);
return new String(bytes, CharsetUtil.UTF_8);
}

@Override
public String transform(String s) {
// Strings are immutable so just return it
return s;
}
@Override
public String name() {
return "string";
}
@Override
public byte systemCodecID() {
return 9;
}
}

因為String是immutable對象,所以transform方法直接返回即可。 所有的包裝類型的消息編解碼器也是如此,因為包裝類型跟String一樣,也是不可變的。

public class JsonObjectMessageCodec implements MessageCodec<JsonObject, JsonObject> {

@Override
public void encodeToWire(Buffer buffer, JsonObject jsonObject) {
Buffer encoded = jsonObject.toBuffer();
buffer.appendInt(encoded.length());
buffer.appendBuffer(encoded);
}

@Override
public JsonObject decodeFromWire(int pos, Buffer buffer) {
int length = buffer.getInt(pos);
pos += 4;
return new JsonObject(buffer.slice(pos, pos + length));
}

@Override
public JsonObject transform(JsonObject jsonObject) {
return jsonObject.copy();
}
@Override
public String name() {
return "jsonobject";
}

@Override
public byte systemCodecID() {
return 13;
}
}

Json對象不是線程安全的, 調用send方法發送參數時,consumer與send方法對應的線程可能不是同一個。如果兩個線程直接參數進行處理(例如。josn#put方法),是會發生線程安全問題。 vert.x為了避免這種情況發生,就對json對象進行copy。致使兩個線程json不共享參數。 那麼就可以保證線程安全了。 JsonArray也是如此。

下一步分析註冊註冊消息編解碼器

先來看看EventBus的實現,EventBusImpl中的屬性。

public class EventBusImpl implements EventBus, MetricsProvider {

//攔截器,在發送消息之前,經過攔截器處理,由於攔截器用的來,就不詳細介紹了
private final List<Handler<SendContext>> interceptors = new CopyOnWriteArrayList<>();

//接收消息時,需要往本地臨時註冊一個consumer,
//那麼這個replySequence用於生成地址。
private final AtomicLong replySequence = new AtomicLong(0);

//保存consumer的map,其中key就是定義consumre時的地址。
protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>();

//編解碼器管理器
protected final CodecManager codecManager = new CodecManager();

//eventBus是否啟動標記
protected volatile boolean started;

//去掉非重要參數
}

這裡重點提一下handlerMap屬性。當我們用consumer方法註冊消費者就是往這個handlerMap添加consumer。然後send方法根據地址(key)從這個handlerMap找consumer。

所以: eventBus發送消息就是本地Map定址過程。

下面codecManager纔是消息編解碼器的重點。

//eventBusImpl關於messgeCodec的處理代理給codecManager
public class EventBusImpl implements EventBus, MetricsProvider {
public EventBus registerCodec(MessageCodec codec) {
codecManager.registerCodec(codec);
return this;
}
public <T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec) {
codecManager.registerDefaultCodec(clazz, codec);
return this;
}
}

public class CodecManager {
private final MessageCodec[] systemCodecs;
//
private final ConcurrentMap<String, MessageCodec> userCodecMap = new ConcurrentHashMap<>();

private final ConcurrentMap<Class, MessageCodec> defaultCodecMap = new ConcurrentHashMap<>();
public CodecManager() {
this.systemCodecs = codecs(NULL_MESSAGE_CODEC, PING_MESSAGE_CODEC, STRING_MESSAGE_CODEC, BUFFER_MESSAGE_CODEC, JSON_OBJECT_MESSAGE_CODEC, JSON_ARRAY_MESSAGE_CODEC,
BYTE_ARRAY_MESSAGE_CODEC, INT_MESSAGE_CODEC, LONG_MESSAGE_CODEC, FLOAT_MESSAGE_CODEC, DOUBLE_MESSAGE_CODEC,
BOOLEAN_MESSAGE_CODEC, SHORT_MESSAGE_CODEC, CHAR_MESSAGE_CODEC, BYTE_MESSAGE_CODEC, REPLY_EXCEPTION_MESSAGE_CODEC);
}

public void registerCodec(MessageCodec codec) {
userCodecMap.put(codec.name(), codec);
}

public <T> void registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec) {
defaultCodecMap.put(clazz, codec);
userCodecMap.put(codec.name(), codec);
}
//去挑o 去掉非關鍵代碼
}

所以registerCodec方法只是往userCodecMap添加。 而registerDefaultCodec除了會往userCodecMap添加,同時也會往defaultCodecMap添加。

CodecManager最為關鍵的方法,lookupCodec方法。其中參數body就是消息參數,codecName參數在發送消息時,指定發送選項DeliveryOptions中的codecName(默認null)

方法執行邏輯:

  1. 從userCodecMap找messageCodec如果存在codecName的情況下。
  2. 找不到就根據消息參數類型逐一比較默認的消息編解碼器。
  3. 找不到最後再去defaultCodecMap
  4. 最後找不到則拋出異常。

public MessageCodec lookupCodec(Object body, String codecName) {
MessageCodec codec;
if (codecName != null) {
codec = userCodecMap.get(codecName);
if (codec == null) {
throw new IllegalArgumentException("No message codec for name: " + codecName);
}
} else if (body == null) {
codec = NULL_MESSAGE_CODEC;
} else if (body instanceof String) {
codec = STRING_MESSAGE_CODEC;
} else if (body instanceof Buffer) {
codec = BUFFER_MESSAGE_CODEC;
} else if (body instanceof JsonObject) {
codec = JSON_OBJECT_MESSAGE_CODEC;
} else if (body instanceof JsonArray) {
codec = JSON_ARRAY_MESSAGE_CODEC;
} else if (body instanceof byte[]) {
codec = BYTE_ARRAY_MESSAGE_CODEC;
} else if (body instanceof Integer) {
codec = INT_MESSAGE_CODEC;
} else if (body instanceof Long) {
codec = LONG_MESSAGE_CODEC;
} else if (body instanceof Float) {
codec = FLOAT_MESSAGE_CODEC;
} else if (body instanceof Double) {
codec = DOUBLE_MESSAGE_CODEC;
} else if (body instanceof Boolean) {
codec = BOOLEAN_MESSAGE_CODEC;
} else if (body instanceof Short) {
codec = SHORT_MESSAGE_CODEC;
} else if (body instanceof Character) {
codec = CHAR_MESSAGE_CODEC;
} else if (body instanceof Byte) {
codec = BYTE_MESSAGE_CODEC;
} else if (body instanceof ReplyException) {
codec = defaultCodecMap.get(body.getClass());
if (codec == null) {
codec = REPLY_EXCEPTION_MESSAGE_CODEC;
}
} else {
codec = defaultCodecMap.get(body.getClass());
if (codec == null) {
throw new IllegalArgumentException("No message codec for type: " + body.getClass());
}
}
return codec;
}

好了,消息編解碼器已經講完。 接下來分析consumer邏輯。

Consumer實現

public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = consumer(address);
consumer.handler(handler);
return consumer;
}

public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1);
}

在調用consumer方法時,創建了HandlerRegistration實例,並調用其handler方法。

HandlerRegistration中包括了流控相關的代碼,還包括send方法註冊的臨時consumer,所以HandlerRegistration代碼邏輯感覺有點混亂。

繼續分析HandlerRegistration

public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Message<T>> {

public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;

private final Vertx vertx;
private final EventBusMetrics metrics;
private final EventBusImpl eventBus;

//註冊地址
private final String address;

//send方法臨時consumer或者reply的時候用的,發送的目標地址
//個人覺得叫做sendAddress更為適合點
private final String repliedAddress;

//是否僅註冊到本地的consumer中,用於localConsumer方法中
//
private final boolean localOnly;

//send方法相關的
private final Handler<AsyncResult<Message<T>>> asyncResultHandler;

//定時任務對應的id,send方法的超時時間實現。
//在構造方法中, 如果timeout != -1的情況下,創建定時任務
private long timeoutID = -1;

//是否完成了註冊
private boolean registered;

//consumer對應的業務處理handler
private Handler<Message<T>> handler;

//consumer所在的Context
private Context handlerContext;

//集羣模式相關的
private AsyncResult<Void> result;
private Handler<AsyncResult<Void>> completionHandler;

//流控相關的
private Handler<Void> endHandler;
private Handler<Message<T>> discardHandler;
private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
private final Queue<Message<T>> pending = new ArrayDeque<>(8);
private boolean paused;
private Object metric;

public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address,
String repliedAddress, boolean localOnly,
Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) {
this.vertx = vertx;
this.metrics = metrics;
this.eventBus = eventBus;
this.address = address;
this.repliedAddress = repliedAddress;
this.localOnly = localOnly;
this.asyncResultHandler = asyncResultHandler;

//send方法或者reply方法也註冊回調的時候, timeout != -1的情況
if (timeout != -1) {
timeoutID = vertx.setTimer(timeout, tid -> {
if (metrics != null) {
metrics.replyFailure(address, ReplyFailure.TIMEOUT);
}
sendAsyncResultFailure(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress);
});
}
}

在consumer方法的情況下, 創建HandlerRegistration。

address參數就是consumer方法中定義的, repliedAddress = null, localOnly = false, asyncResultHandler = null, timeout = -1;

此時只是創建HandlerRegistration對象,還未添加到eventBus中。

在調用handler方法時,才註冊到eventBus中。

//這些方法都在eventBusImpl中

//設置handler,並添加到eventBus中
public synchronized MessageConsumer<T> handler(Handler<Message<T>> handler) {
this.handler = handler;
if (this.handler != null && !registered) {
registered = true;
eventBus.addRegistration(address, this, repliedAddress != null, localOnly);
} else if (this.handler == null && registered) {
// This will set registered to false
this.unregister();
}
return this;
}

//添加到eventBus中
protected <T> void addRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {

boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly);
addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);
}

//集羣模式用
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
completionHandler.handle(Future.succeededFuture());
}

//添加到eventBus中
protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address");

//獲取當前的context
Context context = Vertx.currentContext();
boolean hasContext = context != null;
if (!hasContext) {
// Embedded
context = vertx.getOrCreateContext();
}

//設置當前consumer的context
registration.setHandlerContext(context);

boolean newAddress = false;

//對consumer進行封裝, 主要封裝consumer刪除邏輯
HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);

//handlerMap前面分析過了, handlers就是相同的address的consumer集合。
Handlers handlers = handlerMap.get(address);

if (handlers == null) {
handlers = new Handlers();
Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers);
if (prevHandlers != null) {
handlers = prevHandlers;
}
newAddress = true;
}
//最後往這個consumer集合中添加consumer。
handlers.list.add(holder);

if (hasContext) {
HandlerEntry entry = new HandlerEntry<>(address, registration);
context.addCloseHook(entry);
}

return newAddress;
}

詳情說說addLocalRegistration方法。

獲取Context,然後設置到consumer實例中,那麼就可以實現Consumer一直執行在定義consumer所在的eventLoop中。 關於Context,之前寫過文章分析過了。

接著從EventBus中的handlerMap屬性中,根據address為key,找到consumer集合。所以eventBus是可以添加到多個相同address的consumer的。

然後把consumer添加到handlers,即添加到consumer集合中。

public class Handlers {
private final AtomicInteger pos = new AtomicInteger(0);
public final List<HandlerHolder> list = new CopyOnWriteArrayList<>();
public HandlerHolder choose() {
while (true) {
int size = list.size();
if (size == 0) {
return null;
}
int p = pos.getAndIncrement();
if (p >= size - 1) {
pos.set(0);
}
try {
return list.get(p);
} catch (IndexOutOfBoundsException e) {
// Can happen
pos.set(0);
}
}
}
}

Handlers對象實現也很簡單,就是list。 其中choose方法就是輪詢選取一個consumer。 所以當eventBus中有多個相同address的consumer的話,這些consumer將會輪詢執行。

至此,consumer實現分析完了。

由於篇幅太長了。 所以send方法,sender方法, publish方法, Message相關,就換個章節分析。

good night.


推薦閱讀:
相關文章