博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty 源码解析系列-客户端连接接入及读I/O解析
阅读量:7115 次
发布时间:2019-06-28

本文共 14978 字,大约阅读时间需要 49 分钟。

netty源码解析系列

前言

     我们完成了服务端启动,那么服务端启动完成后,客户端接入以及读I/O 事件是怎么哪里开始的?以及 nettyboss 线程接收到客户端 TCP 连接请求后如何将链路注册到 worker 线程池?带着这些疑问,我们开始客户端连接接入及读写 I/O 解析。

1.NioEventLoop run()开始

processSelectedKeys();复制代码
private void processSelectedKeys() {    if (selectedKeys != null) {        processSelectedKeysOptimized(selectedKeys.flip());    } else {        processSelectedKeysPlain(selector.selectedKeys());    }}复制代码

     根据 selectedKeys 是否为空,判断是否采用优化后的 selectedKeys ,进到 processSelectedKeysOptimized

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {    for (int i = 0;; i ++) {        final SelectionKey k = selectedKeys[i];        if (k == null) {            break;        }        selectedKeys[i] = null;        final Object a = k.attachment();        if (a instanceof AbstractNioChannel) {               processSelectedKey(k, (AbstractNioChannel) a);        } else {               @SuppressWarnings("unchecked")               NioTask
task = (NioTask
) a; processSelectedKey(k, task); } ...}}复制代码

     k.attachment() 获取附加的对象,那我们是在哪里附加上去的呢?上一篇注册时 attach 上去的对象,其实就是 NioServerSocketChannel 自身。

@Overrideprotected void doRegister() throws Exception {    boolean selected = false;    for (;;) {	...	selectionKey = javaChannel().register(eventLoop().selector, 0, this);	...            }}复制代码

     我们再回到 k.attachment() ,在取出附加对象后,判断类型是否为 AbstractNioChannel ,从这里我们可以看到,不是附加 AbstractNioChannel 类型,那么就是附加的 NioTask 对象,在这里我们只看关于 AbstractNioChannel 的,进到 processSelectedKey() 方法。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {    final NioUnsafe unsafe = ch.unsafe();    ...    int readyOps = k.readyOps();    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {         unsafe.read();    if (!ch.isOpen()) {         return;    }    if ((readyOps & SelectionKey.OP_WRITE) != 0) {          ch.unsafe().forceFlush();    }    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {          int ops = k.interestOps();          ops &= ~SelectionKey.OP_CONNECT;          k.interestOps(ops);          unsafe.finishConnect();    }    ...}复制代码

     当操作类型是读操作或者连接操作,进入 unsafe.read() ,有两个类实现了这个方法,一个是 AbstractNioByteChannel 的内部类 NioByteUnsafe ,一个是 AbstractNioMessageChannel 的内部类 NioMessageUnsafe ,这两个类都是 NioUnsafe 实现类 AbstractNioChannel 的子类,那到底是哪一个子类?我们看看 NioServerSocketChannel 创建时是创建的 NioByteUnsafe 还是 NioMessageUnsafe

public class NioServerSocketChannel extends AbstractNioMessageChannel                             implements io.netty.channel.socket.ServerSocketChannel {        public NioServerSocketChannel() {                this(newSocket(DEFAULT_SELECTOR_PROVIDER));        }}复制代码
public NioServerSocketChannel(ServerSocketChannel channel) {        super(null, channel, SelectionKey.OP_ACCEPT);        config = new NioServerSocketChannelConfig(this, javaChannel().socket());}复制代码
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {          super(parent, ch, readInterestOp);      }}复制代码
public abstract class AbstractNioChannel extends AbstractChannel {	protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {    		super(parent);	}}复制代码
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {        protected AbstractChannel(Channel parent) {                this.parent = parent;                unsafe = newUnsafe();                pipeline = new DefaultChannelPipeline(this);        }}复制代码

     NioServerSocketChannelAbstractNioMessageChannel 的子类,AbstractNioMessageChannelAbstractNioChannel 的子类,newUnsafe()AbstractChannel 的抽象方法,那么我们从这里就知道,AbstractNioMessageChannel 实现了 AbstractChannel的newUnsafe() 抽象方法,由此判断,我们选择 AbstractNioMessageChannel 的内部类 NioMessageUnsaferead()

private final class NioMessageUnsafe extends AbstractNioUnsafe {    private final List readBuf = new ArrayList();    @Override    public void read() {        ...        for (;;) {           int localRead = doReadMessages(readBuf);           ...    }    setReadPending(false);    int size = readBuf.size();    for (int i = 0; i < size; i ++) {            pipeline.fireChannelRead(readBuf.get(i));    }    readBuf.clear();    pipeline.fireChannelReadComplete();    ...}复制代码

     这里分两部分,一个是处理消息,一个是处理事件。

          1.处理消息

@Overrideprotected int doReadMessages(List buf) throws Exception {    SocketChannel ch = javaChannel().accept();    ...    buf.add(new NioSocketChannel(this, ch));    return 1;    ...}复制代码

     接受了一个客户端 SocketChannel,封装到NioSocketChannel,添加到list集合中,我们看看new NioSocketChannel()

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {	public NioSocketChannel(Channel parent, SocketChannel socket) {    		super(parent, socket);    		config = new NioSocketChannelConfig(this, socket.socket());	}}复制代码
public abstract class AbstractNioByteChannel extends AbstractNioChannel {	protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {    		super(parent, ch, SelectionKey.OP_READ);	}    @Override    protected AbstractNioUnsafe newUnsafe() {    	return new NioByteUnsafe();    }    protected class NioByteUnsafe extends AbstractNioUnsafe {	    @Override	    public final void read() {		    ...	    }    }}复制代码

     AbstractNioByteChannel 也继承了 AbstractNioChannel ,并实现了 newUnsafe() 方法,由此我们可以推断出当客户端第一次连接时,走的是 AbstractNioMessageChannel 的子类 NioMessageUnsafe的read() ,当客户端发送数据时,走的是 AbstractNioByteChannel 的内部类 AbstractNioUnsaferead() 方法。

         2.处理事件

for (int i = 0; i < size; i ++) {    	   pipeline.fireChannelRead(readBuf.get(i));     }复制代码
@Overridepublic ChannelPipeline fireChannelRead(Object msg) {    head.fireChannelRead(msg);    return this;}复制代码
@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {    final AbstractChannelHandlerContext next = findContextInbound();    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(msg);    } else {        executor.execute(new OneTimeTask() {            @Override            public void run() {                next.invokeChannelRead(msg);            }        });    }    return this;}复制代码

    

     从
next
debug可以看出,当前
handler
ServerBootstrapAcceptor这个处理器来处理
ChannelRead() 方法,如果看了 就会知道,这是在
init() 方法中
pipeline.addLast(new ServerBootstrapAcceptor())。为什么不是
p.addLast(new ChannelInitializer())? 因为在
ChannelInitializer.channelRegistered() 会删除当前
initChannel 处理器。

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {    initChannel((C) ctx.channel());    ctx.pipeline().remove(this);    ctx.fireChannelRegistered();}复制代码

     我们继续看ServerBootstrapAcceptorChannelRead() 方法。

@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {    final Channel child = (Channel) msg;    child.pipeline().addLast(childHandler);    for (Entry
, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption
) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry
, Object> e: childAttrs) { child.attr((AttributeKey
) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}复制代码

     这里分三个步骤

         (1)childHandler添加到处理器上,这个从哪里来?就是从最开始设置serverBootstrap.childHandler(new IOChannelInitialize())
         (2) 设置一些参数。
         (3) work线程池register客户端的channel

@Overridepublic ChannelFuture register(Channel channel) {    return next().register(channel);}复制代码
@Overridepublic EventLoop next() {    return (EventLoop) super.next();}复制代码
@Overridepublic EventExecutor next() {    return chooser.next();}复制代码
private final class GenericEventExecutorChooser implements EventExecutorChooser {    @Override    public EventExecutor next() {        return children[Math.abs(childIndex.getAndIncrement() % children.length)];    }}复制代码

     从work线程池选一个线程来执行register

@Overridepublic ChannelFuture register(Channel channel) {    return register(channel, new DefaultChannelPromise(channel, this));}复制代码
@Overridepublic ChannelFuture register(final Channel channel, final ChannelPromise promise) {	 ...        channel.unsafe().register(this, promise);        return promise;}复制代码
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {	 ...     AbstractChannel.this.eventLoop = eventLoop;     if (eventLoop.inEventLoop()) {     register0(promise);     } else {          try {              eventLoop.execute(new OneTimeTask() {              @Override              public void run() {                 register0(promise);              }              });           } catch (Throwable t) {	            ...           }     }}复制代码
@Overrideprotected void doRegister() throws Exception {	...	selectionKey = javaChannel().register(eventLoop().selector, 0, this);	...}复制代码

     后面的流程和的注册流程是一样的,区别在于服务启动时注册是在boss线程池任务队列中执行注册,客户端新接入注册是在work线程池任务队列中执行register0() 方法,并将work线程池的selector注册到Java NIO 到这里,我们就可以回答开篇的的几个问题:客户端是如何接入?nettyboss线程接收到客户端TCP连接请求后如何将链路注册到worker线程池? 现在我们还剩下一个问题:读写I/O事件是怎么哪里开始的?

     我们回到文章开头

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {    for (int i = 0;; i ++) {        final SelectionKey k = selectedKeys[i];        if (k == null) {            break;        }        selectedKeys[i] = null;        final Object a = k.attachment();        if (a instanceof AbstractNioChannel) {               processSelectedKey(k, (AbstractNioChannel) a);        } else {               @SuppressWarnings("unchecked")               NioTask
task = (NioTask
) a; processSelectedKey(k, task); } ... } }复制代码

     前面boss线程池在这里完成了客户端连接接入,并将链路注册到worker线程池任务队列,添加了read事件的监听,那么现在work线程不停循环selectedKeys中有没有待处理的事件,当有待处理事件,那么会执行processSelectedKey() 方法。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {	...	int readyOps = k.readyOps();	if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {    		unsafe.read();    		...	}	...}复制代码

     在这里unsafe.read() 选择AbstractNioByteChannelread()

@Overridepublic final void read() {    final ChannelConfig config = config();    if (!config.isAutoRead() && !isReadPending()) {        // ChannelConfig.setAutoRead(false) was called in the meantime        removeReadOp();        return;    }    final ChannelPipeline pipeline = pipeline();    final ByteBufAllocator allocator = config.getAllocator();    final int maxMessagesPerRead = config.getMaxMessagesPerRead();    RecvByteBufAllocator.Handle allocHandle = this.allocHandle;    if (allocHandle == null) {       this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();    }    ByteBuf byteBuf = null;    int messages = 0;    boolean close = false;    try {       int totalReadAmount = 0;       boolean readPendingReset = false;       do {          byteBuf = allocHandle.allocate(allocator);          int writable = byteBuf.writableBytes();          int localReadAmount = doReadBytes(byteBuf);          if (localReadAmount <= 0) {           // not was read release the buffer              byteBuf.release();              byteBuf = null;              close = localReadAmount < 0;              break;           }          if (!readPendingReset) {               readPendingReset = true;               setReadPending(false);          }          pipeline.fireChannelRead(byteBuf);          byteBuf = null;          if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {               totalReadAmount = Integer.MAX_VALUE;               break;          }          totalReadAmount += localReadAmount;          if (!config.isAutoRead()) {               break;          }          if (localReadAmount < writable) {              break;          }       } while (++ messages < maxMessagesPerRead);         pipeline.fireChannelReadComplete();         allocHandle.record(totalReadAmount);        if (close) {            closeOnRead(pipeline);            close = false;        }     } catch (Throwable t) {          handleReadException(pipeline, byteBuf, t, close);     } finally {         if (!config.isAutoRead() && !isReadPending()) {                removeReadOp();          }     }    }}复制代码

     把这一大段代码分解成几部分

         1.设置循环读,16次,未读完则会等到下一轮select 继续读取,maxMessagesPerRead默认等于16。
         2.获取缓存操作handlerconfig.getRecvByteBufAllocator().newHandle()
         3.申请缓存空间,allocHandle.allocate(allocator)
         4.从socket中读取数据到byteBuf中。
         5.传递读事件到下一个handler处理器。
         6.读完之后发送读完时间到下一个handler处理器 我们只看读事件,其他细节后面的文章再详细解析。

@Overridepublic ChannelPipeline fireChannelRead(Object msg) {    head.fireChannelRead(msg);    return this;}复制代码
@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {    if (msg == null) {        throw new NullPointerException("msg");    }    final AbstractChannelHandlerContext next = findContextInbound();    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(msg); } else {        executor.execute(new OneTimeTask() {            @Override            public void run() {                next.invokeChannelRead(msg);            }        });    }    return this;}复制代码

    
Handler事件顺序是
HeadContextHandler --> IdleStateHandler -->IOHandler --> TailContext

private void invokeChannelRead(Object msg) {    try {        ((ChannelInboundHandler) handler()).channelRead(this, msg);    } catch (Throwable t) {        notifyHandlerException(t);    }}复制代码

     进到IdleStateHandler

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {        reading = true;        firstReaderIdleEvent = firstAllIdleEvent = true;    }    ctx.fireChannelRead(msg);}复制代码

     设置读事件为true,为后面状态检测做准备,继续向下传递读事件,这次是IOHandler的读事件。

public class IOHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        super.channelRead(ctx, msg);        System.out.println(msg.toString());    }	...}复制代码

     交给用户自定义handler处理读事件,自此读I/O事件是怎么哪里开始,如何交给用户handler处理已解析完毕。

总结:

     1.boss线程处理NioServerSocketChannelaccept事件,并将客户端添加到work任务队列,任务队列执行redister0()方法, 将read事件注册到work线程的selector
     2.work线程轮询selectkeys,当有事件上来时,将缓存数据发送到用户handler

转载于:https://juejin.im/post/5cecfd09f265da1ba647ccf2

你可能感兴趣的文章
拥抱阳光下的产业--光伏新技术、新产品、新工艺发布会顺利召开
查看>>
我的友情链接
查看>>
要考虑
查看>>
文件的上传
查看>>
Oracle日常巡检维护中常用的一些STUFF
查看>>
我的友情链接
查看>>
css 实现小三角
查看>>
Nginx的配置
查看>>
手势 UIPinchGestureRecognizer 捏合手势
查看>>
vim配置
查看>>
[Swift]UIKit学习之滑块控件UISlider的用法
查看>>
我的友情链接
查看>>
nginx+tomcat+memcached构建session共享集群
查看>>
回看Java环境变量classpath
查看>>
mysql数据库Explain详解 .
查看>>
python 多线程插入mysql
查看>>
数据库索引学习相关资料汇总
查看>>
equals和hashcode详解
查看>>
简单使用jumpserver
查看>>
利用碎片时间,TURBOMAIL飞邮手机客户端助你抓住每一个机遇
查看>>