Search code examples
c#mpimessage-passing

MPI.NET Send/ImmediateProbe not working when using multiple hosts


I developed an MPI test program where the master node distributes work to the worker nodes. The worker node uses comm.Send() to request work and the master node checks with comm.ImmediateProbe if any of the worker nodes wants to request some work. If a request is available it is read with comm.Receive and the work is sent to the worker for processing.

When I run my test program with mpiexec.exe on a single host, either localhost or also a remote host everything works as expected, but when I run it on two hosts at the same time the Send on the remote host blocks and the master nodes ImmediateProbe never receives the message sent from the worker on the remote host.

I run the program with mpiexec.exe -wdir \\DESKTOP-58QONBS\MPITest -hosts 2 DESKTOP-58QONBS 2 LAPTOP-L8F7AN5R 1 MPITest.exe

I'm new to MPI, so maybe I am doing something wrong I just could not figure out yet why the behaviour is like this when using two hosts at the same time.

The full code is below:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;

namespace MPITest
{
    public abstract class MPIMasterWorkerBase<TWork, TResult>
        where TWork : class
        where TResult : class
    {
        protected abstract void Initialize(bool isMaster);
        protected abstract void Main();
        protected abstract void ProcessResult(TResult result);
        protected abstract TResult ProcessWork(TWork work);
        protected abstract TWork GetWork();

        private volatile bool terminate = false;
        private Thread thread;
        private MPI.Intracommunicator comm;

        public void Run(string[] args)
        {
            MPI.Environment.Run(ref args, comm =>
            {
                this.comm = comm;

                if (comm.Size < 2)
                {
                    Console.WriteLine("At least 2 processes are required.");
                    return;
                }

                if (comm.Rank == 0)
                {
                    Initialize(isMaster: true);

                    thread = new Thread(MasterThread);
                    thread.Start();

                    Main();

                    terminate = true;
                    thread.Join();
                }
                else
                {
                    Initialize(isMaster: false);

                    thread = new Thread(WorkerThread);
                    thread.Start();
                    thread.Join();
                }
            });
        }

        private void MasterThread()
        {
            Console.WriteLine($"MasterStart {MPI.Environment.ProcessorName}");

            var done = new bool[comm.Size];
            done[0] = true;

            while (!done.All(x => x == true))
            {
                for (int i = 1; i < comm.Size; i++)
                {
                    if (comm.ImmediateProbe(i, 0) != null)
                    {
                        Console.WriteLine($"Receive: {i}");
                        comm.Receive<int>(i, 0);

                        var work = GetWork();
                        if (work != null)
                        {
                            comm.Send(1, i, 0);
                            comm.Send(work, i, 0);
                        }
                        else
                        {
                            if (terminate)
                            {
                                comm.Send(-1, i, 0);
                                done[i] = true;
                            }
                            else
                            {
                                comm.Send(0, i, 0);
                            }
                        }
                    }

                    if (comm.ImmediateProbe(i, 1) != null)
                    {
                        var result = comm.Receive<TResult>(i, 1);
                        ProcessResult(result);
                    }
                }

                Thread.Sleep(1000);
            }

            Console.WriteLine("MasterStop");
        }

        private void WorkerThread()
        {
            Console.WriteLine($"WorkerStart: {comm.Rank} {MPI.Environment.ProcessorName}");

            while (!terminate)
            {
                Thread.Sleep(1000);
                Console.WriteLine($"Send: {comm.Rank}");
                comm.Send(0, 0, 0);
                var flag = comm.Receive<int>(0, 0);
                if (flag == -1)
                    break;
                else if (flag == 0)
                    continue;

                var work = comm.Receive<TWork>(0, 0);
                var result = ProcessWork(work);
                comm.Send(result, 0, 1);
            }

            Console.WriteLine($"WorkerStop: {comm.Rank}");
        }
    }

    [Serializable]
    public class WorkItem
    {
        public int Id { get; set; }
    }

    public class MPITest : MPIMasterWorkerBase<WorkItem, WorkItem>
    {
        private ConcurrentQueue<WorkItem> queue = new();
        private int id;

        protected override void Initialize(bool isMaster)
        {

        }

        protected override void Main()
        {
            var startTime = DateTime.UtcNow;
            while ((DateTime.UtcNow - startTime).TotalSeconds < 10)
            {
                for (int i = 0; i < 2; i++)
                    queue.Enqueue(new WorkItem { Id = id++ });
                Thread.Sleep(1000);
            }
        }

        protected override WorkItem GetWork()
        {
            if (queue.TryDequeue(out var result))
                return result;
            return null;
        }

        protected override WorkItem ProcessWork(WorkItem work)
        {
            Console.WriteLine($"Processing Work {work.Id}");
            return work;
        }

        protected override void ProcessResult(WorkItem result)
        {
            Console.WriteLine($"Process Result {result.Id}");
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            new MPITest().Run(args);
        }
    }
}

Solution

  • The comm.Send was blocking, but after some minutes of waiting the program started to work.

    The issues were caused by the VirtualBox Host-Only Network Adapter that was also installed on the system. Disabling this adapter in the network settings resolved all the issues.