Search code examples
ruby-on-railsrubymessage-bus

Is there a Ruby Message Bus gem?


I am currently building an architechture with a rails process and multiple worker processes which need to be informed of certain events (like the creation of an object).

Rails
  |         API   Worker  
  +----------o------o--------o------ - - -
                        Some other
                          daemon

I'd like to do the following

class Article
  after_creation do
    MessageBus.send type: "article-created", id: self.id
  end
end

While the processes (API, Worker, Daemons, ...) just subscribe to the message bus and a block is called when a message comes in.

MessageBus.subscribe do |msg|
  if msg['type'] == 'article-created'
    # if this is my websocket daemon, I would push the article to the browser
    # if this is my indexing daemon, I would reindex the full-text search
    # if this is ... you get the deal.
  end
end

Currently I am using a local unix domain socket where I push JSON in with UNIXSocket and get it with EventMachine.start_unix_domain_server. But that allows only two-way communication. I also thought about using resque, but this is more a message queue while I need a bus. And it depends on redis. I am quite sure there must be a gem, that implements some message bus in ruby, but googling did not lead to any result


Solution

  • Finally I hacked a quick own solution using Eventmachine Channels.

    This is my server. Basically a clients connect to /tmp/messagebus.sock and send data. Everything that is pushed into the socket is sent to all other clients.

    require 'rubygems'
    require 'eventmachine'
    
    module Messagebus
      class Server
        attr_accessor :connections
        attr_accessor :channel
    
        def initialize
          @connections = []
          @channel = EventMachine::Channel.new
        end
    
        def start
          @signature = EventMachine.start_unix_domain_server '/tmp/messagebus.sock', Connection do |conn|
            conn.server = self
          end
        end
    
        def stop
          EventMachine.stop_server(@signature)
    
          unless wait_for_connections_and_stop
            EventMachine.add_periodic_timer(1) { wait_for_connections_and_stop }
          end
        end
    
        def wait_for_connections_and_stop
          if @connections.empty?
            EventMachine.stop
            true
          else
            puts "Waiting for #{@connections.size} connection(s) to finish ..."
            false
          end
        end
      end
    
      class Connection < EventMachine::Connection
        attr_accessor :server
    
        def post_init
          log "Connection opened"
        end
    
        def server=(server)
          @server = server
    
          @subscription = server.channel.subscribe do |data|
            self.log "Sending #{data}"
            self.send_data data 
          end
        end
    
        def receive_data(data)
          log "Received #{data}"
          server.channel.push data
        end
    
        def unbind
          server.channel.unsubscribe @subscription
          server.connections.delete(self)
          log "Connection closed"
        end
    
        def log(msg)
          puts "[#{self.object_id}] #{msg}"
        end
      end
    end
    
    EventMachine::run {
      s = Messagebus::Server.new
      s.start
      puts "New server listening"
    }