Search code examples
ruby-on-railsrubyeventsenumerableobservable

Ruby on Rails: Observe model changes from controller action


I want to emit server-sent events whenever the update action in a controller is called (or whenever my model is updated). I currently have a working watch action in the same controller with a dummy emitter:

def watch
    self.response.headers["Content-Type"] = "text/event-stream"
    self.response.headers["Last-Modified"] = Time.now.ctime.to_json
    self.response_body = Enumerator.new do |y|
        100.times do |i|
            sleep 5
            y << ["event: message", "data: #{i}\n\n"].join("\n")
        end
    end
    # TODO catch IO error when client disconnects
end

How can I get an enumerable object that yields/returns a value when update is called? (NB: It doesn't truly have to be an Enumerable; but it does have to respond to #each for this streaming technique to work.) In some senses, I'm trying to implement an event-driven architecture in Rails I guess.

I'm aware of Observable but I can't figure out how to get my observer to be enumerable as required for this... or how to put an observer in the Enumerator (as above) without having a loop and sleep timer.

The purpose of this is to have changes that are made to the database sent to all other users currently logged in, such that each user always has a current reflection of the database.

Thanks-


Solution

  • This is not necessarily the best solution, but I ended up creating a message queue as follows: I created a new table ("SSE") in my database, and in the model that I wanted to observe, I added the callbacks:

    class MyModel < ActiveRecord::Base
      after_save :queue_sse
      after_delete :queue_sse
      # ...
      private
        def queue_sse
          connected_users.each do |user|
            SSE.create(:changed_record_id => this.id, :user_id => user)
          end
        end
    end
    

    Then in the watch action:

    def watch
      connected_users << current_user # pseudo for a mutex-synched accessor
      self.response.headers["Content-Type"] = "text/event-stream"
      self.response.headers["Last-Modified"] = Time.now.ctime.to_json
      self.response_body = Enumerator.new do |y|
        loop do
          sleep 5
          ActiveRecord::Base.uncached do
            updates = SSE.find_all_by_user_id(current_user)
            updates.each do |update|
              puts "update found: #{update.id}\n"
              y << ["event: message", "data: #{update.id}\n\n"].join("\n")
              update.destroy
            end
          end
        end
      end
      # TODO add error catching, and on IOError, remove the current_user
    end
    

    This hits the database a lot though. It should probably be built on memcached, a mutex'ed class variable or similar.

    (NB -- requires threading, e.g. config.threadsafe! and a threaded server of course.)