I am working on an Akka.net Streams application that uses Graph API. I would like to provide source data via OfferAsync method of Source.Queue.
How do I access ISourceQueueWithComplete.OfferAsync method after creating a graph to add data to the stream?
Here is a code that I am using to create a graph:
// Create a graph
var runnableGraph = RunnableGraph.FromGraph(GraphDsl.Create(
builder =>
{
// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail)
// use builder to configure the graph
...
}
// run the graph
runnableGraph.Run(materializer);
Here is the code that I want to use to get data for the stream:
while (true)
{
var events = GetEventsFromExternalSource();
foreach(var singleEvent in events)
{
sourceQueueWithComplete.OfferAsync(singleEvent);
}
}
You just need to pass the Source.Queue<T>
in as an input variable when you initially call the GraphDsl
- this will allow the graph to have a materialization type, which you can access once the graph is compiled:
var actorSystem = ActorSystem.Create("Test");
// create source
var sourceQueue = Source.Queue<int>(100, OverflowStrategy.Fail);
var graph = GraphDsl.Create(sourceQueue, (builder, source) =>
{
// connected shapes
var flow = builder.Add(Flow.Create<int>().Select(i =i * 10));
var sink = builder.Add(Sink.ForEach<int>(i =Console.WriteLine(i)));
builder.From(source).To(flow);
builder.From(flow).To(sink);
return ClosedShape.Instance;
});
ISourceQueueWithComplete<intqueueSource = actorSystem.Materializer().Materialize(graph);
foreach(var i in Enumerable.Range(0, 10)){
await queueSource.OfferAsync(i);
}
Executing this program will result in:
0
10
20
30
40
50
60
70
80
90