Search code examples
c#task-parallel-librarypipelinetpl-dataflow

C# TPL: Possible to restart a failed Pipeline at an arbitrary step?


I have a data processing job that consists of about 20 sequential steps. The steps all fall under one of three categories:

  1. do some file manipulation
  2. import / export data from a database
  3. make a call to a 3rd party web API

I've refactored the code from one long, awful looking method to a pipeline pattern, using examples here and here. All of the steps are TransformBlock, such as

var stepThirteenPostToWebApi = new TransformBlock<FileInfo, System.Guid>(async csv =>
{
dynamic task = await ApiUtils.SubmitData(csv.FullName);
return task.guid;
});

The code works most of the time, but occasionally a step in the pipeline fails for whatever reason - let's say a corrupt file can't be read in step 6 of 20 (just an example - any step could fail). The pipeline stops running further tasks, as it should.

However, the 3rd party web API introduces a challenge - we are charged for each job we initiate whether we execute all 20 steps or just the first one.

I would like to be able to fix whatever went wrong in the problem step (again, for our example let's say I fix the corrupt file in step 6 of 20), then pick back up at step 6. The 3rd party web API has a GUID for each job, and is asynchronous, so that should be fine - after the problem is fixed, it will happily let a job resume with remaining steps.

My question: Is it possible (and if so advisable?) to design a pipeline that could begin at any step, assuming the pre-requisites for that step were valid?

It would look something like:

  1. job fails on step 6 and logs step 5 as the last successful step
  2. a human comes along and fixes whatever caused step 6 to fail
  3. a new pipeline is started at step 6

I realize a brute-force way would be to have StartAtStep2(), StartAtStep3(), StartAtStep4() methods. That doesn't seem like a good design, but I'm a bit new at this pattern so maybe that's acceptable.


Solution

  • The brute force way is not that bad, for example your above code would just need to be

    bool StartAtStepThirteen(FileInfo csv) 
    { 
        return stepThirteenPostToWebApi.Post(csv); 
    }
    

    The setup of the chain should be a separate method than the executing of the chain. You should save stepThirteenPostToWebApi in a class level variable in a class that represent's the entire chain, the setup of the chain could be done in the class's constructor.

    Here is a simple 3 step version of the process. When a error happens instead of faulting the task chain I log the error and pass null along the chain for invalid entries. You could make that log method raise a event and then the user can decide what to do with the bad entry.

    public class WorkChain
    {
        private readonly TransformBlock<string, FileInfo> stepOneGetFileInfo;
        private readonly TransformBlock<FileInfo, System.Guid?> stepTwoPostToWebApi;
        private readonly ActionBlock<System.Guid?> stepThreeDisplayIdToUser;
    
        public WorkChain()
        {
            stepOneGetFileInfo = new TransformBlock<string, FileInfo>(new Func<string, FileInfo>(GetFileInfo));
            stepTwoPostToWebApi = new TransformBlock<FileInfo, System.Guid?>(new Func<FileInfo, Task<Guid?>>(PostToWebApi));
            stepThreeDisplayIdToUser = new ActionBlock<System.Guid?>(new Action<Guid?>(DisplayIdToUser));
    
            stepOneGetFileInfo.LinkTo(stepTwoPostToWebApi, new DataflowLinkOptions() {PropagateCompletion = true});
            stepTwoPostToWebApi.LinkTo(stepThreeDisplayIdToUser, new DataflowLinkOptions() {PropagateCompletion = true});
        }
    
        public void PostToStepOne(string path)
        {
            bool result = stepOneGetFileInfo.Post(path);
            if (!result)
            {
                throw new InvalidOperationException("Failed to post to stepOneGetFileInfo");
            }
        }
    
        public void PostToStepTwo(FileInfo csv)
        {
            bool result = stepTwoPostToWebApi.Post(csv);
            if (!result)
            {
                throw new InvalidOperationException("Failed to post to stepTwoPostToWebApi");
            }
        }
    
        public void PostToStepThree(Guid id)
        {
            bool result = stepThreeDisplayIdToUser.Post(id);
            if (!result)
            {
                throw new InvalidOperationException("Failed to post to stepThreeDisplayIdToUser");
            }
        }
    
        public void CompleteAdding()
        {
            stepOneGetFileInfo.Complete();
        }
    
        public Task Completion { get { return stepThreeDisplayIdToUser.Completion; } }
    
    
        private FileInfo GetFileInfo(string path)
        {
            try
            {
                return new FileInfo(path);
            }
            catch (Exception ex)
            {
                LogGetFileInfoError(ex, path);
                return null;
            }
    
        }
    
        private async Task<Guid?> PostToWebApi(FileInfo csv)
        {
            if (csv == null)
                return null;
            try
            {
                dynamic task = await ApiUtils.SubmitData(csv.FullName);
                return task.guid;
            }
            catch (Exception ex)
            {
                LogPostToWebApiError(ex, csv);
                return null;
            }
        }
    
        private void DisplayIdToUser(Guid? obj)
        {
            if(obj == null)
                return;
    
            Console.WriteLine(obj.Value);
        }
    
    }