Search code examples
javaatmosphere

Atmosphere: Multiple subscriptions over single HttpConnection


I'm using Atmosphere in my Spring MVC app to facilitate push, using a streaming transport.

Throughout the lifecycle of my app, the client will subscribe and unsubscribe for many different topics.

Atmosphere seems to use a single http connection per subscription - ie., every call to $.atmosphere.subscribe(request) creates a new connection. This quickly exhausts the number of connections allowed from the browser to the atmosphere server.

Instead of creating a new resource each time, I'd like to be able to add and remove the AtmosphereResource to broadcasters after it's initial creation.

However, as the AtmosphereResource is a one-to-one representation of the inbound request, each time the client sends a request to the server, it arrives on a new AtomsphereResource, meaning I have no way to reference the original resource, and append it to the topic's Broadcaster.

I've tried using both $.atmosphere.subscribe(request) and calling atmosphereResource.push(request) on the resource returned from the original subscribe() call. However, this made no difference.

What is the correct way to approach this?


Solution

  • Here's how I got it working:

    First, when the client does their initial connect, ensure that the atmosphere-specific headers are accepted by the browser before calling suspend():

    @RequestMapping("/subscribe")
    public ResponseEntity<HttpStatus> connect(AtmosphereResource resource)
    {
        resource.getResponse().setHeader("Access-Control-Expose-Headers", ATMOSPHERE_TRACKING_ID + "," + X_CACHE_DATE);
        resource.suspend();
    }
    

    Then, when the client sends additional subscribe requests, although they come in on a different resource, they contain the ATMOPSHERE_TRACKING_ID of the original resource. This allows you to look it up via the resourceFactory:

    @RequestMapping(value="/subscribe", method=RequestMethod.POST)
    public ResponseEntity<HttpStatus> addSubscription(AtmosphereResource resource, @RequestParam("topic") String topic)
    {
        String atmosphereId = resource.getResponse().getHeader(ATMOSPHERE_TRACKING_ID);
        if (atmosphereId == null || atmosphereId.isEmpty())
        {
            log.error("Cannot add subscription, as the atmosphere tracking ID was not found");
            return new ResponseEntity<HttpStatus>(HttpStatus.BAD_REQUEST);
        }
        AtmosphereResource originalResource = resourceFactory.find(atmosphereId);
        if (originalResource == null)
        {
            log.error("The provided Atmosphere tracking ID is not associated to a known resource");
            return new ResponseEntity<HttpStatus>(HttpStatus.BAD_REQUEST);
        }
    
        Broadcaster broadcaster = broadcasterFactory.lookup(topic, true);
        broadcaster.addAtmosphereResource(originalResource);
        log.info("Added subscription to {} for atmosphere resource {}",topic, atmosphereId);
    
        return getOkResponse();
    }