Let's say we have a script which open a file, then read it line by line and print the line to the terminal. We have a sigle thread and a multithread version.
The problem is than the resulting output of both scripts is almost the same, but not exactly. In the multithread versions there are about ten lines which missed the first 2 chars. I mean, if the real line is something line "Stackoverflow rocks", I obtain "ackoverflow rocks".
I think that this is related to some race condition since if I adjust the parameters to create a lot of little workers, I get more faults than If I use less and bigger workers.
The single thread is like this:
$file = "some/file.txt";
open (INPUT, $file) or die "Error: $!\n";
while ($line = <STDIN>) {
print $line;
}
The multithread version make's use of the thread queue and this implementation is based on the @ikegami approach:
use threads qw( async );
use Thread::Queue 3.01 qw( );
use constant NUM_WORKERS => 4;
use constant WORK_UNIT_SIZE => 100000;
sub worker {
my ($job) = @_;
for (@$job) {
print $_;
}
}
my $q = Thread::Queue->new();
async { while (defined( my $job = $q->dequeue() )) { worker($job); } }
for 1..NUM_WORKERS;
my $done = 0;
while (!$done) {
my @lines;
while (@lines < WORK_UNIT_SIZE) {
my $line = <>;
if (!defined($line)) {
$done = 1;
last;
}
push @lines, $line;
}
$q->enqueue(\@lines) if @lines;
}
$q->end();
$_->join for threads->list;
I tried your program and got similar (wrong) results. Instead of Thread::Semaphore
I used lock
from threads::shared
around the print
as it's simpler to use than T::S, i.e.:
use threads;
use threads::shared;
...
my $mtx : shared;
sub worker
{
my ($job) = @_;
for (@$job) {
lock($mtx); # (b)locks
print $_;
# autom. unlocked here
}
}
...
The global variable $mtx
serves as a mutex. Its value doesn't matter, even undef
(like here) is ok.
The call to lock
blocks and returns only if no other threads currently holds the lock on that variable.
It automatically unlocks (and thus makes lock
return) when it goes out of scope. In this sample that happens
after every single iteration of the for
loop; there's no need for an extra {…}
block.
Now we have syncronized the print
calls…
But this didn't work either, because print
does buffered I/O (well, only O). So I forced unbuffered output:
use threads;
use threads::shared;
...
my $mtx : shared;
$| = 1; # force unbuffered output
sub worker
{
# as above
}
...
and then it worked. To my surprise I could then remove the lock
and it still worked. Perhaps by accident. Note that your script will run significantly slower without buffering.
My conclusion is: you're suffering from buffering.