Search code examples
javaspringrabbitmqhazelcastdistributed-caching

Creating a list for the clustered environment


In my application there is a list publisherPostListenerList which receives the real time user posts from the RabbitMQ queue to be sent to the subscribers/consumers. The list is a property of ApplicationListener class which listens to the events of the pubsub queue. The below controller method fetches the list elements via getter method & based on a logic pushes the posts to the subscribers.

The flow is as follows

User writes a post -> Post gets into DB + Queue -> Message from Queue is added in a list which is publisherPostListenerListto be pushed to the subscribers of the user.

As we can see the publisherPostListenerList is a common list for n concurrent requests due to ApplicationListener being a singleton. For a single instance the setup works fine but will fail in a clustered environment as each node will have its own individual publisherPostListenerList list.

How do I deal with this situation? I can't make ApplicationListener class stateless I need the list to store the post elements received from the queue. Do I put the list in a distributed in memory cache? Or there is any other conventional way?

ApplicationListener.java

@Component
public class ApplicationEventListener {

    private List<Post> publisherPostListenerList = new CopyOnWriteArrayList<Post>();

    private static final Logger logger = Logger.getLogger(ApplicationEventListener.class);

    @EventListener
    public void postSubmissionEventHandler(PostSubmissionEvent event) throws IOException {
        Post post = event.getPost();
        logger.debug("application published user post received " + post);
        publisherPostListenerList.add(post);
    }

    public List<Post> getPublisherPostListenerList() {
        return publisherPostListenerList;
    }

    public void setPublisherPostListenerList(List<Post> publisherPostListenerList) {
        this.publisherPostListenerList = publisherPostListenerList;
    }
}

Controller method for pushing the message to the subscriber

@RequestMapping(value="/getRealTimeServerPushUserPosts")
    public SseEmitter getRealTimeServerPushUserPosts(@RequestParam("userId") int userId){
        SseEmitter sseEmitter = new SseEmitter();
        CustomUserDetail myUserDetails = currentUserAccessor.getCurrentLoggedInUser();
        User loggedInUser=myUserDetails.getUser();

        List<Integer> userPublisherIDList = this.userService.loadUserPublisherIdListWhichLoggedInUserFollows(loggedInUser);
        List<Post> postList =eventListener.getPublisherPostListenerList();


        for(Integer userPublisherId : userPublisherIDList){
            for(Post post:postList){
                    if((userPublisherId.intValue()) == (post.getUser().getUserId().intValue())){
                        try {
                        sseEmitter.send(post);
                        postList.remove(post); //removes the post for all the subscribers as the list acts as a global list.
                    } catch (IOException e) {
                        logger.error(e);
                    }
                }
             }
         }
        return sseEmitter;
    }

Solution

  • You can use Hazelcast IList. It follows j.u.List semantics and suitable for distributed / clustered environments.

    You can find a documentation here and examples here. Another option is to use distributed map aka IMap.

    Let me know if you have specific questions regarding implementation details.

    Thank you