Search code examples
c#async-awaithttpclienthttp-live-streamingm3u8

How to make sure that the data of multiple Async downloads are saved in the order they were started?


I'm writing a basic Http Live Stream (HLS) downloader, where I'm re-downloading a m3u8 media playlist at an interval specified by "#EXT-X-TARGETDURATION" and then download the *.ts segments as they become available.

This is what the m3u8 media playlist might look like when first downloaded.

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:12
#EXT-X-MEDIA-SEQUENCE:1
#EXTINF:7.975,
http://website.com/segment_1.ts
#EXTINF:7.941,
http://website.com/segment_2.ts
#EXTINF:7.975,
http://website.com/segment_3.ts

I'd like to download these *.ts segments all at the same time with HttpClient async/await. The segments do not have the same size, so even though the download of "segment_1.ts" is started first, it might finish after the other two segments.

These segments are all part of one large video, so it's important that the data of the downloaded segments are written in the order they were started, NOT in the order they finished.

My code below works perfectly fine if the segments are downloaded one after another, but not when multiple segments are downloaded at the same time, because sometimes they don't finish in the order they were started.

I thought about using Task.WhenAll, which guarantees a correct order, but I don't want to keep the the downloaded segments in memory unnecessarily, because they can be a few megabytes in size. If the download of "segment_1.ts" does finish first, then it should be written to disk right away, without having to wait for the other segments to finish. Writing all the *.ts segments to separate files and joining them in the end is not an option either, because it would require double disk space and the total video can be a few gigabytes in size.

I have no idea how to do this and I'm wondering if somebody can help me with that. I'm looking for a way that doesn't require me to create threads manually or block a ThreadPool thread for a long period of time.

Some of the code and exception handling have been removed to make it easier to see what is going on.

// Async BlockingCollection from the AsyncEx library
private AsyncCollection<byte[]> segmentDataQueue = new AsyncCollection<byte[]>();

public void Start()
{
    RunConsumer();
    RunProducer();
}

private async void RunProducer()
{
    while (!_isCancelled)
    {
        var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false);
        var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false);

        string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries);
        if (!lines.Any() || lines[0] != "#EXTM3U")
            throw new Exception("Invalid m3u8 media playlist.");

        for (var i = 1; i < lines.Length; i++)
        {
           var line = lines[i];
           if (line.StartsWith("#EXT-X-TARGETDURATION"))
           {
               ParseTargetDuration(line);
           }
           else if (line.StartsWith("#EXT-X-MEDIA-SEQUENCE"))
           {
               ParseMediaSequence(line);
           }
           else if (!line.StartsWith("#"))
           {
               if (_isNewSegment)
               {

                   // Fire and forget
                   DownloadTsSegment(line);

               }
           }
        }

        // Wait until it's time to reload the m3u8 media playlist again
        await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false);
    }
}

// async void. We never await this method, so we can download multiple segments at once
private async void DownloadTsSegment(string tsUrl)
{
    var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false);
    var data = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);

    // Add the downloaded segment data to the AsyncCollection
    await segmentDataQueue.AddAsync(data, _cts.Token).ConfigureAwait(false);
}

private async void RunConsumer()
{
    using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read))
    {
        while (!_isCancelled)
        {
            // Wait until new segment data is added to the AsyncCollection and write it to disk
            var data = await segmentDataQueue.TakeAsync(_cts.Token).ConfigureAwait(false);
            await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
        }
    }
}

Solution

  • I don't think you need a producer/consumer queue at all here. However, I do think you should avoid "fire and forget".

    You can start them all at the same time, and just process them as they complete.

    First, define how to download a single segment:

    private async Task<byte[]> DownloadTsSegmentAsync(string tsUrl)
    {
      var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false);
      return await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
    }
    

    Then add the parsing of the playlist which results in a list of segment downloads (which are all in progress already):

    private List<Task<byte[]>> DownloadTasks(string data)
    {
      var result = new List<Task<byte[]>>();
      string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries);
      if (!lines.Any() || lines[0] != "#EXTM3U")
        throw new Exception("Invalid m3u8 media playlist.");
      ...
               if (_isNewSegment)
               {
                 result.Add(DownloadTsSegmentAsync(line));
               }
      ...
      return result;
    }
    

    Consume this list one at a time (in order) by writing to a file:

    private async Task RunConsumerAsync(List<Task<byte[]>> downloads)
    {
      using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read))
      {
        for (var task in downloads)
        {
          var data = await task.ConfigureAwait(false);
          await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
        }
      }
    }
    

    And kick it all off with a producer:

    public async Task RunAsync()
    {
      // TODO: consider CancellationToken instead of a boolean.
      while (!_isCancelled)
      {
        var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false);
        var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
    
        var tasks = DownloadTasks(data);
        await RunConsumerAsync(tasks);
    
        await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false);
      }
    }
    

    Note that this solution does run all downloads concurrently, and this can cause memory pressure. If this is a problem, I recommend you restructure to use TPL Dataflow, which has built-in support for throttling.