How to create an ArrayBlockingQueue ? Producer Consumer problem by ArrayBlockingQueue explained.

In this post we are going to present the producer consumer implementation using ArrayBlockingQueue class, which implements the BlockingQueue interface. The main reason to use the ArrayBlockingQueue class is that it is thread-safe, in the sense that it can be used concurrently between different threads without any risk. Moreover, it has a specific capacity limit, which can be used in our advantage, e.g. when we try to add an element and there are no remaining spots for it, we can use the appropriate method to block the insertion until it can be achieved.

The ArrayBlockingQueue has the following constructors:
 

ArrayBlockingQueue(int capacity)
// Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.

ArrayBlockingQueue(int capacity, boolean fair)
// Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.

ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
// Creates an ArrayBlockingQueue with the given (fixed) capacity, the specified access policy and 
//  initially containing the elements of the given collection, added in traversal order of the collection's iterator.

Importance of A fairness algorithm which is used in last 2 constructors:

ArrayBlockingQueue is a queue of a fixed size. So if you set the size at 10, and attempt to insert an 11th element, the insert statement will block until another thread removes an element. The fairness issue is what happens if multiple threads try to insert and remove at the same time (in other words during the period when the Queue was blocked). A fairness algorithm ensures that the first thread that asks is the first thread that gets. Otherwise, a given thread may wait longer than other threads, causing unpredictable behaviour (sometimes one thread will just take several seconds because other threads that started later got processed first). The trade-off is that it takes overhead to manage the fairness, slowing down the throughput.

Consider the following producer consumer problem. Here we have a ArrayBlockingQueue of size 12. We have a BlockingQueueProducer class which adds 10 elements in the Queue. And we started 2 such producers together. And we have 1 consumer as BlockingQueueConsumer, that consumes one by one item from the Queue. Now we are going to start the producers and consumer in the following fashion:

// producer 1 STARTED
new Thread(queueProducer1).start();
// producer 2 STARTED
new Thread(queueProducer2).start();
// Now wait for 3500 ms
Thread.sleep(3500);
// Single Consumer 2 STARTED
new Thread(queueConsumer).start();

Now the output will comes in following manner :

1. Both the producer starts producing items one by one until 12 items.
2. Now the operations is blocked as the Queue is full.
3. The consumer started after some time and consumes an item.
4. Now there is empty space in the Queue, so one of the producer produces an item and the consumer consumes it and it continues until both producers produces all the items and the single consumer consumes it all.

BlockingQueueProducer.java

package com.tuturself.pc;

import java.util.concurrent.BlockingQueue;

public class BlockingQueueProducer implements Runnable {

	protected BlockingQueue<String> blockingQueue;
	private int producerNo;

	public BlockingQueueProducer(BlockingQueue<String> queue,int producerNo) {
		this.blockingQueue = queue;
		this.producerNo = producerNo;
	}

	@Override
	public void run() {
		try {
			for (int i = 0; i < 10; i++) {
				String item = "ITEM - " + (i + 1) + producerNo;
				blockingQueue.put(item);
				System.out.println("Item produced >> by producr :" + producerNo + " = " + item);
				Thread.sleep(200);	
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

BlockingQueueConsumer.java

package com.tuturself.pc;

import java.util.concurrent.BlockingQueue;

public class BlockingQueueConsumer implements Runnable {

	protected BlockingQueue<String> blockingQueue;

	public BlockingQueueConsumer(BlockingQueue<String> queue) {
		this.blockingQueue = queue;
	}

	@Override
	public void run() {
		while (true) {
			try {
				// take/consume from blockingQueue.
				System.out.println("BlockingQueueConsumer consumed : " + blockingQueue.take());
				Thread.sleep(3500);
			} catch (InterruptedException ex) {

			}
		}
	}
}

BlockingQueueTest.java

package com.tuturself.pc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(12,true);

		BlockingQueueProducer queueProducer1 = new BlockingQueueProducer(blockingQueue,1);
		BlockingQueueProducer queueProducer2 = new BlockingQueueProducer(blockingQueue,2);
		BlockingQueueConsumer queueConsumer = new BlockingQueueConsumer(blockingQueue);

		new Thread(queueProducer1).start();
		new Thread(queueProducer2).start();
		Thread.sleep(3500);
		new Thread(queueConsumer).start();
	}
}

Now the output will be:

Item produced >> by producr :1 = ITEM - 11
Item produced >> by producr :2 = ITEM - 12
Item produced >> by producr :1 = ITEM - 21
Item produced >> by producr :2 = ITEM - 22
Item produced >> by producr :1 = ITEM - 31
Item produced >> by producr :2 = ITEM - 32
Item produced >> by producr :1 = ITEM - 41
Item produced >> by producr :2 = ITEM - 42
Item produced >> by producr :2 = ITEM - 52
Item produced >> by producr :1 = ITEM - 51
Item produced >> by producr :2 = ITEM - 62
Item produced >> by producr :1 = ITEM - 61

// Wait here for the consumer to STARTs as the Queue is full

BlockingQueueConsumer consumed : ITEM - 11
Item produced >> by producr :1 = ITEM - 71
BlockingQueueConsumer consumed : ITEM - 12
Item produced >> by producr :2 = ITEM - 72
BlockingQueueConsumer consumed : ITEM - 21
Item produced >> by producr :1 = ITEM - 81
BlockingQueueConsumer consumed : ITEM - 22
Item produced >> by producr :2 = ITEM - 82
BlockingQueueConsumer consumed : ITEM - 31
Item produced >> by producr :1 = ITEM - 91
BlockingQueueConsumer consumed : ITEM - 32
Item produced >> by producr :2 = ITEM - 92
BlockingQueueConsumer consumed : ITEM - 41
Item produced >> by producr :1 = ITEM - 101
BlockingQueueConsumer consumed : ITEM - 42
Item produced >> by producr :2 = ITEM - 102
BlockingQueueConsumer consumed : ITEM - 52
BlockingQueueConsumer consumed : ITEM - 51
BlockingQueueConsumer consumed : ITEM - 62
BlockingQueueConsumer consumed : ITEM - 61
BlockingQueueConsumer consumed : ITEM - 71
BlockingQueueConsumer consumed : ITEM - 72
BlockingQueueConsumer consumed : ITEM - 81
BlockingQueueConsumer consumed : ITEM - 82
BlockingQueueConsumer consumed : ITEM - 91
BlockingQueueConsumer consumed : ITEM - 92
BlockingQueueConsumer consumed : ITEM - 101
BlockingQueueConsumer consumed : ITEM - 102

 

core java 12 Java Concurrent Packages 12

FOLLOW US ON LinkedIn



Explore Tutu'rself