Search code examples
pythonstreamtwistedfrp

How can I map asynchronous operations to an ordered stream of data and obtain an identically-ordered result?


I'm currently designing an application using the Twisted framework, and I've hit a bit of a roadblock in my planning.

Formal Description of the Problem

My application has the following constraints:

  1. Data arrive in-order, but asynchronously. I cannot know when the next piece of my data will arrive
  2. The order in which data arrive must be preserved throughout the lifespan of the application process.
  3. Additional asynchronous operations must be mapped onto this "stream" of data.

The description of my problem may remind people of the Functional Reactive Programming (FRP) paradigm, and that's a fair comparison. In fact, I think my problem is well-described in those terms and my question can be pretty accurately summarized thusly: "How can I leverage Twisted in such a way as to reason in terms of data streams?"

More concretely, this is what I have figured out:

  1. A datum arrives and is unpacked into an instance of a custom class, henceforth referred to as "datum instance"
  2. The newly-arrived datum instance is appended to a collections.deque object, encapsulated by a custom Stream class.
  3. The Stream class exposes methods such as Stream.map that apply non-blocking computations asynchronously to:

    1. All elements already present in the Stream instance's deque.
    2. All future elements, as they arrive.
  4. Results of the operations performed in item 3 are appended to a new Stream object. This is because it's important to preserve the original data, as it will often be necessary to map several callable's to a given stream.

At the risk of beating a dead horse, I want to insist upon the fact that the computations being mapped to a Stream instance are expected to return instances of Deferred.

The Question

Incidentally, this precisely where I'm stuck: I can implement items 1, 2 & 3 quite trivially, but I'm struggling with how to handle populating the results Stream. The difficulty stems from the fact that I have no guarantees of stream length, so it's completely possible for data to arrive while I'm waiting for some asynchronous operations to complete. It's also entirely possible for async operation Oi to complete after Oi + n, so I can't just add deque.append as a callback.

So how should I approach this problem? Is there some nifty, hidden feature of Twisted I have yet to discover? Do any twisty-fingered developers have any ideas or patterns I could apply?


Solution

  • I don't know of any neat tricks that will help you with this. I think you probably just have to implement the re-ordering (or order-maintaining, depending on how you look at it) logic in your Stream.map implementation.

    If operation i + 1 completes before operation i then Stream.map will probably just have to hold on to that result until operation i completes. Then it can add results i and i + 1 to the output Stream.

    This suggests you may also want to support back-pressure on your input. The re-ordering requirement means you have an extra buffer in your application. You don't want to allow that buffer to grow without bounds so when it reaches some maximum size you probably want to tell whoever is sending you inputs that you can't keep up and they should back off. The IProducer and IConsumer interfaces in Twisted are the standard way to do this now (though something called "tubes" has been in development for a while to replace these interfaces with something easier to use - but I won't suggest that you should hold your breath on that).