Search code examples
pythonsocketsblockingpython-2.6

Merging multiple blocking generator functions in python


I have two iterators. Each represents a possibly infinite stream of data coming from a blocking resource, like a socket.

I want to merge the data in the two iterators, in the order it arrives- i.e. non-deterministically. In more detail, if I have iterators iter1 and iter2, I want my result to be an iterator equivalent to merged.

iter1 : 1 2 3     4   5 ...
iter2 :       1 2   3   ... 
merged: 1 2 3 1 2 4 3 5 ...

   --- > increasing time ---> 

I assume I'll need a concurrent program, but I'm not sure if there's a pythonic way to do this. I would strongly prefer an answer that works in Python 2.6.

For example, let's say I have two iterators which are "under the hood" reading from a socket. Here's a quick server "listener" which repeatedly echoes the date/time of client connection:

==> message.sh <==
#!/usr/bin/env bash
set -e;

# Repeatedly echo the date/time of client connection
MSG=$(date)
while true; do
  echo $MSG;
  sleep 1;
done

==> server.sh <==
#!/usr/bin/env bash
socat TCP-LISTEN:8008,reuseaddr,fork system:"./message.sh"

You can run the server with ./server.sh.

Below is an example python script which tries to merge messages from the two sockets. However, it's not correct- it must receive a value from each iterator to continue. Using the example above, the "merged" result would be:

iter1 : 1 2 3     4   5 ...
iter2 :       1 2   3   ... 
merged: 1     1 2 2 3 3 4     ...

Here is the script:

#!/usr/bin/env python2
import socket
import time

HOST = "127.0.0.1"
PORT = 8008


def iterate_socket(sock):
    while True:
        yield sock.recv(1024)


def merge(xs, ys):
    iters = [xs, ys]
    while True:
        for it in iters:
            try:
                i = it.next()
                yield i
            except StopIteration:
                pass

sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))

iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)

for msg in merge(iter1, iter2):
    print msg,

Finally: I get the iterators from a library, so please assume for the purposes of this question that I have to deal with iterators, and I can't do something like set a socket to non-blocking and poll.


Solution

  • You could move the socket iteration into background threads, and then use a Queue to send the data received by each to your main thread. Then your main thread can just consume data from the queue as it comes in:

    import socket
    import time
    from Queue import Queue
    from threading import Thread
    
    HOST = "127.0.0.1"
    PORT = 8008
    
    
    def iterate_socket(sock):
        while True:
            data = sock.recv(1024)
            yield data
            if not data: # End of the stream
                return
    
    def consume(q, s):
        for i in s:
            q.put(i)
    
    def merge(xs, ys):
        q = Queue()
        iters = [xs, ys]
        for it in iters:
            t = Thread(target=consume, args=(q, it))
            t.start()
    
        done = 0
        while True:
            out = q.get()
            if out == '':  # End of the stream.
                done += 1
                if done == len(iters): # When all iters are done, break out.
                    return
            else:
                yield out
    
    sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock1.connect((HOST, PORT))
    time.sleep(1)
    sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock2.connect((HOST, PORT))
    
    iter1 = iterate_socket(sock1)
    iter2 = iterate_socket(sock2)
    
    for msg in merge(iter1, iter2):
        print msg,