What is LinkedBlockingQueue ? How to use LinkedBlockingQueue ?

The LinkedBlockingQueue class implements the BlockingQueue interface. Read the BlockingQueue for more information about the interface. The LinkedBlockingQueue is an optionally-bounded blocking queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is the element that has been on the queue for the longest period of time. The tail of the queue is the element that has been on the queue for the shortest period of time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

There are 3 ways to create a LinkedBlockingQueue : 

// Creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE.
LinkedBlockingQueue();

// Creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE,
// initially containing the elements of the given collection, 
// added in traversal order of the collection's iterator.
LinkedBlockingQueue(Collection  c);

// Creates a LinkedBlockingQueue with the given (fixed) capacity.
LinkedBlockingQueue(int capacity);

The LinkedBlockingQueue keeps the elements internally in a linked structure (linked nodes). This linked structure can optionally have an upper bound if desired. If no upper bound is specified, Integer.MAX_VALUE is used as the upper bound. Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity. This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces.

Following are some important methods and their behaviour for a LinkedBlockingQueue :

a. offer(E e) : It inserts the element at the tail of queue without exceeding LinkedBlockingQueue size. On success it returns true otherwise false.
b. put(E e) : Inserts the element at the tail of the queue and waits for space if necessary.
c. peek() : It retrieves the head of the queue without deleting it and returns null if empty.
d. poll() : It retrieves and removes the head of the queue and returns null if empty.
e. remove(Object o) : Removes the specified element from the queue.
f. take() : Retrieves and removes the head of the queue and waits if necessary.

Here is how to instantiate and use a LinkedBlockingQueue

Like in our previous example about ArrayBlockingQueue, we are going to use a Producer-Consumer model in order  to check the functionality of our LinkedBlockingQueue. Here in this example we have a TicketProducer that produces 10 tickets in a LinkedBlockingQueue. And we have 2 consumer classes as TicketConsumer which consumes tickets from the queue and a TicketWatcher consumer that watches the current status of the Queue.

TicketProducer.java

package com.tuturself.pc;

import java.util.concurrent.LinkedBlockingQueue;

public class TicketProducer implements Runnable {

  private LinkedBlockingQueue queue;
  private boolean running;

  public TicketProducer(LinkedBlockingQueue queue) {
	this.queue = queue;
	running = true;
  }

  // We need to check if the producer thread is
  // Still running, and this method will return
  // the state (running/stopped).
  public boolean isRunning() {
	return running;
  }

  @Override
  public void run() {

	// We are adding tickets using put() which waits
	// until it can actually insert elements if there is
	// not space in the queue.
	for (int i = 0; i < 15; i++) {
		String element = "Ticket" + i;

		try {
			queue.put(element);
			System.out.println("Ticket added: " + element);
			Thread.sleep(1000);

		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	System.out.println("TicketProducer Completed.");
	running = false;
  }

}

TicketConsumer.java

package com.tuturself.pc;

import java.util.concurrent.LinkedBlockingQueue;

public class TicketConsumer implements Runnable {
  private LinkedBlockingQueue queue;
  private TicketProducer ticketProducer;

  public TicketConsumer(LinkedBlockingQueue queue,
		TicketProducer ticketProducer) {
	this.queue = queue;
	this.ticketProducer = ticketProducer;
  }

  @Override
  public void run() {

	// As long as the producer is running,
	// we remove ticket from the queue.
	while (ticketProducer.isRunning()) {

	  try {
		System.out.println("Removing Ticket: " + queue.take());

		Thread.sleep(2000);
	  } catch (InterruptedException e) {
		e.printStackTrace();
	  }
	}

	System.out.println("TicketConsumer completed.");
  }
}

TicketWatcher.java

package com.tuturself.pc;

import java.util.concurrent.LinkedBlockingQueue;

public class TicketWatcher implements Runnable {

  private LinkedBlockingQueue queue;
  private TicketProducer ticketProducer;

  public TicketWatcher(LinkedBlockingQueue queue, TicketProducer ticketProducer) {
	this.queue = queue;
	this.ticketProducer = ticketProducer;
  }

  @Override
  public void run() {

	// As long as the producer is running,
	// we want to check for tickets.
	while (ticketProducer.isRunning()) {
		System.out.println("Tickets right now: " + queue);

		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	System.out.println("TicketWatcher Completed.");
	System.out.println("Final Tickets in the queue: " + queue);
  }
}

BlockingQueueTest.java

package com.tuturself.pc;

import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueTest {

  public static void main(String[] args) throws InterruptedException {
	LinkedBlockingQueue queue = new LinkedBlockingQueue<>(10);

	TicketProducer producer = new TicketProducer(queue);
	TicketWatcher watcher = new TicketWatcher(queue, producer);
	TicketConsumer consumer = new TicketConsumer(queue, producer);

	Thread producerThread = new Thread(producer);
	Thread watcherThread = new Thread(watcher);
	Thread consumerThread = new Thread(consumer);

	producerThread.start();
	Thread.sleep(2000);
	consumerThread.start();
	Thread.sleep(2000);
	watcherThread.start();
  }
}

Now the Output of the program is :

Ticket added: Ticket0
Ticket added: Ticket1
Ticket added: Ticket2
Removing Ticket: Ticket0
Ticket added: Ticket3
Removing Ticket: Ticket1
Tickets right now: [Ticket2, Ticket3]
Ticket added: Ticket4
Ticket added: Ticket5
Tickets right now: [Ticket2, Ticket3, Ticket4, Ticket5]
Removing Ticket: Ticket2
Ticket added: Ticket6
Ticket added: Ticket7
Tickets right now: [Ticket3, Ticket4, Ticket5, Ticket6, Ticket7]
Removing Ticket: Ticket3
Ticket added: Ticket8
Ticket added: Ticket9
Removing Ticket: Ticket4
Tickets right now: [Ticket5, Ticket6, Ticket7, Ticket8, Ticket9]
Ticket added: Ticket10
Ticket added: Ticket11
Tickets right now: [Ticket5, Ticket6, Ticket7, Ticket8, Ticket9, Ticket10, Ticket11]
Removing Ticket: Ticket5
Ticket added: Ticket12
Ticket added: Ticket13
Tickets right now: [Ticket6, Ticket7, Ticket8, Ticket9, Ticket10, Ticket11, Ticket12, Ticket13]
Removing Ticket: Ticket6
Ticket added: Ticket14
TicketProducer Completed.
TicketWatcher Completed.
TicketConsumer completed.
Final Tickets in the queue: [Ticket7, Ticket8, Ticket9, Ticket10, Ticket11, Ticket12, Ticket13, Ticket14]

As you can see, by running 3 threads simultaneously, we took advantage of the concurrency capabilities of LinkedBlockingQueue completely. The only thing that we had to do is keep it track whether or not the Producer thread was still running, and the rest of the implementation was thread-safe by default. By checking the output you can clearly see the effect of every thread, and the final result.

core java 12 Java Concurrent Packages 12

FOLLOW US ON LinkedIn



Explore Tutu'rself