Callable可以理解为一个加强版的Runnable,他可以有返回值,也可以抛出异常。与线程池Executor结合使用,有两种使用方式:
1. Callable + Future
public class CallableDemo implements Callable<String> { private String name; public CallableDemo(String name) { this.name = name; } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(2); CallableDemo c1 = new CallableDemo("ningyu"); CallableDemo c2 = new CallableDemo("zhanglu"); Future<String> future1 = executor.submit(c1); Future<String> future2 = executor.submit(c2); executor.shutdown(); System.out.println("--------------程序会阻塞到这里--------------"); System.out.println(future1.get()); System.out.println(future2.get()); } @Override public String call() throws Exception { for (int i=0; i<10; i++) { System.out.println(name + " is waiting, step is " + i); Thread.sleep(1000); } return name + " has finished, haha"; } }
因为future.get()方法返回的是线程的return结果,所以当线程未完成时,该方法会阻塞。
2.Callable + FutureTask
public class CallableDemo implements Callable<String> { private String name; public CallableDemo(String name) { this.name = name; } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(2); CallableDemo c1 = new CallableDemo("ningyu"); CallableDemo c2 = new CallableDemo("zhanglu"); FutureTask<String> task1 = new FutureTask<String>(c1); FutureTask<String> task2 = new FutureTask<String>(c2); executor.submit(task1); executor.submit(task2); executor.shutdown(); System.out.println("--------------程序会阻塞到这里--------------"); System.out.println(task1.get()); System.out.println(task2.get()); } @Override public String call() throws Exception { for (int i=0; i<10; i++) { System.out.println(name + " is waiting, step is " + i); Thread.sleep(1000); } return name + " has finished, haha"; } }
其实Callable+Future最终也是以Callable+FutureTask的形式实现的:
在第一种方式中调用了: Future future = executor.submit(task);
那就让我们看看executor.submit(task)的源码吧:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task);//可以看到源码中其实是在submit(Callable<T> task)内部创建了一个RunnableFuture<T>接口实现类 execute(ftask); return ftask; }
而FutureTask又是RunnableFuture的实现类,那就再看看newTaskFor(Callable callable)里面干了什么:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
两种方式最终是同样实现的。
3.CompletableFuture
上面例子中子线程结果是通过future.get()阻塞得到的,当然也可以通过轮询future.isDone()来实现异步,但是这样会消耗CPU资源,结果还不能及时得到。CompletableFuture提供了设置事件监听回调的方式,解决该问题。
还是上面的例子,LZ不想阻塞当前线程,但是还是想在子线程完成时输出结果,可以这么做:
public class FutureDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture.supplyAsync(() -> { String name = "ningyu"; for (int i=0; i<10; i++) { System.out.println(name + " is waiting, step is " + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return name + " has finished, haha"; }).whenComplete((v, e) -> { System.out.println(v); }); System.out.println("I can do others.."); // new Thread().start()起的是User Thread,但是CompletableFuture起的是Daemon Thread,所以LZ加了个死循环保持程序存活 while (true) { } } }
下面具体介绍下CompletableFuture提供的方法:
public T get() public T get(long timeout, TimeUnit unit) public T getNow(T valueIfAbsent)
前两个方法用法跟Future一样,都会阻塞。后面一个方法会立即返回:如果线程已经完成则返回结果,否则返回valueIfAbsent。
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
run开头的线程没有返回值,supply开头的线程有返回值。两个方法都可以传入线程池,如果不传默认使用ForkJoinPool.commonPool()系统级公共线程池。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
如果第一个线程完成后需要另一个线程进行处理,则使用上面这9个方法。包含apply的方法会接收第一个线程的返回值作为输入参数,结果要返回;包含run的则不会接收上个线程的返回值,结果也不返回;包含accept的会接收上一个线程的返回值作为参数,但自己不提供返回值。没有async结尾的方法会沿用上个线程资源,async结尾的会用一个新的线程资源(如果使用的是同一个线程池,即使是async结尾的也有可能拿到同一个线程资源哦)。例子如下:
public class FutureDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService pool = Executors.newFixedThreadPool(2); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("This is " + Thread.currentThread().getName()); String name = "ningyu"; for (int i=0; i<10; i++) { System.out.println(name + " is waiting, step is " + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return name + " has finished, haha."; }, pool).thenApplyAsync(v -> { System.out.println("That is " + Thread.currentThread().getName()); return v + " But he must do another now."; }); System.out.println("I can do others.."); while (true) { } } }
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
This is pool-1-thread-1 I can do others.. ningyu is waiting, step is 0 ningyu is waiting, step is 1 ningyu is waiting, step is 2 ningyu is waiting, step is 3 ningyu is waiting, step is 4 ningyu is waiting, step is 5 ningyu is waiting, step is 6 ningyu is waiting, step is 7 ningyu is waiting, step is 8 ningyu is waiting, step is 9 That is ForkJoinPool.commonPool-worker-1 |
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
整合两个子线程的结果调用上面的方法。例子如下:
public class FutureDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture.supplyAsync(() -> { for (int i=0; i<5; i++) { System.out.println("first job step" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return "first result"; }).thenCombine(CompletableFuture.supplyAsync(() -> { for (int i=0; i<3; i++) { System.out.println("second job step" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return "second result"; }), (r1, r2) -> r1 + " and " + r2).thenAccept(r -> System.out.println(r)); while (true) { } } }
输出结果:
1 2 3 4 5 6 7 8 9 |
first job step0 second job step0 second job step1 first job step1 first job step2 second job step2 first job step3 first job step4 first result and second result |
可以看出,整合的两个子线程其实是同步执行的。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
当future完成或者抛出异常的时候回调用上面的方法。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf是等待所有任务完成,构造后CompletableFuture完成
anyOf是只要有一个任务完成,构造后CompletableFuture就完成
public class FutureDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "first result"; }); CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "second result"; }); CompletableFuture.anyOf(new CompletableFuture[] {first, second}).whenComplete((v, e) -> { System.out.println(v); }); CompletableFuture.allOf(new CompletableFuture[] {first, second}); while (true) { } } }
anyOf会输出second result,allOf在两个子线程完成之后才会被调用。
楼下是疯子。哈哈