Search code examples
javaakkaakka-stream

Akka streams: Simulating a failed stream with OverflowStrategy.fail()


I'm new to Akka and Akka streams. I created a dummy flow and i expected it to end with an exception as my map() function is very slow and i set the buffer to 1.

So my question is two-parts:

  1. Why does this code work without a failure?
  2. How can I simulate an overflow? (for learning purposes)
    import akka.NotUsed;
    import akka.actor.ActorSystem;
    import akka.stream.OverflowStrategy;
    import akka.stream.javadsl.Sink;
    import akka.stream.javadsl.Source;

    public class Application {

        public static void main(String[] args) {
            final ActorSystem system = ActorSystem.create("reactive-test");
            Source<Integer, NotUsed> source =
                    Source.range(0, 10000000)
                    .buffer(1, OverflowStrategy.fail())
                    .map(Application::doubleInt);      
            source.runWith(Sink.foreach(a -> System.out.println(a)), system);
        }

        private static Integer doubleInt(int i) {
            try {
                Thread.sleep(2_000);
            } catch (Exception e) {
                System.out.println(e);
            }
            return 2 * i;
        }
    }

Solution

  • Why this code works without a failure?

    The reason is back-pressure. Source won't produce more elements than your sink can consume thus the slow sink has a direct impact on how fast elements are produced. Your buffer is never overflown as a result.

    How can I simulate an overflow? (for learning purposes)

    Have a sink that is eager to consume but at the same time is slow. It can be emulated by adding a grouped(1000) that creates list of 1000 elements and passes it downstream.

    import akka.NotUsed;
    import akka.actor.ActorSystem;
    import akka.stream.OverflowStrategy;
    import akka.stream.javadsl.Sink;
    import akka.stream.javadsl.Source;
    
    public class StreamsBufJava {
    
        public static void main(String[] args) {
            final ActorSystem system = ActorSystem.create("reactive-test");
            Source<Integer, NotUsed> source =
                    Source.range(0, 10000000)
                            .buffer(1, OverflowStrategy.fail())
                            .grouped(1000)
                            .mapConcat(list -> list)
                            .map(StreamsBufJava::doubleInt);
    
    
            source.runWith(Sink.foreach(System.out::println), system);
        }
    
        private static Integer doubleInt(int i) {
            try {
                Thread.sleep(2_000);
            } catch (Exception e) {
                System.out.println(e);
            }
    
            return 2 * i;
        }
    }
    
    

    Produces

    0
    2
    [ERROR] [04/17/2020 09:40:47.671] [reactive-test-akka.actor.default-dispatcher-5] [Buffer(akka://reactive-test)] Failing because buffer is full and overflowStrategy is: [Fail] in stream [class akka.stream.impl.fusing.Buffer$$anon$26]
    4
    6
    8