通过前面的文章,我们对Netty的Pipeline已经有了初步的了解。我们再回顾一下。
一个Channel,拥有一个ChannelPipeline,作为一个ChannelHandler的容器。但是一个ChannelHandler,不能直接放进ChannelPipeline中,必须包裹AbstractChannelHandlerContext的上下文环境。在初始化Netty的handler时,需要将Handler加载到Pipeline中。
假定加载三个Handler,分别负责解码、业务、编码。三个Handler加载到ChannelPipeline的参考代码如下:
@Override
public void initChannel(SocketChannel ch) throws Exception
{
ChannelPipeline p = ch.pipeline();
p.addLast(new DecoderHandler());
p.addLast(new BusinessHandler());
p.addLast(new EncoderHandler());
}
Netty中Channel加完Hander之后,Pipeline的容器内容图如下:
Pipeline不直接加入Handler,而是需要进行包裹,对应于Decoder、Business、Encoder三个Hander,分别创建三个默认的上下文包裹器(DefaultContext )。DefaultContext 的具体实现类,在Netty中,是DefaultChannelHandlerContext。
除此之外,Pipeline的头尾,各有一个特别的上下文Context 。这两个Hander Context ,不是默认的DefaultContext 。分别有自己的类型。
在pipeline中头尾,分别各有一个特别的HandlerContext——简称Head和Tail。Head的类型是HeadContext。Tail的类型是TailContext。这两种类型,和上文说的DefaultChannelHandlerContext类型都是AbstractChannelHandlerContext的子类。
Head上下文包裹器的主要作用: 主要作为入站处理的起点。数据从Channel读入之后,一个入站数据包从Channel的事件发送出来,首先从Head开始,被后面的所有的入站处理器,逐个进行入站处理。
大致的入站流程如下图:
注意,TailContext,其实也是一个入站处理器。先按下不表,待会详细阐述。
Tail上下文包裹器的主要作用: 主要是作为出站处理的起点。当所有的入站处理器,都处理完成后,开始出站流程。需要出站的数据包,首先从Tail开始,被所有的出站处理器上下文Context中的Hander逐个进行处理。然后将处理结果,写入Channel中。
注意,HeadContext,其实也是一个出站处理器。先按下不表,待会详细阐述。Tail和Head内部,没有包裹其他的内部Handler成员。这一点,是与默认的上下文包裹器DefaultChannelHandlerContext不同的地方。
注意的点:
inbound 入站事件:
outbound 出站事件:
前面我们总体回顾了一下Pipeline,下面我们要从源码角度分析Pipeline。
相信大家都已经知道,在Netty 中每个Channel 都有且仅有一个ChannelPipeline 与之对应,它们的组成关系如下:
通过上图我们可以看到, 一个Channel 包含了一个ChannelPipeline , 而ChannelPipeline 中又维护了一个由ChannelHandlerContext 组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext 中又关联着一个ChannelHandler。上图给了我们ChannelPipeline一个直观的认识。我们接下来通过源码看看Channel初始化过程以及Channel初始化后做了什么。前面文章我们已经分析了过了,接下来跳过子类的构造器调用步骤,直接查看AbstractChannel 构造器:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel 有一个pipeline 字段,在构造器中会初始化它为DefaultChannelPipeline 的实例。这里的代码就印证了一点:每个Channel 都有一个ChannelPipeline。接着我们跟踪一下DefaultChannelPipeline 的初始化过程,首先进入到DefaultChannelPipeline 构造器中:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在DefaultChannelPipeline 构造器中, 首先将与之关联的Channel 保存到字段channel 中。然后实例化两个ChannelHandlerContext:一个是HeadContext 实例head,另一个是TailContext 实例tail。接着将head 和tail 互相指向, 构成一个双向链表。
特别要注意的是:我们在开始的示意图中head 和tail 并没有包含ChannelHandler,为什么呢?因为HeadContext 和TailContext继承于AbstractChannelHandlerContext 的同时也实现了ChannelHandler 接口了,因此它们有Context 和Handler的双重属性。
前面我们对ChannelPipeline的初始化有了一个基本的了解,不过当时重点没有关注ChannelPipeline,因此没有深入地分析它的初始化过程。
接下来我们要做什么呢?——我们具体的来看一下ChannelPipeline初始化过程中做了哪些事。前面我们已经说过,在实例化Channel过程中,会伴随着ChanelPipeline的实例化,并且在ChanelPipeline的实例化过程中,会将Channel与之绑定,,这一点可以通过NioSocketChannel 的父类AbstractChannel 的构造器予以佐证(这里就不贴源码了,可以自行查看)。
当实例化一个NioSocketChannel 的时候,其pipeline 字段就是我们新创建的DefaultChannelPipeline 对象。可以看到,在DefaultChannelPipeline 的构造方法中,将传入的channel 赋值给字段this.channel,接着又实例化了两个特殊的字段:tail 与head,这两个字段是一个双向链表的头和尾。前面说过在DefaultChannelPipeline 中,维护了一个以AbstractChannelHandlerContext 为节点的双向链表,这个链表是Netty 实现Pipeline 机制的关键。我们回顾一下head和tail 的类层次结构:
从类层次结构图中可以很清楚地看到,head 实现了ChannelInboundHandler与ChannelOutboundHandler,而tail 实现了ChannelOutboundHandler 接口,并且它们都实现了ChannelHandlerContext 接口, 因此可以说head 和tail 即是一个ChannelHandler,又是一个ChannelHandlerContext。接着看HeadContext与TailContext 构造器中的代码:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);
this.unsafe = pipeline.channel().unsafe();
this.setAddComplete();
}
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false);
this.setAddComplete();
}
head:
tail:
通过前面的分析,我们知道ChannelPipeline中含有head和tail两个ChannelHandlerContext(同时也是ChannelHandler),但是这个Pipeline 并不能实现什么特殊的功能,因为这个时候我们还没有给它添加自定义的ChannelHandler。通常来说,我们在初始化Bootstrap的时候,会添加我们自定义的ChannelHandler,就以我们具体的客户端启动代码片段来举例:
Bootstrap client = new Bootstrap();
client.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
//接收课客户端请求的处理流程
ChannelPipeline pipeline = ch.pipeline();
int fieldLength = 4;
//通用解码器设置
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,fieldLength,0,fieldLength));
//通用编码器
pipeline.addLast(new LengthFieldPrepender(fieldLength));
//对象编码器
pipeline.addLast("encoder",new ObjectEncoder());
//对象解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",consumerHandler);
}
})
.option(ChannelOption.TCP_NODELAY, true);
从上面的代码可以看出:在调用handler 时,传入了ChannelInitializer 对象,它提供了一个initChannel()方法给我我们初始化ChannelHandler。最后将这个匿名的Handler保存到AbstractBootstrap中。那么这个初始化过程是怎样的呢?下面我们来揭开它的神秘面纱。
ChannelInitializer实现了ChannelHandler接口,那么它是在什么时候添加到ChannelPipeline 中的呢?通过代码跟踪,我们发现它是在Bootstrap 的init()方法中添加到ChannelPipeline 中的,追踪调用过程如下(以客户端为例):
其代码如下:
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelHandler[]{this.config.handler()});
。。。。。。
}
//AbstractBootstrapConfig
public final ChannelHandler handler() {
return this.bootstrap.handler();
}
//AbstractBootstrap
final ChannelHandler handler() {
return this.handler;
}
从上面的代码可见,将handler()返回的ChannelHandler 添加到Pipeline 中,而handler()返回的其实就是我们在初始化Bootstrap 时通过handler()方法设置的ChannelInitializer 实例,因此这里就是将ChannelInitializer 插入到了Pipeline的末端。此时Pipeline 的结构如下图所示:
这时候,有小伙伴可能就有疑惑了,我明明插入的是一个ChannelInitializer 实例,为什么在ChannelPipeline 中的双向链表中的元素却是一个ChannelHandlerContext 呢?我们继续去源码中寻找答案。
刚才,我们提到,在Bootstrap 的init()方法中会调用p.addLast()方法,将ChannelInitializer 插入到链表的末端:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized(this) {
checkMultiplicity(handler);
newCtx = this.newContext(group, this.filterName(name, handler), handler);
this.addLast0(newCtx);
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
}
addLast()有很多重载的方法,我们只需关注这个比较重要的方法就行。上面的addLast()方法中,首先检查ChannelHandler 的名字是否是重复,如果不重复,则调用newContex()方法为这个Handler 创建一个对应的DefaultChannelHandlerContext 实例,并与之关联起来(Context 中有一个handler 属性保存着对应的Handler 实例)。为了添加一个handler 到pipeline 中,必须把此handler 包装成ChannelHandlerContext。因此在上面的代码中我们可以看到新实例化了一个newCtx 对象,并将handler 作为参数传递到构造方法中。那么我们来看一下实例化的DefaultChannelHandlerContext 到底有什么玄机吧。首先看它的构造器:
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
我们 可以看到在上面的代码中调用了isInbound()方法和isOutbound()方法。他们是干什么的呢?
——追踪这两个方法的源码中可以看到,当一个handler 实现了ChannelInboundHandler 接口,则isInbound 返回true;类似地,当一个handler 实现了ChannelOutboundHandler 接口,则isOutbound 就返回true。而这两个boolean 变量会传递到父类AbstractChannelHandlerContext 中,并初始化父类的两个字段:inbound 与outbound。
——那么下一个问题就来了,ChannelInitializer所对应的DefaultChannelHandlerContext 的inbound 与outbound 字段分别是什么呢?其实看一下ChannelInitializer 到底实现了哪个接口就可以了,下面是ChannelInitializer 的类层次结构图:
从上图可以看出:ChannelInitializer是实现了ChannelInboundHandler接口的。
——所以这里实例化的DefaultChannelHandlerContext 的inbound = true,outbound = false。
为什么我们要着重介绍这里呢?
——这两个字段关系到pipeline的事件流向和分类,所以是特别重要的,不过我在这里先卖个关子, 后面我们再来详细分析这两个字段所起的作用。至此, 我们暂时先记住一个结论:ChannelInitializer 所对应的DefaultChannelHandlerContext 的inbound =true,outbound = false。当创建好Context 之后,就将这个Context 插入到Pipeline 的双向链表中。
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = this.tail.prev;
newCtx.prev = prev;
newCtx.next = this.tail;
prev.next = newCtx;
this.tail.prev = newCtx;
}
添加完ChannelInitializer的Pipeline是这样子的:
前面我们已经分析了ChannelInitializer 是如何插入到Pipeline 中的,接下来就来探讨ChannelInitializer 在哪里被调用,ChannelInitializer 的作用以及我们自定义的ChannelHandler 是如何插入到Pipeline 中的。先简单复习一下Channel 的注册过程:
而我们自定义ChannelHandler 的添加过程,发生在AbstractUnsafe 的register0()方法中,在这个方法中调用了pipeline.fireChannelRegistered()方法,其代码实现如下:
private void register0(ChannelPromise promise) {
boolean firstRegistration = this.neverRegistered;
AbstractChannel.this.doRegister();
this.neverRegistered = false;
AbstractChannel.this.registered = true;
AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
this.safeSetSuccess(promise);
AbstractChannel.this.pipeline.fireChannelRegistered();
}
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(this.head);
return this;
}
再看AbstractChannelHandlerContext 的invokeChannelRegistered()方法:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRegistered();
}
});
}
}
很显然,这个代码会从head 开始遍历Pipeline 的双向链表,然后 findContextInbound() 找到第一个属性inbound 为true 的ChannelHandlerContext 实例。看代码:
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(this.findContextInbound());
return this;
}
想起来了没?我们在前面分析ChannelInitializer 时,花了大量的篇幅来分析了inbound和outbound 属性,现在这里就用上了。回想一下,ChannelInitializer 实现了ChannelInboudHandler,因此它所对应的ChannelHandlerContext 的inbound 属性就是true,因此这里返回就是ChannelInitializer 实例所对应的ChannelHandlerContext 对象,如下图所示:
当获取到inbound 的Context 后,就调用它的invokeChannelRegistered()方法:
private void invokeChannelRegistered() {
if (this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelRegistered(this);
} catch (Throwable var2) {
this.notifyHandlerException(var2);
}
} else {
this.fireChannelRegistered();
}
}
我们已经知道,每个ChannelHandler 都和一个ChannelHandlerContext 关联,我们可以通过ChannelHandlerContext获取到对应的ChannelHandler。因此很明显,这里handler()返回的对象其实就是一开始我们实例化的ChannelInitializer 对象,并接着调用了ChannelInitializer 的channelRegistered()方法。看到这里, 应该会觉得有点眼熟了。ChannelInitializer 的channelRegistered()这个方法我们在一开始的时候已经接触到了,但是我们并没有深入地分析这个方法的调用过程。下面我们来看这个方法中到底有什么玄机,继续看代码:
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
protected abstract void initChannel(C ch) throws Exception;
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (initChannel(ctx)) {
ctx.pipeline().fireChannelRegistered();
removeState(ctx);
} else {
ctx.fireChannelRegistered();
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
}
initChannel((C) ctx.channel())这个方法我们也很熟悉,它就是我们在初始化Bootstrap 时,调用handler 方法传入的匿名内部类所实现的方法:
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("handler", new MyClient());
}
红字部分说的就是这里啦:
因此,当调用这个方法之后, 我们自定义的ChannelHandler 就插入到了Pipeline,此时Pipeline 的状态如下图所示:
当添加完成自定义的ChannelHandler 后,在finally 代码块会删除自定义的ChannelInitializer,也就是remove(ctx)最终调用ctx.pipeline().remove(this),因此最后的Pipeline 的状态如下:
到这里,自定义ChannelHandler 的添加过程也分析得差不多了,我们好好阅读,才能理解的更透彻。
不知道大家注意到没有,pipeline.addXXX 都有一个重载的方法,例如addLast()它有一个重载的版本是:ChannelPipeline addLast(String name, ChannelHandler handler);第一个参数指定添加的handler 的名字(更准确地说是ChannelHandlerContext 的名字,说成handler 的名字更便于理解)。那么handler 的名字有什么用呢?如果我们不设置name,那么handler 默认的名字是怎样呢?带着这些疑问,我们依旧还是去源码中找到答案。还是以addLast()方法为例:
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
这个方法会调用重载的addLast()方法:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);return this;
}
第一个参数被设置为null,我们不用关心它。第二参数就是这个handler 的名字。看代码可知,在添加一个handler之前,需要调用checkMultiplicity()方法来确定新添加的handler 名字是否与已添加的handler 名字重复。
如果我们调用的是如下的addLast()方法:ChannelPipeline addLast(ChannelHandler... handlers);那么Netty 就会调用generateName()方法为新添加的handler 自动生成一个默认的名字:
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
而generateName()方法会接着调用generateName0()方法来实际生成一个新的handler 名字:
private static String generateName0(Class<?> handlerType) {
return StringUtil.simpleClassName(handlerType) + "#0";
}
其实默认命名的规则很简单,就是用反射获取handler 的simpleName 加上"#0",因此我们自定义ChatClientHandler 的名字就是"ChatClientHandler#0"。
前面章节中,我们已经知道AbstractChannelHandlerContext 中有inbound 和outbound 两个boolean 变量,分别用于标识Context 所对应的handler 的类型,即:
这里大家肯定还有很多疑惑,不知道这两个字段到底有什么作用? 这还要从ChannelPipeline 的事件传播类型说起。Netty 中的传播事件可以分为两种:Inbound 事件和Outbound 事件。如下是从Netty 官网针对这两个事件的说明:
从上图可以看出,inbound 事件和outbound 事件的流向是不一样的:
Inbound 事件传播方法有:
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext var1) throws Exception;
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
void channelActive(ChannelHandlerContext var1) throws Exception;
void channelInactive(ChannelHandlerContext var1) throws Exception;
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
Outbound 事件传播方法有:
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void read(ChannelHandlerContext var1) throws Exception;
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
void flush(ChannelHandlerContext var1) throws Exception;
}
通过上面的代码和介绍,我们应该发现了这个规律:inbound 类似于是事件回调(响应请求的事件),而outbound 类似于主动触发(发起请求的事件)。注意,如果我们捕获了一个事件,并且想让这个事件继续传递下去,那么需要调用Context 对应的传播方法 fireXXX,例如:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接成功");
ctx.fireChannelActive();
}
}
Outbound 事件都是请求事件(request event),即请求某件事情的发生,然后通过Outbound 事件进行通知。Outbound 事件的传播方向是tail -> customContext -> head。我们接下来以connect 事件为例,分析一下Outbound 事件的传播机制。首先,当用户调用了Bootstrap 的connect()方法时,就会触发一个Connect 请求事件,我们就发现AbstractChannel 的connect()其实由调用了DefaultChannelPipeline 的connect()方法:
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
而pipeline.connect()方法的实现如下:
public final ChannelFuture connect(SocketAddress remoteAddress) {
return tail.connect(remoteAddress);
}
从上面我们可以看到,当outbound 事件(这里是connect 事件)传递到Pipeline 后,它其实是以tail 为起点开始传播的。而tail.connect()其实调用的是AbstractChannelHandlerContext 的connect()方法:
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);return promise;
}
findContextOutbound()方法顾名思义,它的作用是以当前Context 为起点,向Pipeline 中的Context 双向链表的前端寻找第一个outbound 属性为true 的Context(即关联ChannelOutboundHandler 的Context),然后返回。
findContextOutbound()方法代码实现如下:
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
为什么这么说呢,上面代码中的“while ((ctx.executionMask & mask) == 0);”难以理解,我在网上查找了其他版本,其他版本就好理解了:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
当我们找到了一个outbound 的Context 后,就调用它的invokeConnect()方法,这个方法中会调用Context 其关联的ChannelHandler 的connect()方法:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
如果用户没有重写ChannelHandler 的connect()方法,那么会调用ChannelOutboundHandlerAdapter 的connect()实现:
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
我们看到,ChannelOutboundHandlerAdapter 的connect()仅仅调用了ctx.connect(),而这个调用又回到了:Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect这样的循环中,直到connect 事件传递到DefaultChannelPipeline 的双向链表的头节点,即head 中。为什么会传递到head 中呢?回想一下,head 实现了ChannelOutboundHandler,因此它的outbound 属性是true。因为head 本身既是一个ChannelHandlerContext,又实现了ChannelOutboundHandler 接口,因此当connect()消息传递到head 后,会将消息转递到对应的ChannelHandler 中处理,而head 的handler()方法返回的就是head 本身:
public ChannelHandler handler() {
return this;
}
因此最终connect()事件是在head 中被处理。以下是head 的connect()事件处理逻辑如下:
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
到这里, 整个connect()请求事件就结束了。下图中描述了整个connect()请求事件的处理过程:
Inbound 事件和Outbound 事件的处理过程是类似的,只是传播方向不同。Inbound 事件是一个通知事件,即某件事已经发生了,然后通过Inbound 事件进行通知。Inbound 通常发生在Channel的状态的改变或IO 事件就绪。Inbound 的特点是它传播方向是head -> customContext -> tail。上面我们分析了connect()这个Outbound 事件,那么接着分析connect()事件后会发生什么Inbound 事件,并最终找到Outbound 和Inbound 事件之间的联系。当connect()这个Outbound 传播到unsafe 后,其实是在AbstractNioChannel类中类AbstractNioUnsafe的connect()方法中进行处理的:
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
}
}
在AbstractNioUnsafe 的connect()方法中,首先调用doConnect()方法进行实际上的Socket 连接,当连接上后会调用fulfillConnectPromise()方法:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
......
if (!wasActive && active) {
pipeline().fireChannelActive();
}
......
}
我们看到,在fulfillConnectPromise()中,会通过调用pipeline().fireChannelActive()方法将通道激活的消息(即Socket 连接成功)发送出去。而这里,当调用pipeline.fireXXX 后,就是Inbound 事件的起点。因此当调用pipeline().fireChannelActive()后,就产生了一个ChannelActive Inbound 事件,我们就从这里开始看看这个Inbound事件是怎么传播的?
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
果然, 在fireChannelActive()方法中,调用的是head.invokeChannelActive(),因此可以证明Inbound 事件在Pipeline中传输的起点是head。那么,在head.invokeChannelActive()中又做了什么呢?
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
接下去的调用流程是invokeChannelActive():
private void invokeChannelActive() {
if (this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelActive(this);
} catch (Throwable var2) {
this.notifyHandlerException(var2);
}
} else {
this.fireChannelActive();
}
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
this.readIfIsAutoRead();
}
public ChannelHandlerContext fireChannelActive() {
AbstractChannelHandlerContext next = this.findContextInbound();
invokeChannelActive(next);
return this;
}
上面的代码应该很熟悉了。回想一下在Outbound 事件(例如connect()事件)的传输过程中时,我们也有类似的操作:
invokeChannelActive()方法源码如下:
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
这个方法和Outbound 的对应方法(如:invokeConnect()方法)如出一辙。与Outbound 一样,如果用户没有重写channelActive() 方法,那就会调用ChannelInboundHandlerAdapter 的channelActive()方法:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
同样地, 在ChannelInboundHandlerAdapter 的channelActive()中,仅仅调用了ctx.fireChannelActive()方法,因此就会进入Context.fireChannelActive() -> Connect.findContextInbound() -> nextContext.invokeChannelActive() ->nextHandler.channelActive() -> nextContext.fireChannelActive()这样的循环中。同理,tail 本身既实现了ChannelInboundHandler 接口,又实现了ChannelHandlerContext 接口,因此当channelActive()消息传递到tail 后,会将消息转递到对应的ChannelHandler 中处理,而tail 的handler()返回的就是tail 本身:
public ChannelHandler handler() {
return this;
}
因此channelActive Inbound 事件最终是在tail 中处理的,我们看一下它的处理方法:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
TailContext 的channelActive()方法是空的。如果大家自行查看TailContext 的Inbound 处理方法时就会发现,它们的实现都是空的。可见,如果是Inbound,当用户没有实现自定义的处理器时,那么默认是不处理的。下图描述了Inbound事件的传输过程:
Outbound 事件总结:
Inbound 事件总结:
outbound 和inbound 事件设计上十分相似,并且Context 与Handler 直接的调用关系也容易混淆,因此我们在阅读这里的源码时,需要特别的注意。
ChannelHandlerContext:
每个ChannelHandler 被添加到ChannelPipeline 后,都会创建一个ChannelHandlerContext 并与之创建的ChannelHandler 关联绑定。ChannelHandlerContext 允许ChannelHandler 与其他的ChannelHandler 实现进行交互。ChannelHandlerContext 不会改变添加到其中的ChannelHandler,因此它是安全的。下图描述了ChannelHandlerContext、ChannelHandler、ChannelPipeline 的关系:
Netty 有一个简单但强大的状态模型,并完美映射到ChannelInboundHandler 的各个方法。下面是Channel 生命周期中四个不同的状态:
一个Channel 正常的生命周期如下图所示。随着状态发生变化相应的事件产生。这些事件被转发到ChannelPipeline中的ChannelHandler 来触发相应的操作。
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- gamedaodao.com 版权所有 湘ICP备2022005869号-6
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务