Search code examples
akkaakka-streamakka.net

Run every N minutes or if item differs from average


I have an actor which receives WeatherConditions and pushes it (by using OfferAsync) it to source. Currently it is setup to run for each item it receives (it stores it to db).

public class StoreConditionsActor : ReceiveActor
{
    public StoreConditionsActor(ITemperatureDataProvider temperatureDataProvider)
    {
        var materializer = Context.Materializer();
        var source = Source.Queue<WeatherConditions>(10, OverflowStrategy.DropTail);

        var graph = source
            .To(Sink.ForEach<WeatherConditions>(conditions => temperatureDataProvider.Store(conditions)))
            .Run(materializer);

        Receive<WeatherConditions>(i =>
        {
            graph.OfferAsync(i);
        });
    }
}

What I would like to achieve is:

  1. Run it only once every N minutes and store average value of WeatherConditions from all items received in this N minutes time window
  2. If item received matches certain condition (i.e. item temperature is 30% higher than previous item's temperature) run it despite of being "hidden" in time window.

I've been trying ConflateWithSeed, Buffer, Throttle but neither seems to be working (I'm newbie in Akka / Akka Streams so I may be missing something basic)


Solution

  • This answer uses Akka Streams and Scala, but perhaps it will inspire your Akka.NET solution.

    The groupedWithin method could meet your first requirement:

    val queue =
      Source.queue[Int](10, OverflowStrategy.dropTail)
        .groupedWithin(10, 1 second)
        .map(group => group.sum / group.size)
        .toMat(Sink.foreach(println))(Keep.left)
        .run()
    
    Source(1 to 10000)
      .throttle(10, 1 second)
      .mapAsync(1)(queue.offer(_))
      .runWith(Sink.ignore)
    

    In the above example, up to 10 integers per second are offered to the SourceQueue, which groups the incoming elements in one-second bundles and calculates the respective averages of each bundle.

    As for your second requirement, you could use sliding to compare an element with the previous element. The below example passes an element downstream only if it is at least 30% greater than the previous element:

    val source: Source[Int, _] = ???
    
    source
      .sliding(2, 1)
      .collect {
        case Seq(a, b) if b >= 1.3 * a => b
      }
      .runForeach(println)