I'm doing some processing of large input data split over several files. Trying to separate the processing algorithms from the I/O, I set everything up using generators. This works pretty well except when I want to do some intermediate manipulations of the data passing through the generators. Here's an example that hits the important points
import numpy as np
from itertools import izip, tee
# Have two input matrices. In reality they're very large, so data is provided
# one row at a time via generators.
M, N = 100, 3
def gen_data_rows(m,n):
for i in range(m):
yield np.random.normal(size=n)
rows1 = gen_data_rows(M,N)
rows2 = gen_data_rows(M,N)
# Signal processing operates on chunks of the input, e.g. blocks of rows and
# yields results at a reduced rate. Here's a simple example.
def foo_rows(rows):
i = 0
for row in rows:
if i % 5 == 0:
yield row
i += 1
# But what if we want to do some transformations between the raw input data
# and the processing?
def fun1(x, y):
return x + y
def fun2(x, y):
return (x + y)**2
def foo_transformed_rows(rows1, rows2):
# Define a generator that consumes both inputs at the same time and
# produces two streams of output I'd like to send to foo_rows().
def gen_transformed_rows(rows1, rows2):
for x, y in izip(rows1, rows2):
yield fun1(x,y), fun2(x,y)
# Do I really need to tee the above and define separate generators to pick
# off each result?
def pick_generator_idx(gen, i):
for vals in gen:
yield vals[i]
gen_xformed_rows, dupe = tee(gen_transformed_rows(rows1, rows2))
gen_foo_fun1 = foo_rows(pick_generator_idx(gen_xformed_rows, 0))
gen_foo_fun2 = foo_rows(pick_generator_idx(dupe, 1))
for foo1, foo2 in izip(gen_foo_fun1, gen_foo_fun2):
yield foo1, foo2
for foo1, foo2 in foo_transformed_rows(rows1, rows2):
print foo1, foo2
I think the main complication here is that I've got two inputs that I want to combine into two intermediate generators (I/O is the bottleneck, so I really don't want to run through the data twice). Is there a better way to implement the foo_transformed_rows()
function? Having to tee()
the the desired data and define generators just to pick items out of a tuple seems like overkill.
Edit: I modified the example slightly in response to a comment, but unfortunately it's still pretty long in order to remain complete. The essential problem is dealing with a multi-input-multi-output (MIMO) data stream. I guess I'd like something like a yield
statement that produces multiple generators, e.g.
def two_streams(gen_a, gen_b):
"Consumes two generators, produces two results."
for a, b in itertools.izip(gen_a, gen_b):
c, d = foo(a, b)
yield c, d
# This doesn't work. You get one generator of tuples instead of
# two generators of singletons.
gen_c, gen_d = two_streams(gen_a, gen_b)
I thought maybe there would be some itertools magic to do the equivalent.
I agree with @ShadowRanger's comment, and I don't see why you want to avoid the tee
. It works well for this purpose.
However, it seems simpler and more intuitive to me to tee the original generators:
def transform_rows(fun, rows1, rows2):
for x, y in izip(rows1, rows2):
yield fun(x,y)
rows1a, rows1b = tee(rows1)
rows2a, rows2b = tee(rows2)
gen_foo_fun1 = foo_rows(transform_rows(fun1, rows1a, rows2a)
gen_foo_fun2 = foo_rows(transform_rows(fun2, rows1b, rows2b)