前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

CompletableFuture 是 Java8 新增的一个超大型工具类。

为什么说它大呢?

因为它实现了 Future 接口,而更重要的是,它也实现了 CompletionStage 接口。

CompletionStage 接口也是 Java8 中新增的,它拥有多达约 40 种方法!

是的,你没有看错,这看起来完全不符合设计中所谓的“单方法接口”原则,但是在这里,它就这么存在了。

这个接口拥有如此众多的方法,是为函数式编程中的流式调用准备的。

通过 Completionstage 接口,我们可以在个执行结果上进行多次流式调用,以此可以得到最终结果。

比如,你可以在一个 CompletionStage 接口上进行如下调用:

stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun (()-> System.out.println());

这一连串的调用就会依次执行。

1、完成了就通知我

CompletableFuture 和 Future 一样,可以作为函数调用的契约。

向 CompletableFuture 请求一个数据,如果数据还没有准备好,请求线程就会等待。

而让人惊喜的是,我们可以手动设置 CompletableFuture 的完成状态。

package com.shockang.study.java.concurrent.completable;

import java.util.concurrent.CompletableFuture;

/**
 * 脱离线程池的使用,仅作为一个契约
 *
 * @author Shockang
 */
public class CFutureMain1 {
	public static class AskThread implements Runnable {
		CompletableFuture<Integer> re = null;

		public AskThread(CompletableFuture<Integer> re) {
			this.re = re;
		}

		@Override
		public void run() {
			int myRe = 0;
			try {
				myRe = re.get() * re.get();
			} catch (Exception e) {
			}
			System.out.println(myRe);
		}
	}

	public static void main(String[] args) throws InterruptedException {
		final CompletableFuture<Integer> future = new CompletableFuture<>();
		new Thread(new AskThread(future)).start();
		// 模拟长时间其他调用
		Thread.sleep(1000);
		// 告知完成结果
		future.complete(60);
	}
}

3600

2、异步执行任务

通过 CompletableFuture 提供的进一步封装,我们很容易实现 Future 模式那样的异步调用。

package com.shockang.study.java.concurrent.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * 完成普通future的工作
 * <p>
 * 以下几个函数可以执行(创建)一个CompletableFuture任务
 * static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
 * static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
 * static CompletableFuture<Void> runAsync(Runnable runnable);
 * static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
 *
 * @author Shockang
 */
public class CFutureMain2 {
	public static Integer calc(Integer para) {
		try {
			// 模拟一个长时间的执行
			Thread.sleep(1000);
		} catch (InterruptedException e) {
		}
		return para * para;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		final CompletableFuture<Integer> future =
				CompletableFuture.supplyAsync(() -> calc(50));
		System.out.println(future.get());
	}
}

2500

在 CompletableFuture 中,类似的工厂方法如下所示。

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} with
     * the value obtained by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor with the value obtained
     * by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} after
     * it runs the given action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor after it runs the given
     * action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

其中 supplyAsync() 方法用于那些需要返回值的场景,比如计算某个数据等。

而 runAsync() 方法用于没有返回值的场景,比如,仅仅是简单地执行某一个异步动作。

在这两对方法中,都有一个方法可以接收一个 Executor 参数。

这就使我们可以让 Supplier< U > 或者 Runnable 在指定的线程池中工作。

如果不指定,则在默认的系统公共的 ForkJoinPool.common 线程池中执行。

注意:在 Java8 中,新増了 ForkJoinPool.commonPool() 方法。 它可以获得一个公共的 ForkJoin 线程池。 这个公共线程池中的所有线程都是 Daemon 线程。 这意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。

3、流式调用

在前文中我已经提到, CompletionStage 的 40 个接口是为函数式编程做准备的。

在这里,就让我们看一下,如何使用这些接口进行函数式的流式 API 调用。

package com.shockang.study.java.concurrent.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * 完成普通future的工作
 * <p>
 * 以下几个函数可以执行(创建)一个CompletableFuture任务
 * thenApply 转换
 * thenAccept 最后处理
 *
 * @author Shockang
 */
public class CFutureMain3 {
	public static Integer calc(Integer para) {
		try {
			// 模拟一个长时间的执行
			Thread.sleep(1000);
		} catch (InterruptedException e) {
		}
		return para * para;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<Void> fu = CompletableFuture.supplyAsync(() -> calc(50))
				.thenApply((i) -> Integer.toString(i))
				.thenApply((str) -> "\"" + str + "\"")
				.thenAccept(System.out::println);
		fu.get();
	}
}

"2500"

4、异常处理

如果 CompletableFuture 在执行过程中遇到异常,那么我们可以用函数式编程的风格来优雅地处理这些异常。

CompletableFuture 提供了一个异常处理方法 exceptionally() :

package com.shockang.study.java.concurrent.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * 完成普通future的工作
 * <p>
 * exceptionally 异常处理 发生异常进行处理,如果没有异常,则它返回原有的结果
 *
 * @author Shockang
 */
public class CFutureMain4 {

	public static Integer calc(Integer para) {
		return para / 0;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<Void> fu = CompletableFuture
				.supplyAsync(() -> calc(50))
				.exceptionally(ex -> {
					System.out.println(ex.toString());
					return 0;
				})
				.thenApply((i) -> Integer.toString(i))
				.thenApply((str) -> "\"" + str + "\"")
				.thenAccept(System.out::println);
		fu.get();
	}

}
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
"0"

5、组合多个 CompletableFuture

CompletableFuture 还允许你将多个 CompletableFuture 进行组合。

thenCompose()

一种方法是使用 thenCompose() 方法,它的签名如下:

public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn)

一个 CompletableFuture 可以在执行完成后,将执行结果通过 Function 接口传递给下个 CompletionStage 实例进行处理。

Function 接口返回新的 CompletionStage 实例。

package com.shockang.study.java.concurrent.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * 完成普通future的工作
 * <p>
 * thenCompose
 *
 * @author Shockang
 */
public class CFutureMain5 {

	public static Integer calc(Integer para) {
		return para / 2;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<Void> fu =
				CompletableFuture.supplyAsync(() -> calc(50))
						.thenCompose((i) -> CompletableFuture.supplyAsync(() -> calc(i)))
						.thenApply((str) -> "\"" + str + "\"").thenAccept(System.out::println);
		fu.get();
	}

}

"12"

thenCombine()

另外一种组合多个 CompletableFuture 的方法是 thenCombine() 方法,它的签名如下:

public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

方法 thenCombine() 首先完成当前 CompletableFuture 和 other 的执行。
接着,将这两者的执行结果传递给 BiFunction (该接口接收两个参数,并有一个返回值),并返回代表 BiFunction 实例的 CompletableFuture 对象。

package com.shockang.study.java.concurrent.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * 完成普通future的工作
 * <p>
 * thenCombine 合并结果
 *
 * @author Shockang
 */
public class CFutureMain6 {

	public static Integer calc(Integer para) {
		return para / 2;
	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> calc(50));
		CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> calc(25));

		CompletableFuture<Void> fu = intFuture.thenCombine(intFuture2, (i, j) -> (i + j))
				.thenApply((str) -> "\"" + str + "\"")
				.thenAccept(System.out::println);
		fu.get();
	}

}
"37"

6、支持 timeout(JDK9)

在 JDK9 以后 CompletableFuture 增加了 timeout功能。

如果一个任务在给定时间内没有完成,则直接抛出异常。

package com.shockang.study.java.concurrent.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CFutureMain7 {

	public static Integer calc(Integer para) {
		return para / 2;
	}

	public static void main(String[] args) {
		CompletableFuture.supplyAsync(() -> {
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
			}
			return calc(50);

		}).orTimeout(1, TimeUnit.SECONDS).exceptionally(e -> {
			System.err.println(e);
			return 0;
		}).thenAccept(System.out::println);

		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
		}
	}
}

java.util.concurrent.TimeoutException
0
上一篇 下一篇