Search code examples
javamultithreadingsocketsstreambufferedreader

What character does a BufferedReader interpret as the end of the stream?


When reading from a socket using a BufferedReader it states that the readLine() method returns

A String containing the contents of the line, not including any line-termination characters, or null if the end of the stream has been reached

How does it know that it's reached the end of the stream? What sequence of characters does it use to determine this.

I want to simulate sending the same sequence of characters to properly close another connection that uses PipedStreams.


Edit: Here is the code in question. From the responses it looks like there is no such sequence and calling close() on the PipedOutput stream should unblock the readLine() on the output stream. It doesn't appear to be doing this at the moment which is why I was confused so I'm thinking it might be a bug somewhere else.

What's happening is the incomingEventIn.close() line appears to be blocking when inputLine = incomingEventIn.readLine() is blocking. If inputLine = incomingEventIn.readLine() isn't being executed on the other thread then incomingEventIn.close() executes fine. Why is this happening?

public class SocketManager {

    private Socket socket = null;
    private PrintWriter out = null;
    private BufferedReader in = null;

    private PipedOutputStream incomingEventOutStream = null;
    private PrintWriter incomingEventOut = null;
    private BufferedReader incomingEventIn = null;
    private PipedOutputStream incomingResponsOutStream = null;
    private PrintWriter incomingResponseOut = null;
    private BufferedReader incomingResponseIn = null;

    private ArrayList<AsteriskLiveComsEventListener> listeners = new ArrayList<AsteriskLiveComsEventListener>();
    private final ExecutorService eventsDispatcherExecutor;

    private String ip;
    private int port;

    private Object socketLock = new Object();

    public SocketManager(String ip, int port) {
        this.ip = ip;
        this.port = port;
        eventsDispatcherExecutor = Executors.newSingleThreadExecutor();
    }

    public void connect() throws UnableToConnectException, AlreadyConnectedException {
        synchronized(socketLock) {
            if (socket != null && !socket.isClosed()) {
                throw (new AlreadyConnectedException());
            }
            try {
                socket = new Socket(ip, port);
                out = new PrintWriter(socket.getOutputStream(), true);
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

                incomingEventOutStream = new PipedOutputStream();
                incomingEventIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingEventOutStream)));
                incomingEventOut = new PrintWriter(incomingEventOutStream);

                incomingResponsOutStream = new PipedOutputStream();
                incomingResponseIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingResponsOutStream)));
                incomingResponseOut = new PrintWriter(incomingResponsOutStream);

            } catch (IOException e) {
                throw (new UnableToConnectException());
            }
            new Thread(new IncomingEventThread()).start();
            new Thread(new SocketThread()).start();
        }
    }

    public void disconnect() throws NotConnectedException {
        disconnect(false);
    }

    private void disconnect(boolean notRequested) throws NotConnectedException {
        synchronized(socketLock) {
            if (!isConnected()) {
                throw (new NotConnectedException());
            }

            try {
                incomingEventIn.close();
            } catch (IOException e2) {}
            // IT NEVER GETS TO HERE!
            incomingEventOut.close();
            try {
                incomingResponseIn.close();
            } catch (IOException e1) {}
            System.out.println("disconnecting");
            incomingResponseOut.close();
            try {
                socket.shutdownInput();
            } catch (IOException e) {}
            try {
                socket.shutdownOutput();
            } catch (IOException e) {}
            try {
                socket.close();
            } catch (IOException e) {}

            if (notRequested) {

                System.out.println("disconnecting event");
                dispatchEvent(new ConnectionLostEvent());
            }
        }
    }

    public boolean isConnected() {
        synchronized(socketLock) {
            return (socket != null && !socket.isClosed());
        }
    }

    public void addEventListener(AsteriskLiveComsEventListener a) {
        synchronized(listeners) {
            listeners.add(a);
        }
    }

    public void removeEventListener(AsteriskLiveComsEventListener a) {
        synchronized(listeners) {
            listeners.remove(a);
        }
    }

    private void dispatchEvent(final AsteriskLiveComsEvent e) {
        synchronized (listeners) {
            synchronized (eventsDispatcherExecutor) {
                eventsDispatcherExecutor.execute(new Runnable()
                {
                    public void run()
                    {
                        for(int i=0; i<listeners.size(); i++) {
                            listeners.get(i).onAsteriskLiveComsEvent(e);
                        }
                    }
                });
            }
        }
    }

    public JSONObject sendRequest(JSONObject request) throws JSONException, NotConnectedException {
        synchronized(socketLock) {
            System.out.println("sending request "+request.toString());
            out.println(request.toString());
            try {
                return new JSONObject(incomingResponseIn.readLine());
            } catch (IOException e) {
                // lets close the connection
                try {
                    disconnect(true);
                } catch (NotConnectedException e1) {}
                throw(new NotConnectedException());
            }
        }
    }

private class SocketThread implements Runnable {

    @Override
    public void run() {
        String inputLine = null;
        try {
            while((inputLine = in.readLine()) != null) {
                // determine if this is a response or event and send to necessary location
                JSONObject lineJSON = new JSONObject(inputLine);
                if (lineJSON.getString("type").equals("response")) {
                    incomingResponseOut.println(inputLine);
                    incomingResponseOut.flush();
                }
                else if (lineJSON.getString("type").equals("event")) {
                    incomingEventOut.println(inputLine);
                    incomingEventOut.flush();
                }
            }

            if (isConnected()) {
                try {
                    disconnect(true);
                } catch (NotConnectedException e) {}
            }
        } catch (IOException e) {
            // try and disconnect (if not already disconnected) and end thread
            if (isConnected()) {
                try {
                    disconnect(true);
                } catch (NotConnectedException e1) {}
            }
        }
    }

}

private class IncomingEventThread implements Runnable {

    @Override
    public void run() {
        String inputLine = null;
        try {
            while((inputLine = incomingEventIn.readLine()) != null) {
                JSONObject lineJSON = new JSONObject(inputLine);
                String eventType = lineJSON.getString("eventType");
                // determine what type of event it is and then fire one that represents it
                if (eventType.equals("channelAdded")) {
                    JSONObject a = lineJSON.getJSONObject("payload");
                    Hashtable<String,Object> data = new Hashtable<String,Object>();
                    Object[] keys = a.keySet().toArray();
                    for(int i=0; i<keys.length; i++) {
                        data.put((String) keys[i], a.get((String) keys[i]));
                    }
                    dispatchEvent(new ChannelAddedEvent(data));
                }
                else if (eventType.equals("channelRemoved")) {
                    dispatchEvent(new ChannelRemovedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("channelsToRoom")) {
                    ArrayList<Integer> data = new ArrayList<Integer>();
                    JSONObject a = lineJSON.getJSONObject("payload");
                    JSONArray ids = a.getJSONArray("channelIds");
                    for(int i=0; i<ids.length(); i++) {
                        data.add(ids.getInt(i));
                    }
                    dispatchEvent(new ChannelsToRoomEvent(data));
                }
                else if (eventType.equals("channelToHolding")) {
                    dispatchEvent(new ChannelToHoldingEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("channelVerified")) {
                    dispatchEvent(new ChannelVerifiedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("serverResetting")) {
                    dispatchEvent(new ServerResettingEvent());
                }
            }
        } catch (IOException e) {}
        System.out.println("here");
    }

}

Edit 2: I think it's a deadlock issue somewhere because if I put some breakpoints in before it in the debugger it runs fine and inputLine = incomingEventIn.readLine() returns null. If I try and run it normally it locks up.

Edit 3: Solved thanks to Gray's answer. The input stream is being closed before the output which was causing the lock up. It needs to be the other way around. Closing the output stream first then informs the input stream that the stream is closed and unblocks the readLine() method.


Solution

  • How does it know that it's reached the end of the stream? What sequence of characters does it use to determine this.

    The answer to this is OS dependent but the OS' I'm familiar with, no EOF characters are read. The OS returns to the underlying caller the return values that indicate that the stream (file-descriptor) has reached EOF. The JVM sees the return value and returns the appropriate return (null, -1, ...) to the InputStream or Reader caller depending on the method.

    I want to simulate sending the same sequence of characters to properly close another connection that uses PipedStreams.

    If you are reading from a PipedReader then you close the associated PipedWriter. The Reader or InputStream will then return the appropriate EOF value to the caller.

    Edit:

    Since your IncomingEventThread is reading from incomingEventIn, the disconnect() method should close the incomingEventOut first. The thread should close the in side itself. Then you should close the response out.

    I would not have the thread call disconnect(...). It should only close it's reader and writer, not all of the streams.