Search code examples
javamultithreadingblockingqueue

Turning a program using multi-threading into a program that uses blocking queues


I am currently trying to turn a program that is currently using multi-threading, locks etc. into a program that using blocking queues instead. The point is to remove all the multi-threading code. There will be 100 producer threads. The producer threads are all busy producing transaction objects (each one different) and putting them to the queue. The one consumer thread will continuously remove transactions from the queue and process them. In other words, loop forever. However, the looping forever part in the consumer thread is not working out. I want to only use one consumer thread. And with my code I am only getting one line of output when the lines of output should go on forever until the program is stopped. Here is my code any help will be appreciated and thanks in advance.

import java.util.*;
import java.util.concurrent.*;

public class SynchBankTest
{
public static final int NACCOUNTS = 100;
public static final double INITIAL_BALANCE = 1000;
public static final int DELAY = 10;
public static final Transaction tr = new Transaction();

public static void main(String[] args)
{
  BlockingQueue<Transaction> queue = new ArrayBlockingQueue<>(30);
  Bank bank = new Bank(NACCOUNTS, INITIAL_BALANCE);

  for (int i = 0; i < NACCOUNTS; i++)
  {
     Runnable p = () -> {
        try
        {
           while (true)
           {
                queue.put(tr);
                Thread.sleep((int) (DELAY * Math.random()));
           }
        }
        catch (InterruptedException e)
        {
        }
     };
     Thread pt = new Thread(p);
     pt.start();
  }

     Runnable c = () -> {
        double amount = 0;
        int fromAcct = 0;
        int toAcct = 0;
        amount = tr.getAmount();
        fromAcct = tr.getFromAccount();
        toAcct = tr.getToAccount();
        bank.transfer(fromAcct, toAcct, amount);
     };
     Thread ct = new Thread(c);
     ct.start();
   }
  }
 class Transaction
 {
 private double amount;
 private int toAccount;
 private int fromAccount;
 private static final double MAX_AMOUNT = 1000;
 private Bank bank = new Bank(100, 1000);

 public Transaction()
 {
 }
 public int getToAccount()
 {
    toAccount = (int) (bank.size() * Math.random());
    return toAccount;
 }
 public int getFromAccount()
 {
    for (int i = 0; i < 100; i++)
    {
        fromAccount = i;
    }
    return fromAccount;

 }
 public double getAmount()
 {
    amount = MAX_AMOUNT * Math.random();
    return amount;
 }
}
class Bank
{
 private final double[] accounts;

 public Bank(int n, double initialBalance)
 {
   accounts = new double[n];
   Arrays.fill(accounts, initialBalance);
 }

 public void transfer(int from, int to, double amount)
 {
     if (accounts[from] < amount) return;
     System.out.print(Thread.currentThread());
     accounts[from] -= amount;
     System.out.printf(" %10.2f from %d to %d", amount, from, to);
     accounts[to] += amount;
     System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
 }

 public double getTotalBalance()
 {
  try
  {
     double sum = 0;

     for (double a : accounts)
        sum += a;

     return sum;
  }
  finally
  {
  }
}

public int size()
{
   return accounts.length;
}
}

Solution

  • Like said above, the consumer thread needs to have an infinite loop, which I did not have. This solved my issue:

         Runnable c = () -> {
            try{
                while(true)
                {
                    tr = queue.take();
                    double amount = 0;
                    int fromAcct = 0;
                    int toAcct = 0;
                    amount = tr.getAmount();
                    fromAcct = tr.getFromAccount();
                    toAcct = tr.getToAccount();
                    bank.transfer(fromAcct, toAcct, amount);
                }
            }
            catch(InterruptedException e)
            {
            }
         };
         Thread ct = new Thread(c);
         ct.start();