Search code examples
c#.netsystem.reactiveidisposablec#-ziparchive

Reactive Extensions: Creating a pipeline with Rx that works with files


I have a process pipeline with three steps:

  1. video to images: I have a video that is converted to still images (frames)
  2. frames to zip file: When all the frames in a video have been processed, I should create a Zip File with them.
  3. zip file => Upload to FTP

It involves two disposables: the video capture and the zip file.

How could I handle it using Rx? I don't know how to start.


Solution

  • Do you have to pass along the raw objects from each step. It should be fine that the video capture or the zip file are "disposed", because I am sure there would be some side effect form the action (a MemoryStream, a file written to disk etc.). You can just pass on the pointers (Uri's?) to the results of each action to the next part of the pipeline?

    void Main()
    {
        var video = new Uri("https://www.youtube.com/watch?v=Tp5mRlHwZ7M");
        var query = from frames in TranscodeVideoToImages(video)
            from zipFile in ZipFiles(frames)
            from uploadLocation in UploadFile(zipFile)
            select uploadLocation;
        query.Subscribe(...)
    }
    private IObservable<Uri[]> TranscodeVideoToImages(Uri imageSource)
    {
        //Do some long running (async) work here.
        // Save work to disk
        // Return location of saved work
    }
    private IObservable<Uri> ZipFiles(Uri[] files)
    {
        //Run the zip process on all of the files
        //  Return the location of the zip file
    }
    private IObservable<Uri> UploadFile(Uri source)
    {
        //Upload the File. 
        //Probably as simple as as task based operation with .ToObservable()
    }