Java 8并行流中的自定义线程池

是否可以为Java 8并行流指定自定义线程池? 我无法在任何地方找到它。

想象一下,我有一个服务器应用程序,我想使用并行流。 但是这个应用程序很大且多线程,所以我想划分它。 我不想在另一个模块的应用程序块任务的一个模块中执行运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数真实世界的情况下安全地使用并行流。

试试下面的例子。 有一些CPU密集型任务在不同的线程中执行。 这些任务利用并行流。 第一项任务被破坏,所以每一步都需要1秒(由线程休眠模拟)。 问题在于其他线程卡住了,等待中断的任务完成。 这是一个人为的例子,但想象一下servlet应用程序和某人向共享fork连接池提交长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

实际上有一个技巧是如何在特定的fork-join池中执行并行操作的。 如果您将其作为fork-join池中的任务执行,则它将停留在此处并且不会使用常见的任务。

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

技巧基于ForkJoinTask.fork,它指定:“如果适用,安排异步执行当前任务运行的池中的任务,如果不是inForkJoinPool(),则使用ForkJoinPool.commonPool()”


并行流使用默认的ForkJoinPool.commonPool ,默认情况下,由于您有处理器,所以缺省少一个线程,如Runtime.getRuntime().availableProcessors()所返回的Runtime.getRuntime().availableProcessors() (这意味着并行流使用所有处理器,因为它们也使用主线程):

对于需要单独或自定义池的应用程序,可以使用给定的目标并行性级别构建ForkJoinPool; 默认情况下,等于可用处理器的数量。

这也意味着,如果您嵌套并行流或并行启动多个并行流,它们将共享同一个池。 优点:您永远不会使用超过默认值(可用处理器数量)。 缺点:您可能无法获得分配给您启动的每个并行流的“所有处理器”(如果您碰巧有多个处理器)。 (显然你可以使用ManagedBlocker来规避这一点。)

要改变并行流的执行方式,你也可以

  • 将并行流执行提交给您自己的ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); 要么
  • 您可以使用系统属性更改公共池的大小: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")用于20个线程的目标并行度。

  • 后者在我的机器上有8个处理器的例子。 如果我运行以下程序:

    long start = System.currentTimeMillis();
    IntStream s = IntStream.range(0, 20);
    //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    s.parallel().forEach(i -> {
        try { Thread.sleep(100); } catch (Exception ignore) {}
        System.out.print((System.currentTimeMillis() - start) + " ");
    });
    

    输出是:

    215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

    所以你可以看到并行流一次处理8个项目,即它使用8个线程。 但是,如果我取消评论行的注释,则输出为:

    215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216

    这一次,并行流使用了20个线程,流中的所有20个元素都被同时处理。


    除了在您自己的forkJoinPool中触发并行计算的技巧,您还可以将该池传递给CompletableFuture.supplyAsync方法,如下所示:

    ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
        //parallel task here, for example
        range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
        forkJoinPool
    );
    
    链接地址: http://www.djcxy.com/p/79413.html

    上一篇: Custom thread pool in Java 8 parallel stream

    下一篇: threading, concurrency and parallelism on a multicore processor