SockJS Java Client auto reconnect

I am using SockJs Java client to connect with websocket running on different server. Every thing works fine like server publishes message and my Java client receives it,but if server is restarted then I am not able to receive any reply.But when I restart the client then again everything work ok. So I want to implement the re-connection logic with restarting the SockJs Java client. My code is as below:

public class Application {

    private final static WebSocketHttpHeaders headers = new WebSocketHttpHeaders();

    private static Logger logger = Logger.getLogger(Application.class);

    public static void main(String[] args) throws InterruptedException, ExecutionException {, args);

        Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
        List<Transport> transports = Collections.singletonList(webSocketTransport);

        SockJsClient sockJsClient = new SockJsClient(transports);
        sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());

        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);

        String url = "ws://{host}:{port}/hello";

        ListenableFuture<StompSession> f = stompClient.connect(url, headers, new MyWebSocketHandler(), "localhost", 9090);
        StompSession stompSession = f.get();"Subscribing to greeting topic using session " + stompSession);


    public static void subscribeGreetings(StompSession stompSession) throws ExecutionException, InterruptedException {

        stompSession.subscribe("/topic/greetings", new StompFrameHandler() {

            public Type getPayloadType(StompHeaders stompHeaders) {
                return byte[].class;

            public void handleFrame(StompHeaders stompHeaders, Object o) {
      "Received greeting " + new String((byte[]) o));


  • Some of my java code, but is for a raw websocket, dont using STOMP. Based on this javascript recconnecting implementation:

    Java code:

     * Problemas conocidos:
     *  Si se realiza la instancia dos veces consecutivas, se generan dos conexiones. Esto es debido a que el Executor
     *   crea dos tareas y no limita.
     * Una posible solucion seria convertir esta API de reconexion en un Singleton, sin embargo hacer eso significa solo
     *  poder crear una coneccion.
     * Una forma de solucionar esto es crear una fabrica de websockets. Y por otro lado tener la API como un singleton. 
     * Para reproducir el problema:
     *      ReconnectWebsocket ws = new ReconnectWebsocket(new LogicWsExternal() {
                public void onOpen(Session session) {
                }   // This is myPersonalEndPoint
                },uri, settings
            ws = new ReconnectWebsocket(new LogicWsExternal() {
                public void onOpen(Session session) {
                }   // This is myPersonalEndPoint
                },uri, settings
            // .. dos conexiones...
     * @author gas
    public class ReconnectWebsocket implements ReconnectObserver{
        private final Logger LOG = LoggerFactory.getLogger(ReconnectWebsocket.class);
        public static final String ANSI_RESET = "\u001B[0m";
        public static final String ANSI_BLUE = "\u001B[34m";
        public static final String ANSI_WHITE = "\u001B[37m";
        // private static ReconnectWebsocketTest instance = null;
        Boolean debug;
        Boolean automaticOpen;
        Integer reconnectInterval;
        Integer maxReconnectionInterval;
        Float reconnectDecay;
        Integer timeoutInterval;
        Integer maxConnectAttempts;
        String binaryType;
        // These should be treated as read-only properties
        /** The URL as resolved by the constructor. This is always an absolute URL. Reas only. */
        URI path;
        /** The number of attempted reconnects since starting, or the last successful connection. Read only. */
        int connectAttemptsCount;
        * The current state of the connection.
        * Can be one of: WebSocket.CONNECTING, WebSocket.OPEN, WebSocket.CLOSING, WebSocket.CLOSED
        * Read only.
        WebSocketStates readyState = WebSocketStates.CLOSED;
         * A string indicating the name of the sub-protocol the server selected; this will be one of
         * the strings specified in the protocols parameter when creating the WebSocket object.
         * Read only.
        // TODO
        //this.protocol = null;
        // Private state variables
        //ReconnectWebsocket self = this;
        Session session;                    // In Javascript implementation is ws variable.
        boolean forcedClose = false;
        boolean timeOutFlag = false;
        Future<Boolean> timerReconnectionFuture;            // Usado para controlar el timer para las reconexnes.
        //private ReconnectObservable observable;
        LogicWsExternal logicExternal;
        ClientManager client;
        LogicWs wsLogic;
        static ScheduledExecutorService executor;
        static ScheduledExecutorService executor2;
        Future openFuture;
        AtomicBoolean openFlag;
         * Tyrus estates:
         *      org.glassfish.tyrus.core.TyrusWebSocket.State
         * */
        public static enum WebSocketStates {                //  Tyrus:
            CONNECTING("CONNECTING",0)      //      NEW
            ,OPEN("OPEN",1)                 //      CONNECTED
            ,CLOSING("CLOSING",2)           //      CLOSING
            ,CLOSED("CLOSED",3)             //      CLOSED
            String desc;
            Integer statusInt;
            WebSocketStates(String desc,Integer statusInt){
            public String getDescription() {
                return desc;
            public Integer getStatusInt() {
                return statusInt;
            public void printDescription(){
                System.out.println("PrinterStatus: "+desc);
        public ReconnectWebsocket(URI path) throws DeploymentException, IOException {
            this(new LogicWsExternal(){
                public void onOpen(Session session) { }},path, null);
        // Consturctor whith Only package visivility
        public ReconnectWebsocket(LogicWsExternal logicWsExernal, URI path) throws DeploymentException, IOException {
            this(logicWsExernal,path, null);
        public ReconnectWebsocket(LogicWsExternal logicWsExternal, URI path, ReconnectSettings settings) {
            // Default setting
            // Overwrite and define settings with options if they exist.
            /** Wheter this instance should log debug mesages. */
            this.debug = settings.getDebug()!=null ? settings.getDebug() : true;
            /** Wheter or not the websocket should attempt to connect immediately upon instantiation. */
            this.automaticOpen = settings.getAutomaticOpen() !=null ? settings.getAutomaticOpen() : true;
            /** The number of milliseconds to delay before attempting to reconnect. */
            this.reconnectInterval = settings.getReconnectInterval()!=null ? settings.getReconnectInterval() : 1000;
            /** The maximum number of milliseconds to delay a reconnection attempt. Timeout to reconnect */
            this.maxReconnectionInterval = settings.getMaxReconnectionInterval()!=null ? settings.getMaxReconnectionInterval() : 10000;
            /** The rate of increase of the reconnect delay. Allows reconnect attemps to back off when problems persist. */
            this.reconnectDecay = settings.getReconnectDecay()!=null ? settings.getReconnectDecay() : (float) 1.3;
            /** The maximum time in milliseconds to wait for a connection to succeed before closing and retrying */
            this.timeoutInterval = settings.getTimeoutInterval()!=null ? settings.getTimeoutInterval() : 5000;
            /** The number of connection attempts to make before to stop. Unlimited if value is zero.
            this.maxConnectAttempts = settings.getMaxConnectAttempts()!=null ? settings.getMaxConnectAttempts() : 0;
            /** The binary type, possible values 'blob' or 'arraybuffer', default 'blob'. */
            this.binaryType = settings.getBinaryType()!=null ? settings.getBinaryType() : "blob";
            //settings.put("idStateEvenbusChannel", "false");
            //settings.put("idStateEvenbusChannel", "false");
            // These should be treated as read-only properties
            /** The URL as resolved by the constructor. This is always an absolute URL. Reas only. */
            this.path = path;
            /** The number of attempted reconnects since starting, or the last successful connection. Read only. */
            this.connectAttemptsCount = 0;
            * The current state of the connection.
            * Can be one of: WebSocket.CONNECTING, WebSocket.OPEN, WebSocket.CLOSING, WebSocket.CLOSED
            * Read only.
            this.readyState = WebSocketStates.CLOSED;
             * A string indicating the name of the sub-protocol the server selected; this will be one of
             * the strings specified in the protocols parameter when creating the WebSocket object.
             * Read only.
            // TODO 
             // "has a" rather than "is a" observable
            //observable = new ReconnectObservable();
            this.logicExternal = logicWsExternal;
            wsLogic = new LogicWs();
            //client =  ClientManager.createClient();//GLiszli lient by default
            // Java 7 cient. 
            client = ClientManager.createClient(JdkClientContainer.class.getName());
            // By default initialize the executors.
            executor = Executors.newScheduledThreadPool(1);
            executor2 = Executors.newScheduledThreadPool(1);
            openFlag = new AtomicBoolean(true);
            // Wheher or not to create a websocket upon instantiation
            if (this.automaticOpen) {
        public void open() {
            if (readyState == WebSocketStates.CONNECTING || readyState == WebSocketStates.OPEN ) {
            if (executor.isShutdown()) {
                executor = Executors.newScheduledThreadPool(1);
            if (executor2.isShutdown()) {
                 * Este poolthread se apaga cuando se manda a llamar la funcion close() de la API.
                 * El apagado se realiza porque se considera que ya no se va o volver a conectar.
                executor2 = Executors.newScheduledThreadPool(1);
             * Resetear variables
            AtomicInteger counter = new AtomicInteger(0);
            connectAttemptsCount = 0;
            readyState = WebSocketStates.CONNECTING;
            // Ejecutar funciones en metodo OnConnecting
            //String reconnectReason = e.getMessage();
            //self.update(new InternalMessageWs(WsEventType.ONCONNECTING,new OnConnectingEvent(reconnectReason)));
            update(new InternalMessageWs(WsEventType.ONCONNECTING,new OnConnectingEvent("First Connect")));
            Runnable openRun = () -> {
                    if (debug) {
                        System.out.println("DEBUG: ReconnectingWebSocket attempt-connect# "+(connectAttemptsCount+1)+" of "+(maxConnectAttempts==0?"infinite":maxConnectAttempts)+" URI="+path.getPath());
                    Callable<Session> task1 = new Callable<Session>() {
                        public Session call() throws Exception {
                            // Avizar al API que se esta intentando realizar una reconexión. Patron productor-consumidor.
                            // TODO Deberia de ser un hilo?? ESto ya que podria se r que algo en el metodo externo sea de tipo bloqueante.
                            Session session = client.connectToServer(wsLogic,path);
                            //self.session = client.connectToServer(wsLogic,self.path);
                            //System.out.println("ReconnectWebsocket:: client.connectToServer(...) is null:"+(session==null?"true":"false"));
                            //System.out.println("ReconnectWebsocket:: client.connectToServer(...) is open:"+session.isOpen());
                            return session;
                    Future<Session> future = executor.submit(task1);
                    try {
                        // Tiempo de espera antes de interrumpir volver a intentarlo.
                        //Session s = future.get(self.timeoutInterval,TimeUnit.MILLISECONDS);
                        //Session s = future.get(30,TimeUnit.SECONDS);
                        //return s;
                        //self.session = future.get(30,TimeUnit.SECONDS);
                        session = future.get(timeoutInterval,TimeUnit.MILLISECONDS);
                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
                        // TODO Auto-generated catch block
                        //e.printStackTrace();          //For debug only
                        // Calculate Back off time.
                        float timeout = (float) (reconnectInterval * Math.pow(reconnectDecay,connectAttemptsCount));
                        if (maxConnectAttempts > 0 && ( connectAttemptsCount >= maxConnectAttempts )) {
                        int maxTimeReconnect = (int) (timeout > maxReconnectionInterval ? maxReconnectionInterval : timeout);
                        Callable<Boolean> timerReconnection = new Callable<Boolean>() {
                            public Boolean call() {
                                while(counter.get() >= 0) {
                                    System.out.println("Time next reconection: "+counter.get()+" seconds");
                                    System.out.println("ThreadId: "+Thread.currentThread().getId() );
                                    // Avizar a la API el tiempo para la sig. reconexión.. Patron productor-consumidor.
                                    // TODO Deberia de ser un hilo?? ESto ya que podria se r que algo en el metodo externo sea de tipo bloqueante.
                                    if (counter.get() == 0 && debug ) {
                                        System.out.println("DEBUG: ReconnectingWebSocket connection-timeout: "+path.getPath());
                                    try {
                                    } catch (InterruptedException e) {
                                        // TODO Auto-generated catch block
                                return false;
                         *          scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS)
                         * 0 -> initialDelay,  this time is includen in the backoff algorithm calc, then this value is zero.
                         * timeout -> the delay between the termination of one execution and the commencement of the next
                        timerReconnectionFuture = executor.submit(timerReconnection);
                        try {
                            Boolean delayTime = timerReconnectionFuture.get();
                        } catch (InterruptedException | ExecutionException e1) {
                            // TODO Auto-generated catch block
                }while (!Thread.currentThread().isInterrupted()); //END while
                //while (openFlag.get()); //END while   
            };// END Runnable
            openFuture = executor2.submit(openRun);
        public void send(String str) {
            // Convertir el texto a objeto Message para enviarlo.
            if (readyState ==  WebSocketStates.OPEN) {
                if (debug) {
                    LOG.debug("Sending to URL:\"{}\", Data:\n\"{}\" ",path.getPath(),JsonWriter.formatJson(str));
                    //System.out.println("DEBUG: ReconnectingWebSocket sending to "+path.getPath()+": "+str);
                try {
                } catch (IOException e) {
                    LOG.error("Sending to URL:\"{}\", Data:\"{}\" {}",path.getPath(),str,e);
         * @param j objeto Json a enviar por Websocket.
         * @throws EncodeException 
         * @throws IOException 
        public void send(Json j) {
            //System.out.println("ReconnectWebsocket:: send(Json INI)");
            send(new Message(j));
            //System.out.println("ReconnectWebsocket:: send(Json END)");
         * Transmits data to the server over Websocket connection.
         * @param data a text string, ArrayBuffer or Blob to send to the server.
         * @throws IOException 
         * @throws EncodeException 
        public void send(Message data) {
            //System.out.println("ReconnectWebsocket:: send(Message INI)");
            if (readyState ==  WebSocketStates.OPEN) {
                if (debug) {
                    System.out.println("DEBUG: ReconnectingWebSocket send "+path.getPath()+": "+data);
                //System.out.println("ReconnectWebSocket::send(Message msg - Before)" );
                //System.out.println("ReconnectWebSocket::send(Message msg - Before - session is null="+(session==null?"true":"false" ));
                //System.out.println("ReconnectWebSocket::send(Message msg - Before - session is open="+session.isOpen());
                try {
                    LOG.debug("Sending to URL:\"{}\", Data:\"{}\" ",path.getPath(),data);
                    //System.out.println("REconnectWebSocket::send(Message msg -  After)" );
                } catch (IOException | EncodeException e) {
                    LOG.error("Sending to URL:\"{}\", Data:\"{}\" {}",path.getPath(),data,e);
            // Deberia de detenerse la reconeccion en estos casos?, es decir detener despues de intentar enviar una
            //  cadena de texto pero que ha fallado.
            //System.out.println("ReconnectWebsocket:: send(Message END)");
        public void close(String reason) {
        public void close() {
        public void close(CloseReason.CloseCodes code) {
        /** Closes the Websocket connection or connection attempt, if any.
         *  If the connection is already CLOSED, this method does nothing.
        public void close(CloseReason.CloseCodes code, String reason) {
            if (readyState == WebSocketStates.CLOSED) {
            CloseReason closeReason;
            forcedClose = true;
             * Status code:     1000
             * Name:            CLOSE_NORMAL
             * Description:     The connection successfully completed whatever purpose for
             *                   which it was created.
            // Default CLOSE_NORMAL code
            if (code==null) {
                closeReason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,"ReconnectingWebSocket STOP");
            } else if (reason!=null) {
                closeReason = new CloseReason(code,reason);
            } else {
                closeReason = new CloseReason(code,"ReconnectingWebSocket STOP");
            if ( (readyState == WebSocketStates.OPEN || readyState == WebSocketStates.CONNECTING) ) {
                // Change readyState status:
                readyState = WebSocketStates.CLOSED;
                if (session==null) {
                     * readyState == WebSocketStates.CONNECTING && session == null
                     * This ocurr when the server is off and the client is in a loop trying to connect.
                } else {
                     * readyState == WebSocketStates.OPEN && session != null
                     *  or
                     * readyState == WebSocketStates.CONNECTING && session != null
                    try { // Permanent close. Called via the Close method.
                        if (session.isOpen()) {
                             * Session is previously closed when has connected at less one time, after the server shutdown
                             * and the reconnection beging. During the reconnection if you try to close (forced close)
                             * then session.close will thorwn a error.
                             * To fix we have verificate if the session is closed.
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        LOG.error("Error cerrando sesion. {}",e);
         * Additional public API method to refresh the connection if still is open.
         * After close, the websocket will try to reconnect.
         * For example, if the app suspects bad data / missed heart beats, it can try to refresh.
        public void refresh() {
            if (readyState == WebSocketStates.OPEN) {
                try {
                    session.close(new CloseReason(CloseCodes.SERVICE_RESTART, "refresh!!"));
                } catch (IOException e) {
                    LOG.error("Error cerrando sesion. {}",e);
            } else {
                // Stop timer of reconnection.
                if (readyState == WebSocketStates.CONNECTING) {
                    close(CloseCodes.SERVICE_RESTART, "refresh!!");
                    // Reset variables.
                    connectAttemptsCount = 0;
        public Session getSession(){
            return session;
        public WebSocketStates getReadyState() {
            return readyState;
         * El observador de los cambios en el clente Websocket interno.
        public void update(InternalMessageWs msg) {
            switch (msg.getType()) {
            case ONOPEN:
                // Cambiar estado de la conexión
                readyState = WebSocketStates.OPEN;
                if (debug ) {
                    System.out.println("DEBUG: ReconnectingWebSocket onOpen: "+path.getPath());
                // Ejecutar las funciones onOPen que el usuario ha definido.
                logicExternal.onOpen( ((OnOpenEvent)msg.getEvent()).getSession() );
            case ONMESSAGE:
                if (debug) {
                    System.out.println("DEBUG: ReconnectingWebSocket onMessage: "+path.getPath());
                OnMessageEvent evtMsg = (OnMessageEvent)msg.getEvent();
                // Ejecutar las funciones OnMessage que el usuario ha definido.
            case ONCLOSE:
                if (debug ) {
                    System.out.println("DEBUG: ReconnectingWebSocket onClose: "+path.getPath()+" forcedClose="+forcedClose);
                // Cambiar estado de la conexión
                readyState = WebSocketStates.CLOSED;
                OnCloseEvent evtClose = (OnCloseEvent)msg.getEvent();
                // Ejecutar las funciones OnClose que el usuario ha definido.
                 * Determinar si se debe vlver a conectar o no.
                 * Si forcedClose = true, entonces detener.
                 * Si forcedClose = false, entonces reconectar.
                if (!forcedClose) {
                    if (debug ) {
                        System.out.println("DEBUG: ReconnectingWebSocket STOP Reconnectiing: "+path.getPath());
                    forcedClose = false;
            case ONERROR:
                if (debug ) {
                    System.out.println("DEBUG: ReconnectingWebSocket onError: "+path.getPath());
                // Cambiar estado de la conexión
                readyState = WebSocketStates.CLOSED;
                OnErrorEvent evtError = (OnErrorEvent)msg.getEvent();
                // Ejecutar las funciones OnError que el usuario ha definido.
                // Volver a iniciar secuencia de conectar.
                // Algunos prfieren cerrar la conexion.
            case ONCONNECTING:
                if (debug ) {
                    System.out.println("DEBUG: ReconnectingWebSocket onConnecting: "+path.getPath());
                OnConnectingEvent evtConnecting = (OnConnectingEvent)msg.getEvent();
                // Ejecutar las funciones OnConnecting que el usuario ha definido.
        public void watcherReconnectionTry() {
        public void watcherTimeLeft(int timeLeft) {

    One interface:

    public interface ReconnectObserver {
        public void update(InternalMessageWs msg);
        public void watcherReconnectionTry();
        public void watcherTimeLeft(int timeLeft);

    ReconnectObservable class:

    import java.util.ArrayList;
    public class ReconnectObservable implements ReconnectSubject {
        private ArrayList<ReconnectObserver> observers;
        public ReconnectObservable() {
            observers = new ArrayList<ReconnectObserver>();
        public void addObserver(ReconnectObserver observer) {
        public void notifyObservers(InternalMessageWs msg) {
            for(ReconnectObserver observer : observers) {

    ReconnectSubject interface:

    public interface ReconnectSubject {
        public void addObserver(ReconnectObserver observer);
        //public void notifyObservers();
        public void notifyObservers(InternalMessageWs msg);

    InternalMessageWs class:

    import javax.websocket.CloseReason;
    import javax.websocket.Session;
    public class InternalMessageWs {
        WsEventType type;
        Object event;
        InternalMessageWs(WsEventType type) {
            this.type = type;
            this.event = null;
        InternalMessageWs(WsEventType type, Object event) {
            this.type = type;
        public WsEventType getType() {
            return type;
        public void setType(WsEventType type) {
            this.type = type;
        public Object getEvent() {
            return event;
        public void setEvent(Object event) {
            this.event = event;
    class OnOpenEvent {
        Session session;
        public OnOpenEvent(Session session) {
            this.session = session;
        public Session getSession() {
            return session;
        public void setSession(Session session) {
            this.session = session;
    class OnMessageEvent {
        Message message;
        Session session;
        public OnMessageEvent(Session session, Message message) {
            this.message = message;
            this.session = session;
        public Message getMessage() {
            return message;
        public void setMessage(Message message) {
            this.message = message;
        public Session getSession() {
            return session;
        public void setSession(Session session) {
            this.session = session;
    class OnCloseEvent {
        Session session;
        CloseReason reason;
        public OnCloseEvent(Session session, CloseReason reason) {
            this.session = session;
            this.reason = reason;
        public Session getSession() {
            return session;
        public void setSession(Session session) {
            this.session = session;
        public CloseReason getReason() {
            return reason;
        public void setReason(CloseReason reason) {
            this.reason = reason;
    class OnErrorEvent {
        Session session;
        Throwable t;
        public OnErrorEvent(Session session,Throwable t) {
            this.t = t;
        public Throwable getT() {
            return t;
        public void setT(Throwable t) {
            this.t = t;
        public Session getSession() {
            return session;
        public void setSession(Session session) {
            this.session = session;
    class OnConnectingEvent {
        String reason;
        public OnConnectingEvent(String reason) {
            this.reason = reason;
        public String getReason() {
            return reason;
        public void setReason(String reason) {
            this.reason = reason;
    enum WsEventType {
        ONCONNECTING            // Using in Reconnecting state of the Websocket client.

    You need use Java 8 JDK because im usign callables, etc.