Search code examples
javamultithreadingudp

Multithreaded UDP Server Java


I have two threads, one is listening on a socket and adding to the queue, the other subtracting from the queue and submitting for processing. The second thread has a sleep when the queue is empty. This sleep somehow affects the first thread, that is, if you remove sleep or make it larger, then the delay in socket.receive of the first thread increases. If I keep the sleep as low as possible, it gets better, but not perfect. What am I doing wrong?

private DatagramSocket socket;
private boolean running, listenthreadwork;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private ConcurrentLinkedQueue<MyObject> list = new ConcurrentLinkedQueue<MyObject>();

public void init(int port) throws SocketException
{
    socket = new DatagramSocket(port);
    
    runnerThread = new Thread(this::listenLoopUDP);
    runnerThread.setName("listenLoopUDP");
    
    listenerThread = new Thread(this::listenerThreadUDP);
    listenerThread.setName("listenerThreadUDP");
    
    running = true;
    listenthreadwork = true;
    runnerThread.start();
    listenerThread.start();
}

private void listenerThreadUDP() {
    while (listenthreadwork == true) {
        MyObjectinfo = null;
        synchronized (list) {
            if (!list.isEmpty()) {
                info = list.poll();
            }
        }
        if (info != null) {
            for (IUDPListener listener : listeners) {
                listener.msgReceived(info);
            }
        } else {
            try {
                Thread.sleep(10);//Somehow affects listenLoopUDP
            } catch (InterruptedException e) {
                Log.write(e);
            }
        }
    }
}

public void listenLoopUDP() {
    while (running) {
        DatagramPacket packet = new DatagramPacket(buf, buf.length);

        try {
            socket.receive(packet);
        } catch (IOException e) {
            if (socket.isClosed()) {
                running = false;
                continue;
            } else {
                Log.write(e);
                socket.close();
            }
        }
        
        String received = new String(packet.getData());

        MyObject info = new MyObject(received);
        synchronized (list) {
            list.offer(info);
        }
    }

    listenthreadwork = false;
    try {
        listenerThread.join();
    } catch (InterruptedException e) {
        Log.write(e);
    }
}

I have a simple client with timer

public class TestClient {
    private DatagramSocket socket;
    private InetAddress address;
    private int count = 0;    

    public TestClient() {
        try {
            socket = new DatagramSocket();
        } catch (SocketException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        try {
            address = InetAddress.getByName("localhost");
        } catch (UnknownHostException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void sendEcho(String msg) {
        msg = msg + count++;
        ByteBuffer bb = ByteBuffer.allocate(256);
        bb.order(ByteOrder.LITTLE_ENDIAN);
        bb.put(msg.getBytes());
        DatagramPacket packet = new DatagramPacket(bb.array(), bb.capacity(), address, 15000);
        try {
            socket.send(packet);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void close() {
        socket.close();
    }
}

    TestClient client = new TestClient();
    Timer timer = new Timer(false);
    timer.schedule(new TimerTask() {
    
        @Override
        public void run() {
            client.send("hello");
        }
    
    }, 1, 1);

But in server I get message something like this
2021-05-17 20:04:48.320
2021-05-17 20:04:48.335
2021-05-17 20:04:48.351
2021-05-17 20:04:48.367
2021-05-17 20:04:48.382

It doesn't even help to remove it or not run listenerThread

        synchronized (list) {
            list.offer(info);
        }

Solution

  • Actually you don't need to sleep, use proper queue classes instead, like LinkedBlockingQueue, I also remove the flags since you don't need them also, use interrupt() to stop a thread blocked waiting for a queue element:

    private DatagramSocket socket;
    private byte[] buf = new byte[256];
    private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
    private Thread runnerThread, listenerThread;
    private LinkedBlockingQueue<MyObject> list = new LinkedBlockingQueue<MyObject>();
    
    public void init(int port) throws SocketException
    {
        socket = new DatagramSocket(port);
        
        runnerThread = new Thread(this::listenLoopUDP);
        runnerThread.setName("listenLoopUDP");
        
        listenerThread = new Thread(this::listenerThreadUDP);
        listenerThread.setName("listenerThreadUDP");
        
        runnerThread.start();
        listenerThread.start();
    }
    
    private void listenerThreadUDP() {
        try {
            while (true) {
                MyObject info=list.take();
                for (IUDPListener listener : listeners) {
                    listener.msgReceived(info);
                }
            }
        } catch (InterruptedException ex) {
            //Just quit
        }
    }
    
    public void listenLoopUDP() {
        try {
            while (true) {
                DatagramPacket packet = new DatagramPacket(buf, buf.length);
                socket.receive(packet);
                String received = new String(packet.getData());
                MyObject info = new MyObject(received);
                list.put(info);
            }
        } catch (IOException e) {
            Log.write(e);
        } catch (InterruptedException e) {
            Log.write(e);
        } finally {
            //Any exception above (or a runtime one) will activate this block where we do the cleanup and interrupt the other running thread
            listenerThread.interrupt();
            socket.close();
        }
    }
    

    I did a test with your 1ms client, printing both sent and received message and I have a perfect interleaving between the message, so the bottleneck is not in the receiving thread; with perfect interleaving I mean that in the console I get, as expected, the sent message from the client immediately followed from the received message.