什么是 CompletionService?
当我们使用 ExecutorService 启动多个 Callable 时,每个 Callable 返回一个 Future,而当我们执行 Future 的 get 方法获取结果时,可能拿到的 Future 并不是第一个执行完成的 Callable 的 Future,就会进行阻塞,从而不能获取到第一个完成的 Callable 结果,那么这样就造成了很严重的性能损耗问题。而 CompletionService 正是为了解决这个问题,它是 Java8 的新增接口,它的实现类是 ExecutorCompletionService。CompletionService 会根据线程池中 Task 的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。
常用方法
- 构造方法:构建 ExecutorCompletionService 对象
 
- executor:关联的线程池
 
- completionQueue:自定义的结果存储队列
 
1 2
   | ExecutorCompletionService(Executor executor); ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue);
   | 
 
- submit:提交一个 Callable 或者 Runnable 类型的任务,并返回 Future
 
1 2
   | Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result);
   | 
 
- take:阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就阻塞,直到任务完成返回结果
 
1
   | Future<V> take() throws InterruptedException;
   | 
 
- poll:从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回 null,该方法不会阻塞
 
- timeout:最多等待多长时间
 
- unit:时间单位
 
1 2
   | Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit);
   | 
 
使用案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
   | public class CompletionServiceExample {     public static void main(String[] args) throws InterruptedException, ExecutionException {         ExecutorService executorService = Executors.newFixedThreadPool(2);
          List<Callable<Integer>> callables = Arrays.asList(                     ()->{                         mySleep(20);                         System.out.println("=============20 end==============");                         return 20;                     },                     ()->{                         mySleep(10);                         System.out.println("=============10 end==============");                         return 10;                     }                 );         List<Future<Integer>> futures = new ArrayList<>();                  futures.add(executorService.submit(callables.get(0)));         futures.add(executorService.submit(callables.get(1)));                  for (Future future:futures) {             System.out.println("结果: "+future.get());         }         System.out.println("============main end=============");     }          private static void mySleep(int seconds){         try {             TimeUnit.SECONDS.sleep(seconds);         } catch (InterruptedException e) {             e.printStackTrace();         }     } }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
   | public class CompletionServiceExample {     public static void main(String[] args) throws InterruptedException, ExecutionException {         ExecutorService executorService = Executors.newFixedThreadPool(2);
          List<Callable<Integer>> callables = Arrays.asList(                     ()->{                         mySleep(20);                         System.out.println("=============20 end==============");                         return 20;                     },                     ()->{                         mySleep(10);                         System.out.println("=============10 end==============");                         return 10;                     }                 );                  CompletionService completionService = new ExecutorCompletionService(executorService);                  completionService.submit(callables.get(0));         completionService.submit(callables.get(1));
                   Future<Integer> pollFuture = completionService.poll();                  System.out.println(pollFuture);                  Future<Integer> pollTimeOutFuture = completionService.poll(3,TimeUnit.SECONDS);                  System.out.println(pollTimeOutFuture);                  for(int i=0;i<callables.size();i++){             System.out.println(completionService.take().get());         }         System.out.println("============main end=============");     }          private static void mySleep(int seconds){         try {             TimeUnit.SECONDS.sleep(seconds);         } catch (InterruptedException e) {             e.printStackTrace();         }     } }
  | 
 
CompletionService 和 ExecutorService
| 特性 | 
CompletionService | 
ExecutorService | 
| 任务管理方式 | 
提供任务完成队列,任务完成后即可获取 | 
需要遍历 Future 逐个获取任务结果 | 
| 结果获取顺序 | 
按完成顺序返回任务结果 | 
按提交顺序返回 Future 需 get() | 
| 是否自动管理任务 | 
是,take()直接返回已完成任务 | 
否,需手动调用 future.get() | 
| 使用场景 | 
适用于并发批量任务,如 Web 爬虫、批量查询等 | 
适用于简单的并行任务 |