Search code examples
jsonbashstreamjq

How to invoke actions on a continuous stream of json records using jq


So I have a process that writes a JSON record to stdout every second, how can I use jq to trigger an action after a certain amount (=x) of records are written and still have my output every second?

The process is, to allow easy replay, replaced by a bash shell read loop with a sleep:
( read line ; while [[ -n "${line}" ]] ; do echo "${line}"; sleep 1 ; read line ; done ) < json.data

The action after x records is to count the amount of records, and ignore lines with "null" values.

When x is 1, so process each line, the solution is simple or straightforward.

( read line ; while [[ -n "${line}" ]] ; do echo "${line}"; sleep 1 ; read line ; done ) < json.data | \
jq -c '. , ([.] | del( .[] | select(to_entries[].value == null)) | length)'
{"date":230415072207,"kwL1Tl":0,"kwL1":0.022,"kwL2Tl":0,"kwL2":0.075,"kwL3Tl":0,"kwL3":0}
1
{"date":230415072311,"kwL1Tl":0,"kwL1":0.023,"kwL2Tl":0,"kwL2":0.08,"kwL3Tl":0,"kwL3":0}  
1
{"date":230415072312,"kwL1Tl":null,"kwL1":null,"kwL2Tl":null,"kwL2":null,"kwL3Tl":null,"kwL3":null}  
0
{"date":230415072415,"kwL1Tl":0,"kwL1":0.023,"kwL2Tl":0,"kwL2":0.08,"kwL3Tl":0,"kwL3":0}  
1
{"date":230415072416,"kwL1Tl":0,"kwL1":0.022,"kwL2Tl":0,"kwL2":0.08,"kwL3Tl":0,"kwL3":0}  
1
...

But what to do when I want to do this for every 5 or 100 records ?

The expected output for x=5:

$ ( read line ; while [[ -n "${line}" ]] ; do echo "${line}"; sleep 1 ; read line ; done ) < json.data | \
alert
{"date":230415072207,"kwL1Tl":0,"kwL1":0.022,"kwL2Tl":0,"kwL2":0.075,"kwL3Tl":0,"kwL3":0}
{"date":230415072311,"kwL1Tl":0,"kwL1":0.023,"kwL2Tl":0,"kwL2":0.08,"kwL3Tl":0,"kwL3":0}
{"date":230415072312,"kwL1Tl":null,"kwL1":null,"kwL2Tl":null,"kwL2":null,"kwL3Tl":null,"kwL3":null}
{"date":230415072415,"kwL1Tl":0,"kwL1":0.023,"kwL2Tl":0,"kwL2":0.08,"kwL3Tl":0,"kwL3":0}
{"date":230415072416,"kwL1Tl":0,"kwL1":0.022,"kwL2Tl":0,"kwL2":0.08,"kwL3Tl":0,"kwL3":0}
4
{"date":230415072519,"kwL1Tl":0,"kwL1":0.021,"kwL2Tl":0,"kwL2":0.08,"kwL3Tl":0,"kwL3":0}
{"date":230415072520,"kwL1Tl":0,"kwL1":0.021,"kwL2Tl":0,"kwL2":0.08,"kwL3Tl":0,"kwL3":0}
{"date":230415072623,"kwL1Tl":0,"kwL1":0.022,"kwL2Tl":0,"kwL2":0.075,"kwL3Tl":0,"kwL3":0}
{"date":230415072624,"kwL1Tl":0,"kwL1":0.022,"kwL2Tl":0,"kwL2":0.075,"kwL3Tl":0,"kwL3":0}
{"date":230415072727,"kwL1Tl":0,"kwL1":0.022,"kwL2Tl":0,"kwL2":0.076,"kwL3Tl":0,"kwL3":0}
5
{"date":230415072728,"kwL1Tl":0,"kwL1":0.023,"kwL2Tl":0,"kwL2":0.076,"kwL3Tl":0,"kwL3":0}
{"date":230415072831,"kwL1Tl":0,"kwL1":0.023,"kwL2Tl":0,"kwL2":0.075,"kwL3Tl":0,"kwL3":0}
...

Bash function "alert" should contain a working jq command.


Solution

  • The following "alert" function does what is expected:

    alert(){
        #
        # send alerts if needed
        jq -nc --argjson m "5" '
            def alerts(r): r | del( .[] | select(to_entries[].value == null)) | length ; 
            foreach inputs as $in (
                 {"recs":[]};
                 if (.recs|length) < $m then .recs += [$in] else .recs=[$in] end;
                 .recs[-1], if (.recs|length) == $m then alerts(.recs) else empty end)
        '
    }
    

    Explaining it a bit:

    • "foreach inputs" reads process output as a stream
    • The first "if" adds input JSON records in a array "recs", until the array is $m big. (in our case "5" as specified by the "-argjson m 5" option to the jq program). If it is 5 big it overwrites the "recs" array again (with a new first element).
    • ".recs[-1]" puts the last record received on output. ("$in" would also work)
    • The second "if" executes the internal function "alerts", when the array "recs" is 5 big.
    • The jq local function "alerts" writes out the size of the array "recs", after removing records that have a "null" as value of one or more fields.