netty源码解析系列
前言
我们完成了服务端启动,那么服务端启动完成后,客户端接入以及读I/O 事件是怎么哪里开始的?以及 netty 的 boss 线程接收到客户端 TCP 连接请求后如何将链路注册到 worker 线程池?带着这些疑问,我们开始客户端连接接入及读写 I/O 解析。
1.NioEventLoop run()开始
processSelectedKeys();复制代码
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); }}复制代码
根据 selectedKeys 是否为空,判断是否采用优化后的 selectedKeys ,进到 processSelectedKeysOptimized。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTasktask = (NioTask ) a; processSelectedKey(k, task); } ...}}复制代码
k.attachment() 获取附加的对象,那我们是在哪里附加上去的呢?上一篇注册时 attach 上去的对象,其实就是 NioServerSocketChannel 自身。
@Overrideprotected void doRegister() throws Exception { boolean selected = false; for (;;) { ... selectionKey = javaChannel().register(eventLoop().selector, 0, this); ... }}复制代码
我们再回到 k.attachment() ,在取出附加对象后,判断类型是否为 AbstractNioChannel ,从这里我们可以看到,不是附加 AbstractNioChannel 类型,那么就是附加的 NioTask 对象,在这里我们只看关于 AbstractNioChannel 的,进到 processSelectedKey() 方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); ... int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } ...}复制代码
当操作类型是读操作或者连接操作,进入 unsafe.read() ,有两个类实现了这个方法,一个是 AbstractNioByteChannel 的内部类 NioByteUnsafe ,一个是 AbstractNioMessageChannel 的内部类 NioMessageUnsafe ,这两个类都是 NioUnsafe 实现类 AbstractNioChannel 的子类,那到底是哪一个子类?我们看看 NioServerSocketChannel 创建时是创建的 NioByteUnsafe 还是 NioMessageUnsafe。
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }}复制代码
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket());}复制代码
public abstract class AbstractNioMessageChannel extends AbstractNioChannel { protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); }}复制代码
public abstract class AbstractNioChannel extends AbstractChannel { protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); }}复制代码
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }}复制代码
NioServerSocketChannel 是 AbstractNioMessageChannel 的子类,AbstractNioMessageChannel 是 AbstractNioChannel 的子类,newUnsafe() 是 AbstractChannel 的抽象方法,那么我们从这里就知道,AbstractNioMessageChannel 实现了 AbstractChannel的newUnsafe() 抽象方法,由此判断,我们选择 AbstractNioMessageChannel 的内部类 NioMessageUnsafe 的 read()。
private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List
这里分两部分,一个是处理消息,一个是处理事件。
1.处理消息@Overrideprotected int doReadMessages(Listbuf) throws Exception { SocketChannel ch = javaChannel().accept(); ... buf.add(new NioSocketChannel(this, ch)); return 1; ...}复制代码
接受了一个客户端 SocketChannel,封装到NioSocketChannel,添加到list集合中,我们看看new NioSocketChannel()。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }}复制代码
public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } @Override protected AbstractNioUnsafe newUnsafe() { return new NioByteUnsafe(); } protected class NioByteUnsafe extends AbstractNioUnsafe { @Override public final void read() { ... } }}复制代码
AbstractNioByteChannel 也继承了 AbstractNioChannel ,并实现了 newUnsafe() 方法,由此我们可以推断出当客户端第一次连接时,走的是 AbstractNioMessageChannel 的子类 NioMessageUnsafe的read() ,当客户端发送数据时,走的是 AbstractNioByteChannel 的内部类 AbstractNioUnsafe 的 read() 方法。
2.处理事件for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); }复制代码
@Overridepublic ChannelPipeline fireChannelRead(Object msg) { head.fireChannelRead(msg); return this;}复制代码
@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this;}复制代码
从 next的 debug可以看出,当前 handler是 ServerBootstrapAcceptor这个处理器来处理 ChannelRead() 方法,如果看了 就会知道,这是在 init() 方法中 pipeline.addLast(new ServerBootstrapAcceptor())。为什么不是 p.addLast(new ChannelInitializer())? 因为在 ChannelInitializer.channelRegistered() 会删除当前 initChannel 处理器。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { initChannel((C) ctx.channel()); ctx.pipeline().remove(this); ctx.fireChannelRegistered();}复制代码
我们继续看ServerBootstrapAcceptor的ChannelRead() 方法。
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption ) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry , Object> e: childAttrs) { child.attr((AttributeKey ) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}复制代码
这里分三个步骤
(1) 将childHandler添加到处理器上,这个从哪里来?就是从最开始设置serverBootstrap.childHandler(new IOChannelInitialize())。 (2) 设置一些参数。 (3) work线程池register客户端的channel。@Overridepublic ChannelFuture register(Channel channel) { return next().register(channel);}复制代码
@Overridepublic EventLoop next() { return (EventLoop) super.next();}复制代码
@Overridepublic EventExecutor next() { return chooser.next();}复制代码
private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; }}复制代码
从work线程池选一个线程来执行register。
@Overridepublic ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this));}复制代码
@Overridepublic ChannelFuture register(final Channel channel, final ChannelPromise promise) { ... channel.unsafe().register(this, promise); return promise;}复制代码
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ... } }}复制代码
@Overrideprotected void doRegister() throws Exception { ... selectionKey = javaChannel().register(eventLoop().selector, 0, this); ...}复制代码
后面的流程和的注册流程是一样的,区别在于服务启动时注册是在boss线程池任务队列中执行注册,客户端新接入注册是在work线程池任务队列中执行register0() 方法,并将work线程池的selector注册到Java NIO 到这里,我们就可以回答开篇的的几个问题:客户端是如何接入?netty的boss线程接收到客户端TCP连接请求后如何将链路注册到worker线程池? 现在我们还剩下一个问题:读写I/O事件是怎么哪里开始的?
我们回到文章开头private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTasktask = (NioTask ) a; processSelectedKey(k, task); } ... } }复制代码
前面boss线程池在这里完成了客户端连接接入,并将链路注册到worker线程池任务队列,添加了read事件的监听,那么现在work线程不停循环selectedKeys中有没有待处理的事件,当有待处理事件,那么会执行processSelectedKey() 方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); ... } ...}复制代码
在这里unsafe.read() 选择AbstractNioByteChannel的read()。
@Overridepublic final void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); byteBuf = null; close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } }}复制代码
把这一大段代码分解成几部分
1.设置循环读,16次,未读完则会等到下一轮select 继续读取,maxMessagesPerRead默认等于16。 2.获取缓存操作handler,config.getRecvByteBufAllocator().newHandle()。 3.申请缓存空间,allocHandle.allocate(allocator)。 4.从socket中读取数据到byteBuf中。 5.传递读事件到下一个handler处理器。 6.读完之后发送读完时间到下一个handler处理器 我们只看读事件,其他细节后面的文章再详细解析。@Overridepublic ChannelPipeline fireChannelRead(Object msg) { head.fireChannelRead(msg); return this;}复制代码
@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) { if (msg == null) { throw new NullPointerException("msg"); } final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this;}复制代码Handler事件顺序是 HeadContextHandler --> IdleStateHandler -->IOHandler --> TailContext
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); }}复制代码
进到IdleStateHandler
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { reading = true; firstReaderIdleEvent = firstAllIdleEvent = true; } ctx.fireChannelRead(msg);}复制代码
设置读事件为true,为后面状态检测做准备,继续向下传递读事件,这次是IOHandler的读事件。
public class IOHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); System.out.println(msg.toString()); } ...}复制代码
交给用户自定义handler处理读事件,自此读I/O事件是怎么哪里开始,如何交给用户handler处理已解析完毕。
总结:
1.boss线程处理NioServerSocketChannel的accept事件,并将客户端添加到work任务队列,任务队列执行redister0()方法, 将read事件注册到work线程的selector。 2.work线程轮询selectkeys,当有事件上来时,将缓存数据发送到用户handler 。