我們在設計伺服器的時候現在比較通用的就是reactor的模型。也就是一個監聽線程在埠監聽,多個工作線程在後面等待。等建立鏈接之後,把這個線程扔給工作線程。工作線程再去處理。如果接受的請求多了就把這個請求放在隊列裡面。工作線程去取。

其實看文檔我猜netty的線程模型 應該是一個server線程在selector上堅挺新鏈接。接受之後把這個鏈接扔給work 線程。每個work線程上也有selector 處理這個 channel 之後的read這樣的請求。 那麼是不是呢,來分析一下。

我們先來看 服務端這邊的用戶代碼:

ChannelFuture f = bootstrap.bind("localhost", 8086).sync();

來看一下這個bind 的操作,其他的先不看。我們在AbstractBootstrap 類裡面發現 doBind 操作:

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case its not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

我們來關注一個這個doBind0 的函數,進到裡面有:

channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});

我們看到了這個調用eventloop的 execute方法。EventLoop 只是一個介面,所以最終我們是在SingleThreadEventExecutor 類中找到了這個方法:

public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

在這個execute中我們看到,首先判斷調用線程是不是EventLoop的線程,很明顯我們是從main函數進來的bind 操作,不是EventLoop線.然後我們就startThread,就是netty 的EventLoop 線程。這個線程的啟動函數run,我們可以來看一下,簡化一下代碼只看三個主要功能:

@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));

if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
processSelectedKeys();
runAllTasks();
}
}

可以看到主要工作就是啟動selector, 然後處理註冊的I/O事件。最後處理一下所有的Task。我們可以看一下這個select方法:

int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}

int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}

這個select方法其實就三種情況,如果已經到了這個線程的定時任務執行時間或者是有任務來提交,那我們就是調用這個selectNow方法,非阻塞的處理一下註冊在selector上面的channel。如果不是這兩種情況那就 阻塞一會來等待channel上的I/O操作。

當然這個select方法還對之前的epoll 空輪詢的bug有避免的操作:

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}

之後這個線程的操作就是處理select的返回值和task了。

推薦閱讀:

相關文章