Search code examples
javasocketsmulticastmulticastsocket

Java Get MulticastSocket.receive to Throw ClosedByInterruptException


I have some code that reads data from a multicast socket until a user-defined end time. I would also like to stop reading data if the thread is interrupted via a call to Thread.interrupt (or by any other user-initiated action you can come up with). I can't figure out how to get notification when the thread is interrupted. The existing code is as follows:

// These are the constants I am given
final int         mcastPort = ...;
final InetAddress mcastIP   = ...;
final InetAddress ifaceIP   = ...; // Null indicates all interfaces should be used
final Instant     endTime   = ...; // Time at which to stop processing

// Initialize a datagram
final byte[] buffer = new byte[1000];
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

// Process incoming datagram packets
try (final MulticastSocket socket = new MulticastSocket(port)) {
    socket.joinGroup(mcastIP);
    if (ifaceIP != null)
        socket.setInterface(ifaceIP);

    do {
        final Duration soTimeout = Duration.between(Instant.now(), endTime);
        socket.setSoTimeout(soTimeout);
        socket.receive(packet);

        // Process packet
        ...
    } while (true);
} catch (final SocketTimeoutException e) {
    // Normal condition... the recording time has ended
} catch (final ClosedByInterruptException e) {
    // Uh-oh... this never happens
} ...

I saw that there was a DatagramSocket.getChannel method that returns a DatagramChannel, so I naturally assumed that type was used to read/write to the underlying socket. That assumption was incorrect, which means that MulticastSocket doesn't implement InterruptibleChannel. Because of this, MulticastSocket.receive never throws a ClosedByInterruptException.

I have searched online for examples, but can't figure out how to modify the above code to use a DatagramChannel instead of a MulticastSocket. The problems I need help with are:

  1. How do I set the SO_TIMEOUT parameter on a DatagramChannel?
  2. How do I convert an InetAddress into a NetworkInterface object?

The following is my best guess on how to convert my implementation from MulticastSocket to DatagramChannel to meet my requirements:

// Initialize a buffer
final ByteBuffer buffer = ByteBuffer.allocate(1000);

try (final DatagramChannel mcastChannel = DatagramChannel.open()) {
    mcastChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    mcastChannel.connect(new InetSocketAddress(port));
    mcastChannel.join(mcastIP);
    if (ifaceIP != null)
        // HELP: this option requires an InterfaceAddress object,
        //       but I only have access to an InetAddress object
        mcastChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, ifaceIP);

    do {
        final Duration soTimeout = Duration.between(Instant.now(), endTime);
        // HELP: SO_TIMEOUT is not a member of StandardSocketOptions
        mcastChannel.setOption(SO_TIMEOUT, ???);
        mcastChannel.receive(buffer);

        // Process packet
        ...
    } while (true);
} ...

Will this approach even work? DatagramChannel.receive doesn't list SocketTimeoutException as one of the exceptions it is able to throw. If this will work, then please let me know how I need to change the second implementation to be equivalent to the first, but with the ability to throw a ClosedByInterruptException when a client calls Thread.interrupt. If not, then does anyone have any other ideas as to how I can meet the requirement of stopping datagram reception at a pre-defined time, while also providing a way to stop execution via user interaction?


Solution

  • How do I set the SO_TIMEOUT parameter on a DatagramChannel?

    By calling channel.socket().setSoTimeout().

    How do I convert an InetAddress into a NetworkInterface object?

    You enumerate the network interfaces until you find one with the required address.

    DatagramChannel.receive() doesn't list SocketTimeoutException

    It doesn't have to. It lists IOException, and SocketTimeoutException extends IOException.

    Your second piece of code should call bind(), not connect(), and it should set the intterface before calling join(), not after. Apart from that, it should work as expected once you fix the network interface issue, and it will throw ClosedByInterruptException if interrupted.