Search code examples
javaqueueblockingqueue

Read messages which are 10 minutes old in java


There is a producer and consumer I am trying to implement. Where producer will keep pushing messages into queue. But while consumer has to read those messages only after 30 minutes from the time it reached the queue.

Suppose

m1 reaches at 10am 
m2 reaches at 10.10am
m3 reaches at 10.20am

Consumer has to pick them at

m1 at 10.30am
m2 at 10.40am
m3 at 10.50am 

is there any way to do in java.

I tried BlockingQueue but I don't think so we can archive it using BlockingQueue. In BlockingQueue consumer will read as soon as message is available in queue.

Any help will be greatly appreciated.


Solution

  • DelayQueue

    Java comes built with DelayQueue, an implementation of Queue and BlockingQueue that supplies an element only after its assigned delay period has expired.

    Your objects offered to the queue must carry an amount of time to wait, defined by implementing the Delayed interface. This interface requires one method: getDelay. This interface has one other requirement:

    An implementation of this interface must define a compareTo method that provides an ordering consistent with its getDelay method.

    Define your message class to meet both of those requirements.

    public class DelayedMessage implements Delayed , Comparable {…}
    

    Define your queue to carry objects of that type.

    BlockingQueue< DelayedMessage > queue = new DelayQueue<>() ;
    

    See this tutorial, and the tutorial by Oracle.

    Example

    Here is an example app.

    We define a Message class that implements Delayed and thereby also implements Comparable<Delayed>.

    Notice how we record a starting moment on the Message object, capturing the current moment and then adding the amount of time specified as a Duration object. Notice especially how the required getDelay method is dynamically recalculating time remaining between the moment of its execution and the moment when its delay expires. A Delayed element in the queue will not become available while its getDelay result is positive; the element becomes available only when getDelay returns a zero or negative number.

    package work.basil.example;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.Objects;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Message implements Delayed
    {
        // Member fields
        CharSequence message;
        Instant momentToRun;
    
        // Constructor
        public Message ( CharSequence message , Duration delay )
        {
            this.message = Objects.requireNonNull( message );
            this.momentToRun = Instant.now().plus( Objects.requireNonNull( delay ) );
        }
    
        // Getter
        public CharSequence getMessage ( ) { return this.message; }
    
        ;
    
        // Implements Delayed
        @Override
        public long getDelay ( TimeUnit timeUnit )
        {
            Duration timeRemaining = Duration.between( Instant.now() , this.momentToRun );
            long result = timeUnit.convert( timeRemaining );
            return result;
        }
    
        // Implements Comparable
        @Override
        public int compareTo ( Delayed o )
        {
            TimeUnit tu = TimeUnit.NANOSECONDS;
            return Long.compare( this.getDelay( tu ) , o.getDelay( tu ) );
        }
    
        // Object
    
        @Override
        public String toString ( )
        {
            return "Message{ " +
                    "message=" + message +
                    " | momentToRun=" + momentToRun +
                    " | getDelay (nanos) = " + this.getDelay( TimeUnit.NANOSECONDS ) +
                    " }";
        }
    }
    

    Next we use that Message class. We store Message objects in a DelayQueue. We dump that queue to see verify its contents.

    Then we create a background thread to check the queue every second. If we get null from the queue, we ignore it. If we get a Message object from the queue, we write the current moment in UTC and then write the toString output of that message object. We schedule that work on a ScheduledExecutorService (see tutorial by Oracle).

    Wait a full minute when running this app. After a minute expires, the executor service is shutdown, and a notification with current moment is written to the console to let you know the app has ended its run successfully.

    package work.basil.example;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.Objects;
    import java.util.Queue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class DemoDelayQueue
    {
        public static void main ( String[] args )
        {
            DemoDelayQueue app = new DemoDelayQueue();
            app.demo();
        }
    
        private void demo ( )
        {
            System.out.println( "App beginning now: " + Instant.now() );
    
            BlockingQueue < Message > queue = new DelayQueue <>();
            queue.add( new Message( "Message delayed 5 seconds." , Duration.ofSeconds( 5 ) ) );
            queue.add( new Message( "Message delayed 47 seconds." , Duration.ofSeconds( 47 ) ) );
            queue.add( new Message( "Message delayed 3 seconds." , Duration.ofSeconds( 3 ) ) );
            queue.add( new Message( "Message delayed 12 seconds." , Duration.ofSeconds( 12 ) ) );
            System.out.println( "queue = " + queue );
    
            ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
            ses.scheduleAtFixedRate(
                    ( ) -> {
                        Message m = queue.poll();
                        if ( Objects.nonNull( m ) )
                        {
                            System.out.println( "--------" );
                            System.out.println( "Retrieved `Message` object from queue at: " + Instant.now() );
                            System.out.println( m.toString() );
                        } 
                        else // Else `m` is null. 
                        {
                            // System.out.println( "No message available. Null returned by queue." ) ;
                        }
                    }
                    ,
                    0 ,
                    1 ,
                    TimeUnit.SECONDS
            );
    
            try
            {
                Thread.sleep( TimeUnit.MINUTES.toMillis( 1 ) );
            }
            catch ( InterruptedException e )
            {
                e.printStackTrace();
            }
            finally
            {
                ses.shutdown();
                System.out.println( "App ending now: " + Instant.now() );
            }
        }
    }
    

    When run, output looks something like this:

    App beginning now: 2020-06-12T21:26:23.256755Z
    queue = [Message{ message=Message delayed 3 seconds. | momentToRun=2020-06-12T21:26:26.271260Z | getDelay (nanos) = 2999677000 }, Message{ message=Message delayed 12 seconds. | momentToRun=2020-06-12T21:26:35.271324Z | getDelay (nanos) = 11976986000 }, Message{ message=Message delayed 5 seconds. | momentToRun=2020-06-12T21:26:28.271128Z | getDelay (nanos) = 4976537000 }, Message{ message=Message delayed 47 seconds. | momentToRun=2020-06-12T21:27:10.271205Z | getDelay (nanos) = 46976417000 }]
    --------
    Retrieved `Message` object from queue at: 2020-06-12T21:26:26.302270Z
    Message{ message=Message delayed 3 seconds. | momentToRun=2020-06-12T21:26:26.271260Z | getDelay (nanos) = -31295000 }
    --------
    Retrieved `Message` object from queue at: 2020-06-12T21:26:28.300048Z
    Message{ message=Message delayed 5 seconds. | momentToRun=2020-06-12T21:26:28.271128Z | getDelay (nanos) = -29093000 }
    --------
    Retrieved `Message` object from queue at: 2020-06-12T21:26:35.303619Z
    Message{ message=Message delayed 12 seconds. | momentToRun=2020-06-12T21:26:35.271324Z | getDelay (nanos) = -32412000 }
    --------
    Retrieved `Message` object from queue at: 2020-06-12T21:27:10.300950Z
    Message{ message=Message delayed 47 seconds. | momentToRun=2020-06-12T21:27:10.271205Z | getDelay (nanos) = -29863000 }
    App ending now: 2020-06-12T21:27:23.302958Z