I have an expensive method to call for creating a batch of source items:
private Task<List<SourceItem>> GetUnprocessedBatch(int batchSize)
I want to populate new items only when there is no item to process(or it falls below a certain threshold). I couldn't figure out which Source method to use so far.
I have implemented a crude stream that would keep returning new items:
public class Stream
private readonly Queue<SourceItem> scrapeAttempts;
private int batchSize = 100;
private int minItemCount = 10;
public Stream()
scrapeAttempts = new Queue<SourceItem>();
public async Task<SourceItem> Next()
if (scrapeAttempts.Count < minItemCount)
var entryScrapeAttempts = await GetUnprocessedBatch(batchSize);
entryScrapeAttempts.ForEach(attempt => scrapeAttempts.Enqueue(attempt));
return scrapeAttempts.Dequeue();
I expected Source.Task
would work but it looks like it calls it only once. How can I create a source for this scenario?
So, conceptually what you want is a Source stage, that fetches elements asynchronously in batches, buffers the batch and propagates events downstream one by one. When the buffer is close to being empty, we want to eagerly call the next fetch on the side thread (but not more than once), so it could complete while we're emptying current batch.
This sort of behavior will require building a custom GraphStage. One that could look like this:
sealed class PreFetch<T> : GraphStage<SourceShape<T>>
private readonly int threshold;
private readonly Func<Task<IEnumerable<T>>> fetch;
private readonly Outlet<T> outlet = new Outlet<T>("prefetch");
public PreFetch(int threshold, Func<Task<IEnumerable<T>>> fetch)
this.threshold = threshold;
this.fetch = fetch;
this.Shape = new SourceShape<T>(this.outlet);
public override SourceShape<T> Shape { get; }
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
private sealed class Logic : GraphStageLogic
public Logic(PreFetch<T> stage) : base(stage.Shape)
// queue for batched elements
var queue = new Queue<T>();
// flag which indicates, that pull from downstream was made,
// but we didn't have any elements at that moment
var wasPulled = false;
// determines if fetch was already called
var fetchInProgress = false;
// in order to cooperate with async calls without data races,
// we need to register async callbacks for success and failure scenarios
var onSuccess = this.GetAsyncCallback<IEnumerable<T>>(batch =>
foreach (var item in batch) queue.Enqueue(item);
if (wasPulled)
// if pull was requested but not fulfilled, we need to push now, as we have elements
// it assumes that fetch returned non-empty batch
Push(stage.outlet, queue.Dequeue());
wasPulled = false;
fetchInProgress = false;
var onFailure = this.GetAsyncCallback<Exception>(this.FailStage);
SetHandler(stage.outlet, onPull: () => {
if (queue.Count < stage.threshold && !fetchInProgress)
// if queue occupation reached bellow expected capacity
// call fetch on a side thread and handle its result asynchronously
stage.fetch().ContinueWith(task =>
// depending on if task was failed or not, we call corresponding callback
if (task.IsFaulted || task.IsCanceled)
onFailure(task.Exception as Exception ?? new TaskCanceledException(task));
else onSuccess(task.Result);
fetchInProgress = true;
// if queue is empty, we cannot push immediatelly, so we only mark
// that pull request has been made but not fulfilled
if (queue.Count == 0)
wasPulled = true;
Push(stage.outlet, queue.Dequeue());
wasPulled = false;