eventbus是vert.x的神經網路,連接vert.x的單元(Handler | Verticle)關鍵組件。
如果是verticle每個孤立的小島,那麼eventBus就是連接這些島嶼的橋樑。
我們可以看到,verticle之間的交互是通過eventBus完成的,而不是直接的方法調用。即使在不同的機器上,也可以通過集羣模式的EventBus完成通訊。
通過eventBus調用的好處,可以保證線程安全,eventBus已經把線程安全問題解決了。例如JsonObject發送的時候會經過一次copy。
eventbus提供的功能是跟消息隊列非常相似的,有如下幾種模式。
但是eventBus是不保證消息一定達到。
下面是幾個小demo。簡單說說eventBus是怎麼使用的。
consumer方法,定義一個consumer,如果是集羣模式的話,會廣播到其他節點。localConsumer方法,定義本地的consumer,如果是集羣模式的話,不會廣播到其他節點。 其中兩個方法中的lambda表達式,是接收到回復消息的業務處理。
第一個send方法,是不需要接收回復的發送消息。 第二個send方法,是接收到回復消息之後,進行業務處理。
sender方法,先定義一個sender對象,定義該方法時,需要目標地址的參數,其中上例中的「demo」就是目標地址。然後就可以利用sender對象,持續的往該目標地址發送消息。 該方式與send方法的區別是, sender對象是有流控的。而send方法是沒有流控的。
publish方法,廣播。 往目標地址廣播。 上例中, 所有定義為「demo」的comsumer,都可以收到廣播。
接下來看看eventBus是怎麼實現的。
先來看看EventBus的核心api
public interface EventBus extends Measured { //發送消息,不需要回復 EventBus send(String address, Object message);
//發送消息, 需要回復 <T> EventBus send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);
//發送消息, 自定義發送選項。(例如修改默認的超時時間) <T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler);
//廣播 EventBus publish(String address, Object message, DeliveryOptions options);
//消費者 <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler); <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);
//sender <T> MessageProducer<T> sender(String address, DeliveryOptions options);
//往當前eventBus註冊消息編解碼器 EventBus registerCodec(MessageCodec codec); <T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec);
//其他非主分支api,也比較簡單,就不再一一列舉了 }
接下來,先來分析消息編解碼器。 eventBus發送和接收參數的時候,都經過編解碼器處理。 其中基本類型和對應的包裝類型, String, Json都已經有了對應的編解碼器並默認添加到EventBus中。 當我們需要發自定義的POJO,那麼就需要實現MessageCodec,並註冊到EventBus中。
接下來繼續看MessageCodec api
public interface MessageCodec<S, R> {
//把參數s進行序列化添加到buffer中,該方法用於集羣模式 void encodeToWire(Buffer buffer, S s);
//從buffer讀取數據, 反序列化出對象R, 該方法用於集羣模式 R decodeFromWire(int pos, Buffer buffer);
//對消息參數進行處理. 例如發送Json參數發生copy,正是用該參數處理的。 //該方法主要對參數進行線程安全處理 R transform(S s);
//消息編解碼器的名稱, 該名稱必須唯一。 String name();
/** * Used to identify system codecs. Should always return -1 for a user codec. * * @return -1 for a user codec. */ byte systemCodecID(); }
其中encodeToWire方法和decodeFromWire方法,下一篇講到集羣模式時,再詳細分析。
下面列舉一些eventBus的自實現的MessageCodec
public class StringMessageCodec implements MessageCodec<String, String> { @Override public void encodeToWire(Buffer buffer, String s) { byte[] strBytes = s.getBytes(CharsetUtil.UTF_8); buffer.appendInt(strBytes.length); buffer.appendBytes(strBytes); } @Override public String decodeFromWire(int pos, Buffer buffer) { int length = buffer.getInt(pos); pos += 4; byte[] bytes = buffer.getBytes(pos, pos + length); return new String(bytes, CharsetUtil.UTF_8); }
@Override public String transform(String s) { // Strings are immutable so just return it return s; } @Override public String name() { return "string"; } @Override public byte systemCodecID() { return 9; } }
因為String是immutable對象,所以transform方法直接返回即可。 所有的包裝類型的消息編解碼器也是如此,因為包裝類型跟String一樣,也是不可變的。
public class JsonObjectMessageCodec implements MessageCodec<JsonObject, JsonObject> {
@Override public void encodeToWire(Buffer buffer, JsonObject jsonObject) { Buffer encoded = jsonObject.toBuffer(); buffer.appendInt(encoded.length()); buffer.appendBuffer(encoded); }
@Override public JsonObject decodeFromWire(int pos, Buffer buffer) { int length = buffer.getInt(pos); pos += 4; return new JsonObject(buffer.slice(pos, pos + length)); }
@Override public JsonObject transform(JsonObject jsonObject) { return jsonObject.copy(); } @Override public String name() { return "jsonobject"; }
@Override public byte systemCodecID() { return 13; } }
Json對象不是線程安全的, 調用send方法發送參數時,consumer與send方法對應的線程可能不是同一個。如果兩個線程直接參數進行處理(例如。josn#put方法),是會發生線程安全問題。 vert.x為了避免這種情況發生,就對json對象進行copy。致使兩個線程json不共享參數。 那麼就可以保證線程安全了。 JsonArray也是如此。
先來看看EventBus的實現,EventBusImpl中的屬性。
public class EventBusImpl implements EventBus, MetricsProvider {
//攔截器,在發送消息之前,經過攔截器處理,由於攔截器用的來,就不詳細介紹了 private final List<Handler<SendContext>> interceptors = new CopyOnWriteArrayList<>();
//接收消息時,需要往本地臨時註冊一個consumer, //那麼這個replySequence用於生成地址。 private final AtomicLong replySequence = new AtomicLong(0);
//保存consumer的map,其中key就是定義consumre時的地址。 protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>();
//編解碼器管理器 protected final CodecManager codecManager = new CodecManager();
//eventBus是否啟動標記 protected volatile boolean started;
//去掉非重要參數 }
這裡重點提一下handlerMap屬性。當我們用consumer方法註冊消費者就是往這個handlerMap添加consumer。然後send方法根據地址(key)從這個handlerMap找consumer。
所以: eventBus發送消息就是本地Map定址過程。
下面codecManager纔是消息編解碼器的重點。
//eventBusImpl關於messgeCodec的處理代理給codecManager public class EventBusImpl implements EventBus, MetricsProvider { public EventBus registerCodec(MessageCodec codec) { codecManager.registerCodec(codec); return this; } public <T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec) { codecManager.registerDefaultCodec(clazz, codec); return this; } }
public class CodecManager { private final MessageCodec[] systemCodecs; // private final ConcurrentMap<String, MessageCodec> userCodecMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Class, MessageCodec> defaultCodecMap = new ConcurrentHashMap<>(); public CodecManager() { this.systemCodecs = codecs(NULL_MESSAGE_CODEC, PING_MESSAGE_CODEC, STRING_MESSAGE_CODEC, BUFFER_MESSAGE_CODEC, JSON_OBJECT_MESSAGE_CODEC, JSON_ARRAY_MESSAGE_CODEC, BYTE_ARRAY_MESSAGE_CODEC, INT_MESSAGE_CODEC, LONG_MESSAGE_CODEC, FLOAT_MESSAGE_CODEC, DOUBLE_MESSAGE_CODEC, BOOLEAN_MESSAGE_CODEC, SHORT_MESSAGE_CODEC, CHAR_MESSAGE_CODEC, BYTE_MESSAGE_CODEC, REPLY_EXCEPTION_MESSAGE_CODEC); }
public void registerCodec(MessageCodec codec) { userCodecMap.put(codec.name(), codec); }
public <T> void registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec) { defaultCodecMap.put(clazz, codec); userCodecMap.put(codec.name(), codec); } //去挑o 去掉非關鍵代碼 }
所以registerCodec方法只是往userCodecMap添加。 而registerDefaultCodec除了會往userCodecMap添加,同時也會往defaultCodecMap添加。
CodecManager最為關鍵的方法,lookupCodec方法。其中參數body就是消息參數,codecName參數在發送消息時,指定發送選項DeliveryOptions中的codecName(默認null)
方法執行邏輯:
public MessageCodec lookupCodec(Object body, String codecName) { MessageCodec codec; if (codecName != null) { codec = userCodecMap.get(codecName); if (codec == null) { throw new IllegalArgumentException("No message codec for name: " + codecName); } } else if (body == null) { codec = NULL_MESSAGE_CODEC; } else if (body instanceof String) { codec = STRING_MESSAGE_CODEC; } else if (body instanceof Buffer) { codec = BUFFER_MESSAGE_CODEC; } else if (body instanceof JsonObject) { codec = JSON_OBJECT_MESSAGE_CODEC; } else if (body instanceof JsonArray) { codec = JSON_ARRAY_MESSAGE_CODEC; } else if (body instanceof byte[]) { codec = BYTE_ARRAY_MESSAGE_CODEC; } else if (body instanceof Integer) { codec = INT_MESSAGE_CODEC; } else if (body instanceof Long) { codec = LONG_MESSAGE_CODEC; } else if (body instanceof Float) { codec = FLOAT_MESSAGE_CODEC; } else if (body instanceof Double) { codec = DOUBLE_MESSAGE_CODEC; } else if (body instanceof Boolean) { codec = BOOLEAN_MESSAGE_CODEC; } else if (body instanceof Short) { codec = SHORT_MESSAGE_CODEC; } else if (body instanceof Character) { codec = CHAR_MESSAGE_CODEC; } else if (body instanceof Byte) { codec = BYTE_MESSAGE_CODEC; } else if (body instanceof ReplyException) { codec = defaultCodecMap.get(body.getClass()); if (codec == null) { codec = REPLY_EXCEPTION_MESSAGE_CODEC; } } else { codec = defaultCodecMap.get(body.getClass()); if (codec == null) { throw new IllegalArgumentException("No message codec for type: " + body.getClass()); } } return codec; }
好了,消息編解碼器已經講完。 接下來分析consumer邏輯。
Consumer實現
public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) { Objects.requireNonNull(handler, "handler"); MessageConsumer<T> consumer = consumer(address); consumer.handler(handler); return consumer; }
public <T> MessageConsumer<T> consumer(String address) { checkStarted(); Objects.requireNonNull(address, "address"); return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1); }
在調用consumer方法時,創建了HandlerRegistration實例,並調用其handler方法。
HandlerRegistration中包括了流控相關的代碼,還包括send方法註冊的臨時consumer,所以HandlerRegistration代碼邏輯感覺有點混亂。
繼續分析HandlerRegistration
public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Message<T>> {
public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
private final Vertx vertx; private final EventBusMetrics metrics; private final EventBusImpl eventBus;
//註冊地址 private final String address;
//send方法臨時consumer或者reply的時候用的,發送的目標地址 //個人覺得叫做sendAddress更為適合點 private final String repliedAddress;
//是否僅註冊到本地的consumer中,用於localConsumer方法中 // private final boolean localOnly;
//send方法相關的 private final Handler<AsyncResult<Message<T>>> asyncResultHandler;
//定時任務對應的id,send方法的超時時間實現。 //在構造方法中, 如果timeout != -1的情況下,創建定時任務 private long timeoutID = -1;
//是否完成了註冊 private boolean registered;
//consumer對應的業務處理handler private Handler<Message<T>> handler;
//consumer所在的Context private Context handlerContext;
//集羣模式相關的 private AsyncResult<Void> result; private Handler<AsyncResult<Void>> completionHandler;
//流控相關的 private Handler<Void> endHandler; private Handler<Message<T>> discardHandler; private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES; private final Queue<Message<T>> pending = new ArrayDeque<>(8); private boolean paused; private Object metric;
public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address, String repliedAddress, boolean localOnly, Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) { this.vertx = vertx; this.metrics = metrics; this.eventBus = eventBus; this.address = address; this.repliedAddress = repliedAddress; this.localOnly = localOnly; this.asyncResultHandler = asyncResultHandler;
//send方法或者reply方法也註冊回調的時候, timeout != -1的情況 if (timeout != -1) { timeoutID = vertx.setTimer(timeout, tid -> { if (metrics != null) { metrics.replyFailure(address, ReplyFailure.TIMEOUT); } sendAsyncResultFailure(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress); }); } }
在consumer方法的情況下, 創建HandlerRegistration。
address參數就是consumer方法中定義的, repliedAddress = null, localOnly = false, asyncResultHandler = null, timeout = -1;
此時只是創建HandlerRegistration對象,還未添加到eventBus中。
在調用handler方法時,才註冊到eventBus中。
//這些方法都在eventBusImpl中 //設置handler,並添加到eventBus中 public synchronized MessageConsumer<T> handler(Handler<Message<T>> handler) { this.handler = handler; if (this.handler != null && !registered) { registered = true; eventBus.addRegistration(address, this, repliedAddress != null, localOnly); } else if (this.handler == null && registered) { // This will set registered to false this.unregister(); } return this; }
//添加到eventBus中 protected <T> void addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) {
boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly); addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult); }
//集羣模式用 protected <T> void addRegistration(boolean newAddress, String address, boolean replyHandler, boolean localOnly, Handler<AsyncResult<Void>> completionHandler) { completionHandler.handle(Future.succeededFuture()); }
//添加到eventBus中 protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) { Objects.requireNonNull(address, "address");
//獲取當前的context Context context = Vertx.currentContext(); boolean hasContext = context != null; if (!hasContext) { // Embedded context = vertx.getOrCreateContext(); }
//設置當前consumer的context registration.setHandlerContext(context);
boolean newAddress = false;
//對consumer進行封裝, 主要封裝consumer刪除邏輯 HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);
//handlerMap前面分析過了, handlers就是相同的address的consumer集合。 Handlers handlers = handlerMap.get(address);
if (handlers == null) { handlers = new Handlers(); Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers); if (prevHandlers != null) { handlers = prevHandlers; } newAddress = true; } //最後往這個consumer集合中添加consumer。 handlers.list.add(holder);
if (hasContext) { HandlerEntry entry = new HandlerEntry<>(address, registration); context.addCloseHook(entry); }
return newAddress; }
詳情說說addLocalRegistration方法。
獲取Context,然後設置到consumer實例中,那麼就可以實現Consumer一直執行在定義consumer所在的eventLoop中。 關於Context,之前寫過文章分析過了。
接著從EventBus中的handlerMap屬性中,根據address為key,找到consumer集合。所以eventBus是可以添加到多個相同address的consumer的。
然後把consumer添加到handlers,即添加到consumer集合中。
public class Handlers { private final AtomicInteger pos = new AtomicInteger(0); public final List<HandlerHolder> list = new CopyOnWriteArrayList<>(); public HandlerHolder choose() { while (true) { int size = list.size(); if (size == 0) { return null; } int p = pos.getAndIncrement(); if (p >= size - 1) { pos.set(0); } try { return list.get(p); } catch (IndexOutOfBoundsException e) { // Can happen pos.set(0); } } } }
Handlers對象實現也很簡單,就是list。 其中choose方法就是輪詢選取一個consumer。 所以當eventBus中有多個相同address的consumer的話,這些consumer將會輪詢執行。
至此,consumer實現分析完了。
由於篇幅太長了。 所以send方法,sender方法, publish方法, Message相關,就換個章節分析。
good night.