Search code examples
javacollectionsconcurrencyconcurrent-collections

Find messages from certain key till certain key while being able to remove stale keys


My problem

Let's say I want to hold my messages in some sort of datastructure for longpolling application:

1. "dude"
2. "where"
3. "is"
4. "my"
5. "car"

Asking for messages from index[4,5] should return: "my","car".

Next let's assume that after a while I would like to purge old messages because they aren't useful anymore and I want to save memory. Let's say after time x messages[1-3] became stale. I assume that it would be most efficient to just do the deletion once every x seconds. Next my datastructure should contain:

4. "my"
5. "car"

My solution?

I was thinking of using a concurrentskiplistset or concurrentskiplist map. Also I was thinking of deleting the old messages from inside a newSingleThreadScheduledExecutor. I would like to know how you would implement(efficiently/thread-safe) this or maybe use a library?


Solution

  • The big concern, as I gather it, is how to let certain elements expire after a period. I had a similar requirement and I created a message class that implemented the Delayed Interface. This class held everything I needed for a message and (through the Delayed interface) told me when it has expired.

    I used instances of this object within a concurrent collection, you could use a ConcurrentMap because it will allow you to key those objects with an integer key.

    I reaped the collection once every so often, removing items whose delay has passed. We test for expiration by using the getDelay method of the Delayed interface:

    message.getDelay(TimeUnit.MILLISECONDS);
    

    I used a normal thread that would sleep for a period then reap the expired items. In my requirements it wasn't important that the items be removed as soon as their delay had expired. It seems that you have a similar flexibility.

    If you needed to remove items as soon as their delay expired, then instead of sleeping a set period in your reaping thread, you would sleep for the delay of the message that will expire first.

    Here's my delayed message class:

    class DelayedMessage implements Delayed {
    
        long endOfDelay;
        Date requestTime;
        String message;
    
        public DelayedMessage(String m, int delay) {
            requestTime = new Date();
            endOfDelay = System.currentTimeMillis()
                    + delay;
            this.message = m;
        }
    
        public long getDelay(TimeUnit unit) {
            long delay = unit.convert(
                    endOfDelay - System.currentTimeMillis(),
                    TimeUnit.MILLISECONDS);
            return delay;
        }
    
        public int compareTo(Delayed o) {
            DelayedMessage that = (DelayedMessage) o;
            if (this.endOfDelay < that.endOfDelay) {
                return -1;
    
            }
            if (this.endOfDelay > that.endOfDelay) {
                return 1;
            }
            return this.requestTime.compareTo(that.requestTime);
        }
    
        @Override
        public String toString() {
            return message;
        }
    }