I'm trying to implement a crawler that visits some URL, collects new relative URLs from it and builds a report. I'm trying to do it concurrently using Crystal fibers and channels, like the following:
urls = [...] # of String
visited_urls = []
pool_size.times do
spawn do
loop do
url = urls.shift?
break if url.nil?
channel.send(url) if some_condition
end
end
end
# TODO: here the problem!
loop do
url = channel.receive?
break if url.nil? || channel.closed?
visited_urls << url
end
puts visited_urls.inspect
But here I have a problem - infinite second loop
(it calls channel.receive?
till the last item in the channel and than waits for a new message that never arrives). Issue exists because I never know how much items actually in the channel, so I can't do like proposed in the Concurency section of the Crystal lang Guides.
So maybe there are some good practices how to work with the channel when we don't know how much items it will store and we need to receive? Thanks!
A common solution to this is to have a kill value. Either as part of the main data flow like this:
results = Channel(String|Symbol).new(POOL_SIZE * 2)
POOL_SIZE.times do
spawn do
while has_work?
results.send "some work result"
end
results.send :done
end
end
done_workers = 0
loop do
message = results.receive
if message == :done
done_workers += 1
break if done_workers == POOL_SIZE
elsif message.is_a? String
puts "Got: #{message}"
end
end
Or via a secondary channel to signal the event:
results = Channel(String).new(POOL_SIZE * 2)
done = Channel(Nil).new(POOL_SIZE)
POOL_SIZE.times do
spawn do
while has_work?
results.send "some work result"
end
done.send nil
end
end
done_workers = 0
loop do
select
when message = results.receive
puts "Got: #{message}"
when done.receive
done_workers += 1
break if done_workers == POOL_SIZE
end
end