Search code examples
multithreadingperlfilestdin

Missing characters while reading input with threads


Let's say we have a script which open a file, then read it line by line and print the line to the terminal. We have a sigle thread and a multithread version.

The problem is than the resulting output of both scripts is almost the same, but not exactly. In the multithread versions there are about ten lines which missed the first 2 chars. I mean, if the real line is something line "Stackoverflow rocks", I obtain "ackoverflow rocks".

I think that this is related to some race condition since if I adjust the parameters to create a lot of little workers, I get more faults than If I use less and bigger workers.

The single thread is like this:

$file = "some/file.txt";
open (INPUT, $file) or die "Error: $!\n";

while ($line = <STDIN>) {
    print $line;
}

The multithread version make's use of the thread queue and this implementation is based on the @ikegami approach:

use threads            qw( async );
use Thread::Queue 3.01 qw( );

use constant NUM_WORKERS    => 4;
use constant WORK_UNIT_SIZE => 100000;

sub worker {
    my ($job) = @_;
    for (@$job) {
        print $_;
    }
}

my $q = Thread::Queue->new();


async { while (defined( my $job = $q->dequeue() )) { worker($job); } }
    for 1..NUM_WORKERS;

my $done = 0;    

while (!$done) {
    my @lines;

    while (@lines < WORK_UNIT_SIZE) {
        my $line = <>;
        if (!defined($line)) {
            $done = 1;
            last;
        }

    push @lines, $line;
}

$q->enqueue(\@lines) if @lines;
}

$q->end();
$_->join for threads->list;

Solution

  • I tried your program and got similar (wrong) results. Instead of Thread::Semaphore I used lock from threads::shared around the print as it's simpler to use than T::S, i.e.:

    use threads;
    use threads::shared;
    ...
    my $mtx : shared;
    
    sub worker
    {
        my ($job) = @_;
        for (@$job) {
            lock($mtx); # (b)locks
            print $_;
                        # autom. unlocked here
        }
    }
    ...
    

    The global variable $mtx serves as a mutex. Its value doesn't matter, even undef (like here) is ok. The call to lock blocks and returns only if no other threads currently holds the lock on that variable. It automatically unlocks (and thus makes lock return) when it goes out of scope. In this sample that happens after every single iteration of the for loop; there's no need for an extra {…} block.

    Now we have syncronized the print calls…

    But this didn't work either, because print does buffered I/O (well, only O). So I forced unbuffered output:

    use threads;
    use threads::shared;
    ...
    my $mtx : shared;
    $| = 1;  # force unbuffered output
    
    sub worker
    {
        # as above
    }
    ...
    

    and then it worked. To my surprise I could then remove the lock and it still worked. Perhaps by accident. Note that your script will run significantly slower without buffering.

    My conclusion is: you're suffering from buffering.