java多线程4:Callable,Future,FutureTask,CompletableFuture

Callable可以理解为一个加强版的Runnable,他可以有返回值,也可以抛出异常。与线程池Executor结合使用,有两种使用方式:

1. Callable + Future

public class CallableDemo implements Callable<String> {
 
    private String name;
     
    public CallableDemo(String name) {
        this.name = name;
    }
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        CallableDemo c1 = new CallableDemo("ningyu");
        CallableDemo c2 = new CallableDemo("zhanglu");
        Future<String> future1 = executor.submit(c1);
        Future<String> future2 = executor.submit(c2);
        executor.shutdown();
        System.out.println("--------------程序会阻塞到这里--------------");
        System.out.println(future1.get());
        System.out.println(future2.get());
    }
 
    @Override
    public String call() throws Exception {
        for (int i=0; i<10; i++) {
            System.out.println(name + " is waiting, step is " + i);
            Thread.sleep(1000);
        }
        return name + " has finished, haha";
    }
}

因为future.get()方法返回的是线程的return结果,所以当线程未完成时,该方法会阻塞。

2.Callable + FutureTask


public class CallableDemo implements Callable<String> {

    private String name;
    
    public CallableDemo(String name) {
        this.name = name;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        CallableDemo c1 = new CallableDemo("ningyu");
        CallableDemo c2 = new CallableDemo("zhanglu");
        FutureTask<String> task1 = new FutureTask<String>(c1);
        FutureTask<String> task2 = new FutureTask<String>(c2);
        executor.submit(task1);
        executor.submit(task2);
        executor.shutdown();
        System.out.println("--------------程序会阻塞到这里--------------");
        System.out.println(task1.get());
        System.out.println(task2.get());
    }

    @Override
    public String call() throws Exception {
        for (int i=0; i<10; i++) {
            System.out.println(name + " is waiting, step is " + i);
            Thread.sleep(1000);
        }
        return name + " has finished, haha";
    }
}

其实Callable+Future最终也是以Callable+FutureTask的形式实现的:
在第一种方式中调用了: Future future = executor.submit(task);
那就让我们看看executor.submit(task)的源码吧:


public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);//可以看到源码中其实是在submit(Callable<T> task)内部创建了一个RunnableFuture<T>接口实现类
    execute(ftask);
    return ftask;
}

而FutureTask又是RunnableFuture的实现类,那就再看看newTaskFor(Callable callable)里面干了什么:


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

两种方式最终是同样实现的。

3.CompletableFuture

上面例子中子线程结果是通过future.get()阻塞得到的,当然也可以通过轮询future.isDone()来实现异步,但是这样会消耗CPU资源,结果还不能及时得到。CompletableFuture提供了设置事件监听回调的方式,解决该问题。

还是上面的例子,LZ不想阻塞当前线程,但是还是想在子线程完成时输出结果,可以这么做:

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture.supplyAsync(() -> {
            String name = "ningyu";
            for (int i=0; i<10; i++) { 
                System.out.println(name + " is waiting, step is " + i); 
                try { 
                    Thread.sleep(1000); 
                } 
                catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            return name + " has finished, haha"; 
        }).whenComplete((v, e) -> {
            System.out.println(v);
        });
         
        System.out.println("I can do others..");
         
        // new Thread().start()起的是User Thread,但是CompletableFuture起的是Daemon Thread,所以LZ加了个死循环保持程序存活
        while (true) {
             
        }
    }
}

下面具体介绍下CompletableFuture提供的方法:


public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)

前两个方法用法跟Future一样,都会阻塞。后面一个方法会立即返回:如果线程已经完成则返回结果,否则返回valueIfAbsent。


public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

run开头的线程没有返回值,supply开头的线程有返回值。两个方法都可以传入线程池,如果不传默认使用ForkJoinPool.commonPool()系统级公共线程池。


public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

如果第一个线程完成后需要另一个线程进行处理,则使用上面这9个方法。包含apply的方法会接收第一个线程的返回值作为输入参数,结果要返回;包含run的则不会接收上个线程的返回值,结果也不返回;包含accept的会接收上一个线程的返回值作为参数,但自己不提供返回值。没有async结尾的方法会沿用上个线程资源,async结尾的会用一个新的线程资源(如果使用的是同一个线程池,即使是async结尾的也有可能拿到同一个线程资源哦)。例子如下:

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("This is " + Thread.currentThread().getName());
            String name = "ningyu";
            for (int i=0; i<10; i++) { 
                System.out.println(name + " is waiting, step is " + i); 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            return name + " has finished, haha."; 
        }, pool).thenApplyAsync(v -> {
            System.out.println("That is " + Thread.currentThread().getName());
            return v + " But he must do another now.";
        });

        System.out.println("I can do others..");

        while (true) {
            
        }
    }
}

输出结果:

This is pool-1-thread-1
I can do others..
ningyu is waiting, step is 0
ningyu is waiting, step is 1
ningyu is waiting, step is 2
ningyu is waiting, step is 3
ningyu is waiting, step is 4
ningyu is waiting, step is 5
ningyu is waiting, step is 6
ningyu is waiting, step is 7
ningyu is waiting, step is 8
ningyu is waiting, step is 9
That is ForkJoinPool.commonPool-worker-1
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

整合两个子线程的结果调用上面的方法。例子如下:

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {        
        CompletableFuture.supplyAsync(() -> {
            for (int i=0; i<5; i++) { 
                System.out.println("first job step" + i); 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } return "first result"; 
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            for (int i=0; i<3; i++) { 
                System.out.println("second job step" + i); 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            return "second result"; 
        }), (r1, r2) -> r1 + " and " + r2).thenAccept(r -> System.out.println(r));
        
        while (true) {
            
        }
    }
}

输出结果:

first job step0
second job step0
second job step1
first job step1
first job step2
second job step2
first job step3
first job step4
first result and second result

可以看出,整合的两个子线程其实是同步执行的。

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

当future完成或者抛出异常的时候回调用上面的方法。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf是等待所有任务完成,构造后CompletableFuture完成
anyOf是只要有一个任务完成,构造后CompletableFuture就完成

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "first result";
        });
        CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "second result";
        });
        
        CompletableFuture.anyOf(new CompletableFuture[] {first, second}).whenComplete((v, e) -> {
            System.out.println(v);
        });
        
        CompletableFuture.allOf(new CompletableFuture[] {first, second});
        
        while (true) {
            
        }
    }
}

anyOf会输出second result,allOf在两个子线程完成之后才会被调用。

1 Reply to “java多线程4:Callable,Future,FutureTask,CompletableFuture”

发表评论