什么是 CompletionService?
当我们使用 ExecutorService 启动多个 Callable 时,每个 Callable 返回一个 Future,而当我们执行 Future 的 get 方法获取结果时,可能拿到的 Future 并不是第一个执行完成的 Callable 的 Future,就会进行阻塞,从而不能获取到第一个完成的 Callable 结果,那么这样就造成了很严重的性能损耗问题。而 CompletionService 正是为了解决这个问题,它是 Java8 的新增接口,它的实现类是 ExecutorCompletionService。CompletionService 会根据线程池中 Task 的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。
常用方法
- 构造方法:构建 ExecutorCompletionService 对象
- executor:关联的线程池
- completionQueue:自定义的结果存储队列
1 2
| ExecutorCompletionService(Executor executor); ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue);
|
- submit:提交一个 Callable 或者 Runnable 类型的任务,并返回 Future
1 2
| Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result);
|
- take:阻塞方法,从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就阻塞,直到任务完成返回结果
1
| Future<V> take() throws InterruptedException;
|
- poll:从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回 null,该方法不会阻塞
- timeout:最多等待多长时间
- unit:时间单位
1 2
| Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit);
|
使用案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class CompletionServiceExample { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList( ()->{ mySleep(20); System.out.println("=============20 end=============="); return 20; }, ()->{ mySleep(10); System.out.println("=============10 end=============="); return 10; } ); List<Future<Integer>> futures = new ArrayList<>(); futures.add(executorService.submit(callables.get(0))); futures.add(executorService.submit(callables.get(1))); for (Future future:futures) { System.out.println("结果: "+future.get()); } System.out.println("============main end============="); } private static void mySleep(int seconds){ try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class CompletionServiceExample { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList( ()->{ mySleep(20); System.out.println("=============20 end=============="); return 20; }, ()->{ mySleep(10); System.out.println("=============10 end=============="); return 10; } ); CompletionService completionService = new ExecutorCompletionService(executorService); completionService.submit(callables.get(0)); completionService.submit(callables.get(1));
Future<Integer> pollFuture = completionService.poll(); System.out.println(pollFuture); Future<Integer> pollTimeOutFuture = completionService.poll(3,TimeUnit.SECONDS); System.out.println(pollTimeOutFuture); for(int i=0;i<callables.size();i++){ System.out.println(completionService.take().get()); } System.out.println("============main end============="); } private static void mySleep(int seconds){ try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
CompletionService 和 ExecutorService
特性 |
CompletionService |
ExecutorService |
任务管理方式 |
提供任务完成队列,任务完成后即可获取 |
需要遍历 Future 逐个获取任务结果 |
结果获取顺序 |
按完成顺序返回任务结果 |
按提交顺序返回 Future 需 get() |
是否自动管理任务 |
是,take()直接返回已完成任务 |
否,需手动调用 future.get() |
使用场景 |
适用于并发批量任务,如 Web 爬虫、批量查询等 |
适用于简单的并行任务 |