What is PriorityBlockingQueue ? And how to use PriorityBlockingQueue ?

PriorityBlockingQueue was included in concurrent package from JDK 5 see here. It belongs to java.util.concurrent package. It is an unbounded blocking queue backed by a priority heap and implementation of BlockingQueue and it also contains the feature of PriorityQueue. PriorityBlockingQueue uses the ordering rule same as PriorityQueue and in addition supports blocking retrieval operations.The PriorityBlockingQueue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). We can not add null elements in a PriorityBlockingQueue. All the elements added in PriorityBlockingQueue should implements Comparable to ensure the natural ordering of elements. If we try to add non-comparable objects then it will throw ClassCastException.

Operations on this class make no guarantees about the ordering of elements with equal priority. If you need to enforce an ordering, you can define custom classes or comparators that use a secondary key to break ties in primary priority values.

Following are some operation of PriorityBlockingQueue:

  1. add(E e) : Adds the given element in the PriorityBlockingQueue. Here add(E e), offer(E e) and put(E e) will behave in the same way as PriorityBlockingQueue is unbounded.
  2. peek( ) : Retrieves the head of the queue if available otherwise null. It does not remove the head.
  3. poll( ): Retrieves as well remove the head of queue if available otherwise returns null.
  4. remove(Object o): Removes the specified element.
  5. take(): Retrieves and removes the head of the queue and wait if element is not available.

We are going to develop a order delivery system for an on-line shopping company. We have a PriorityBlockingQueue named orderQueue where each ordered are stored when placed. And we have a consumer that picks the order one by one from the Queue and deliver it to the end user. The orders are picked on the natural order of arriving. But in between if some order comes with higher priority (express delivery/ one day delivery as Amazon) then the consumer will ensure to pick the order on priority.

We have a Enum. Which is defining the priority of delivery for each Order. Where 1 is the NORMAL delivery, 2 is the EXPRESS delivery(within 2 days) and 3 is with highest priority SMAE_DAY_DELIVERY of the order.

package com.tuturself.pbq;

public enum PRIORITY {

 NORMAL(1), EXPRESS(2), SMAE_DAY_DELIVERY(3);

 private final Integer value;

 PRIORITY(int v) {
  value = v;
 }

 public Integer value() {
  return value;
 }

 public static PRIORITY fromValue(int v) {
  for (PRIORITY priority: PRIORITY.values()) {
   if (priority.value == v) {
    return priority;
   }
  }
  throw new IllegalArgumentException(String.valueOf(v));
 }
}

Now we have a Inventory of items, which is the inventory of our on-line shopping company.

package com.tuturself.pbq;

import java.util.Random;

public class Inventory {

 private static final String[] items = {
  "shoes",
  "hanger",
  "blanket",
  "soap",
  "sticky note",
  "sketch pad",
  "television",
  "nail file",
  "clothes"
 };

 public static String getRandomItem() {
  int index = (new Random()).nextInt(items.length);
  if (index == items.length)
   index = (items.length - 1);
  return items[index];
 }
}

Following is our Order class:

package com.tuturself.pbq;

public class Order implements Comparable < Order > {

 private Integer orderId;
 private String item;
 private String shippingAddress;
 private PRIORITY priority;


 /**
  * @param orderId
  * @param item
  * @param priority
  */
 public Order(Integer orderId, String item, PRIORITY priority) {
  super();
  this.orderId = orderId;
  this.item = item;
  this.priority = priority;
 }

 public String getShippingAddress() {
  return shippingAddress;
 }

 public void setShippingAddress(String shippingAddress) {
  this.shippingAddress = shippingAddress;
 }

 public Integer getOrderId() {
  return orderId;
 }

 public String getItem() {
  return item;
 }

 public PRIORITY getPriority() {
  return priority;
 }

 @Override
 public int compareTo(Order o) {
  return this.priority.value().compareTo(o.getPriority().value());
 }

 @Override
 public String toString() {
  return "Order [orderId=" + orderId + ", item=" + item + ", shippingAddress=" + shippingAddress + ", priority=" + priority + "]";
 }
}

Here we will take the placed order and put it in the Priority blocking Queue by AddOrderThread

package com.tuturself.pbq;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class AddOrderThread implements Runnable {

 BlockingQueue < Order > orderQueue;
 public AddOrderThread(BlockingQueue < Order > orderQueue) {
  this.orderQueue = orderQueue;
 }

 @Override
 public void run() {
  while (true) {
   try {
    Order order = getNextPalcedOrder();
    System.out.println(" ------------------------- New Order ------------------------- ");
    System.out.println("Adding to Queue: " + order);
    orderQueue.put(order);
    System.out.println(" ------------------------------------------------------------- ");
    Thread.sleep(500);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }

 /**
  * This is Just a Mock method which is returning a 
  * Order with random Data
  * @return
  */
 private Order getNextPalcedOrder() {
  Integer orderId = getRandomInRange(10000, 1);
  String item = Inventory.getRandomItem();
  Integer priority = getRandomInRange(3, 1);
  priority = priority == 0 ? PRIORITY.NORMAL.value() : priority;
  Order order = new Order(orderId, item, PRIORITY.fromValue(priority));
  order.setShippingAddress("Shipping Address");
  return order;
 }

 private int getRandomInRange(int maximum, int minimum) {
  return (new Random()).nextInt(maximum - minimum + 1) + minimum;
 }
}

Here is our consumer which takes the order from the Queue and ship the same. ShippingOrderThread

package com.tuturself.pbq;

import java.util.concurrent.BlockingQueue;

public class ShippingOrderThread implements Runnable {

 BlockingQueue < Order > orderQueue;
 public ShippingOrderThread(BlockingQueue < Order > orderQueue) {
  this.orderQueue = orderQueue;
 }

 @Override
 public void run() {
  while (true) {
   try {
    Order order = orderQueue.take();
    System.out.println(" ------------------------- SHIPPING ------------------------- ");
    System.out.println(" Order shipped " + order);
    System.out.println(" ------------------------------------------------------------- ");
    System.out.println();
    Thread.sleep(2000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }
}

Now let us test the Order shipment system.TestPriorityBlockingQueue

package com.tuturself.pbq;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TestPriorityBlockingQueue {

 public static void main(String[] args) {
  final BlockingQueue < Order > priorityBlockingQueue = new LinkedBlockingQueue < Order > ();

  AddOrderThread queueProducer = new AddOrderThread(priorityBlockingQueue);
  new Thread(queueProducer).start();

  ShippingOrderThread queueConsumer = new ShippingOrderThread(priorityBlockingQueue);
  new Thread(queueConsumer).start();
 }
}

Now let us check the output where orders are added one by one. And the Shipment system is shipping the order from the queue considering the priority of each order. Check order with orderId 2494 is shipped first as it is a EXPRESS order, while there are other orders present is the Queue with NORMAL priority.

------------------------- New Order ------------------------- 
Adding to Queue: Order [orderId=3933, item=blanket, shippingAddress=Shipping Address, priority=NORMAL]
 ------------------------------------------------------------- 
 ------------------------- SHIPPING ------------------------- 
 Order shipped Order [orderId=3933, item=blanket, shippingAddress=Shipping Address, priority=NORMAL]
 ------------------------------------------------------------- 

 ------------------------- New Order ------------------------- 
Adding to Queue: Order [orderId=2494, item=soap, shippingAddress=Shipping Address, priority=EXPRESS]
 ------------------------------------------------------------- 
 ------------------------- New Order ------------------------- 
Adding to Queue: Order [orderId=26, item=hanger, shippingAddress=Shipping Address, priority=NORMAL]
 ------------------------------------------------------------- 
 ------------------------- New Order ------------------------- 
Adding to Queue: Order [orderId=7220, item=hanger, shippingAddress=Shipping Address, priority=NORMAL]
 ------------------------------------------------------------- 
 ------------------------- SHIPPING ------------------------- 
 Highest priority item is shipped first
 Order shipped Order [orderId=2494, item=soap, shippingAddress=Shipping Address, priority=EXPRESS]
 ------------------------------------------------------------- 

 

core java 12 Java Concurrent Packages 12

FOLLOW US ON LinkedIn



Explore Tutu'rself