Search code examples
bashawkparallel-processing

Bash-awk-parallel select process for each line of a huge file


I am trying to send different lines of a very big file to different processes. So to show my problem I am building a toy example where I have a file with 10 categories and I want to compute the standard deviation (sd) of the second column for each category. Please keep in mind that my real file is millions of very long lines lines, and the sd computation is in fact a more complex computation.

STEP 1 building a test file :

seq 1 1000 | awk '{print int(10*rand()),int(100*rand())}' > testfile

STEP 2 splitting according to column 1 (I want to compute the variance of the second column for the different values in the first field)

cat testfile | awk '{print $2 >> "file"$1}'

STEP 3

so now I can compute each variance in parallel

for i in $(seq 0 9); do
    cat file$i | awk '{s+=$1;ss+=$1*$1}END{a=s/NR;print sqrt((ss-a*a)/NR)}' > sd$i &
done

So what I would like to do is to skip the file$i part and to send directly to 10 processes my numbers while reading my initial file.

In a way it s a bit like using parallel but instead of sending blocks of lines to processes it s using a field to send some specific lines to specific processes.

To give an idea of the last data I had to deal with, I have 13 million lines in 2377 categories. each line have 30K fields on which I am making stats using a specific bash command

Please also help me formulate my question !


Solution

  • Parallelize stream processing using

    Yes, bash is the correct tool for running and inter-connect many subtasks.

    So in this sample, I will

    • Create a dedicateds sed script that wull
      • run in parallel as many subtasks there are filter, then
    • run sed to distribute each line of input, regarding filter conditions, to each filter (or ignore them).

    Nota as SO request don't ask about outputs, there's nothing done to drive them. Btw, you may have to drive them differently regarding your conditions (separated files, concatenation by ended line, simple concatenation, merging by target, splited by subsequent sed script to different log files, etc...)

    (Full bash script using sed at end of this post!)

    Using sed for stream filtering

    a bit like using parallel but instead of sending blocks of lines to processes it s using a field to send some specific lines to specific processes.

    In this use case: having to filter stream to distrubute to many subtasks, sed should be de quickest way (as sed is a lot lighter then perl and parallel is a perl script. Using sed will be sensibly quicker, lighter and will consume less of resources! Please look comparison at end of this!

    Fist preparing sed command line:

    printf -v sedcmd ' -e \47};/^%d/{s/^. //;w/dev/fd/%d\47 %d> >(exec \
        awk \47{c++;s+=$1;ss+=$1*$1}END{a=s/c;print %d,sqrt((ss-a*a)/c)}\47) ' $(
        for ((i=0;i<10;i++)) { echo $i $((i+4)) $((i+4)) $i  ; })
    

    Then command is: eval sed -n "${sedcmd/\};} -e '};'":

    eval sed -n "${sedcmd/\};} -e '};'" <testfile
    

    or

    eval sed -n "${sedcmd/\};} -e '};'" <testfile | cat
    

    Where $sedcmd look like:

    $ echo -- "$sedcmd"
    --  -e '};/^0/{s/^. //;w/dev/fd/4' 4> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 0,sqrt((ss-a*a)/c)}')  -e '};/^1/{s/^. //;w/dev/fd/5' 5> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 1,sqrt((ss-a*a)/c)}')  -e '};/^2/{s/^. //;w/dev/fd/6' 6> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 2,sqrt((ss-a*a)/c)}')  -e '};/^3/{s/^. //;w/dev/fd/7' 7> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 3,sqrt((ss-a*a)/c)}')  -e '};/^4/{s/^. //;w/dev/fd/8' 8> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 4,sqrt((ss-a*a)/c)}')  -e '};/^5/{s/^. //;w/dev/fd/9' 9> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 5,sqrt((ss-a*a)/c)}')  -e '};/^6/{s/^. //;w/dev/fd/10' 10> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 6,sqrt((ss-a*a)/c)}')  -e '};/^7/{s/^. //;w/dev/fd/11' 11> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 7,sqrt((ss-a*a)/c)}')  -e '};/^8/{s/^. //;w/dev/fd/12' 12> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 8,sqrt((ss-a*a)/c)}')  -e '};/^9/{s/^. //;w/dev/fd/13' 13> >(exec \
        awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 9,sqrt((ss-a*a)/c)}') 
    

    Where

    • 4> >(exec awk ...) tell bash to generate a fd number 4 and run awk
    • -e "/^0/{s/^. //;w/dev/fd/4" -e "}" tell sed to drop first character of lines wich begin by 0 and send it to fd/4.

    parallel.sh full bash script (draft)

    Here is a full parallelFiltering bash script using sed:

    #!/bin/bash
    # parallel.sh - bash script for filtering/parallelising using sed.
    # (C) 2023 Felix Hauri - [email protected]
    # Licensed under terms of GPL v3. www.gnu.org
    
    prog=${0##*/}
    usage() {
        cat <<-EOUsage
            Usage: $prog -t <tags> [-b <re>] [-a <re>] command args
              -h                 show this
              -t <tags>   coma separated liste of tags to send to separated tasks
                               or single tag, '-t' option could be submited multiple times
              -b <re>     sed regex to match before tags
              -a <re>     sed regex to match after tags
              command     Any command to be run once for each tag.
                            Special string "<RE>" will be replaced by current tag.
            EOUsage
    }
    die() {
        echo >&2 "ERROR $prog: $*"
        exit 1
    }
    
    while getopts "ht:a:b:" opt; do
        case $opt in
            h ) usage; exit ;;
            t ) IFS=, read -a crttags <<<"$OPTARG"
                tags+=("$crttags");;
            b ) before=$OPTARG ;;
            a ) after=$OPTARG ;;
            *) die Wrong argument. ;;
        esac
    done
    shift $((OPTIND-1))
    
    [[ -v tags ]] || die "No tags submited"
    (( $# )) || die "No command submited"
    
    sedcmd='' paren=''
    declare -i crtFd=4
    for re in "${tags[@]}";do
        printf -v crtcmd '%q ' "${@//\<RE\>/$re}"
        printf -v crtcmd ' -e \47%s/%s/{s/%s//;w/dev/fd/%d\47 %d> >(exec %s) ' \
               "$paren" "$before$re$after"{,} $crtFd $crtFd "$crtcmd"
        paren='};'
        sedcmd+="$crtcmd" crtFd+=1
    done
    sedcmd+=" -e '$paren'"
    
    eval sed -n "$sedcmd" 
    
    Usage: parallel.sh -t <tags> [-b <re>] [-a <re>] command args
      -h            show this
      -t <tags>   coma separated liste of tags to send to separated tasks
                    or single tag, '-t' option could be submited multiple times
      -b <re>     sed regex to match before tags
      -a <re>     sed regex to match after tags
      command     Any command to be run once for each tag.
                    Special string "<RE>" will be replaced by current tag.
    

    This script could be found there: parallel.sh.

    Tested with your use case with:

     ./parallel.sh -t{0..9} -b ^ awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print <RE>,sqrt((ss-a*a)/c)}' <testfile
    

    Notice the only change from your command line is print <RE>,sqrt... where <RE> will be replaced by each tags (-t) in each subtask respectively.

    9 55.6751
    8 58.0447
    7 55.6755
    6 58.3663
    5 58.696
    4 58.2724
    3 54.9797
    2 57.5355
    1 54.6131
    0 57.1334
    

    Comparison with GNU parallel

    Of course this is about line buffered filtering, not suitable for block buffered distribution!!

    I've tested with a simple 1000 lines random file:

    for ((i=1000;i--;)){ echo $((RANDOM%10)) $((RANDOM%100));} >testfile
    

    then using parallel:

    sd() {
      awk '{s+=$'$1';ss+=$'$1'*$'$1'}END{a=s/NR;print sqrt((ss-a*a)/NR)}'
    }
    export -f sd
    time parallel -j10 --colsep ' ' --bin 1 --pipe \
        --tagstring {%} sd 2 <testfile |sort 
    
    10      58.3703
    1       50.7911
    2       56.9009
    3       55.0832
    4       52.5365
    5       65.0864
    6       61.4079
    7       55.5353
    8       62.337
    9       51.2512
    
    real    0m0.488s
    user    0m1.158s
    sys     0m0.272s
    

    and using sed + bash:

    time ./parallel.sh -t{0..9} -b ^ awk \
      '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print <RE>,sqrt((ss-a*a)/c)}' <testfile |
        sort
    
    0 58.3703
    1 50.7911
    2 56.9009
    3 55.0832
    4 52.5365
    5 65.0864
    6 61.4079
    7 55.5353
    8 62.337
    9 51.2512
    
    real    0m0.010s
    user    0m0.009s
    sys     0m0.000s
    

    Fortunately computed results are same! (parallel version output 10 instead of 0).

    Where bash+sed version

    • use tags instead of number
    • use a lot less system resources
    • is something quicker

    Test with bigger and smaller files:

                       Number of lines     Real     User   System
    parallel.sh            100'000'000   73.117   72.598    0.416
    parallel (perl)        100'000'000  129.264  383.701   36.319
    
    parallel.sh              1'000'000    0.744    0.728    0.013
    parallel (perl)          1'000'000    1.798    5.571    0.613
    
    parallel.sh                 10'000    0.018    0.007    0.009
    parallel (perl)             10'000    0.523    1.148    0.269
    

    Here are ouput of ps --tty pts/4 fw while parallel.sh was running in pts/4:

       5352 pts/4    Ss     0:00 -bash
       5983 pts/4    S+     0:00  \_ /bin/bash ./parallel.sh -t0 -t1 -t2..
       5985 pts/4    R+     0:13  |   \_ sed -n -e /^0/{s/^0//;w/dev/fd/..
       5986 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5987 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5988 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5989 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5990 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5991 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5992 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5993 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5994 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5995 pts/4    S+     0:00  |       \_ awk {c++;s+=$1;ss+=$1*$1}EN..
       5984 pts/4    S+     0:00  \_ sort
    

    No cut! That's all folks!

    Here, bash execute sed which run 10x awk, piped to sort. Look's ok!

    Here are ouput of ps --tty pts/4 fw while parallel (perl) was running:

       5352 pts/4    Ss     0:00 -bash
       5777 pts/4    S+     0:00  \_ /usr/bin/perl /usr/bin/parallel -j1..
       5780 pts/4    R+     0:17  |   \_ /usr/bin/perl /usr/bin/parallel..
       5956 pts/4    R      0:16  |   |   \_ perl -e  use B; my $sep = s..
       5957 pts/4    R      0:16  |   |   \_ perl -e  use B; my $sep = s..
    snip 7 lines
       5965 pts/4    R      0:16  |   |   \_ perl -e  use B; my $sep = s..
       5793 pts/4    S      0:00  |   \_ /usr/bin/bash -c perl -e '{use ..
       5794 pts/4    S      0:00  |   |   \_ perl -e {use POSIX qw(:errn..
       5795 pts/4    S      0:00  |   |   \_ /usr/bin/bash -c perl -e '{..
       5796 pts/4    S      0:01  |   |       \_ awk {s+=$2;ss+=$2*$2}EN..
    snip 33 lines
       5852 pts/4    S      0:00  |   \_ /usr/bin/bash -c perl -e '{use ..
       5867 pts/4    S      0:00  |       \_ perl -e {use POSIX qw(:errn..
       5868 pts/4    S      0:00  |       \_ /usr/bin/bash -c perl -e '{..
       5870 pts/4    S      0:01  |           \_ awk {s+=$2;ss+=$2*$2}EN..
       5778 pts/4    S+     0:00  \_ sort
    

    Well!! 52 process are executed to fork 10 time one stream to 10 subprocess!! Each subprocess require 5 sub tasks?!

    Use case:

    Quick demo on log file:

    { tags=($(cut -d\[ -f 1 | cut -d\  -f 5 | sort -u)) ;} <daemon.log 
    ./parallel.sh "${tags[@]/#/-t}" -b \\b -a \\[ bash -c \
        $'printf \47 - %-20s  %8d %8d %8d\\n\47 "$1" $(wc)' -- "<RE>" <daemon.log  |
        sort
    

    This will run as many task there are tags. Then ouput wc for each sub stream.

    Note: Syntax: ${tags[@]/#/-t} will be expanded as -tdhclient -tdnsmasq -tssystemd ....

     - accton                      14      154     1165
     - dbus-daemon                 80     1273    13731
     - dhclient                  6480    79920   542160
     - dnsmasq                   6480    49680   401760
     - systemd                 154608  1474418 10664639
    ...
    

    But, you could create different filter for differents targets:

    tags=( dhclient dnsmasq systemd )
    ./parallel.sh ${tags[@]/#/-t} -b \\b -a \\[ \
            "./filter-<RE>.sh" <daemon.log
    

    Will run 3 different tasks: ./filter-dnsmasq.sh, ./filter-dhclient.sh and ./filter-systemd.sh, then parse log file to send watched lines to specific task.

    Remark about parellel output:

    Regarding Ole Tange's comment, this seem clear to me: If you ask many task to speak together on same uniq STDOUT, you may observe stange mixed lines!

    If your filter have to ouput continuously, you have to drive his ouput in proper way!

    ./parallel.sh -t{0..9} -b ^ -a ' ' <testfile sh \
        -c 'sed "s/^/group <RE> /" >/tmp/file-<RE>.txt'
    cat /tmp/file-?.txt
    

    Nota: I've made a sample in the spirit of comments:
    parellel.sh ... | sort | uniq -c....
    For this to run, script would by modified by adding fifos to be merged by a parcat at end of script, something like:

    +++ parallel.sh   2023-03-07 19:17:08.976802098 +0100
    @@ -40,5 +40,8 @@
     for re in "${tags[@]}";do
    +    pasteFd=/tmp/fifo-pp-r$crtFd
    +    mkfifo $pasteFd
    +    pasteFds+=($pasteFd)
         printf -v crtcmd '%q ' "${@//\<RE\>/$re}"
    -    printf -v crtcmd ' -e \47%s/%s/{s/%s//;w/dev/fd/%d\47 %d> >(exec %s) ' \
    -    "$paren" "$before$re$after"{,} $crtFd $crtFd "$crtcmd"
    +    printf -v crtcmd ' -e \47%s/%s/{s/%s//;w/dev/fd/%d\47 %d> >(exec %s >%s) '\
    +    "$paren" "$before$re$after"{,} $crtFd $crtFd "$crtcmd" $pasteFd
         paren='};'
    @@ -47,3 +50,4 @@
     sedcmd+=" -e '$paren'"
    -
    -eval sed -n "$sedcmd" 
    +eval sed -n "$sedcmd" &
    +parcat ${pasteFds[@]}
    +rm ${pasteFds[@]}
    

    I will probably add some options for making this properly in my final script (on my website).