Search code examples
performancec++11handlermessage

Efficient message factory and handler in C++


Our company is rewriting most of the legacy C code in C++11. (Which also means I am a C programmer learning C++). I need advice on message handlers.

We have distributed system - Server process sends a packed message over TCP to client process.

In C code this was being done: - parse message based on type and subtype, which are always the first 2 fields

- call a handler as handler[type](Message *msg)

- handler creates temporary struct say, tmp_struct to hold the parsed values and .. 

- calls subhandler[type][subtype](tmp_struct)

There is only one handler per type/subtype.

Moving to C++11 and mutli-threaded environment. The basic idea I had was to -

1) Register a processor object for each type/subtype combination. This is
actually a vector of vectors - vector< vector >

class MsgProcessor {

    // Factory function
    virtual Message *create();
    virtual Handler(Message *msg)
}

This will be inherited by different message processors

class AMsgProcessor : public MsgProcessor {

      Message *create() override();
      handler(Message *msg);
}

2) Get the processor using a lookup into the vector of vectors. Get the message using the overloaded create() factory function. So that we can keep the actual message and the parsed values inside the message.

3) Now a bit of hack, This message should be send to other threads for the heavy processing. To avoid having to lookup in the vector again, added a pointer to proc inside the message.

class Message {
    const MsgProcessor *proc; // set to processor, 
                              // which we got from the first lookup
                              // to get factory function.
};

So other threads, will just do

Message->proc->Handler(Message *);

This looks bad, but hope, is that this will help to separate message handler from the factory. This is for the case, when multiple type/subtype wants to create same Message, but handle it differently.

I was searching about this and came across :

http://www.drdobbs.com/cpp/message-handling-without-dependencies/184429055?pgno=1

It provides a way to completely separate the message from the handler. But I was wondering if my simple scheme above will be considered an acceptable design or not. Also is this a wrong way of achieving what I want?

Efficiency, as in speed, is the most important requirement from this application. Already we are doing couple of memory Jumbs => 2 vectors + virtual function call the create the message. There are 2 deference to get to the handler, which is not good from caching point of view I guess.


Solution

  • Though your requirement is unclear, I think I have a design that might be what you are looking for.

    Check out http://coliru.stacked-crooked.com/a/f7f9d5e7d57e6261 for the fully fledged example.

    It has following components:

    1. An interface class for Message processors IMessageProcessor.
    2. A base class representing a Message. Message
    3. A registration class which is essentially a singleton for storing the message processors corresponding to (Type, Subtype) pair. Registrator. It stores the mapping in a unordered_map. You can also tweak it a bit for better performance. All the exposed API's of Registrator are protected by a std::mutex.
    4. Concrete implementations of MessageProcessor. AMsgProcessor and BMsgProcessor in this case.
    5. simulate function to show how it all fits together.

    Pasting the code here as well:

    /*
     * http://stackoverflow.com/questions/40230555/efficient-message-factory-and-handler-in-c
     */
    
    #include <iostream>
    #include <vector>
    #include <tuple>
    #include <mutex>
    #include <memory>
    #include <cassert>
    #include <unordered_map>
    
    class Message;
    
    class IMessageProcessor
    {
    public:
      virtual Message* create() = 0;
      virtual void handle_message(Message*) = 0;
      virtual ~IMessageProcessor() {};
    };
    
    /*
     * Base message class
     */
    class Message
    {
    public:
      virtual void populate() = 0;
      virtual ~Message() {};
    };
    
    using Type = int;
    using SubType = int;
    using TypeCombo = std::pair<Type, SubType>;
    using IMsgProcUptr = std::unique_ptr<IMessageProcessor>;
    
    /*
     * Registrator class maintains all the registrations in an
     * unordered_map.
     * This class owns the MessageProcessor instance inside the
     * unordered_map.
     */
    class Registrator
    {
    public:
      static Registrator* instance();
    
      // Diable other types of construction
      Registrator(const Registrator&) = delete;
      void operator=(const Registrator&) = delete;
    
    public:
      // TypeCombo assumed to be cheap to copy
      template <typename ProcT, typename... Args>
      std::pair<bool, IMsgProcUptr> register_proc(TypeCombo typ, Args&&... args)
      {
        auto proc = std::make_unique<ProcT>(std::forward<Args>(args)...);
        bool ok;
        {
          std::lock_guard<std::mutex> _(lock_);
          std::tie(std::ignore, ok) = registrations_.insert(std::make_pair(typ, std::move(proc)));
        }
        return (ok == true) ? std::make_pair(true, nullptr) : 
                              // Return the heap allocated instance back
                              // to the caller if the insert failed.
                              // The caller now owns the Processor
                              std::make_pair(false, std::move(proc));
      }
    
      // Get the processor corresponding to TypeCombo
      // IMessageProcessor passed is non-owning pointer
      // i.e the caller SHOULD not delete it or own it
      std::pair<bool, IMessageProcessor*> processor(TypeCombo typ)
      {
        std::lock_guard<std::mutex> _(lock_);
    
        auto fitr = registrations_.find(typ);
        if (fitr == registrations_.end()) {
          return std::make_pair(false, nullptr);
        }
        return std::make_pair(true, fitr->second.get());
      }
    
      // TypeCombo assumed to be cheap to copy
      bool is_type_used(TypeCombo typ)
      {
        std::lock_guard<std::mutex> _(lock_);
        return registrations_.find(typ) != registrations_.end();
      }
    
      bool deregister_proc(TypeCombo typ)
      {
        std::lock_guard<std::mutex> _(lock_);
        return registrations_.erase(typ) == 1;
      }
    
    private:
      Registrator() = default;
    
    private:
      std::mutex lock_;
      /*
       * Should be replaced with a concurrent map if at all this
       * data structure is the main contention point (which I find
       * very unlikely).
       */
      struct HashTypeCombo
      {
      public:
        std::size_t operator()(const TypeCombo& typ) const noexcept
        {
          return std::hash<decltype(typ.first)>()(typ.first) ^ 
                 std::hash<decltype(typ.second)>()(typ.second);
        }
      };
    
      std::unordered_map<TypeCombo, IMsgProcUptr, HashTypeCombo> registrations_;
    };
    
    Registrator* Registrator::instance()
    {
      static Registrator inst;
      return &inst;
      /*
       * OR some other DCLP based instance creation
       * if lifetime or creation of static is an issue
       */
    }
    
    
    // Define some message processors
    
    class AMsgProcessor final : public IMessageProcessor
    {
    public:
      class AMsg final : public Message 
      {
      public:
        void populate() override {
          std::cout << "Working on AMsg\n";
        }
    
        AMsg() = default;
        ~AMsg() = default;
      };
    
      Message* create() override
      {
        std::unique_ptr<AMsg> ptr(new AMsg);
        return ptr.release();
      }
    
      void handle_message(Message* msg) override
      {
        assert (msg);
        auto my_msg = static_cast<AMsg*>(msg);
    
        //.... process my_msg ?
        //.. probably being called in some other thread
        // Who owns the msg ??
        (void)my_msg; // only for suppressing warning
    
        delete my_msg;
    
        return;
      }
    
      ~AMsgProcessor();
    };
    
    AMsgProcessor::~AMsgProcessor()
    {
    }
    
    class BMsgProcessor final : public IMessageProcessor
    {
    public:
      class BMsg final : public Message
      {
      public:
        void populate() override {
          std::cout << "Working on BMsg\n";
        }
    
        BMsg() = default;
        ~BMsg() = default;
      };
    
      Message* create() override
      {
        std::unique_ptr<BMsg> ptr(new BMsg);
        return ptr.release();
      }
    
      void handle_message(Message* msg) override
      {
        assert (msg);
        auto my_msg = static_cast<BMsg*>(msg);
    
        //.... process my_msg ?
        //.. probably being called in some other thread
        //Who owns the msg ??
        (void)my_msg; // only for suppressing warning
    
        delete my_msg;
    
        return;
      }
    
      ~BMsgProcessor();
    };
    
    BMsgProcessor::~BMsgProcessor()
    {
    }
    
    
    TypeCombo read_from_network()
    {
      return {1, 2};
    }
    
    
    struct ParsedData {
    };
    
    Message* populate_message(Message* msg, ParsedData& pdata)
    {
      // Do something with the message
      // Calling a dummy populate method now
      msg->populate();
      (void)pdata;
      return msg;
    }
    
    void simulate()
    {
      TypeCombo typ = read_from_network();
      bool ok;
      IMessageProcessor* proc = nullptr;
    
      std::tie(ok, proc) = Registrator::instance()->processor(typ);
      if (!ok) {
        std::cerr << "FATAL!!!" << std::endl;
        return;
      }
    
      ParsedData parsed_data;
      //..... populate parsed_data here ....
    
      proc->handle_message(populate_message(proc->create(), parsed_data));
      return;
    }
    
    
    int main() {
    
      /*
       * TODO: Not making use or checking the return types after calling register
       * its a must in production code!!
       */
      // Register AMsgProcessor
      Registrator::instance()->register_proc<AMsgProcessor>(std::make_pair(1, 1));
      Registrator::instance()->register_proc<BMsgProcessor>(std::make_pair(1, 2));
    
      simulate();
    
      return 0;
    }
    

    UPDATE 1

    The major source of confusion here seems to be because the architecture of the even system is unknown.

    Any self respecting event system architecture would look something like below:

    1. A pool of threads polling on the socket descriptors.
    2. A pool of threads for handling timer related events.
    3. Comparatively small number (depends on application) of threads to do long blocking jobs.

    So, in your case:

    1. You will get network event on the thread doing epoll_wait or select or poll.
    2. Read the packet completely and get the processor using Registrator::get_processor call. NOTE: get_processor call can be made without any locking if one can guarantee that the underlying unordered_map does not get modified i.e no new inserts would be made once we start receiving events.
    3. Using the obtained processor we can get the Message and populate it.
    4. Now, this is the part that I am not that sure of how you want it to be. At this point, we have the processor on which you can call handle_message either from the current thread i.e the thread which is doing epoll_wait or dispatch it to another thread by posting the job (Processor and Message) to that threads receiving queue.