Search code examples
pythonsubprocessreactivexrx-py

How to use subprocess in a RxPY map correctly?


I am trying to use RxPY to scan the IP to see which hosts are up.

However, currently it returns empty.

For ping(ip), if I simply return ip, it will return a list of IP address.

from reactivex import operators as ops
import reactivex as rx
import subprocess

# If I change to this version, it will return a list of IP address.
# def ping(ip):
#    return ip

def ping(ip):
    retval = subprocess.call(["ping", "-c1", "-n", "-i0.1", "-W1", ip])
    print("Here!")  # It never reached here.
    if retval == 0:
        return ip  # host is up
    else:
        return ""  # host is down


if __name__ == "__main__":
    ip_list = ["192.168.1.1", "192.168.1.2"]
    rx.of(ip_list).pipe(
        ops.map(lambda ip: ping(ip)),
    ).subscribe(lambda x: print(list(x)))

The line of subprocess.call is totally skipped.

I am thinking it might be related with async, however in this case, the function subprocess.call seems not an async function.

How to use subprocess correctly? Any guide would be appreciate!


Solution

  • I found a solution. The idea is wrapping subprocess.call as an Observable.

    After creating my custom operator based on the doc, now it works!

    import reactivex as rx
    import subprocess
    
    def ping():
        def _ping(source):
            def subscribe(observer, scheduler=None):
                def on_next(ip):
                    retval = subprocess.call(["ping", "-c1", "-n", "-i0.1", "-W1", ip])
                    if retval == 0:  # host is up
                        observer.on_next(ip)
                    else:  # host is down
                        observer.on_next("")
    
                return source.subscribe(
                    on_next,
                    observer.on_error,
                    observer.on_completed)
            return rx.create(subscribe)
        return _ping
    
    if __name__ == "__main__":
        ip_list = ['192.168.1.1', '192.168.1.2']
        rx.of(*ip_list).pipe(
            ping(),
        ).subscribe(lambda x: x)
    

    It will print something like this now

    PING 192.168.1.1 (192.168.1.1): 56 data bytes
    
    --- 192.168.1.1 ping statistics ---
    1 packets transmitted, 0 packets received, 100.0% packet loss
    PING 192.168.1.2 (192.168.1.2): 56 data bytes
    
    --- 192.168.1.2 ping statistics ---
    1 packets transmitted, 0 packets received, 100.0% packet loss