Search code examples
javaspringredislong-pollingspring-data-redis

Using Redis Stream to Block HTTP response via HTTP long polling in Spring Boot App


I have a spring boot web application with the functionality to update an entity called StudioLinking. This entity describes a temporary, mutable, descriptive logical link between two IoT devices for which my web app is their cloud service. The Links between these devices are ephemeral in nature, but the StudioLinking Entity persists on the database for reporting purposes. StudioLinking is stored to the SQL based datastore in the conventional way using Spring Data/ Hibernate. From time to time this StudioLinking entity will be updated with new information from a Rest API. When that link is updated the devices need to respond (change colors, volume, etc). Right now this is handled with polling every 5 seconds but this creates lag from when a human user enters an update into the system and when the IoT devices actually update. It could be as little as a millisecond or up to 5 seconds! Clearly increasing the frequency of the polling is unsustainable and the vast majority of the time there are no updates at all!

So, I am trying to develop another Rest API on this same application with HTTP Long Polling which will return when a given StudioLinking entity is updated or after a timeout. The listeners do not support WebSocket or similar leaving me with Long Polling. Long polling can leave a race condition where you have to account for the possibility that with consecutive messages one message may be "lost" as it comes in between HTTP requests (while the connection is closing and opening, a new "update" might come in and not be "noticed" if I used a Pub/Sub).

It is important to note that this "subscribe to updates" API should only ever return the LATEST and CURRENT version of the StudioLinking, but should only do so when there is an actual update or if an update happened since the last checkin. The "subscribe to updates" client will initially POST an API request to setup a new listening session and pass that along so the server knows who they are. Because it is possible that multiple devices will need to monitor updates to the same StudioLinking entity. I believe I can acomplish this by using separately named consumers in the redis XREAD. (keep this in mind for later in the question)

After hours of research I believe the way to acomplish this is using redis streams.

I have found these two links regarding Redis Streams in Spring Data Redis:

https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/ https://medium.com/@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281

I also have read this link about long polling, both of these links just have a sleep timer during the long polling which is for demonstration purposes but obviously I want to do something useful.

https://www.baeldung.com/spring-deferred-result

And both these links were very helpful. Right now I have no problem figuring out how to publish the updates to the Redis Stream - (this is untested "pseudo-code" but I don't anticipate having any issues implementing this)

// In my StudioLinking Entity

@PostUpdate
public void postToRedis() {
    StudioLinking link = this;
    ObjectRecord<String, StudioLinking> record = StreamRecords.newRecord()
            .ofObject(link)
            .withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
    this.redisTemplate
            .opsForStream()
            .add(record)
            .subscribe(System.out::println);
    atomicInteger.incrementAndGet();
}

But I fall flat when it comes to subscribing to said stream: So basically what I want to do here - please excuse the butchered pseudocode, it is for idea purposes only. I am well aware that the code is in no way indicative of how the language and framework actually behaves :)

// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
    LOG.info("Received async-deferredresult request");
    DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);

    deferredResult.onTimeout(() -> 
      deferredResult.setErrorResult(
        ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
          .body("IT WAS NOT UPDATED!")));
    
    ForkJoinPool.commonPool().submit(() -> {
        //----------------------------------------------
        // Made up stuff... here is where I want to subscribe to a stream and block!
        //----------------------------------------------
        LOG.info("Processing in separate thread");
        try {
            // Subscribe to Redis Stream, get any updates that happened between long-polls
            // then block until/if a new message comes over the stream
            var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(studioLinkingID, updateList),
                StreamOffset.create(studioLinkingID, ReadOffset.lastConsumed()),
                streamListener);
            listenerContainer.start();
        } catch (InterruptedException e) {
        }
        output.setResult("IT WAS UPDATED!");
    });
    
    LOG.info("servlet thread freed");
    return output;
}

So is there a good explanation to how I would go about this? I think the answer lies within https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html but I am not a big enough Spring power user to really understand the terminology within Java Docs (the Spring documentation is really good, but the JavaDocs is written in the dense technical language which I appreciate but don't quite understand yet).

There are two more hurdles to my implementation:

  1. My exact understanding of spring is not at 100% yet. I haven't yet reached that a-ha moment where I really fully understand why all these beans are floating around. I think this is the key to why I am not getting things here... The configuration for the Redis is floating around in the Spring ether and I am not grasping how to just call it. I really need to keep investigating this (it is a huge hurdle to spring for me).
  2. These StudioLinking are short lived, so I need to do some cleanup too. I will implement this later once I get the whole thing up off the ground, I do know it will be needed.

Solution

  • Why don't you use a blocking polling mechanism? No need to use fancy stuff of spring-data-redis. Just use simple blocking read of 5 seconds, so this call might take around 6 seconds or so. You can decrease or increase the blocking timeout.

    class LinkStatus {
        private final boolean updated;
    
        LinkStatus(boolean updated) {
          this.updated = updated;
        }
      }
    
    
    
    // Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
      // updateList is a unique token to track individual consumers in Redis
      @GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
      public LinkStatus subscribeToUpdates(
          @PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
        StreamOperations<String, String, String> op = redisTemplate.opsForStream();
        
        Consumer consumer = Consumer.from("test-group", "test-consumer");
        // auto ack block stream read with size 1 with timeout of 5 seconds
        StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
        List<MapRecord<String, String, String>> records =
            op.read(consumer, readOptions, StreamOffset.latest("test-stream"));
        return new LinkStatus(!CollectionUtils.isEmpty(records));
      }