Java Producer and Consumer Implementation using Blocking Queue

'Java Consumer Producer' example - This is one of frequently asked questions to senior core java developer. Java concurrency producer and consumer solution is demonstrated below. 
Java Concurrency Queuing options:
 The java concurrent executors and task holding queues can be configured in three ways-
Direct handoffs :  A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.

Unbounded queues : Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.

Bounded queues : A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.

   Example of PriorityBlockingQueue and see how the comparable is used with priority of task which depends on implementation of compare method in task.
From Java Docs-

The Blocking queue here is bounded with 11 as initial capacity as default constructor.  The queue is based on priority and least priority data is processed 

" An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). This class does not permit null elements. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException).
This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces. The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order. If you need ordered traversal, consider using Arrays.sort(pq.toArray()). Also, methoddrainTo can be used to remove some or all elements in priority order and place them in another collection."




Output
 Consumed Data [number=2, name=two]
 producer 0
 Consumed Data [number=10, name=ten]
 producer 1
 Consumed Data [number=20, name=twenty]
 producer 2
 Consumed Data [number=0, name=0]
 producer 3
 Consumed Data [number=1, name=1]
 producer 4
 Consumed Data [number=2, name=2]
 producer 5
 Consumed Data [number=3, name=3]


Consumer Producer Solution with Synchronisation:
This implementation with Synchronisation needs great care and it is more complicated in implementation:


Output 
The output will confim that there is no concurrency issue on putting data into same queue and wait() and notify() works perfectly fine.
 Got: 53613
 Put: 53614
 Got: 53614
 Put: 53615
 Got: 53615
 Put: 53616
 Got: 53616
 Put: 53617
 Got: 53617

No comments:

Post a Comment