Search code examples
javaexecutorservicethreadpoolexecutor

Java: Threadpoolexecutor - use list of jobs to submit jobs?


The following code is a Customer and jobManager. Customers have a name, an address and an accountBalance. Jobs are money transfers from one customer to another. This is a Threadpoolexecutor training programm. The below version works, and I submit jobs one by one.

customer.java

public class customer {

    private String name;
    private String adress;
    private Double accountBalance;

    public customer(String name, String adress, Double accountBalance)
    {
        this.name = name;
        this.adress = adress;
        this.accountBalance = accountBalance;
    }

    public String getName() { return name; }

    public String getAdress()
    {
        return adress;
    }

    public Double getAccountBalance(){return accountBalance;}

    public void setAccountBalance(double accountBalance){this.accountBalance=accountBalance;}

    @Override
    public String toString(){

        return "[" + name+"; " +adress+"; "+accountBalance+"]";
    }
}

customerOrganizer.java

import java.util.ArrayList;
import java.util.List;

public class customerOrganizer {

    private static final customerOrganizer myJobOrganizer = new customerOrganizer();

    public static customerOrganizer getJobOrganizer(){
        return myJobOrganizer;
    }

    private List<customer> customerList = new ArrayList<customer>();

    public void add_customer(customer kunde)
    {
        this.customerList.add(kunde);
    }

    public Iterable<customer> all_customers()
    {
        return this.customerList;
    }

    public static customerOrganizer getInstance()
    {
        return myJobOrganizer;
    }

}

job.java

public class job implements Runnable {
    private customer kunde1;
    private customer kunde2;
    private Double transfer;

    public job(customer kunde1, customer kunde2, Double transfer) {
        this.kunde1 = kunde1;
        this.kunde2 = kunde2;
        this.transfer = transfer;
    }

    @Override
    public String toString(){

        return "[" + kunde1+"; " +kunde2+"; "+transfer+"]";
    }

    public void run() {

        System.out.println("Starting transfer");

        Double geber = this.kunde1.getAccountBalance();
        Double nehmer = this.kunde2.getAccountBalance();

        Double geberNeu = geber - this.transfer;
        this.kunde1.setAccountBalance(geberNeu);

        Double nehmerNeu = nehmer + this.transfer;
        this.kunde2.setAccountBalance(nehmerNeu);
        System.out.println("Transfer done");
    }
}

jobOrganizer.java

public class jobOrganizer {

    private static final jobOrganizer myJobOrganizer = new jobOrganizer();

    public static jobOrganizer getMyJobOrganizer() {
        return myJobOrganizer;
    }

    private List<job> jobList = new ArrayList<job>();

    public int getAmount(){ return jobList.size();}

    public void add_job(job newJob) {
        this.jobList.add(newJob);
    }

    public Iterable<job> all_jobs() {
        return this.jobList;
    }

    public static jobOrganizer getInstance() {
        return myJobOrganizer;
    }

}

Main.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        customerOrganizer myCustomerOrganizer = new customerOrganizer();
        jobOrganizer myJobOrganizer= new jobOrganizer();

        customer mueller = new customer("Tim Mueller", "Strasse 1", 1077.00);
        customer john = new customer("John Doe", "Strasse 2",503.00);
        customer meier = new customer("John Meier", "Strasse 3", 8500.50);
        customer wurst = new customer("Hans Wurst", "Strasse 4", 1000.00);

        myCustomerOrganizer.add_customer(mueller);
        myCustomerOrganizer.add_customer(john);
        myCustomerOrganizer.add_customer(meier);
        myCustomerOrganizer.add_customer(wurst);

        job transfer1= new job(meier,wurst,500.50);
        job transfer2= new job(mueller,john,77.00);

        myJobOrganizer.add_job(transfer1);
        myJobOrganizer.add_job(transfer2);

        // this works:
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(transfer1);
        executor.submit(transfer2);
        executor.shutdown();


    }}

So, I do have a jobList and I have the idea, that I should use it. Instead of submitting the jobs one bye one, I would like to submit them based on the job list. I thought of something like this for the beginning:

 int threads = myJobOrganizer.getAmount();
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        for (int i = 0; i <threads+1; i++){
            //submit jobs? execute?
        }

Furthermore, myJobOrganizer would then need to implement Runnable? I also saw solutions being something like:

for(condition){

        executor.execute(new Runnable() {

            @Override
            public void run() {
              submit the jobs?
                }
            }}

But I really don't know, how to do it. Basically, I am missing, how to extract the jobs from my jobList in the right way so I can submit them to the executor service >.<

Update concerning thread safety

So, I followed the link provided Rab, I used a CompletionService. The last part of the Main.java now looks like this:

int threads = myJobOrganizer.getAmount();

System.out.println(myCustomerOrganizer.all_customers().toString());
// executor service   
ExecutorService executor = Executors.newFixedThreadPool(threads);
// completion service is applied on executor
CompletionService service = new ExecutorCompletionService(executor);

for(Callable<Job> myJob : myJobOrganizer.all_jobs()){
    service.submit(myJob);
}
executor.shutdown();
// pause the main for control printout -> not nice yet, I am working on 
// joining threads
try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
// control print
System.out.println(myCustomerOrganizer.all_customers().toString());

Please note that this edit was meant for completion of the entry, yet still is wrong (sadly). The answers provided are linked to the original question and do NOT concern thread safety

Thanks for your time and effort!


Solution

  • ExecutorService handle how tasks are distributed among the workers. All you have to do is to pass in jobs one by one.

    for (job jobObj : myJobOrganizer.all_jobs()) 
        executor.submit(jobObj);
    

    Notice that sumbit return a Future<?>, which is used to track whether your tasks are completed, or if they errored (and also the task result, but runnable has no results). If you care about these thing, you would want to collect them in some sort of container, like a List.


    If you change the job into a Callable<Void>, submitting would be much easier to do. Callable is some sort of extension of Runnable that allow the task to yield a result upon completion. Since your transfer has no results, using java.lang.Void as a filler type to the generic parameter would be fine.

    Now, just do executor.invokeAll(myJobOrganizer.all_jobs()) would be sufficient. This would save a few context switch, speeding things up a bit. (very significant actually, because your tasks are all very small)


    BTW, you should be aware that concurrent access needs proper synchronization, which you have none. Your accounts could end up in wrong state should different jobs involve one same account. We also commonly name classes in LargeCamelCase, method names in smallCamelCase.