Search code examples
perliobuffer

Is it ever safe to combine select(2) and buffered IO for file handles?


I am using IO::Select to keep track of a variable number of file handles for reading. Documentation I've come across strongly suggests not to combine the select statement with <> (readline) for reading from the file handles.

My situation:

I will only ever use each file handle once, i.e. when the select offers me the file handle, it will be completely used and then removed from the select. I will be receiving a hash and a variable number of files. I do not mind if this blocks for a time.

For more context, I am a client sending information to be processed by my servers. Each file handle is a different server I'm talking to. Once the server is finished, a hash result will be sent back to me from each one. Inside that hash is a number indicating the number of files to follow.

I wish to use readline in order to integrate with existing project code for transferring Perl objects and files.

Sample code:

my $read_set = IO::Select()->new;
my $count = @agents_to_run; #array comes as an argument

for $agent ( @agents_to_run ) { 
    ( $sock, my $peerhost, my $peerport ) 
        = server($config_settings{ $agent }->
            { 'Host' },$config_settings{ $agent }->{ 'Port' };
    $read_set->add( $sock );

}

while ( $count > 0) {
    my @rh_set = IO::Select->can_read();

    for my $rh ( @{ $rh_set } ) {

            my %results = <$rh>;
            my $num_files = $results{'numFiles'};
            my @files = ();
            for (my i; i < $num_files; i++) {
                    $files[i]=<$rh>;
            }                 
            #process results, close fh, decrement count, etc
    }
}

Solution

  • Using readline (aka <>) is quite wrong for two reasons: It's buffered, and it's blocking.


    Buffering is bad

    More precisely, buffering using buffers that cannot be inspected is bad.

    The system can do all the buffering it wants, since you can peek into its buffers using select.

    Perl's IO system cannot be allowed to do any buffering because you cannot peek into its buffers.

    Let's look at an example of what can happen using readline in a select loop.

    • "abc\ndef\n" arrives on the handle.
    • select notifies you that there is data to read.
    • readline will try to read a chunk from the handle.
    • "abc\ndef\n" will be placed in Perl's buffer for the handle.
    • readline will return "abc\n".

    At this point, you call select again, and you want it to let you know that there is more to read ("def\n"). However, select will report there is nothing to read since select is a system call, and the data has already been read from the system. That means you will have to wait for more to come in before being able to read "def\n".

    The following program illustrates this:

    use IO::Select qw( );
    use IO::Handle qw( );
    
    sub producer {
        my ($fh) = @_;
        for (;;) {
            print($fh time(), "\n") or die;
            print($fh time(), "\n") or die;
            sleep(3);
        }
    }
    
    sub consumer {
        my ($fh) = @_;
        my $sel = IO::Select->new($fh);
        while ($sel->can_read()) {
            my $got = <$fh>;
            last if !defined($got);
            chomp $got;
            print("It took ", (time()-$got), " seconds to get the msg\n");
        }
    }
    
    pipe(my $rfh, my $wfh) or die;
    $wfh->autoflush(1);
    fork() ? producer($wfh) : consumer($rfh);
    

    Output:

    It took 0 seconds to get the msg
    It took 3 seconds to get the msg
    It took 0 seconds to get the msg
    It took 3 seconds to get the msg
    It took 0 seconds to get the msg
    ...
    

    This can be fixed using non-buffered IO:

    sub consumer {
        my ($fh) = @_;
        my $sel = IO::Select->new($fh);
        my $buf = '';
        while ($sel->can_read()) {
            sysread($fh, $buf, 64*1024, length($buf)) or last;
            while ( my ($got) = $buf =~ s/^(.*)\n// ) {
                print("It took ", (time()-$got), " seconds to get the msg\n");
            }
        }
    }
    

    Output:

    It took 0 seconds to get the msg
    It took 0 seconds to get the msg
    It took 0 seconds to get the msg
    It took 0 seconds to get the msg
    It took 0 seconds to get the msg
    It took 0 seconds to get the msg
    ...
    

    Blocking is bad

    Let's look at an example of what can happen using readline in a select loop.

    • "abcdef" arrives on the handle.
    • select notifies you that there is data to read.
    • readline will try to read a chunk from the socket.
    • "abcdef" will be placed in Perl's buffer for the handle.
    • readline hasn't received a newline, so it tries to read another chunk from the socket.
    • There is no more data currently available, so it blocks.

    This defies the purpose of using select.

    [ Demo code forthcoming ]


    Solution

    You have to implement a version of readline that doesn't block, and only uses buffers you can inspect. The second part is easy because you can inspect the buffers you create.

    • Create a buffer for each handle.
    • When data arrives from a handle, read it but no more. When data is waiting (as we know from select), sysread will return what's available without waiting for more to arrive. That makes sysread perfect for this task.
    • Append the data read to the appropriate buffer.
    • For each complete message in the buffer, extract it and process it.

    Adding a handle:

    $select->add($fh);
    $clients{fileno($fh)} = {
        buf  => '',
        ...
    };
    

    select loop:

    use experimental qw( refaliasing declared_refs );
    
    while (my @ready = $select->can_read) {
        for my $fh (@ready) {
            my $client = $clients{fileno($fh)};
            my \$buf = \($client->{buf});  # Make $buf an alias for $client->{buf}
    
            my $rv = sysread($fh, $buf, 64*1024, length($buf));
            if (!$rv) {
                delete $clients{fileno($fh)};
                $sel->remove($fh);
    
                if (!defined($rv)) {
                    ... # Handle error
                }
                elsif (length($buf)) {
                    ... # Handle eof with partial message
                }
                else {
                    ... # Handle eof
                }
    
                next;
            }
    
            while ( my ($msg) = $buf =~ s/^(.*)\n// )
                ... # Process message.
            }
        }
    }
    

    By the way, this is much easier to do using threads, and this doesn't even handle writers!


    Note that IPC::Run can do all the hard work for you if you're communicating with a child process, and that asynchronous IO can be used as an alternative to select.