Search code examples
pythonrubynode.jsapitornado

How can I create a Twitter-like streaming API with parameterized filtering?


I am trying to develop a data streaming API with the same functionality as Twitter’s streaming API (https://dev.twitter.com/streaming/reference/post/statuses/filter), namely a data stream with filtering capabilities. I am generating a lot of data and want to serve it to clients.

I understand how to make an app that serves the all clients the same data. That’s relatively easy. The difficulty I’m having comes from allowing clients to specify data filters and serving unique data to each client.

crude diagram

My thoughts:

First I thought to open a streaming http requests (like Twitter). I could create an endpoint that accepts GET requests with parameters (e.g. https://stream.example.com/v1/filter.json?track=twitter). According to this answer Streaming API vs Rest API?, this doesn’t scale easily and requires a lot of resources.

Then I thought to use websockets and let the client supply a filter message (e.g. locations=-122.75,36.8,-121.75,37.8). However, I can’t find a good example of a WS server dishing out unique data to each client. What would this class look like if it inherited tornado.websocket.WebSocketHandler or similar implementation?

I also considered pushing the data to a messaging service (RabbitMQ ) or database (Redis) and subscribing clients as they connect to their unique channel. (I think like this question Any ideas how to create parameterised streaming api?). I don’t know of an efficient way to create the unique channels. This also seems overly complicated.

I would prefer to do this in Python but I would also consider using Ruby and JS implementations.


Solution

  • Not too familiar with Python but I think that this should be possible using Websockets. Here's my take on it in Ruby, hopefully it is of any help. These are stripped down versions with much of the websocket functionality removed just to demonstrate.

    However for best practices regarding the use of streaming API I can't be of much help I'm afraid.

    Server

    require 'em-websocket'
    require 'json'
    
    def filtered_stream(filter, ws)
      loop do 
        # do something with filter, transform or send different kinds of data
        ws.send "#{filter} - hello"
        sleep 2
      end
    end
    
    EM.run {
      EM::WebSocket.run(:host => "127.0.0.1", :port => 9999) do |ws|
        ws.onopen { |handshake|
          # We can access some information in the handshake object such as headers
          # Perhaps even add the client to a list / table
        }
    
        ws.onmessage { |msg|
          # use ws.close to close the connection if needed
          # for example if the client attempts to send an invalid message or multiple messages?
    
          filter = JSON.parse(msg)['locations']
          Thread.new { filtered_stream(filter, ws) }
        }
      end
    }
    

    Client

    require 'websocket-eventmachine-client'
    
    EM.run do
      ws = WebSocket::EventMachine::Client.connect(
        :uri => 'ws://localhost:9999',
       )
    
      ws.onopen do
        # Unsure on how to supply the filter, headers is an option too
    
        ws.send "{\"locations\":\"-122.75,36.8,-121.75,37.8\"}"
      end
    
      ws.onmessage do |msg|
        p msg
      end
    end