I am currently building an architechture with a rails process and multiple worker processes which need to be informed of certain events (like the creation of an object).
Rails
| API Worker
+----------o------o--------o------ - - -
Some other
daemon
I'd like to do the following
class Article
after_creation do
MessageBus.send type: "article-created", id: self.id
end
end
While the processes (API, Worker, Daemons, ...) just subscribe to the message bus and a block is called when a message comes in.
MessageBus.subscribe do |msg|
if msg['type'] == 'article-created'
# if this is my websocket daemon, I would push the article to the browser
# if this is my indexing daemon, I would reindex the full-text search
# if this is ... you get the deal.
end
end
Currently I am using a local unix domain socket where I push JSON in with UNIXSocket
and get it with EventMachine.start_unix_domain_server
. But that allows only two-way communication. I also thought about using resque, but this is more a message queue while I need a bus. And it depends on redis. I am quite sure there must be a gem, that implements some message bus in ruby, but googling did not lead to any result
Finally I hacked a quick own solution using Eventmachine Channels.
This is my server. Basically a clients connect to /tmp/messagebus.sock
and send data. Everything that is pushed into the socket is sent to all other clients.
require 'rubygems'
require 'eventmachine'
module Messagebus
class Server
attr_accessor :connections
attr_accessor :channel
def initialize
@connections = []
@channel = EventMachine::Channel.new
end
def start
@signature = EventMachine.start_unix_domain_server '/tmp/messagebus.sock', Connection do |conn|
conn.server = self
end
end
def stop
EventMachine.stop_server(@signature)
unless wait_for_connections_and_stop
EventMachine.add_periodic_timer(1) { wait_for_connections_and_stop }
end
end
def wait_for_connections_and_stop
if @connections.empty?
EventMachine.stop
true
else
puts "Waiting for #{@connections.size} connection(s) to finish ..."
false
end
end
end
class Connection < EventMachine::Connection
attr_accessor :server
def post_init
log "Connection opened"
end
def server=(server)
@server = server
@subscription = server.channel.subscribe do |data|
self.log "Sending #{data}"
self.send_data data
end
end
def receive_data(data)
log "Received #{data}"
server.channel.push data
end
def unbind
server.channel.unsubscribe @subscription
server.connections.delete(self)
log "Connection closed"
end
def log(msg)
puts "[#{self.object_id}] #{msg}"
end
end
end
EventMachine::run {
s = Messagebus::Server.new
s.start
puts "New server listening"
}