Search code examples
perlpipe

non-blocking read of a pipe while doing other things


As a follow-up to process hangs when writing large data to pipe, I need to implement a way for a parent process to read from a pipe being written to by its child process, while doing other things until the child has completed.

More specifically, the parent is returning a response to a client over HTTP. The response consists of the string <PING/>, followed by the string <DONE/> when it is done pinging, followed by the actual content. This is being done to keep the connection alive until the actual response is ready.

My questions:

1) I'm mostly just looking for general feedback. Do you see any issues with this code?

2) Will I achieve my goal of a non-blocking read? In particular once all of the currently available data has been read (but the writer is still writing more) will my code be able to move on from while ( my $line = <$pipe_reader>){? And will it work properly after the pipe has been closed but before the child terminates?

3) The documentation for IO::Select says that add() takes an IO::Handle object. I keep seeing IO::Handle everywhere, but I don't know how to determine if a pipe created in this way counts as an IO::Handle object. perl -e "pipe(my $r, my $w); print(ref($r))" just gives me GLOB...

4) The Perl documentation for select (on which I assume IO::Select is based) warns

WARNING: One should not attempt to mix buffered I/O (like read or readline) with select, except as permitted by POSIX, and even then only on POSIX systems. You have to use sysread instead.

Does this mean it is a problem to have $writer->write('<PING/>'); in the same loop?

Perl code

pipe(my $pipe_reader, my $pipe_writer);
$pipe_writer->autoflush(1);

my $pid = fork;

if ( $pid ) {

    # parent
    close $pipe_writer;

    $s = IO::Select->new();
    $s->add($pipe_reader);

    my $response  = "";
    my $startTime = time;
    my $interval  = 25;
    my $pings     = 0;

    while ( waitpid(-1, WNOHANG) <= 0 ) {

        if ( time > $startTime + ($interval * $pings) ) {
            $pings++;
            $writer->write('<PING/>');
        }

        if ( $s->can_read(0) ) {

            while ( my $line = <$pipe_reader> ) {
                $response .= $line;
            }
        }
    };

    $writer->write('<DONE/>');
    $writer->write($response);
    close $pipe_reader;
    $writer->close();

else {

    #child
    die "cannot fork: $!" unless defined $pid;
    close $pipe_reader;

    #...do writes here...

    close $pipe_writer;
}

Regarding $writer, it may be irrelevant to this question, but the overall solution follows the pattern in the second code sample here

Since we aren't ready with the entire HTTP body yet, we return a callback to PSGI which gives us a $responder object. We give it just the HTTP status and content type, then it gives us a $writer to write the body later.

We use $writer in the above code to write our ping values and the eventual body. All of the above code is in the callback returned to PSGI but I omitted that for brevity.


Solution

  • The first issue here is that of the non-blocking operation. Other questions are addressed below.

    As you quote, with select (or IO::Select) one should not use buffered I/O. Specially here where you want non-blocking and non-buffered operation. The code below gets terribly confused with <>.

    Note that "buffering" is a multi-layered business. Some of it can be turned on/off by a simple program instruction, some is far more difficult to mess with, and some is a matter of implementation. It is in the language, libraries, OS, hardware. The least we can do is to use recommended tools.

    Thus read from the select-manipulated handle using sysread, not readline (what <> uses). It returns 0 on EOF so one can test when the writing end has been closed (when EOF gets sent).

    use warnings;
    use strict;
    use feature 'say';
    
    use Time::HiRes qw(sleep);
    use IO::Select; 
    
    my $sel = IO::Select->new;
    
    pipe my $rd, my $wr;
    $sel->add($rd); 
    
    my $pid = fork // die "Can't fork: $!";  #/
    
    if ($pid == 0) {
        close $rd; 
        $wr->autoflush;
        for (1..4) {
            sleep 1;
            say "\tsending data";
            say $wr 'a' x (120*1024);
        }
        say "\tClosing writer and exiting";
        close $wr;
        exit; 
    }   
    close $wr;    
    say "Forked and will read from $pid";
    
    my @recd;
    READ: while (1) {
        if (my @ready = $sel->can_read(0)) {  # beware of signal handlers
            foreach my $handle (@ready) {
                my $buff;
                my $rv = sysread $handle, $buff, 64*1024;
                if (not $rv) {  # error (undef) or closed writer (==0)
                    if (not defined $rv) {
                        warn "Error reading: $!";
                    }
                    last READ;  # single pipe (see text)
                }
                say "Got ", length $buff, " characters";
                push @recd, length $buff; 
            }
        }
        else {
            say "Doing else ... ";
            sleep 0.5; 
        }
    }   
    close $rd;
    my $gone = waitpid $pid, 0;
    say "Reaped pid $gone";
    say "Have data: @recd"
    

    This assumes that the parent doesn't do a lot of processing in else or that would make the pipe checks wait then. In such a case you need to fork yet another process for those long jobs.

    Some comments

    • I ask for a lot of data from sysread as that is the most efficient way to use it and as you expect big writes from the child. You can see from the prints (a sample is below) how that works out.

    • The undefined return of sysread indicates an error. The pipe may still be readable and if we return to sysread via while we may end up in an infinite loop of errors, so we exit the loop. The read error might not happen the next time but counting on that would risk an infinite loop. Or introduce a counter and try to re-read a few times.

    • On exceptional return (writer closed or error) the code exits the loop, as no more need be done here. But with more complex IPC (more pipes, all this in another loop taking new connections, signal handlers, etc) we'd need to remove the handle from the list of those to watch for and the handling of read error would differ from that of the closed writer.

    • In this simple example the error handling is simple (really just last READ if not $rv;). But in general a read error is a different matter from the orderly closed writer and they are handled separately. (For one, on a read error we'd want to retry a fixed number of times.)

    • All data can be collected into $buff by using OFFSET, the fourth argument for sysread, of length $buff. Then every write starts at the end of $buff, which gets extended.

      my $rv = sysread $handle, $buff, 64*1024, length $buff;
      

    In this case there is no need for @recd. This is a common way to collect data.

    • Signals are a part and parcel of any IPC. A limited discussion follows

    "Safe signals" generally protect I/O from being interrupted by a signal. But select may be affected

    Note that whether select gets restarted after signals (say, SIGALRM) is implementation-dependent.

    and thus handles that use it may not be safe either. In my experience can_read can return (false) when a SIGCHLD is handled by the program. This simple example is safe, for a few reasons:

    • If can_read returns empty as a signal is handled the while brings it right back to that handle, which is still readable.

    • A signal can affect select if it comes while the program is blocked at select. But you have non-blocking operation and the chance that the signal comes in right while select is checking handles is minuscule

    • Finally, I don't know whether a SIGCHLD for a process that wrote to a pipe can affect select on the other end of that very pipe, but even if it can odds are astronomically small.

    With more complex code (if can_read isn't directly in a loop like above) consider whether its faulty return (due to a signal) can affect the program flow. If that is a concern add code to check false returns from can_read; if caused by a signal the $! is EINTR. This can be checked by using %!, which when used gets Errno loaded. So you can check whether can_read returned because of an interrupt by if $!{EINTR}. For instance

    if (my @ready = $sel->can_read(0)) {
       ...
    }
    elsif ($!{EINTR}) { 
       # interrupted by signal, transfer control as suitable
       next READ;
    }
    

    Again, the program above returns to while promptly anyway (by assumption that the else block isn't meant for long-running jobs, for which there should be another process).

    A different matter is SIGPIPE signal, which by default kills the program. Since you are dealing with pipes it is only prudent to handle it, by installing a signal handler

    $SIG{PIPE} = \&handle_sigpipe;
    

    where sub handle_sigpipe can do what the program needs. For example, set global variables used to check the validity of the pipe, so once it raised an error we don't try to read/write with it again. The very fact that we have assigned to $SIG{PIPE} protects from that signal. However, unless it's 'IGNORE', the can_read need be restarted as discussed above. See the follow-up post.

    Comments on questions

    • Your code fragment won't be able to "move on" as intended since it uses <> to read. (Besides, you got in a while over <> there, which does block. So once it reads what is available it would sit and wait until more comes. You want a single read instead, but again not with <>.)

    • Every filenahdle is an IO::Handle (or IO::File) object, or at least gets blessed into those classes on demand. See (second part of) this post.

    • The warning on not mixing buffered I/O with select relates to filehandles that use it. While it is crucial for the pipe, writing to that other service is unrelated.

    • Code comment: there is no need to condition all work on child's exit. You need to watch for when the child closes the pipe. Reap the process (collect the signal) later.

    Another way to handle similar needs is to do each part of work in its own fork. So to do the 'keep-alive' with your HTTP in a yet separate process. Then all child processes can be managed more simply by the parent, by communicating using socketpair.

    See this post for comparison of read and sysread that includes many relevant points.


    The code above prints

    Forked and will read from 4171
    Doing else ... 
    Doing else ... 
    Doing else ... 
            sending data
    Got 65536 characters
    Got 57345 characters
    Doing else ... 
    Doing else ... 
            sending data
    Got 65536 characters
    Got 57345 characters
    Doing else ... 
    Doing else ... 
            sending data
    Doing else ... 
    Got 65536 characters
    Got 40960 characters
    Got 16385 characters
    Doing else ... 
    Doing else ... 
            sending data
    Got 65536 characters
    Got 24576 characters
            Closing writer and exiting
    Got 32769 characters
    Doing else ... 
    Reaped pid 4171
    Have data: 65536 57345 65536 57345 65536 40960 16385 65536 24576 32769