Search code examples
shellunixcurlgrepxargs

How to process continuous stream output with grep utility?


I have a requirement where my curl command is receiving continuous output from a streaming HTTP service. The stream never ends. I want to just grep a string from the stream and pass/pipe this command output to another utility such as xargs and say, echo for an example, for further continuous processing.

This is the output of the continuous stream which I shall stop receiving only when I end running the curl command.

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N 

[{"header":{"queryId":"none","schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `SENSOR_ID` STRING, `TEMP` BIGINT, `HUM` BIGINT"}},
{"row":{"columns":[1599624891102,"S2","S2",40,20]}},
{"row":{"columns":[1599624891113,"S1","S1",90,80]}},
{"row":{"columns":[1599624909117,"S2","S2",40,20]}},
{"row":{"columns":[1599624909125,"S1","S1",90,80]}},
{"row":{"columns":[1599625090320,"S2","S2",40,20]}},

Now when I pipe the output to grep, it works as expected and I keep receiving any new events.

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N | grep S1

{"row":{"columns":[1599624891113,"S1","S1",90,80]}},
{"row":{"columns":[1599624909125,"S1","S1",90,80]}},

But when I pipe this grep output to xargs and echo, the output just don't move at all.

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N | grep S1 | xargs -I {} echo {} 
^C

When I remove grep from the middle, it works as expected.

curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N | xargs -I {} echo {} 

[{header:{queryId:none,schema:`ROWTIME` BIGINT, `ROWKEY` STRING, `SENSOR_ID` STRING, `TEMP` BIGINT, `HUM` BIGINT}},
{row:{columns:[1599624891102,S2,S2,40,20]}},
{row:{columns:[1599624891113,S1,S1,90,80]}},
{row:{columns:[1599624909117,S2,S2,40,20]}},
{row:{columns:[1599624909125,S1,S1,90,80]}},
{row:{columns:[1599625090320,S2,S2,40,20]}},

Looks like grep is looking for the input to end before it can pipe it further. When I tested the same thing with a finite input, it works as expected.

ls | grep sh | xargs -I {} echo {};

abcd.sh
123.sh
pqr.sh
xyz.sh

So, the questions are: Is my understanding correct? Is there a way where grep can keep passing the output to subsequent commands in real time? I want to keep some basic filtering logic out of the further scripting, hence wanting grep to work.

Thanks in Advance !

Anurag


Solution

  • As suggested by @larsks , " --line-buffered flush output on every line" option for grep is working fine when is test for similar requirement as yous .

    So the command would be

    curl -X "POST" "http://localhost:8088/query" --header "Content-Type: application/json" -d $'{"ksql": "select * from SENSOR_S EMIT CHANGES;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"}}' -s -N | grep S1 --line-buffered | xargs -I {} echo {}

    I tested on "/var/log/messages" file which gets continously updated as following :

    [root@project1-master ~]# tail -f /var/log/messages | grep journal --line-buffered | xargs -I {} echo {}

    Sep 11 11:15:47 project1-master journal: I0911 15:15:47.448254 1 node_lifecycle_controller.go:1429] Initializing eviction metric for zone:

    Sep 11 11:15:52 project1-master journal: I0911 15:15:52.448704 1 node_lifecycle_controller.go:1429] Initializing eviction metric for zone:

    Sep 11 11:15:54 project1-master journal: 2020-09-11 15:15:54.006 [INFO][46] felix/int_dataplane.go 1300: Applying dataplane updates