I am fully aware that there are tonnes of articles explaining the inner workings of parent-child process dynamics. I have gone through them and got my stuff working as I want it to function, almost. But there is one thing which is bugging me out and I am not able to understand it despite multiple tries.
Problem: Despite reaping the children, main is not waiting for all children to finish and exits prematurely. I believe I did make a proper exit from the child process and I have installed the REAPER in the child process - so how is main exiting before the child finishes?
Not looking for a solution here - but I need a new direction where I could bang my head for the next week. As of now - I feel I have exhausted my options and tried a lot many things but to no avail.
Some background about what I am trying to achieve:
All in all - I want all the children to finish, and only then I want to proceed to do something further. Each child process spawns a bunch of threads and those threads are properly joined by the said child process which then proceeds to make an exit with exit(0)
.
The additional hoopla you might observe in the program is nothing but our requirement where we are to hit 5 APIs (engines) but only with a fixed batch size, say 10 for each, at a time. I launch child process for each engine and launch thread for each request - and then I wait for all threads to finish, join them, and only then child process exits. Only now I could deposit the next batch of requests to the same engine, and I do this for all engines till I exhaust my total requests, say 10000.
Each request may take anywhere between 1 second to 2 hours - basically they are CSV reports being fetched from an HTTP API.
My issue is that when I have exhausted my total set of requests - I am not able to wait make the MAIN wait till all child processes have finished. This is weird and is the issue which I am trying to tackle.
Any ideas?
My Program OUTPUT:
[compuser@lenovoe470:little-stuff]$ perl 07--20190526-batch-processing-using-threads-with-busy-pool-detection-2.pl 12
26710: STARTING TASKS IN BATCHES
26710: RUNNING batch_engine 1_e1 tasks (1 2)
26710: RUNNING batch_engine 2_e2 tasks (3 4)
26710: RUNNING batch_engine 3_e3 tasks (5 6 7)
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710: BUSY_ENGINE: e3.
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710:26712: TASK_ORCHESTRATOR: >> finished batch_engine (2_e2) tasks (3 4)
26710: PID (26712) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e3.
26710:26713: TASK_ORCHESTRATOR: >> finished batch_engine (3_e3) tasks (5 6 7)
26710:26711: TASK_ORCHESTRATOR: >> finished batch_engine (1_e1) tasks (1 2)
26710: PID (26713) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e1.
26710: PID (26711) has finished with status (0). updating proc hash
26710: RUNNING batch_engine 4_e2 tasks (8 9)
26710: RUNNING batch_engine 5_e3 tasks (10 11 12)
26710: FINISHED TASKS IN BATCHES
[compuser@lenovoe470:little-stuff]$ 1:26722: TASK_ORCHESTRATOR: >> finished batch_engine (5_e3) tasks (10 11 12)
1:26721: TASK_ORCHESTRATOR: >> finished batch_engine (4_e2) tasks (8 9)
In above output:
My program:
#!/usr/bin/env perl
use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use Thread qw(async);
STDOUT->autoflush(1);
# doesn't work
sub reaper {
my $reaped;
while (($reaped = waitpid (-1,&WNOHANG) > 0)) {
print "$$: reaped: $reaped\n";
sleep(1);
}
$SIG{CHLD} = \&reaper;
}
# doesn't work
my @total_tasks = (1 .. shift || 9);
my @engines = (qw/e1 e2 e3/);
my $sizes = { e1 => 2, e2 => 2, e3 => 3, };
my $proc_hash;
my $global_string = "ENGINE";
# source: https://duyanghao.github.io/ways_avoid_zombie_process/
#
sub REAPER {
local ($!, $?);
while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
if ( WIFEXITED($?) )
{
# my
my $ret_code = WEXITSTATUS($?);
print "$$: PID ($reaped_pid) has finished with status ($ret_code). updating proc hash\n";
my $engine_name = $proc_hash->{$reaped_pid};
delete ($proc_hash->{$reaped_pid});
delete ($proc_hash->{$engine_name});
# my
# original
#my $ret_code = WEXITSTATUS($?);
#print "child process:$pid exit with code:$ret_code\n";
# original
}
}
}
#
$SIG{CHLD} = \&REAPER;
sub random_sleep_time {
return (int(rand(5)+1))
#return (sprintf "%.2f",(rand(1)+1))
}
sub task_runner {
my @args = @_;
my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
STDOUT->autoflush(1);
my $task_time = random_sleep_time();
sleep ($task_time);
threads->exit(0);
#print "$$:".(threads->tid()).": TASK_RUNNER: $global_string ($batch_engine) task ($task) finished in $task_time seconds\n";
#return;
};
sub task_orchestrator {
my ($batch_engine, @tasks) = @_;
my $engine = (split (/_/,$batch_engine))[1];
my $task_orch_pid = fork();
die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;
if ($task_orch_pid != 0) {
$proc_hash->{$engine} = $task_orch_pid;
$proc_hash->{$task_orch_pid} = $engine;
}
if ($task_orch_pid == 0) {
STDOUT->autoflush(1);
my @tids;
for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
foreach my $task_number (0 .. $#tasks) {
$tids [$task_number] = threads->create (
\&task_runner,[$batch_engine,$tasks [$task_number]]
);
}
my $ppid = getppid();
foreach my $tid (@tids) {$tid->join()}
print "$ppid:$$: TASK_ORCHESTRATOR: >> finished batch_engine ($batch_engine) tasks (@tasks)\n";
exit (0);
}
}
sub update_proc_hash {
my $finished_pid = waitpid (-1, POSIX->WNOHANG);
if ($finished_pid > 0) {
print "$$: PID ($finished_pid) has finished. updating proc hash\n";
my $engine_name = $proc_hash->{$finished_pid};
delete ($proc_hash->{$finished_pid});
delete ($proc_hash->{$engine_name});
}
}
my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks) {
foreach my $engine (@engines) {
update_proc_hash();
if (exists $proc_hash->{$engine}) {
print "$$: BUSY_ENGINE: $engine.\n";
sleep (1);
next;
}
else {
my @engine_tasks;
my $engine_max_tasks = $sizes->{$engine};
while ($engine_max_tasks-- != 0) {
my $task = shift @total_tasks;
push (@engine_tasks,$task) if $task;
}
if (@engine_tasks) {
my $batch_engine = $batch.'_'.$engine;
print "$$: RUNNING batch_engine $batch_engine tasks (@engine_tasks)\n";
task_orchestrator ("$batch_engine",@engine_tasks);
$batch++;
}
}
}
}
REAPER();
print "$$: FINISHED TASKS IN BATCHES\n";
__END__
Update after 3 days: Thank you SO community. Once again, I am grateful to all of you who have taken out their time to look into this and helped spot and correct the problem. Thank you so much.
Allow me to share the new output with the final program for everyone's reference.
OUTPUT after using the fix:
User@Host:/cygdrive/c/bash-home> perl test.pl
22044: STARTING TASKS IN BATCHES
22044: MAIN: engine (e1) is RUNNING batch #1 tasks: (1 2)
22044: MAIN: engine (e2) is RUNNING batch #2 tasks: (3 4 5)
22044: MAIN: engine (e3) is RUNNING batch #3 tasks: (6 7)
41456: TASK_RUNNER: engine (e1) finished batch #1 task #1 in (1.80) seconds
41456: TASK_RUNNER: engine (e1) finished batch #1 task #2 in (1.31) seconds
41456: TASK_ORCHESTRATOR: engine (e1) finished batch #1 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (41456) has finished with status (0).
18252: TASK_RUNNER: engine (e2) finished batch #2 task #3 in (1.04) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #4 in (1.91) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #5 in (1.63) seconds
18252: TASK_ORCHESTRATOR: engine (e2) finished batch #2 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (18252) has finished with status (0).
14544: TASK_RUNNER: engine (e3) finished batch #3 task #6 in (1.42) seconds
14544: TASK_RUNNER: engine (e3) finished batch #3 task #7 in (1.84) seconds
14544: TASK_ORCHESTRATOR: engine (e3) finished batch #3 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (14544) has finished with status (0).
22044: MAIN: engine (e1) is RUNNING batch #4 tasks: (8 9)
22044: MAIN: engine (e2) is RUNNING batch #5 tasks: (10)
37612: TASK_RUNNER: engine (e1) finished batch #4 task #8 in (1.19) seconds
37612: TASK_RUNNER: engine (e1) finished batch #4 task #9 in (1.31) seconds
37612: TASK_ORCHESTRATOR: engine (e1) finished batch #4 tasks in (1.00) seconds.
16300: TASK_RUNNER: engine (e2) finished batch #5 task #10 in (1.53) seconds
16300: TASK_ORCHESTRATOR: engine (e2) finished batch #5 tasks in (1.00) seconds.
22044: ALL ORCHESTRATORS HAVE FINISHED
22044: FINISHED TASKS IN BATCHES
FINAL Working PROGRAM:
#!/usr/bin/env perl
use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use threads;
STDOUT->autoflush(1);
my @total_tasks = (1 .. 10);
my $sleep_time = 1;
my @engines = (qw/e1 e2 e3/);
my $sizes = {
e1 => 2,
e2 => 3,
e3 => 2,
};
my $proc_hash;
my $global_string = "engine";
sub REAPER {
local ($!, $?);
while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
if ( WIFEXITED($?) ) {
my $ret_code = WEXITSTATUS($?);
print "$$: REAPER: TASK_ORCHESTRATOR pid ($reaped_pid) has finished with status ($ret_code).\n";
my $engine_name = $proc_hash->{$reaped_pid};
delete ($proc_hash->{$reaped_pid});
delete ($proc_hash->{$engine_name});
}
}
}
$SIG{CHLD} = \&REAPER;
sub random_sleep_time { return sprintf ("%.2f",(rand ($sleep_time||5) + 1)) }
sub task_runner {
STDOUT->autoflush(1);
my @args = @_;
my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
my ($batch, $engine) = split (/_/,$batch_engine);
my $task_time = random_sleep_time();
sleep ($task_time);
print "$$: TASK_RUNNER: $global_string ($engine) finished batch #$batch task #$task in ($task_time) seconds\n";
threads->exit(0);
};
sub task_orchestrator {
my ($batch_engine, @tasks) = @_;
my ($batch, $engine) = split (/_/,$batch_engine);
my $task_orch_pid = fork();
die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;
if ($task_orch_pid != 0) {
$proc_hash->{$engine} = $task_orch_pid;
$proc_hash->{$task_orch_pid} = $engine;
}
if ($task_orch_pid == 0) {
STDOUT->autoflush(1);
my @tids;
my $start_time = time;
for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
foreach my $task_number (0 .. $#tasks) {
$tids [$task_number] = threads->create (
\&task_runner,[$batch_engine,$tasks [$task_number]]
);
}
foreach my $tid (@tids) {$tid->join()}
my $end_time = time;
my $total_time = sprintf ("%.2f",($end_time - $start_time));
print "$$: TASK_ORCHESTRATOR: engine ($engine) finished batch #$batch tasks in ($total_time) seconds.\n";
exit (0);
}
}
my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks)
{
foreach my $engine (@engines)
{
if (exists $proc_hash->{$engine})
{
sleep (1);
next;
}
else
{
my @engine_tasks;
my $engine_max_tasks = $sizes->{$engine};
while ($engine_max_tasks-- != 0)
{
my $task = shift @total_tasks;
push (@engine_tasks,$task) if $task;
}
if (@engine_tasks)
{
my $batch_engine = $batch.'_'.$engine;
print "$$: MAIN: engine ($engine) is RUNNING batch #$batch tasks: (@engine_tasks)\n";
task_orchestrator ($batch_engine,@engine_tasks);
$batch++;
}
}
}
}
# All 3 below work properly
#sleep (.2) while ((waitpid(-1, WNOHANG)) >= 0);
#sleep (.2) while ((waitpid(-1, WNOHANG)) != -1);
sleep (.2) while ((waitpid(-1, WNOHANG)) > -1);
print "$$: ALL ORCHESTRATORS HAVE FINISHED\n";
print "$$: FINISHED TASKS IN BATCHES\n";
__END__
The waitpid
can return 0 if there are child processes matching PID but none have terminated yet
With -1
this applies to any child process and so your code with multiple children will surely encounter a zero return from the non-blocking waitpid
in REAPER
; this is precisely how we get to wait as long as there are non-terminated child processes. But your while
loop exits at first such zero.
One way about this is to poll for non-negative returns
use warnings;
use strict;
use feature 'say';
use POSIX ':sys_wait_h';
use Time::HiRes qw(sleep) ;
for (1..4) {
my $pid = fork // die "Can't fork: $!";
if ($pid == 0) {
sleep rand 4;
say "\tkid $$ exiting";
exit;
};
};
while ( (my $kid = waitpid -1, WNOHANG) > -1 ) {
say "got $kid" if $kid > 0;
sleep 0.2;
}
Prints
kid 12687 exiting got 12687 kid 12690 exiting got 12690 kid 12689 exiting got 12689 kid 12688 exiting got 12688
Please adjust the polling period as suitable. Note that since this catches any child processes it is possible for it to interfere with yet other forks, if there were any unwaited ones by that point.
Or you can block with the wait
while ( (my $kid = waitpid -1, 0) > -1 ) {
say "got $kid";
}
where you can now also do > 0
, as there'll be no 0
returns here since the call blocks. While we only need the loop to terminate once -1
comes back (no more processes out there), as before.
The major difference is that the block executes only once a child process actually exited, so if you need to keep tabs on what some long-running children are doing (and perhaps limit their run times or protect against hung jobs) that is not as easy in this form; you want a non-blocking operation for that.
Note that some of details, in particular relating to returns, may vary across systems.
The naive version of this is to wait only for these specific PIDs, collected as you fork
foreach my $pid (@pids) {
my $gone = waitpid $pid, 0;
say "Process $gone exited with $?" if $gone > 0; # -1 if reaped already
}
which blocks with waitpid
for each process. The problem with this is that if one process runs much longer than others (or hangs) this loop will be stuck waiting at it. And, just in general, we'd rather have children processes reaped as they exit than in the order in which they were started.