Search code examples
c#akka.netakka.net-streams

How to access OfferAsync method of Source.Queue when using Akka.Net Graphs?


I am working on an Akka.net Streams application that uses Graph API. I would like to provide source data via OfferAsync method of Source.Queue.

How do I access ISourceQueueWithComplete.OfferAsync method after creating a graph to add data to the stream?

Here is a code that I am using to create a graph:


    // Create a graph
    var runnableGraph = RunnableGraph.FromGraph(GraphDsl.Create(
        builder =>
    {
       // create source
       var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail)
       
       // use builder to configure the graph
       ...
    }

    // run the graph
    runnableGraph.Run(materializer);

Here is the code that I want to use to get data for the stream:

while (true)
{
   var events = GetEventsFromExternalSource();
   foreach(var singleEvent in events)
   {
      sourceQueueWithComplete.OfferAsync(singleEvent);
   }
}
   

Solution

  • You just need to pass the Source.Queue<T> in as an input variable when you initially call the GraphDsl - this will allow the graph to have a materialization type, which you can access once the graph is compiled:

    var actorSystem = ActorSystem.Create("Test");
    
    // create source
    var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail);
    
    var graph = GraphDsl.Create(sourceQueue, (builder, source) =>
    {
        // connected shapes
        var flow = builder.Add(Flow.Create<int>().Select(i =i * 10));
        var sink = builder.Add(Sink.ForEach<int>(i =Console.WriteLine(i)));
        
        builder.From(source).To(flow);
        builder.From(flow).To(sink);
        
        return ClosedShape.Instance;
    });
    
    ISourceQueueWithComplete<intqueueSource = actorSystem.Materializer().Materialize(graph);
    
    foreach(var i in Enumerable.Range(0, 10)){
        await queueSource.OfferAsync(i);
    }
    

    Executing this program will result in:

    0
    10
    20
    30
    40
    50
    60
    70
    80
    90