Search code examples
javaspringapplicationcontextthreadpoolexecutor

passing a parameter to threadpoolexecutor


I have reviewing multithreading and tried to implement an application that creates separate threads to run a collection process. The main method in the process requires a variable arraylist and I am trying to figure out a way to pass the arraylist to each thread.

ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
    ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) context.getBean("taskExecutor");

    MainTask mxTask = (MainTask) context.getBean("MainTask");
    mxTask.setName("Thread 1");
    taskExecutor.execute(mxTask);

    MainTask mxTask2 = (MainTask) context.getBean("MainTask");
    mxTask2.setName("Thread 2");
    taskExecutor.execute(mxTask2);

The above is the class method that declares the threads with MainTask being the class that executes the run() method which then calls the other main methods.

@Override
public void run() {
    System.out.println(name + " is running.");
    getConfigurations();
    try {
            mainRun();


    } catch (MessagingException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

The getconfigurations() method reads in all the necessary config variables. Below is the main process that each thread needs to run.

public static void mainRun() throws IOException {

    ArrayList<String> filterList = new ArrayList<String>();
    ArrayList<String> brokerList = new ArrayList<String>();


    if(kafkaServers1 != null) {
        brokerList.add(kafkaServers1);
    } else if(!kafkaServers2.isEmpty()) {
        brokerList.add(kafkaServers2);
    } else if(!kafkaServers3.isEmpty()) {
        brokerList.add(kafkaServers3);
    }

    ArrayList<String> serverList = new ArrayList<String>();

    for(int x=0;x<brokerList.size();x++){
        String[] serverBrokers = brokerList.get(x).split(",");
        serverList.add(serverBrokers[0]);
        serverList.add(serverBrokers[1]);
        serverList.add(serverBrokers[2]);

    }


    try {

        while(true){
            for (String temp : serverList) {
                kafkaServer = temp;
                hostName = kafkaServer;
                InetAddress addr = InetAddress.getByName(hostName);
                hostName = addr.getHostName();
                kafkaServer= hostName;
                retrieveData(hostName);

...

The variable kafkaServers1 contains a list of 3 ips which get split and each added to the serverList arrayList. What I am trying to do is only assign one ip to each thread. Is this possible? Can someone please advise?


Solution

  • Rather than having your Runnable object be a bean, you could retrieve the Kafka configurations, then create a new Runnable for each host. Submit it to the executor.

    MyRunnable infoGetter = new MyRunnable(hostInfo);
    taskExecutor.execute(infoGetter);
    

    A CompletableFuture may be a better option if you need to do something with the results, or a fork/join RecursiveTask<V> if you need to merge them.