Greetings.
I thought I’d share a technique I like to use to have my threads work together. Here is the plot: you have a pool of working threads that do some heaving lifting operations, time consuming. They are here waiting for the next job to process, probably monitoring a queue or something. Then you have another set of Threads that is going to take an order for processing, add it to the queue, wait for the processing to be complete and finalize the order. Those "teller" threads are usually lightweight and need little memory/CPU time if any.
In that scenario the threads will have to coordinate so that the teller Thread can complete the order as soon as the processing is over. That is the focus of this article: how do you "pause" the teller Thread and "resume" it when the processing Thread has finished working?
There used to be a "suspend" and a "resume" method on the Thread class. There are now deprecated because inherently deadlock-prone. So we’ll use the methods "wait()" and "notify()" of Object to have the same behaviour.
For our example, we need some kind of Thread that does a time consuming job. Let’s have one that figures out if a number is prime or not, called PrimeNumberThread, offering 2 methods: "enqueue" to add a number to the processing queue; and "dequeue" that fetches the result.
The Teller Thread
The idea is to have each incoming request served by a "teller" thread. Let’s call it SchedulerThread. It deals with the comunication with the customer: it takes the order (the number to process), schedules it for processing, waits for the processing to complete and then gets back to the customer with the response. There will be as many SchedulerThreads as there are "requests" (in our case there is no limitation, in real life you need a max).
public class SchedulerThread extends Thread {
static Integer counter = 1;
long num;
public SchedulerThread(long num) {
this.num = num;
}
public void run() {
PrimeNumberThread.enqueue(num, this);
try {
// wait for somebody to notify this thread
synchronized (this) {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
synchronized (counter) {
System.out.println((counter++) + "- " + num + " is "
+ (PrimeNumberThread.dequeue(num) ? "" : "not ") + "prime.");
}
}
}To request the processing of a number N, all you have to do is:
new SchedulerThread(N).start();This will add N to the processing queue and then call wait(). From the Javadoc: "wait() causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object.". This is the key. Once the number is processed, we need the PrimeNumberThread to invoke notify() on the SchedulerThread object, so that it can resume and complete the request. For that reason we need to provide the thread object itself along with the number to enqueue.
The Processing Thread
We need a processing Queue. It has to be a static Queue because it will be shared by all the PrimeNumberThreads. Note that any usage of this Queue will have to be synchronized.
We will also use a Map to store the result and another Map to hold the teller Thread. But let’s see the code first, and then dive more into the details:
public class PrimeNumberThread extends Thread {
// the numbers queue awaiting processing
static List<Long> numbers = new ArrayList<Long>();
// a map that stores the results
static Map<Long, Boolean> results = new HashMap<Long, Boolean>();
// a map that holds the thread that are waiting
static Map<Long, Thread> waitObjects = new HashMap<Long, Thread>();
@Override
public void run() {
try {
while (true) {
boolean empty = false;
Long num = null;
synchronized (numbers) {
if (numbers.isEmpty()) {
empty = true;
} else {
num = numbers.remove(0);
}
}
if (empty) {
// if empty, just wait and retry
Thread.sleep(50);
} else {
// most of the COU is spent here:
boolean prime = isPrime(num);
// stores the result
synchronized (results) {
results.put(num, prime);
}
// notify end of processing
synchronized (waitObjects) {
Thread waitObject = waitObjects.remove(num);
synchronized (waitObject) {
waitObject.notify();
}
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void enqueue(long num, Thread waitObject) {
synchronized (waitObjects) {
waitObjects.put(num, waitObject);
}
synchronized (numbers) {
numbers.add(num);
}
}
public static boolean dequeue(long num) {
synchronized (results) {
return results.remove(num);
}
}
// inefficient code to find prime numbers
public static boolean isPrime(long num) {
boolean prime = true;
long limit = (long) Math.sqrt(num);
for (long i = 2; i <= limit; i++) {
if (num % i == 0) {
prime = false;
break;
}
}
return prime;
}
}So we have a isPrime(long) method that does the heavy work, an enqueue and a dequeue method that allow to add a new number to process and to retrieve the result. Most of the complexity here is to keep the Collections synchronized. The "run" method does the actual job to get the next number to process (if any), process it, update the results Map and notify the SchedulerThread stored in the waitObjects Map.
In action
Here is the code to process 100 primes numbers (randomly generated) with 8 processing threads:
public static void main(String[] args) {
int nbPrimeThreads = 8; // set to number of cores
int nbNumbersToAnalyze = 100;
// starts the processing threads
for (int i = 0; i < nbPrimeThreads; i++) {
new PrimeNumberThread().start();
}
// generate numbers
Random r = new Random();
long[] numbers = new long[nbNumbersToAnalyze];
for (int i = 0; i < nbNumbersToAnalyze; i++) {
numbers[i] = (long) (r.nextDouble() * (Long.MAX_VALUE - 1)) + 1;
}
// schedule all the numbers!
for (long num : numbers) {
new SchedulerThread(num).start();
}
}The point of using a pool of processing threads is to keep control of how much of your resources are used. Here most of the processing is heavy on the CPU so it is a good idea to keep the number of processing threads equal to you number of available cores of you CPU.
The output:
1- 372281293763079169 is not prime.
10- 1106522284603503617 is not prime.
11- 4505642750654164993 is not prime.
12- 6295184702160283649 is not prime.
16- 3690429631313886209 is not prime.
17- 6637291068156567553 is not prime.
18- 2976140522999570433 is not prime.
(...)
93- 7951101352683392001 is not prime.
94- 7446226462970158081 is not prime.
95- 6810305470554089473 is not prime.
96- 314272017611928577 is prime.
97- 3531645579569108993 is prime.
98- 6201886719324952577 is prime.
99- 6564108545411756033 is prime.
100- 8033214365761463297 is prime.Note that as it often the case when working with thread, your output is not ordered. Also note that you will be likely to see your prime numbers at the end because it takes much more time to figure out that they are actually prime numbers.
Peace.
