Search code examples
javasocketsserversocketobjectinputstreamobjectoutputstream

Continuous Socket connection, ObjectInputStream EOF on second request


Java 21 Sockets
allowing for multiple connections
using ObjectInputStream and ObjectOutputStream produce EOF on second request

I have a Client side, a (middleman) Bridge Server and an Inventory server.

  • The Client refers to end users.
  • The Server refers to the authorization system and a kind of gateway to the Inventory Server

The sequence diagram is the following:
Communication flow between components

Now for the problem:

  1. Starting BridgeServer
  2. Starting UserClient
    and observing the connection has been established.
  3. Sending any type of request
    (object) from the options showcased in the terminal.
    in my runs I sent the AuthenticationRequest.
  4. The AuthenticationRequest is processed correctly.
  5. Resend the AuthenticationRequest or any request.
  6. EOFException is thrown by the BridgeServer
java.io.EOFException
    at java.base/java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2933)
    at java.base/java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3428)
    at java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:985)
    at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:416)
    at com.pe.distributed.system.bridge.BridgeServer.run(BridgeSide.java:44)
    at com.pe.distributed.system.bridge.BridgeMain.main(BridgeMain.java:5)

For context and reproducability:

UserClient


import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.Socket;
import java.util.Map;
import java.util.Scanner;
import java.util.logging.Level;
import java.util.logging.Logger;

public record AuthenticationRequest(String username, String password) implements Serializable {
}

public record InventoryQuery() implements Serializable {
}

public record ItemOrder(String itemCode, int quantity) implements Serializable {
}

public class UserClient {
    private static final Logger logger = Logger.getLogger(UserClient.class.getName());
    private static final String EXCEPTION_OCCURRED = "Exception occurred";

    private UserClient() {
    }

    @SuppressWarnings("java:S2189")
    public static void run() {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            try (Socket bridgeSocket = new Socket("127.0.0.1", 5550)) {
                logger.log(Level.INFO, "Connected successfully to bridge server");

                while (true) {
                    displayMainMenu();
                    int choice = scanner.nextInt();
                    scanner.nextLine();

                    // Handle user choice
                    switch (choice) {
                        case 1:
                            authenticate(scanner, bridgeSocket);
                            break;
                        case 2:
                            checkInventory(bridgeSocket);
                            break;
                        case 3:
                            placeOrder(scanner, bridgeSocket);
                            break;
                        default:
                            logger.log(Level.WARNING, "Invalid choice. Please enter a valid option.");
                    }
                }
            } catch (IOException | ClassNotFoundException e) {
                logger.log(Level.SEVERE, EXCEPTION_OCCURRED, e);
            } finally {
                scanner.close();
            }
        }
    }

    @SuppressWarnings("java:S106")
    private static void displayMainMenu() {
        System.out.println("What would you like to do:");
        System.out.println("1. Authenticate");
        System.out.println("2. Check Inventory");
        System.out.println("3. Place an Order");
        System.out.print("Enter your choice: ");
    }

    @SuppressWarnings("java:S106")
    private static void authenticate(Scanner scanner, Socket socket) throws IOException, ClassNotFoundException {
        Map.Entry<String, String> authCreds = getUserAuthenticationInput(scanner);
        AuthenticationRequest authenticationRequest = new AuthenticationRequest(authCreds.getKey(),
                authCreds.getValue());

        ObjectOutputStream bridgeIn = new ObjectOutputStream(socket.getOutputStream());
        bridgeIn.writeObject(authenticationRequest);
        bridgeIn.flush();

        ObjectInputStream bridgeOut = new ObjectInputStream(socket.getInputStream());
        Object response = bridgeOut.readObject();
        logger.log(Level.FINE, "Received response from bridge: {0}", response);
        System.out.println(response);
    }

    @SuppressWarnings("java:S106")
    private static Map.Entry<String, String> getUserAuthenticationInput(Scanner scanner) {
        System.out.println("Please provide credentials.\n");
        System.out.print("Username: ");
        String username = scanner.nextLine();
        System.out.print("Password: ");
        String password = scanner.nextLine();
        return Map.entry(username, password);
    }

    @SuppressWarnings("java:S106")
    private static void checkInventory(Socket socket) throws IOException, ClassNotFoundException {
        ObjectOutputStream bridgeIn = new ObjectOutputStream(socket.getOutputStream());
        bridgeIn.writeObject(new InventoryQuery());
        bridgeIn.flush();

        ObjectInputStream bridgeOut = new ObjectInputStream(socket.getInputStream());
        Object response = bridgeOut.readObject();
        logger.log(Level.FINE, "Received response from bridge: {0}", response);
        System.out.println(response);
    }

    private static void placeOrder(Scanner scanner, Socket socket) throws IOException, ClassNotFoundException {
        Map.Entry<String, Integer> orderDetails = getUserOrderInput(scanner);
        ItemOrder itemOrder = new ItemOrder(orderDetails.getKey(), orderDetails.getValue());

        ObjectOutputStream bridgeIn = new ObjectOutputStream(socket.getOutputStream());
        bridgeIn.writeObject(itemOrder);
        bridgeIn.flush();

        ObjectInputStream bridgeOut = new ObjectInputStream(socket.getInputStream());
        Object response = bridgeOut.readObject();
        logger.log(Level.FINE, "Received response from bridge: {0}", response);
    }

    @SuppressWarnings("java:S106")
    private static Map.Entry<String, Integer> getUserOrderInput(Scanner scanner) {
        System.out.println("Place your order!\n");

        String itemCode;
        int desiredQuantity;

        // Input validation loop for item code
        do {
            System.out.print("Item-code: ");
            itemCode = scanner.nextLine();
        } while (itemCode == null || itemCode.isEmpty());

        // Input validation loop for quantity
        do {
            System.out.print("Quantity: ");
            while (!scanner.hasNextInt()) {
                System.out.println("Invalid input. Please enter a valid number.");
                scanner.next(); // Consume the invalid input
            }
            desiredQuantity = scanner.nextInt();
            scanner.nextLine(); // Consume newline character
        } while (desiredQuantity <= 0);

        return Map.entry(itemCode, desiredQuantity);
    }
}

and the BridgeServer

import lombok.Getter;
import lombok.Setter;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

@Getter
@Setter
public class User {

    String username;
    String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }
}

public record AuthenticationResponse(String message, Boolean isAuthenticated) implements Serializable {
}

public record AuthenticationRequest(String username, String password) implements Serializable {
}

public class BridgeServer {
    private static final Logger logger = Logger.getLogger(BridgeServer.class.getName());
    private static final Map<String, User> users;

    static {
        users = new ConcurrentHashMap<>();
        // Populate inventory with sample items
        users.put("user1", new User("user1", "pass1"));
        users.put("user2", new User("user2", "pass2"));
        users.put("user3", new User("user3", "pass3"));
    }

    @SuppressWarnings("java:S2189")
    public static void run() {
        try (ExecutorService executorService = Executors.newCachedThreadPool();
             ServerSocket serverSocket = new ServerSocket(5550)) {
            logger.info("Bridge server started, waiting for connections...");

            //noinspection InfiniteLoopStatement
            while (true) {
                Socket userSocket = serverSocket.accept();
                String clientKey = userSocket.getInetAddress().toString() + ":" + userSocket.getPort();
                logger.log(Level.INFO, "User client with key {0} connected.", clientKey);

                executorService.submit(new BridgeConnectionHandler(userSocket, clientKey));
            }
        } catch (IOException e) {
            logger.log(Level.SEVERE, "IOException occurred", e);
        }
    }

    @SuppressWarnings("java:S2189")
    private record BridgeConnectionHandler(Socket userSocket, String clientKey) implements Runnable {
        @Override
        public void run() {
            try (ObjectInputStream in = new ObjectInputStream(userSocket.getInputStream());
                 ObjectOutputStream out = new ObjectOutputStream(userSocket.getOutputStream())) {

                boolean isAuthenticated = false;
                //noinspection InfiniteLoopStatement
                while (true) {
                    Object request = in.readObject();
                    // If the client is authenticated, handle user requests
                    if (isAuthenticated) {
                        if (!(request instanceof AuthenticationRequest)) {
                            handleUserRequest(in, out);
                        } else {
                            out.writeObject(new AuthenticationResponse("Client already authenticated with user "
                                    + users.get(clientKey), true));
                            out.flush();
                        }
                    } else if (request instanceof AuthenticationRequest authenticationRequest) {
                        AuthenticationResponse authenticationResponse = handleAuthenticationRequest(authenticationRequest);
                        out.writeObject(authenticationResponse);
                        out.flush();

                        isAuthenticated = authenticationResponse.isAuthenticated();
                    } else {
                        out.writeObject(new AuthenticationResponse("Not authenticated", false));
                        out.flush();
                    }
                }
            } catch (IOException | ClassNotFoundException e) {
                logger.log(Level.SEVERE, "Exception occurred", e);
            }
        }

        private AuthenticationResponse handleAuthenticationRequest(AuthenticationRequest authenticationRequest) {
            boolean authenticated = authenticate(authenticationRequest.username(), authenticationRequest.password());
            return new AuthenticationResponse(authenticated ? "Authentication successful" : "Authentication failed", authenticated);
        }

        private static synchronized boolean authenticate(String username, String password) {
            User user = users.get(username);
            return user != null && user.getPassword().equals(password);
        }

        private void handleUserRequest(ObjectInputStream bridgeIn, ObjectOutputStream userOut) throws IOException {
            try (Socket inventorySocket = new Socket("127.0.0.1", 12346);
                 ObjectOutputStream inventoryOut = new ObjectOutputStream(inventorySocket.getOutputStream());
                 ObjectInputStream inventoryIn = new ObjectInputStream(inventorySocket.getInputStream())) {

                Object request;
                while ((request = bridgeIn.readObject()) != null) {
                    logger.log(Level.INFO, "Received request from user: {0}", request);

                    // Forward request to inventory side
                    inventoryOut.writeObject(request);
                    inventoryOut.flush();

                    // Receive response from inventory side
                    Object response = inventoryIn.readObject();
                    logger.log(Level.INFO, "Received response from inventory: {0}", response);

                    // Send response back to user side
                    userOut.writeObject(response);
                    userOut.flush();
                }
            } catch (ClassNotFoundException e) {
                logger.log(Level.SEVERE, "Exception occurred", e);
            }
        }
    }

    public static void main(String[] args) {
        run();
    }

}

Specifically I mentioned EOF being thrown in BridgeServer and specifically under BridgeConnectionHandler's <br> Object request = in.readObject(); the second time a request comes through.

I've been thorough in flush()ing every time an object is written.

  1. Some answers say that the socket/connection is closing therefore the ObjectInputStream produces EOFException as expected.
  2. Others say that the ObjectInputStream object(s) and the actual Socket's InputStream may be out of sync.

Maybe I have misunderstood the capabilities of sockets, but

  • I want my server to keep the connection alive till the client closes it.
  • And for all of the requests to be processed correctly.

And of course no EOFException errors.

Would love some feedback !


Solution

  • Turns out the issue was that instead of passing and using the ObjectInputStream and ObjectOutputStream created upon the initial connection,

    try (ObjectInputStream in = new ObjectInputStream(userSocket.getInputStream());
         ObjectOutputStream out = new ObjectOutputStream(userSocket.getOutputStream()))
    
    

    We also created new instances into each call of the methods' authenticate, checkInventory, placeOrder ex.

    ObjectOutputStream bridgeIn = new ObjectOutputStream(socket.getOutputStream());
        bridgeIn.writeObject(authenticationRequest);
        bridgeIn.flush();
    ObjectInputStream bridgeOut = new ObjectInputStream(socket.getInputStream());
    

    Since:

    ObjectOutputStream writes a stream header when it is created.
    If you create a new ObjectOutputStream and write to the same OutputStream (i.e., socket.getOutputStream()), each new stream writes its own header.
    When the corresponding ObjectInputStream tries to read the data, it expects to read objects in a continuous sequence.
    However, encountering a new header in the middle of the stream can confuse it, leading to an EOFException or other unexpected behavior.

    That's what was causing the EOF, and once I passed the original and unique Input and Output Streams to the methods the error went away.