Starvation in ForkJoinPool with managed blocking

In my application I am submitting two kinds of tasks to the commonPool at different rates.

task-1:

ForkJoinPool.managedBlock(
//...
Uninterruptibles.putUninterruptibly(blockingQueueQithMaxSize50, "a")
//...
);

task-2:

List<String> list = Lists.newLinkedList();
ForkJoinPool.managedBlock(
//...
Queues.drainUninterruptibly(blockingQueueWithMaxSize50, list, 1, 1, SECONDS);
//...
);

In some scenarios, when the rate at which tasks of type task-1 are submitted to the pool is too high, and the blockingQueue is full, all the threads running tasks of type task-1 are blocking on the put (The number of threads is around 52). But the new tasks of type task-1 and task-2 which are still being submitted to the pool are not leading to new workers being spawned in the pool which is causing all subsequent tasks to be enqueued in the work queues leading to starvation and deadlock causing app to freeze.

Can some one please help me understand what I am doing wrong here?

After some digging I found these:

  • ForkJoinPool parallelism=1 deadlock
  • https://bugs.openjdk.java.net/browse/JDK-7035020
  • But looks like this bug was fixed in java 7 itself.

    Env:

  • JDK: oracle-j2sdk1.8 | 1.8.0+update20
  • Arch: amd64
  • OS: Debian Wheezy
  • Common pool configuration:

  • Available Processors = 2
  • Which means parallelism = 1
  • all other configs are default
  • UPDATE 1

    Some more info:

    I am submit ting the runnables to the ForkJoinPool which internally calling externalPush method:

    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
        int r = ThreadLocalRandom.getProbe();
        int ps = plock;
        WorkQueue[] ws = workQueues;
        if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                q.top = s + 1;                     // push on to deque
                q.qlock = 0;
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            q.qlock = 0;
        }
        fullExternalPush(task);
    }
    

    When I do a remote debug, the execution reaches

    q.top = s + 1; // push on to deque
    q.qlock = 0;
    if (n <= 1)
        signalWork(ws, q);
    return;
    

    But n is very large, not less than or equal to 1, so signalWork method is not being called which interally calls the tryAddWorker method.

    链接地址: http://www.djcxy.com/p/35396.html

    上一篇: 高效地实现Java本地接口摄像头馈送

    下一篇: 使用受管理的阻止在ForkJoinPool中导致饥饿