Java执行者:如何在不阻塞的情况下通知,何时完成任务?
假设我有一个队列充满了我需要提交给执行者服务的任务。 我希望他们一次处理一个。 我能想到的最简单的方法是:
不过,我试图避免完全阻止。 如果我有10,000个这样的队列,它们需要一次处理它们的任务,那么我将耗尽堆栈空间,因为它们中的大多数将持续阻塞线程。
我想提交一个任务并提供一个回调,当任务完成时会调用回调。 我将使用该回拨通知作为发送下一个任务的标志。 (functionaljava和jetlang显然使用了这种非阻塞算法,但我无法理解他们的代码)
我该如何使用JDK的java.util.concurrent来完成这项任务,而不是编写自己的执行程序服务?
(向我提供这些任务的队列本身可能会阻塞,但这是稍后需要解决的问题)
定义一个回调接口来接收你想在完成通知中传递的参数。 然后在任务结束时调用它。
你甚至可以为Runnable任务编写一个通用包装器,并将它们提交给ExecutorService
。 或者,请参阅下面有关内置于Java 8的机制。
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
借助CompletableFuture
,Java 8包含了一种更加复杂的方法来组合可以异步且有条件地完成过程的管道。 这是一个人为的但完整的通知例子。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
使用Guava的可监听未来API并添加回调。 参看 来自网站:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
在Java 8中,您可以使用CompletableFuture。 这里有一个例子,我在我的代码中使用它从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个GUI应用程序):
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
它异步执行。 我使用两个私有方法: mapUsersToUserViews
和updateView
。
上一篇: Java executors: how to be notified, without blocking, when a task completes?
下一篇: Parallel.ForEach with a custom TaskScheduler to prevent OutOfMemoryException