1

I'm receiving a sequence of messages, and I want to process them in their sequential order. Each message has a sequence number. There's a pool of threads receiving them. I want to put them into a blocking queue like a PriorityBlockingQueue, and read them in the right order, blocking until the next consecutive message is available.

E.g. given this code:

ConsecutiveBlockingQueue<Integer> q = new ConsecutiveBlockingQueue<>();

new Thread (()->{ q.put(0); q.put(2); }).start();
new Thread (()->{ q.put(1); q.put(3); }).start();

ArrayList<Integer> ordered = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
    ordered.add(q.take());
}
System.out.println(ordered);

I want it to print [0, 1, 2, 3]

3
  • Yes, that's what PriorityBlockingQueue does. Your question? Commented Mar 9, 2021 at 6:13
  • @user207421 That's not what PriorityBlockingQueue does. PriorityBlockingQueue only blocks when it's empty or full. I need a queue that also blocks after 0 and 1 are taken, 3 is in it, but 2 is missing. If you add a Tread.sleep to t2 in my example, you will see that the result will not be [0, 1, 2, 3] Commented Mar 9, 2021 at 7:48
  • There's nothing in the JDK that does what you want, as far I as know. There may be a library out there though. When it comes right down to it, however, it shouldn't be too difficult to implement this yourself. At least not if you only want basic behavior. You could even make use of PriorityQueue internally. Commented Mar 9, 2021 at 8:44

1 Answer 1

0

Here's a minimally tested class that seems to do what I want. Comments welcome.

package com.ciphercloud.sdp.common;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToIntFunction;

public class ConsecutiveBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final ToIntFunction <E> ixFunction;
    // blocking queue for consecutive items. Take operations will look into this queue
    LinkedBlockingQueue <E> bQueue = new LinkedBlockingQueue<>();

    // buffering/ordering queue for items that are out of sequence
    PriorityQueue <E> pQueue = new PriorityQueue<>();

    ReentrantLock lock = new ReentrantLock();

    private int nextIx;

    ConsecutiveBlockingQueue(ToIntFunction <E> ixFunction) {
        this(0, ixFunction);
    }

    ConsecutiveBlockingQueue(int startIx, ToIntFunction <E> ixFunction) {
        nextIx = startIx;
        this.ixFunction = ixFunction;
    }

    @Override
    public Iterator <E> iterator() {
        return bQueue.iterator();
    }

    @Override
    public int size() {
        return bQueue.size();
    }

    protected BlockingQueue <E> delegate() {
        return bQueue;
    }

    @Override
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

    @Override
    public int drainTo(Collection <? super E> c) {
        return bQueue.drainTo(c);
    }

    @Override
    public int drainTo(Collection <? super E> c, int maxElements) {
        return bQueue.drainTo(c, maxElements);
    }

    @Override
    public void put(E e) {
        offer(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        offer(e);
        return true;
    }

    @Override
    public boolean offer(E e) {
        lock.lock();
        try {
            if (ixFunction.applyAsInt(e) == nextIx) {
                // offered item is the next consecutive expected one
                // put it directly into the blocking queue
                bQueue.offer(e);
                nextIx++;

                // if there are any buffered items in the pQueue, move them
                // into the blocking queue while they follow consecutively
                while(true) {
                    E next = pQueue.peek();
                    if(next == null || ixFunction.applyAsInt(next) != nextIx) {
                        // no more items in pQueue, or next item is not consecutive
                        break;
                    }
                    pQueue.poll();
                    bQueue.offer(next);
                    nextIx++;
                }
            } else {
                // offered item is not consecutively next. Buffer it in pQueue
                pQueue.offer(e);
            }
        } finally {
            lock.unlock();
        }

        return true;
    }


    @Override
    public E take() throws InterruptedException {
        return bQueue.take();
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return bQueue.poll(timeout, unit);
    }


    @Override
    public E poll() {
        return bQueue.poll();
    }

    @Override
    public E peek() {
        return bQueue.peek();
    }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.