Search code examples
concurrencycrystal-lang

How to read all the items from the Channel when you don't know their actual count


I'm trying to implement a crawler that visits some URL, collects new relative URLs from it and builds a report. I'm trying to do it concurrently using Crystal fibers and channels, like the following:

urls = [...] # of String
visited_urls = []

pool_size.times do
  spawn do
    loop do
      url = urls.shift?
      break if url.nil?

      channel.send(url) if some_condition
    end
  end
end

# TODO: here the problem!
loop do
  url = channel.receive?
  break if url.nil? || channel.closed?

  visited_urls << url
end

puts visited_urls.inspect

But here I have a problem - infinite second loop (it calls channel.receive? till the last item in the channel and than waits for a new message that never arrives). Issue exists because I never know how much items actually in the channel, so I can't do like proposed in the Concurency section of the Crystal lang Guides.

So maybe there are some good practices how to work with the channel when we don't know how much items it will store and we need to receive? Thanks!


Solution

  • A common solution to this is to have a kill value. Either as part of the main data flow like this:

    results = Channel(String|Symbol).new(POOL_SIZE * 2)
    
    POOL_SIZE.times do
      spawn do
        while has_work?
          results.send "some work result"
        end
    
        results.send :done
      end
    end
    
    done_workers = 0
    
    loop do
      message = results.receive
      if message == :done
        done_workers += 1
        break if done_workers == POOL_SIZE
      elsif message.is_a? String
        puts "Got: #{message}"
      end
    end
    

    Or via a secondary channel to signal the event:

    results = Channel(String).new(POOL_SIZE * 2)
    done = Channel(Nil).new(POOL_SIZE)
    
    POOL_SIZE.times do
      spawn do
        while has_work?
          results.send "some work result"
        end
    
        done.send nil
      end
    end
    
    done_workers = 0
    loop do
      select
      when message = results.receive
        puts "Got: #{message}"
      when done.receive
        done_workers += 1
        break if done_workers == POOL_SIZE
      end
    end