In my application there is a list publisherPostListenerList
which receives the real time user posts from the RabbitMQ queue to be sent to the subscribers/consumers. The list is a property of ApplicationListener
class which listens to the events of the pubsub queue. The below controller method fetches the list elements via getter method & based on a logic pushes the posts to the subscribers.
The flow is as follows
User writes a post -> Post gets into DB + Queue -> Message from Queue is added in a list which is publisherPostListenerList
to be pushed to the subscribers of the user.
As we can see the publisherPostListenerList
is a common list for n concurrent requests due to ApplicationListener
being a singleton. For a single instance the setup works fine but will fail in a clustered environment as each node will have its own individual publisherPostListenerList
list.
How do I deal with this situation? I can't make ApplicationListener
class stateless I need the list to store the post elements received from the queue. Do I put the list in a distributed in memory cache? Or there is any other conventional way?
ApplicationListener.java
@Component
public class ApplicationEventListener {
private List<Post> publisherPostListenerList = new CopyOnWriteArrayList<Post>();
private static final Logger logger = Logger.getLogger(ApplicationEventListener.class);
@EventListener
public void postSubmissionEventHandler(PostSubmissionEvent event) throws IOException {
Post post = event.getPost();
logger.debug("application published user post received " + post);
publisherPostListenerList.add(post);
}
public List<Post> getPublisherPostListenerList() {
return publisherPostListenerList;
}
public void setPublisherPostListenerList(List<Post> publisherPostListenerList) {
this.publisherPostListenerList = publisherPostListenerList;
}
}
Controller method for pushing the message to the subscriber
@RequestMapping(value="/getRealTimeServerPushUserPosts")
public SseEmitter getRealTimeServerPushUserPosts(@RequestParam("userId") int userId){
SseEmitter sseEmitter = new SseEmitter();
CustomUserDetail myUserDetails = currentUserAccessor.getCurrentLoggedInUser();
User loggedInUser=myUserDetails.getUser();
List<Integer> userPublisherIDList = this.userService.loadUserPublisherIdListWhichLoggedInUserFollows(loggedInUser);
List<Post> postList =eventListener.getPublisherPostListenerList();
for(Integer userPublisherId : userPublisherIDList){
for(Post post:postList){
if((userPublisherId.intValue()) == (post.getUser().getUserId().intValue())){
try {
sseEmitter.send(post);
postList.remove(post); //removes the post for all the subscribers as the list acts as a global list.
} catch (IOException e) {
logger.error(e);
}
}
}
}
return sseEmitter;
}
You can use Hazelcast IList
. It follows j.u.List
semantics and suitable for distributed / clustered environments.
You can find a documentation here and examples here.
Another option is to use distributed map aka IMap
.
Let me know if you have specific questions regarding implementation details.
Thank you