Producer-consumer is a classic concurrent programming design pattern, where processes are designated as either producers or consumers. The producers are responsible for adding to some shared resource and the consumers are responsible for removing from that resource. Resource accessed by producer or consumer must be thread-safe. In other words, either a single producer or a single consumer can access the structure at any given time.
This blog post will help you understand how we can implement the producer–consumer pattern using a thread-safe queue and wait/notify methods that belong to the Java Object class. To follow this tutorial, you need to have a basic knowledge of threads in Java, as well as any version of Java installed on your machine.
A good real-life example is Lotto. Let us say that for one draw – which occurs every week – our game can receive about 1.000.000 bets. For the sake of simplicity, we will write those bets to a file and read from it.
The first class we are going to create is BetsSimulator.
As you can see, there is nothing special about this class. It just generates random bets and writes to file.
The next step is to implement a producer–consumer pattern – and to do that, we will start with a queue class and call it BetQueue.
The first method that we are going to define is the add method like in following code:
As mentioned before, Queue will be thread-safe to guarantee that we put the synchronized keyword between the access modifier and the return type. The add method receives bet as an input parameter, adds bet into queue, prints the message that bet is added into queue, marks that queue as not empty anymore and notifies the other thread about it.
It is now time for another method, which is called remove.It looks like this:
Just like add, remove uses the synchronized keyword.
First, we need to check if the queue is empty and if it is not marked as isDone. If both conditions are true, we must wait till someone adds bet to queue and notifies that the queue is not empty anymore. When the queue is not empty and not done, we get a bet from the queue and return it.
The Producer class – in our case BetProducer – has two responsibilities: to read bets from file and to add them to the queue. When there are no more bets in the file, we notify the queue that we are finished with producing bets by calling the method done.
The Consumer class contains business logic which is defined with the isBetWinning method. Inside this method there is logic which checks if bet is winning or not. We mark bet as winning if more than four numbers are matched with numbers inside the winning bet (the winningCombination property). In our case, the business logic is simple and will be executed very quickly since we put sleep to simulate some work. The Run method has an infinitive loop in which bets are removed from the queue, after which it checks if they are winning or losing, and when there are no more bets inside the queue, this infinitive loop breaks.
The last class that we are going to define is the entry-point MainApplication class that has the main method.
How BetProducer and BetConsumer are done is using polymorphism – they have the same parent class, so we can define an array and put them together inside of it. The next step is to start new threads by iterating the array and calling the start method. Because our goal is to save winning bets inside a file until further processing, we must wait for all running threads to finish, and that is where we use the join method. When all threads finish, we simply take the winning bets and write them to a file.
Now, since we are done with coding, let us run our code to see what will happen. The output should be similar to the following image:
As we can see, the first finished producer – the logic producer – just reads records from the file and adds them to queue, while the consumer has a business logic because of which it must spend time to perform. What is wrong with this code? Imagine if we had many more bets – what might happen is that we run out of memory because the producer would be a lot faster and could cause our queue to be overloaded. Let’s solve this by extending the BetQueue to the add method just by adding the following code to the top of our method.
So, if queue size is equal to CAPACITY, we wait till someone removes the bet from queue and lets us know.
The last thing we should do is to update the remove method by adding the following code just before the return statement:
Let’s run the code again and see what happens.
As you can see, now the producer produces as much bet as we have defined as the capacity in queue and then waits for the consumer to consume it. Furthermore, you can add more consumers in the MainApplication class and speed up consuming, but I will let you try that yourself.
I hope you enjoyed this article and that it was helpful for understanding how the publisher-consumer pattern works.