1. Java Future
FutureTask
在说FutureTask之前先说下Task,对于Java语言,Task代表一个可执行任务,而一个可执行任务往往是继承自Runnable接口或Future接口
假设如果我们直接交由一个线程执行Runnable,那么我们既不能停止这个任务也不能看到当前任务得到当前任务是否完成,如果完成我们也得不到执行结果,因此Java就抽象出一个接口,这个接口叫Future
Future表示一个异步执行的情况的结果,我们可以对这个cancel还可以get结果还可以判断它是否执行完成或是否取消等
FutureTask就是结合了Runnable和Future,即对一个可运行的任务,可以执行上述说的,查询结果或查看状态。
2. Netty Future
基本的Java Future前面已经说了,但基本的Java Future接口存在问题,表面上是异步,但实际想在执行完成后做些什么还得是同步的,即需要阻塞等待执行完成。也即传统的Java Future其问题是虽然可以得到当前执行的情况和获得执行的结果,但都不是真正上的异步,因此Netty扩充了Java Future的Futute功能,其补充功能如下:
//判断当前执行是否成功
boolean isSuccess();
//判断当前执行是否取消Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
boolean isCancellable();
//判断当前任务是否出错,如果出错返回出错信息
Throwable cause();
//添加监听器,在任务执行完成时回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//移除监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//同步操作,会一直等到任务执行完成,如果任务执行失败则抛出失败原因
Future<V> sync() throws InterruptedException;
//同步操作,等待任务完成,如果任务中断会抛出异常
Future<V> await() throws InterruptedException;
//直接返回结果,不阻塞,如果任务未执行完会返回null
V getNow();
与Java Future最大的不同是Netty的Future多了监听器这一事件,监听器本质是一种回调,用于在任务完成时进行回调。
通常而言,异步和回调是成对出现的,也即我们不仅要异步的执行任务,还要通过回调函数在任务执行完后处理结果
ChannelFuture在Futute的基础上提供了channel()
的操作,也即ChannelFuture认为,其事件必然会绑定一个Channel,通过channel()
可以获得当前ChannelFuture绑定的Channel
不同的是,Netty不仅引入了Future,还引入了异步思想下的Promise
我们之前知道,Netty的future与java的不同有一点是包含isSuccess方法,也即Java的future是只关心干没干完的,到底成没成功我不知道,你别问我,但netty的future是可以返回这个任务是否成功的,因此我们就需要一个对当前任务是否成功的状态设置的功能,这就是Promise接口
首先Promise继承自netty的future,也即promise也具有future的这些功能,比如获得结果,注册监听器等
其次,promise还具有对结果的设置能力
//设置结果为成功
Promise<V> setSuccess(V result);
//设置结果为失败
Promise<V> setFailure(Throwable cause);
换言之,promise比futute强的地方是可以设置结果的成功或失败,这样在获取结果的时候就可以通过成功或失败来判断而不是是否完成来判断了
Netty下的ChannelPromise继承自ChannelFuture和Promise
ChannelPromise的一个默认实现类是DefaultChannelPromise
但DefaultChannelPromise又继承自DefaultPromise,这个类是一个很关键的类,它实现了很多我们上面说的接口,我们可以看下DefaultPromise做了什么
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private static final Object SUCCESS = new Object();
private static final Object UNCANCELLABLE = new Object();
private volatile Object result;
private Object listeners;
}
setSuccess操作:
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
setFailure操作:
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
setValue0:
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
首先
可以看到setSuccess与setFailure操作相同就是将返回的结果设置进result字段,不同的是setSuccess设置的是一个V,而setFailure设置的是一个异常类CauseHolder
,CauseHolder
的封装很简单:
private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}
当我们使用get操作阻塞获得执行结果的时候
@Override
public V get() throws InterruptedException, ExecutionException {
Object result = this.result;
if (!isDone0(result)) {
await();
result = this.result;
}
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
Throwable cause = cause0(result);
if (cause == null) {
return (V) result;
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
会先判断当前是否完成,如果未完成就挂起等待完成,等到挂起被唤醒就将result返回(此时被唤醒的节点就是当前结果已返回),如果当前已返回,就将result返回,如果运行结果是异常,就抛出这个异常
在回到setSuccess与setFailure
if (checkNotifyWaiters()) {
notifyListeners();
}
setValue0里的的这句话有两个意义,首先是唤醒waiters,其次是执行监听器
waiters是什么?其实就是等待的线程,我们可以再回来看get的内容:
if (!isDone0(result)) {
await();
result = this.result;
}
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}
private void decWaiters() {
--waiters;
}
可以看到如果当前任务还在运行未返回,就会执行incWaiters和wait操作
其中incWaiters就是将当前属性里的waiters数量+1,但是这里会判断waiters的总数量,同时netty会会控制waiters的数量,如果大于Short.MAX(32767),就会返回异常,即不让那么多线程等待。
后面的wait就比较简单了,就是Object下实现的wait函数。
当一旦被放行后,代表当前线程不必等待,此时就将waiters--
可以看到这其实就是我们常见的锁,锁内有个队列标识当前阻塞的线程。
这里需要补充一点的是,很多同学看到
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
synchronized
关键字,同时在synchronized
内又加了wait操作,既然wait了岂不是类似于死锁,其他所有线程无法进入synchronized
,这里其实很简单,就是JDK在Object.wait的时候会释放synchronized
,这个释放是JVM自动做的,大家不用担心。
我们再回来看
if (checkNotifyWaiters()) {
notifyListeners();
}
这两段代码:
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
这里很简单,就是放行所有的阻塞线程。实现方式就是Object的notifyAll。
如果还不了解synchronized与await/notify的使用方式,读者可以类比Lock与Condition
这就是执行回调的内容了
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
首先我们这里看的是ChannelPromise,对Netty有了解的同学知道Netty将线程分为IO线程和非IO线程
DefaultPromise的构造方法就需要传入一个EventExecutor,代表后续这些回调由谁来执行
public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}
而executor()
就是返回当前之前在构造方法处传入的EventExecutor,我们先假设这是个IO线程
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
这里就是回调执行的函数了,首先我们需要知道在Netty中IO线程会立即执行当前任务,因此这里回调是立即执行的
stackDepth
这一信息是Netty为避免回调过深而加的限制,默认最大回调深度是8,大于8的回调不再处理,应该是为了避免栈溢出吧(异步应该不会才对)
实际的执行回调的方法是notifyListenersNow
notifyingListeners
这个属性是用来标识当前是否正在进行通知操作,初始化的时候是false,如果为true代表当前正在通知或已经通知过了,没必要再通知。
实际通知的函数是notifyListener0
,这个函数很简单,就是执行listener里的operationComplete
,我们的回调函数就是继承listener然后里面去写operationComplete函数。
当然对于我们常用的sync,也是如此rethrowIfFailed主要是判断结果是否异常,如果异常会抛出异常
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
监听器的注册也很简单:
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
只是将它赋值给了ChannelPromise内的listeners属性,不过netty允许对一个事件注册多个监听器,因此listeners是一个Object。
有了上面那些内容后,其实就不难理解Netty整套异步流程的实现方式了:
首先netty通过Future接口和Promise接口规定了所有异步需要做的工作,主要就是可以注册监听器异步回调也可以阻塞等待获得结果。
而ChannelPromise基本实现了这些功能,首先内部的listeners用来盛放当前注册的监听器,setSuccess和setFailure方法用于保证在任务执行完后将执行结果通知future,并且去唤醒那些阻塞等待的线程以及唤醒回调那些注册的监听器。
ChannelPromise内部有一个result字段,这个字段是个Object,用于存放结果,如果成功就装成功的结果,如果失败就存放一个异常,ChannelPromise中的失败结果以异常存放。
get()和await()以及sync()操作会阻塞直到当前任务执行完,阻塞的方式是通过synchronized和Object.wait实现,而唤醒是通过Object.notifyAll()实现。