One Producer multiple Consumers - using BlockingQueue

In Computer Science Producer Consumer Problem also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data.

We have already articles for single producer and consumer in Multi-Threading Section. Check this article for a Single Producer and Single Consumer ProblemOr in our Java Concurrent section we have used BlockingQueue implementations to describe producer consumer problems: https://www.tuturself.com/category?categoryId=134

But here in this article we will discuss about single producer and multiple consumers’ problem. Where a producer will produce items in a common place like a Queue. And there are multiple consumers which are processing these records one by one parallel.  Thus we can achieve more throughput.

A real life example of such a scenario is a Bank. Consider the bank has 5 counters for serving the customers. And the bank serves maximum of 100 tokens / day. There is one token vending machine which is generating token continuously for each customer. When a customer arrives in bank for some work, he /she needs to collect a token from the token vending machine (The Producer). Now the customer needs to wait for some time to be served from one of the 5 counters (The Consumers). Is not it simple. But wait, there are some challenges to implement it.

Typical Problems for this kind of problems are:

  • Thread safe data access for Producer and Consumer.
  • Producer will keep adding elements to Queue (although a maximum of 100 in our case) regardless or Queue capacity.
  • Producer is not aware of the status of items being processed by the consumer.
  • Consumer will run an infinite loop to see if items are available in queue to process.
  • How to tell the consumer that all the items are processed? Now stop execution.

Possible Solutions are:

  • Use Thread Safe Queue.
  • Producer should enter into sleep mode when queue size is full. This can be achieved by using a Blocking queue of fixed size. Thus the producer will wait for the consumers to consume some items when the queue capacity is full.
  • Consumer should go to sleep more when buffer is empty and resume its operation when elements are available in queue for processing. This also can have achieved by a Blocking Queue.
  • Consumer should be notified to end processing when producer completes producing elements, and no more elements will be added to Queue. This can be achieved by using a CountDownLatch only when you know the total number of records your producer is going to produce. Create a CountDownLatch with the number of records, your producer is going to produce. And after consuming each record, decrement the count by One. And put the awaits() method only when all your consumer have started. Thus it will cross the awaits() only all your records are processed. Now you can shut down your consumers. We have used the similar technique in our example. Check the code for detail implementation.

BlockingCollection<T> is a thread-safe collection class that provides the following features: An implementation of the Producer-Consumer pattern. Concurrent adding and taking of elements from multiple threads. Optional maximum capacity. Insertion and removal operations that block when the collection is empty or full. To read more about BlockingQueue read here

Following is our Model Class for Token, which is produced by the Token Vending machine:

package com.tuturself.multiconsumer;

import java.util.UUID;

public class Token {

	private UUID id;
	private int number;
	private String description;

	public UUID getId() {
		return id;
	}

	public void setId(UUID id) {
		this.id = id;
	}

	public int getNumber() {
		return number;
	}

	public void setNumber(int number) {
		this.number = number;
	}

	public String getDescription() {
		return description;
	}

	public void setDescription(String description) {
		this.description = description;
	}

	@Override
	public String toString() {
		return "Token [id=" + id + ", number=" + number + ", description=" + description + "]";
	}
}

Now we have the Producer which is the TokenVendingMachine:

package com.tuturself.multiconsumer;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;

public class TokenVendingMachine implements Runnable {

	private BlockingQueue<Token> blockingQueue;

	public TokenVendingMachine(BlockingQueue<Token> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

	@Override
	public void run() {
		for (int i = 0; i <= 100; i++) {
			Token token = new Token();
			token.setId(UUID.randomUUID());
			token.setNumber(i);
			token.setDescription("Some Description");
			/**
			 * Insert the token element in the Queue. Wait if no space is
			 * available
			 */
			try {
				System.out.println("New token issued :" + token);
				blockingQueue.put(token);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

Now we have the Consumer, which is a Counter for serving the Customer with a Token:

package com.tuturself.multiconsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

public class TokenServingCounter implements Runnable {

	private BlockingQueue<Token> blockingQueue;
	private CountDownLatch countDownLatch;

	public TokenServingCounter(BlockingQueue<Token> blockingQueue, 
                 CountDownLatch countDownLatch) {
		this.blockingQueue = blockingQueue;
		this.countDownLatch = countDownLatch;
	}

	@Override
	public void run() {
		/**
		 * Serving token one by one in a infinite loop.
		 * The Loop will break while there are no more
		 * token to serve
		 */
		while (true) {
			if (countDownLatch.getCount() == 0) {
				break;
			}
			try {
				// Serving the customer with the token
				Token token = blockingQueue.take();
				System.out.println("Serving Token :" + token);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				/*Decrementing count from the Countdown Latch
				as the token is served*/
				countDownLatch.countDown();
			}
		}
	}
}

Now let us test our Single Producer and multiple consumer example. Here we will start the producer First. Then we will Start our 5 token serving counters. And all will be shut down , where there are no more token to serve.

package com.tuturself.multiconsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class TestMultipleConsumer {

	public static void main(String[] args) throws InterruptedException {

		BlockingQueue<Token> blockingQueue = new LinkedBlockingQueue<>();
		
		/**
		 * Create and START the vending Machine. The Machine will create 
		 * 100 token/day. Which will be served in 5 counters
		 */
		TokenVendingMachine tokenVendingMachine = new TokenVendingMachine(
                                         blockingQueue);
		new Thread(tokenVendingMachine).start();
		CountDownLatch countDownLatch = new CountDownLatch(100);
		
		// Here we have the Token consumer. We have 5 counter
		TokenServingCounter tokenConsumer = new TokenServingCounter(blockingQueue,
                           countDownLatch);
		ExecutorService executor = Executors.newFixedThreadPool(5);
		for (int i = 1; i <= 5; i++) {
			executor.submit(tokenConsumer);
		}
		countDownLatch.await();
		System.out.println("Stopped");
		executor.shutdown();
	}
}

You can Run the TestMultipleConsumer  class to test the Program. Debugging line by line will also help you to have a detailed understanding about the flow. Happy learning.

 

core java 12 Java Concurrent Packages 12

FOLLOW US ON LinkedIn



Explore Tutu'rself