bitchat 是一個基於 Netty 的 IM 即時通訊框架
server.start();
啟動客戶端
public class DirectConnectServerClientApplication {
public static void main(String[] args) { Client client = SimpleClientFactory.getInstance() .newClient(ServerAttr.getLocalServer(8864)); client.connect(); doClientBiz(client); } }
客戶端連接上服務端後,將顯示如下信息:
體驗客戶端的功能
目前客戶端提供了三種 Func,分別是:登錄,查看在線用戶列表,發送單聊消息,每種 Func 有不同的命令格式。
-lo houyi 123456
查看在線用戶
-lu
發送單聊信息
-pc 1 hello,houyi
其中第二個參數數要發送消息給那個用戶的用戶id,第三個參數是消息內容
消息接收方,接收到消息:
客戶端斷線重連
整體架構
除了服務端和客戶端之外,還有三大中心:消息中心,用戶中心,鏈接中心。
集羣版
客戶端發送消息給另一個用戶,服務端接收到這個請求後,從 Connection中心中獲取目標用戶「掛」在哪個服務端下,如果在自己名下,那最簡單直接將消息推送給目標用戶即可,如果在其他服務端,則需要將該請求轉交給目標服務端,讓目標服務端將消息推送給目標用戶。
每個欄位的含義
public class IdleStateChecker extends IdleStateHandler { private static final int DEFAULT_READER_IDLE_TIME = 15; private int readerTime; public IdleStateChecker(int readerIdleTime) { super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS); readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime; } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { log.warn("[{}] Hasnt read data after {} seconds, will close the channel:{}", IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel()); ctx.channel().close(); } }
另外,客戶端需要額外再維護一個健康檢查器,正常情況下他負責定時向服務端發送心跳,當鏈接的狀態變成 inActive 時,該檢查器將負責進行重連,如下所示:
public class HealthyChecker extends ChannelInboundHandlerAdapter {
private static final int DEFAULT_PING_INTERVAL = 5; private Client client; private int pingInterval; public HealthyChecker(Client client, int pingInterval) { Assert.notNull(client, "client can not be null"); this.client = client; this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); schedulePing(ctx); } private void schedulePing(ChannelHandlerContext ctx) { ctx.executor().schedule(() -> { Channel channel = ctx.channel(); if (channel.isActive()) { log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName()); channel.writeAndFlush(new PingPacket()); schedulePing(ctx); } }, pingInterval, TimeUnit.SECONDS); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.executor().schedule(() -> { log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName()); client.connect(); }, 5, TimeUnit.SECONDS); ctx.fireChannelInactive(); } }
業務線程池
public class ServerPacketDispatcher extends SimpleChannelInboundHandler<Packet> { @Override public void channelRead0(ChannelHandlerContext ctx, Packet request) { // if the packet should be handled async if (request.getAsync() == AsyncHandle.ASYNC) { EventExecutor channelExecutor = ctx.executor(); // create a promise Promise<Packet> promise = new DefaultPromise<>(channelExecutor); // async execute and get a future Future<Packet> future = executor.asyncExecute(promise, ctx, request); future.addListener(new GenericFutureListener<Future<Packet>>() { @Override public void operationComplete(Future<Packet> f) throws Exception { if (f.isSuccess()) { Packet response = f.get(); writeResponse(ctx, response); } } }); } else { // sync execute and get the response packet Packet response = executor.execute(ctx, request); writeResponse(ctx, response); } } }
不止是IM框架
原文:https://mp.weixin.qq.com/s?timestamp=1559194631&src=3&ver=1&signature=lcAYZC239JlrzbvdnNWgZIymFJAtw4y5Gcw7bfo4Qm1D9OB5GevhBdANEsE8-Lt9hb7mr3Y9gVqx34Zi7GltwygWedF7IX9FHzfiJSjK1eTL6Iztc8Yt9uxhCppFGkzcu-nrCwuOEQC5oWH-9ovUmx3xniVNBn4fjpeQvK*gH2k=
推薦閱讀: