I'm doing an exercise about Java concurrency using wait, notify to study for an exam. The exam will be written, so the code does have to be perfect since we can't try to compile and check errors.
This is the text of the exercise:
This is the code that I produced:
public class Downloader{
private Queue downloadQueue;
private HashMap urlData;
private final static THREADS_NUMBER;
public Downloader(){
this.downloadQueue = new Queue();
this.urlData = new HashMap();
for(int i = 0; i < THREADS_NUMBER; i++){
new DownTh(this.downloadQueue, this.urlData).start();
}
}
void syncronized download(String URL){
downloadQueue.add(url);
notifyAll();
return;
}
byte[] syncronized getData(String URL){
while(urlData.get(URL) == null ){
wait()
}
return urlData.get(URL);
}
}
public class DownTh extend Thread{
private Queue downloadQueue;
private HashMap urlData;
public DownTh(Queue downloadQueue, HashMap urlData){
this.downloadQueue = downloadQueue
this.urlData = urlData;
}
public void run(){
while(true){
syncronized{
while(queue.isEmpty()){
wait()
}
String url = queue.remove();
urlData.add(url, Util.download(url))
notifyAll()
}
}
}
}
Can you help me and tell me if the logic is right?
Let's assume for a second that all those great classes in Java that handle synchronization do not exist, because this is a synthetic task, and all you got to handle is sychronized
, wait
and notify
.
The first question to answer in simple words is: "Who is going to wait on what?"
What does this mean in detail? We need at least one synchronization element between the caller and the download thread (your urlData
), also there should be one data object handling the download data itself for convenience, and to check whether or not the download has yet been completed.
So the detailed steps that will happen are:
Caller requests new download.
create: DownloadResult
write: urlData(url -> DownloadResult)
wake up 1 thread on urlData.
Thread X must find data to download and process it or/then fall asleep again.
read: urlData (find first unprocessed DownloadResult, otherwise wait on urlData)
write: DownloadResult (acquire it)
write: DownloadResult (download result)
notify: anyone waiting on DownloadResult
repeat
Caller must be able to asynchronously check/wait for download result.
read: urlData
read: DownloadResult (wait on DownloadResult if required)
As there are reads and writes from different threads on those objects, synchronization is required when accessing the objects urlData or DownloadResult.
Also there will be a wait/notify association:
After careful analysis the following code would fulfill the requirements:
public class DownloadResult {
protected final URL url; // this is for convenience
protected boolean inProgress;
protected byte[] result;
public DownloadResult(final URL url) {
this.url = url;
this.inProgress = false;
}
/* Try to lock this against tother threads if not already acquired. */
public synchronized boolean acquire() {
if (this.inProgress == false) {
this.inProgress = true;
return true;
} else {
return false;
}
}
public void download() {
final byte[] downloadedBytes = Util.download(this.url); // note how this is done outside the synchronized block to avoid unnecessarily long blockings
synchronized (this) {
this.result = downloadedBytes;
this.notifyAll(); // wake-up ALL callers
}
}
public synchronized byte[] getResult() throws InterruptedException {
while (this.result == null) {
this.wait();
}
return this.result;
}
}
protected class DownTh extends Thread {
protected final Map<URL, DownloadResult> urlData;
public DownTh(final Map<URL, DownloadResult> urlData) {
this.urlData = urlData;
this.setDaemon(true); // this allows the JVM to shut down despite DownTh threads still running
}
protected DownloadResult getTask() {
for (final DownloadResult downloadResult : urlData.values()) {
if (downloadResult.acquire()) {
return downloadResult;
}
}
return null;
}
@Override
public void run() {
DownloadResult downloadResult;
try {
while (true) {
synchronized (urlData) {
while ((downloadResult = this.getTask()) == null) {
urlData.wait();
}
}
downloadResult.download();
}
} catch (InterruptedException ex) {
// can be ignored
} catch (Error e) {
// log here
}
}
}
public class Downloader {
protected final Map<URL, DownloadResult> urlData = new HashMap<>();
// insert constructor that creates the threads here
public DownloadResult download(final URL url) {
final DownloadResult result = new DownloadResult(url);
synchronized (urlData) {
urlData.putIfAbsent(url, result);
urlData.notify(); // only one thread needs to wake up
}
return result;
}
public byte[] getData(final URL url) throws InterruptedException {
DownloadResult result;
synchronized (urlData) {
result = urlData.get(url);
}
if (result != null) {
return result.getResult();
} else {
throw new IllegalStateException("URL " + url + " not requested.");
}
}
}
In real Java things would be done differently, by using Concurrent classes and/or Atomic... classes, so this is just for educational purposes. For further reading see "Callable Future".