I want to emit server-sent events whenever the update
action in a controller is called (or whenever my model is updated). I currently have a working watch
action in the same controller with a dummy emitter:
def watch
self.response.headers["Content-Type"] = "text/event-stream"
self.response.headers["Last-Modified"] = Time.now.ctime.to_json
self.response_body = Enumerator.new do |y|
100.times do |i|
sleep 5
y << ["event: message", "data: #{i}\n\n"].join("\n")
end
end
# TODO catch IO error when client disconnects
end
How can I get an enumerable object that yields/returns a value when update
is called? (NB: It doesn't truly have to be an Enumerable; but it does have to respond to #each
for this streaming technique to work.) In some senses, I'm trying to implement an event-driven architecture in Rails I guess.
I'm aware of Observable but I can't figure out how to get my observer to be enumerable as required for this... or how to put an observer in the Enumerator (as above) without having a loop and sleep timer.
The purpose of this is to have changes that are made to the database sent to all other users currently logged in, such that each user always has a current reflection of the database.
Thanks-
This is not necessarily the best solution, but I ended up creating a message queue as follows: I created a new table ("SSE") in my database, and in the model that I wanted to observe, I added the callbacks:
class MyModel < ActiveRecord::Base
after_save :queue_sse
after_delete :queue_sse
# ...
private
def queue_sse
connected_users.each do |user|
SSE.create(:changed_record_id => this.id, :user_id => user)
end
end
end
Then in the watch
action:
def watch
connected_users << current_user # pseudo for a mutex-synched accessor
self.response.headers["Content-Type"] = "text/event-stream"
self.response.headers["Last-Modified"] = Time.now.ctime.to_json
self.response_body = Enumerator.new do |y|
loop do
sleep 5
ActiveRecord::Base.uncached do
updates = SSE.find_all_by_user_id(current_user)
updates.each do |update|
puts "update found: #{update.id}\n"
y << ["event: message", "data: #{update.id}\n\n"].join("\n")
update.destroy
end
end
end
end
# TODO add error catching, and on IOError, remove the current_user
end
This hits the database a lot though. It should probably be built on memcached, a mutex'ed class variable or similar.
(NB -- requires threading, e.g. config.threadsafe!
and a threaded server of course.)