I want to read line by line output of the tcpdump subprocess in (near) real-time but I need the option to assess whether the pipe is empty (therefore the queue). The thread waits 0.5 seconds, gets all queued lines of output, processes it (e.g. mean distribution of packets over 0.5 seconds) and returns something.
Minimal non-working example:
millis = lambda: int(round(time.time() * 1000))
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
print(millis())
print(line)
queue.put(line)
out.close()
def infiniteloop1():
p = Popen( [ 'sudo', 'tcpdump', '-i', 'wlan0', '-nn', '-s0', '-q', '-l', '-p', '-S' ], stdout=subprocess.PIPE, stderr=STDOUT)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
while True:
while True:
# read line without blocking
try:
row = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('empty')
break
else:
pass
time.sleep(0.5)
thread1 = threading.Thread(target=infiniteloop1)
thread1.daemon = True
thread1.start()
The output while capturing a continuous stream of packages:
[...]
1552905183422
10:33:03.334167 IP 192.168.1.2.36189 > a.b.c.d.443: tcp 437
1552905183422
10:33:03.357215 IP a.b.c.d.443 > 192.168.1.2.36189: tcp 0
1552905183423
10:33:03.385145 IP 192.168.1.2.36189 > a.b.c.d.443: tcp 437
empty
empty
1552905184438
10:33:03.408408 IP a.b.c.d.443 > 192.168.1.2.36189: tcp 0
1552905184439
10:33:03.428045 IP 192.168.1.2.36189 > a.b.c.d.443: tcp 437
1552905184439
10:33:03.451235 IP a.b.c.d.443 > 192.168.1.2.36189: tcp 0
[...]
Notice the two consecutive "empty". The last packet before the first "empty" was captured 10:33:03.385145 by tcpdump and delivered to queue at 1552905183423 which took 38 ms. Between the two "emptys" no packets are delivered into the queue. The first package after the second "empty" was captured at 10:33:03.408408 and delivered 1552905184438, it was delivered 1 second after the previous packet but captured between the "emptys". Why is it not delivered between the "emptys"? This is not rarely happening but every second burst of poping the queue results in no packages being delivered, why is that?
The first package after the second "empty" was captured at 10:33:03.408408 and delivered 1552905184438, it was delivered 1 second after the previous packet but captured between the "emptys".
Given your code, the system time stamps are only calculated and printed if that iterator in for line in iter(out.readline, b'')
returns a new item, so this is where the delay seems to be coming from.
I suspect stdio buffering to be the culprit. On Linux (i.e. libc/glibc), if the STDOUT descriptor refers to a TTY, line buffering is enabled. If it refers to something else (e.g. a pipe), the STDOUT descriptor is fully buffered; your process needs to fill 4096 bytes (default on Linux) before the write system call is invoked.
Very roughly calculated, based on the output you show here, your subprocess seems to generate ~65 bytes every ~0.025 seconds. Given a 4kB buffer, it would take ~1.625 seconds to fill it up and trigger the flush/write.
Reading from that subprocess.PIPE
and sending the output to your main process' stdout takes a a lot less, hence you see the bursts with tcpdump
output, that is ~25ms apart being printed (received from the stdout iterator) within a few microseconds, and your program subsequently waiting until the next 4kB are flushed.
If you have the possibility the install 3rd party packages (and use Python >= 2.7), you might want to look at pexpect. Childs of that package connect to a PTY, making the system treat them like interactive programs, hence their stdout descriptor is line-buffered.