Custom thread pool in Java 8 parallel stream
Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.
Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.
If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of real world situations.
Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
//parallel task here, for example
IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();
The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"
The parallel streams use the default ForkJoinPool.commonPool
which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors()
(This means that parallel streams use all your processors because they also use the main thread):
For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.
This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)
To change the way parallel streams are executed, you can either
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
or System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
for a target parallelism of 20 threads. Example of the latter on my machine which has 8 processors. If I run the following program:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
The output is:
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
So you can see that the parallel stream processes 8 items at a time, ie it uses 8 threads. However, if I uncomment the commented line, the output is:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.
除了在您自己的forkJoinPool中触发并行计算的技巧,您还可以将该池传递给CompletableFuture.supplyAsync方法,如下所示:
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()),
forkJoinPool
);
链接地址: http://www.djcxy.com/p/79414.html
上一篇: 蟒蛇中的whoami
下一篇: Java 8并行流中的自定义线程池