What is BlockingQueue in Java Concurrent package ? How it works ?

The Java BlockingQueue interface in the java.util.concurrent package is a queue which is thread safe. It supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when inserting an element. BlockingQueue implementations are designed to be used primarily for producer-consumer queues. A BlockingQueue is typically used to have one thread produce objects, which another thread consumes. Here is a diagram that illustrates this principle:

The producing thread will keep producing new objects and insert them into the queue, until the queue reaches some upper bound known as remainingCapacity or it's upper limit, in other words. If the blocking queue reaches its upper limit, the producing thread is blocked while trying to insert the new object. It remains blocked until a consuming thread takes an object out of the queue. So, BlockingQueue is always capacity bounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking. A BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer.MAX_VALUE.

The consuming thread keeps taking objects out of the blocking queue, and processes the items. If the consuming thread tries to take an object out of an empty queue, the consuming thread is blocked until a producing thread puts an object into the queue.

BlockingQueue Methods

BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future.

1. Throws Exception:  If the attempted operation is not possible immediately, an exception is thrown.

2. Special Value: If the attempted operation is not possible immediately, a special value is returned (often true / false).

3. Blocks: If the attempted operation is not possible immediately, the method call blocks until it is.

4. Times Out: If the attempted operation is not possible immediately, the method call blocks until it is, but waits no longer than the given timeout. Returns a special value telling whether the operation succeeded or not (typically true / false).

Summary of BlockingQueue methods :

  Throws Exception Special Value Blocks Times Out
Insert add(o) offer(o) put(o) offer(o, timeout, timeunit)
Remove remove(o) poll() take() poll(timeout, timeunit)
Examine element() peek()    

All BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control. However, the bulk Collection operations addAll, containsAll, retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation. So it is possible, for example, for  addAll(c)  to fail (throwing an exception) after adding only some of the elements in c. A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null. A null is used as a sentinel value to indicate failure of poll operations.

Java BlockingQueue Example

Here is a Java BlockingQueue example. The example uses the LinkedBlockingQueue implementation of the BlockingQueue interface. First, the TicketProducer class which starts a Producer and produces 10 tickets. Then the TicketConsumer that will consumes the tickets.

Producer Thread

import java.util.concurrent.BlockingQueue;

/**
 * Producer Class in java.
 */
class TicketProducer implements Runnable {

    private final BlockingQueue tickets;

    public TicketProducer(BlockingQueue tickets) {
        this.tickets = tickets;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 10; i++) {
            try {
                System.out.println("Produced : " + i);
                //put/produce into tickets list
                tickets.put(i);
            } catch (InterruptedException ex) {

            }
        }
    }
}

Consumer Thread

/**
 * Consumer Class in java.
 */
class TicketConsumer implements Runnable {

    private BlockingQueue tickets;

    public TicketConsumer(BlockingQueue tickets) {
        this.tickets = tickets;
    }

    @Override
    public void run() {
        while (true) {
            try {
                //take/consume from tickets list.
                System.out.println("Ticket consumed : " + tickets.take());
            } catch (InterruptedException ex) {

            }
        }
    }
}

Main Class

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class TicketProducerConsumerMain {

    public static void main(String args[]) {
        //Creating shared object between Producer and Consumer
        BlockingQueue tickets = new LinkedBlockingQueue<>();

        TicketProducer producer = new TicketProducer(tickets);
        TicketConsumer consumer = new TicketConsumer(tickets);

        Thread ticketProducer = new Thread(producer, "TicketProducerThread");
        Thread ticketConsumer = new Thread(consumer, "TicketConsumerThread");

        ticketProducer.start();
        ticketConsumer.start();
    }
}

Ouput:

Ticket produced : 1
Ticket produced : 2
Ticket consumed : 1
Ticket produced : 3
Ticket consumed : 2
Ticket produced : 4
Ticket consumed : 3
Ticket produced : 5
Ticket consumed : 4
Ticket produced : 6
Ticket consumed : 5
Ticket produced : 7
Ticket consumed : 6
Ticket produced : 8
Ticket consumed : 7
Ticket produced : 9
Ticket consumed : 8
Ticket produced : 10
Ticket consumed : 9
Ticket consumed : 10

 

core java 12 Java Concurrent Packages 12

FOLLOW US ON LinkedIn



Explore Tutu'rself