java多线程4:Callable,Future,FutureTask,CompletableFuture

Callable可以理解为一个加强版的Runnable,他可以有返回值,也可以抛出异常。与线程池Executor结合使用,有两种使用方式:

1. Callable + Future

public class CallableDemo implements Callable<String> {
 
    private String name;
     
    public CallableDemo(String name) {
        this.name = name;
    }
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        CallableDemo c1 = new CallableDemo("ningyu");
        CallableDemo c2 = new CallableDemo("zhanglu");
        Future<String> future1 = executor.submit(c1);
        Future<String> future2 = executor.submit(c2);
        executor.shutdown();
        System.out.println("--------------程序会阻塞到这里--------------");
        System.out.println(future1.get());
        System.out.println(future2.get());
    }
 
    @Override
    public String call() throws Exception {
        for (int i=0; i<10; i++) {
            System.out.println(name + " is waiting, step is " + i);
            Thread.sleep(1000);
        }
        return name + " has finished, haha";
    }
}

因为future.get()方法返回的是线程的return结果,所以当线程未完成时,该方法会阻塞。

2.Callable + FutureTask


public class CallableDemo implements Callable<String> {

    private String name;
    
    public CallableDemo(String name) {
        this.name = name;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        CallableDemo c1 = new CallableDemo("ningyu");
        CallableDemo c2 = new CallableDemo("zhanglu");
        FutureTask<String> task1 = new FutureTask<String>(c1);
        FutureTask<String> task2 = new FutureTask<String>(c2);
        executor.submit(task1);
        executor.submit(task2);
        executor.shutdown();
        System.out.println("--------------程序会阻塞到这里--------------");
        System.out.println(task1.get());
        System.out.println(task2.get());
    }

    @Override
    public String call() throws Exception {
        for (int i=0; i<10; i++) {
            System.out.println(name + " is waiting, step is " + i);
            Thread.sleep(1000);
        }
        return name + " has finished, haha";
    }
}

其实Callable+Future最终也是以Callable+FutureTask的形式实现的:
在第一种方式中调用了: Future future = executor.submit(task);
那就让我们看看executor.submit(task)的源码吧:


public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);//可以看到源码中其实是在submit(Callable<T> task)内部创建了一个RunnableFuture<T>接口实现类
    execute(ftask);
    return ftask;
}

而FutureTask又是RunnableFuture的实现类,那就再看看newTaskFor(Callable callable)里面干了什么:


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

两种方式最终是同样实现的。

3.CompletableFuture

上面例子中子线程结果是通过future.get()阻塞得到的,当然也可以通过轮询future.isDone()来实现异步,但是这样会消耗CPU资源,结果还不能及时得到。CompletableFuture提供了设置事件监听回调的方式,解决该问题。

还是上面的例子,LZ不想阻塞当前线程,但是还是想在子线程完成时输出结果,可以这么做:

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture.supplyAsync(() -> {
            String name = "ningyu";
            for (int i=0; i<10; i++) { 
                System.out.println(name + " is waiting, step is " + i); 
                try { 
                    Thread.sleep(1000); 
                } 
                catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            return name + " has finished, haha"; 
        }).whenComplete((v, e) -> {
            System.out.println(v);
        });
         
        System.out.println("I can do others..");
         
        // new Thread().start()起的是User Thread,但是CompletableFuture起的是Daemon Thread,所以LZ加了个死循环保持程序存活
        while (true) {
             
        }
    }
}

下面具体介绍下CompletableFuture提供的方法:


public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)

前两个方法用法跟Future一样,都会阻塞。后面一个方法会立即返回:如果线程已经完成则返回结果,否则返回valueIfAbsent。


public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

run开头的线程没有返回值,supply开头的线程有返回值。两个方法都可以传入线程池,如果不传默认使用ForkJoinPool.commonPool()系统级公共线程池。


public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

如果第一个线程完成后需要另一个线程进行处理,则使用上面这9个方法。包含apply的方法会接收第一个线程的返回值作为输入参数,结果要返回;包含run的则不会接收上个线程的返回值,结果也不返回;包含accept的会接收上一个线程的返回值作为参数,但自己不提供返回值。没有async结尾的方法会沿用上个线程资源,async结尾的会用一个新的线程资源(如果使用的是同一个线程池,即使是async结尾的也有可能拿到同一个线程资源哦)。例子如下:

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("This is " + Thread.currentThread().getName());
            String name = "ningyu";
            for (int i=0; i<10; i++) { 
                System.out.println(name + " is waiting, step is " + i); 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            return name + " has finished, haha."; 
        }, pool).thenApplyAsync(v -> {
            System.out.println("That is " + Thread.currentThread().getName());
            return v + " But he must do another now.";
        });

        System.out.println("I can do others..");

        while (true) {
            
        }
    }
}

输出结果:

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

整合两个子线程的结果调用上面的方法。例子如下:

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {        
        CompletableFuture.supplyAsync(() -> {
            for (int i=0; i<5; i++) { 
                System.out.println("first job step" + i); 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } return "first result"; 
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            for (int i=0; i<3; i++) { 
                System.out.println("second job step" + i); 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            return "second result"; 
        }), (r1, r2) -> r1 + " and " + r2).thenAccept(r -> System.out.println(r));
        
        while (true) {
            
        }
    }
}

输出结果:

可以看出,整合的两个子线程其实是同步执行的。

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

当future完成或者抛出异常的时候回调用上面的方法。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf是等待所有任务完成,构造后CompletableFuture完成
anyOf是只要有一个任务完成,构造后CompletableFuture就完成

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "first result";
        });
        CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "second result";
        });
        
        CompletableFuture.anyOf(new CompletableFuture[] {first, second}).whenComplete((v, e) -> {
            System.out.println(v);
        });
        
        CompletableFuture.allOf(new CompletableFuture[] {first, second});
        
        while (true) {
            
        }
    }
}

anyOf会输出second result,allOf在两个子线程完成之后才会被调用。

1 Reply to “java多线程4:Callable,Future,FutureTask,CompletableFuture”

发表评论