Netty内部的io.netty.util.concurrent.Future<V> 继承自java.util.concurrent.Future<V>,而Promise<V>是前者的一个特殊实现。
Java并发编程包下提供了Future<V>接口。Future在异步编程中表示该异步操做的结果,经过Future<V>的内部方法能够实现状态检查、取消执行、获取执行结果等操做。内部的方法以下:
// 尝试取消执行
boolean cancel(boolean mayInterruptIfRunning);
// 是否已经被取消执行
boolean isCancelled();
// 是否已经执行完毕
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果或超时后返回
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
原生的Future<V>功能比较有限,Netty扩展了Future<V>并增长了如下方法:
// 判断是否执行成功
boolean isSuccess();
// 判断是否能够取消执行
boolean isCancellable();
Throwable cause();
// 增长回调方法
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 增长多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 阻塞等待,且若是失败抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 阻塞等待
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
netty中future的特色:
ps:future只有两种状态,uncompleted和completed。
completedfuture表示已经完成异步操做,该类在异步操做结束时建立,用户使用addlistener()方法提供异步操做方法。
略。
// 设置成功状态并回调
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
// 设置失败状态并回调
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();
可见,Promise<V>做为一个特殊的Future<V>,只是增长了一些状态设置方法。因此它经常使用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。
以客户端链接的注册过程为例,调用链路以下:
io.netty.bootstrap.Bootstrap.connect()
--> io.netty.bootstrap.Bootstrap.doResolveAndConnect()
---->io.netty.bootstrap.AbstractBootstrap.initAndRegister()
------>io.netty.channel.MultithreadEventLoopGroup.register()
-------->io.netty.channel.SingleThreadEventLoop.register()
一直跟踪到SingleThreadEventLoop中,会看到这段代码:
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
此处新建了一个DefaultChannelPromise,构造函数传入了当前的channel以及当前所在的线程this。从第一节的类图咱们知道,DefaultChannelPromise同时实现了Future和Promise,具备上述提到的全部方法。
而后继续将该promise传递进另一个register方法中:
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
在该register方法中,继续将promise传递到Unsafe的register方法中,而当即返回了以ChannelFuture的形式返回了该promise。显然这里是一个异步回调处理:上层的业务能够拿到返回的ChannelFuture阻塞等待结果或者设置回调方法,而继续往下传的Promise能够用于设置执行状态而且回调设置的方法。
咱们继续往下debug能够看到:
// io.netty.channel.AbstractChannel.AbstractUnsafe.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
// 若是已经注册过,则置为失败
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
// 若是线程类型不兼容,则置为失败
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
// 出现异常状况置promise为失败
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// 注册以前,先将promise置为不可取消转态
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
// promise置为成功
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
// 出现异常状况置promise为失败
safeSetFailure(promise, t);
}
}
可见,底层的I/O操做成功与否均可以经过Promise设置状态,并使得外层的ChannelFuture能够感知获得I/O操做的结果。
咱们再来看看被返回的ChannelFuture的用途:
// io.netty.bootstrap.AbstractBootstrap.java
final ChannelFuture initAndRegister() {
//...
ChannelFuture regFuture = config().group().register(channel);
// 若是异常不为null,则意味着底层的I/O已经失败,而且promise设置了失败异常
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
这里经过检查失败异常栈是否为空,能够提早检查到I/O是否失败。继续回溯,还能够看到:
// io.netty.bootstrap.AbstractBootstrap.java
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 若是注册已经成功
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 若是注册还没有完成
// ...
}
}
此处,经过ChannelFuture#isDone()方法能够知道底层的注册是否完成,若是完成,则继续进行bind操做。
可是由于注册是个异步操作,若是此时注册可能还没完成,那就会进入以下逻辑:
// io.netty.bootstrap.AbstractBootstrap.java
//...
else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
这里新建了一个新的PendingRegistrationPromise,并为原来的ChannelFuture对象添加了一个回调方法,并在回调中更改PendingRegistrationPromise的状态,并且PendingRegistrationPromise会继续被传递到上层。当底层的Promise状态被设置而且回调,就会进入该回调方法。从而将I/O状态继续向外传递。
咱们已经了解清楚了Promise和Future的异步模型。再来看看底层是如何实现的。以最经常使用的DefaultChannelPromise为例,内部很是简单,咱们主要看它的父类DefaultPromise:
// result字段的原子更新器
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 缓存执行结果的字段
private volatile Object result;
// promise所在的线程
private final EventExecutor executor;
// 一个或者多个回调方法
private Object listeners;
// 阻塞线程数量计数器
private short waiters;
以设置成功状态为例(setSuccess):
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
// 调用回调方法
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// 原子修改result字段为objResult
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
// 若是有其余线程在等待该promise的结果,则唤醒他们
notifyAll();
}
}
设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒全部在阻塞等待该promise返回结果的线程。
其余设置状态方法再也不赘言,基本上大同小异。
上文提到其余线程会阻塞等待该promise返回结果,具体实现以sync方法为例:
@Override
public Promise<V> sync() throws InterruptedException {
// 阻塞等待
await();
// 若是有异常则抛出
rethrowIfFailed();
return this;
}
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
// 若是已经完成,直接返回
return this;
}
// 能够被中断
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
//检查死循环
checkDeadLock();
synchronized (this) {
while (!isDone()) {
// 递增计数器(用于记录有多少个线程在等待该promise返回结果)
incWaiters();
try {
// 阻塞等待结果
wait();
} finally {
// 递减计数器
decWaiters();
}
}
}
return this;
}
全部调用sync方法的线程,都会被阻塞,直到promise被设置为成功或者失败。这也解释了为什么Netty客户端或者服务端启动的时候通常都会调用sync方法,本质上都是阻塞当前线程而异步地等待I/O结果返回,以下:
Bootstrap bootstrap = new Bootstrap();
ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 管道中添加基于换行符分割字符串的解析器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 管道中添加字符串编码解码器
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
// 管道中添加服务端处理逻辑
ch.pipeline().addLast(new MyClientEchoHandler());
}
}).connect("127.0.0.1", 98).sync();
future.channel().closeFuture().sync();
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
// 添加回调方法
addListener0(listener);
}
if (isDone()) {
// 若是I/O操做已经结束,直接触发回调
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
// 只有一个回调方法直接赋值
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
// 将回调方法添加到DefaultFutureListeners内部维护的listeners数组中
((DefaultFutureListeners) listeners).add(listener);
} else {
// 若是有多个回调方法,新建一个DefaultFutureListeners以保存更多的回调方法
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
从上边能够看到,添加回调方法完成以后,会当即检查promise是否已经完成;若是promise已经完成,则立刻调用回调方法。
Netty的Promise和Future机制是基于Java并发包下的Future<V>开发的。其中Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操做经过Promise改变执行状态,咱们能够经过同步等待的Future当即获得结果。
所以,就像永顺大牛标题所言,在Netty的异步模型里,Promise和Future就像是双子星通常紧密相连。但我以为这二者更像是量子纠缠里的两个电子,由于改变其中一个方的状态,另一方可以立刻感知。
至此,Promise和Future的核心原理已经分析完毕。
扩展文章:
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- gamedaodao.com 版权所有 湘ICP备2022005869号-6
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务