Search code examples
javacometd

Flooding of message at side client from server channel and wrong message with CometD frame work


I am developing a client-server application, where I wanted to have a persistent connection between client-server, and I chose the CometD framework for the same. I successfully created the CometD application.

Client -

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;

import com.synacor.idm.auth.LdapAuthenticator;
import com.synacor.idm.resources.LdapResource;

public class CometDClient {
    private volatile BayeuxClient client;
    private final AuthListner authListner = new AuthListner();
    private LdapResource ldapResource;
public static void main(String[] args) throws Exception {

    org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.LEVEL", "ERROR");
    org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.util.log.announce", "false");
    org.eclipse.jetty.util.log.Log.getRootLogger().setDebugEnabled(false);
    CometDClient client = new CometDClient();
client.run();
}

public void run()  {
    String url = "http://localhost:1010/cometd";
    HttpClient httpClient = new HttpClient();

    try {
        httpClient.start();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    
    client = new BayeuxClient(url, new LongPollingTransport(null, httpClient));
    client.getChannel(Channel.META_HANDSHAKE).addListener(new InitializerListener());
    client.getChannel(Channel.META_CONNECT).addListener(new ConnectionListener());
    client.getChannel("/ldapAuth").addListener(new AuthListner());
    
    
    
    client.handshake();
    boolean success = client.waitFor(1000, BayeuxClient.State.CONNECTED);
    if (!success) {
        System.err.printf("Could not handshake with server at %s%n", url);
        return;
    }

}

private void initialize() {
    client.batch(() -> {

        
        ClientSessionChannel authChannel = client.getChannel("/ldapAuth");
        authChannel.subscribe(authListner);

    });
}

private class InitializerListener implements ClientSessionChannel.MessageListener {
    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        if (message.isSuccessful()) {
            initialize();
        }
    }
}

private class ConnectionListener implements ClientSessionChannel.MessageListener {
    private boolean wasConnected;
    private boolean connected;

    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        if (client.isDisconnected()) {
            connected = false;
            connectionClosed();
            return;
        }

        wasConnected = connected;
        connected = message.isSuccessful();
        if (!wasConnected && connected) {
            connectionEstablished();
        } else if (wasConnected && !connected) {
            connectionBroken();
        }
    }
}
private void connectionEstablished() {
    System.err.printf("system: Connection to Server Opened%n");
}

private void connectionClosed() {
    System.err.printf("system: Connection to Server Closed%n");
}

private void connectionBroken() {
    System.err.printf("system: Connection to Server Broken%n");
}


private class AuthListner implements ClientSessionChannel.MessageListener{

    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        Object data2 = message.getData();
        System.err.println("Authentication String     " + data2 );
        if(data2 != null && data2.toString().indexOf("=")>0) {
        String[] split = data2.toString().split(",");
        String userString = split[0];
        String passString = split[1];
        String[] splitUser = userString.split("=");
        String[] splitPass = passString.split("=");
        LdapAuthenticator authenticator = new LdapAuthenticator(ldapResource);
        if(authenticator.authenticateToLdap(splitUser[1], splitPass[1])) {
//          client.getChannel("/ldapAuth").publish("200:success from client "+user);
//          channel.publish("200:Success "+user);
            Map<String, Object> data = new HashMap<>();
            // Fill in the structure, for example:
            data.put(splitUser[1], "Authenticated");
            channel.publish(data, publishReply -> {
                if (publishReply.isSuccessful()) {
                    System.out.print("message sent successfully on server");
                }
            });
        }
        }
        
    }
    
}

}

Server - Service Class

import java.util.List;
import java.util.concurrent.BlockingQueue;

import org.cometd.bayeux.MarkedReference;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.server.ConfigurableServerChannel;
import org.cometd.bayeux.server.ServerChannel;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.AbstractService;
import org.cometd.server.ServerMessageImpl;

import com.synacor.idm.resources.AuthenticationResource;
import com.synacor.idm.resources.AuthenticationResource.AuthC;


public class AuthenticationService extends AbstractService implements AuthenticationResource.Listener {

    String authParam;
    BayeuxServer bayeux;
    BlockingQueue<String> sharedResponseQueue;
    public AuthenticationService(BayeuxServer bayeux) {

        super(bayeux, "ldapagentauth");
        addService("/ldapAuth", "ldapAuthentication");  
        this.bayeux = bayeux;
    }
    public void ldapAuthentication(ServerSession session, ServerMessage message) {
        System.err.println("********* inside auth service ***********");
        Object data = message.getData();
        System.err.println("****** got data back from client " +data.toString());
        sharedResponseQueue.add(data.toString());
    }
    @Override
    public void onUpdates(List<AuthC> updates) {
        System.err.println("********* inside auth service listner ***********");

        MarkedReference<ServerChannel> createChannelIfAbsent = bayeux.createChannelIfAbsent("/ldapAuth", new ConfigurableServerChannel.Initializer() {
            public void configureChannel(ConfigurableServerChannel channel)
            {
                channel.setPersistent(true);
                channel.setLazy(true);
            }
        });
        ServerChannel reference = createChannelIfAbsent.getReference();
        for (AuthC authC : updates) {

            authParam = authC.getAuthStr();
            this.sharedResponseQueue= authC.getsharedResponseQueue();
            ServerChannel channel = bayeux.getChannel("/ldapAuth");
            ServerMessageImpl serverMessageImpl = new ServerMessageImpl();
            serverMessageImpl.setData(authParam);

            reference.setBroadcastToPublisher(false);
            reference.publish(getServerSession(), authParam, Promise.noop());
        }

    }


}

Event trigger class-

public class AuthenticationResource implements Runnable{
      private final JerseyClientBuilder clientBuilder;
      private final BlockingQueue<String> sharedQueue; 
      private final BlockingQueue<String> sharedResponseQueue;
      private boolean isAuthCall = false; 
      private String userAuth;
        private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
        Thread runner;

    public AuthenticationResource(JerseyClientBuilder clientBuilder,BlockingQueue<String> sharedQueue,BlockingQueue<String> sharedResponseQueue) {
        super();
        this.clientBuilder = clientBuilder;
        this.sharedQueue = sharedQueue;
        this.sharedResponseQueue= sharedResponseQueue;
          this.runner = new Thread(this);
            this.runner.start();
    }
  public List<Listener> getListeners()
  {
      return listeners;
  }
  

    @Override
    public void run() {
      List<AuthC> updates = new ArrayList<AuthC>();

//      boolean is =  true;
      while(true){
        if(sharedQueue.size()<=0) {
            continue;
        }
          try {
             userAuth  = sharedQueue.take();
             // Notify the listeners
             for (Listener listener : listeners)
               
             {
               updates.add(new AuthC(userAuth,sharedResponseQueue));
                 listener.onUpdates(updates);
             }
             updates.add(new AuthC(userAuth,sharedResponseQueue));
                  System.out.println("****** Auth consume ******** " +  userAuth);

             if(userAuth != null) {
               isAuthCall = true;
             }

          } catch (Exception err) {
             err.printStackTrace();
          break;
          }
//          if (sharedQueue.size()>0) {
//              is = false;
//          }
          
      } 

    }
    
  public static class AuthC
  {
      private final String authStr;
      private final BlockingQueue<String> sharedResponseQueue;

      public AuthC(String authStr,BlockingQueue<String> sharedResponseQueue)
      {
          this.authStr = authStr;
          this.sharedResponseQueue=sharedResponseQueue;

      }


      public String getAuthStr()
      {
          return authStr;
      }

      public BlockingQueue<String> getsharedResponseQueue()
      {
          return sharedResponseQueue;
      }

  }
    
  public interface Listener extends EventListener
  {
      void onUpdates(List<AuthC> updates);
  }

}

I have successfully established a connection between client and server. Problems -

1- When I am sending a message from the server to the Client, the same message is sent out multiple times. I only expecting one request-response mechanism. In my case- server is sending user credentila I am expecting result, whether the user is authenticated or not.

you can see in image how it is flooding with same string at client side -

enter image description here

2- There was other problem looping up of message between client and server, that I can be able to resolve by adding, but still some time looping of message is happening.

serverChannel.setBroadcastToPublisher(false);

3- If I change the auth string on sever, at client side it appears to be old one. For example -

  • 1 request from server - auth string -> user=foo,pass=bar -> at client side - user=foo,pass=bar
  • 2 request from server - auth string user=myuser,pass=mypass -> at client side - user=foo,pass=bar

this are the three problems, please guide me and help me to resolve this.


Solution

  • CometD offer a request/response style of messaging using remote calls, both on the client and on the server (you want to use annotated services on the server).

    Channel /ldapAuth has 2 subscribers: the remote client (which subscribes with authChannel.subscribe(...)), and the server-side AuthenticationService (which subscribes with addService("/ldapAuth", "ldapAuthentication")).

    Therefore, every time you publish to that channel from AuthenticationService.onUpdates(...), you publish to the remote client, and then back to AuthenticationService, and that is why calling setBroadcastToPublisher(false) helps.

    For authentication messages, it's probably best that you stick with remote calls, because they have a natural request/response semantic, rather than a broadcasting semantic. Please read about how applications should interact with CometD.

    About other looping, there are no loops triggered by CometD. You have loops in your application (in AuthenticationService.onUpdates(...)) and you take from a queue that may have the same information multiple times (in AuthenticationResource.run() -- which by the way it's a spin loop that will likely spin a CPU core to 100% utilization -- you should fix that).

    The fact that you see stale data it's likely not a CometD issue, since CometD does not store messages anywhere so it cannot make up user-specific data.

    I recommend that you clean up your code using remote calls and annotated services. Also, clean up your own code from spin loops.

    If you still have the problem after the suggestions above, look harder for application mistakes, it's unlikely that this is a CometD issue.