Search code examples
c++dbussystemdlibuv

Event loop handling for sd-bus in libuv


We have an eventloop from libuv to handle unixsockets and TCP sockets. The program now also must handle DBus, and we decided to use sd-bus for that.

Lennart wrote on his blog:

Note that our APIs, including sd-bus, integrate nicely into sd-event
event loops, but do not require it, and may be integrated into other
event loops too, as long as they support watching for time and I/O events.

So i assume, it must be possible.

I can get the dbus socket fd via sd_bus_get_fd (sd_bus *bus). But I can't find any obvious way to stop sd-bus from using its bus_poll method to wait for events internally.

For example when calling a method with sd_bus_call(...) will block with ppoll.

So: How do I handle the dbus events in libuv?


Solution

  • I figured it out, here's an example on how to unite C++, libuv and sd-bus:

    I recommend that you read http://0pointer.de/blog/the-new-sd-bus-api-of-systemd.html to understand sd-bus in general.

    These are code snippets from my implementation at https://github.com/TheJJ/horst

    Method calls can then be done with sd_bus_call_async which does not block (opposed to sd_bus_call). Don't forget to call update_events() after sd_bus_call_async so the call is sent out over the socket!

    /**
     * Callback function that is invoked from libuv
     * once dbus events flowed in.
     */
    static void on_dbus_ready(uv_poll_t *handle, int /*status*/, int /*events*/) {
        DBusConnection *connection = (DBusConnection *)handle->data;
    
        sd_bus *bus = connection->get_bus();
    
        // let dbus handle the available events request
        while (true) {
            // this will trigger the dbus vtable-registered functions
            int r = sd_bus_process(bus, nullptr);
    
            if (r < 0) {
                printf("[dbus] Failed to process bus: %s", strerror(-r));
                break;
            }
            else if (r > 0) {
                // try to process another request!
                continue;
            }
            else {
                // no more progress, wait for the next callback.
                break;
            }
        }
    
        // update the events we watch for on the socket.
        connection->update_events();
    }
    
    /**
     * Convert the sdbus-returned poll flags to
     * corresponding libuv flags.
     */
    int poll_to_libuv_events(int pollflags) {
        int ret = 0;
        if (pollflags & (POLLIN | POLLPRI)) {
            ret |= UV_READABLE;
        }
        if (pollflags & POLLOUT) {
            ret |= UV_WRITABLE;
        }
    
        // we also have the non-corresponding UV_DISCONNECT
    
        return ret;
    }
    
    
    class DBusConnection {
    public:
        DBusConnection(Satellite *sat);
    
        virtual ~DBusConnection() = default;
    
        /** connect to dbus */
        int connect() {
            int r = sd_bus_open_system(&this->bus);
    
            if (r < 0) {
                printf("[dbus] Failed to connect to bus: %s", strerror(-r));
                goto clean_return;
            }
    
            r = sd_bus_add_object_vtable(
                this->bus,
                &this->bus_slot,
                "/rofl/lol",      // object path
                "rofl.lol",       // interface name
                your_vtable,
                this              // this is the userdata that'll be passed
                                  // to the dbus methods
            );
    
            if (r < 0) {
                printf("[dbus] Failed to install the horst sdbus object: %s", strerror(-r));
                goto clean_return;
            }
    
            // register our service name
            r = sd_bus_request_name(this->bus, "moveii.horst", 0);
            if (r < 0) {
                printf("[dbus] Failed to acquire service name: %s", strerror(-r));
                goto clean_return;
            }
    
            // register the filedescriptor from
            // sd_bus_get_fd(bus) to libuv
            uv_poll_init(this->loop, &this->connection, sd_bus_get_fd(this->bus));
    
    
            // make `this` reachable in callbacks.
            this->connection.data = this;
    
            // init the dbus-event-timer
            uv_timer_init(this->loop, &this->timer);
            this->timer.data = this;
    
            // process initial events and set up the
            // events and timers for subsequent calls
            on_dbus_ready(&this->connection, 0, 0);
    
            printf("[dbus] Listener initialized");
            return 0;
    
        clean_return:
            sd_bus_slot_unref(this->bus_slot);
            sd_bus_unref(this->bus);
            this->bus = nullptr;
    
            return 1;
        }
    
    
    
        /** update the events watched for on the filedescriptor */
        void update_events() {
            sd_bus *bus = this->get_bus();
    
            // prepare the callback for calling us the next time.
            int new_events = poll_to_libuv_events(
                sd_bus_get_events(bus)
            );
    
            uint64_t usec;
            int r = sd_bus_get_timeout(bus, &usec);
    
            if (not r) {
                // if the timer is running already, it is stopped automatically
                // inside uv_timer_start.
                uv_timer_start(
                    &this->timer,
                    [] (uv_timer_t *handle) {
                        // yes, handle is not a poll_t, but
                        // we just care for its -> data member anyway.
                        on_dbus_ready((uv_poll_t *)handle, 0, 0);
                    },
                    usec / 1000, // time in milliseconds, sd_bus provides µseconds
                    0            // don't repeat
                );
            }
    
            // always watch for disconnects:
            new_events |= UV_DISCONNECT;
    
            // activate the socket watching,
            // and if active, invoke the callback function
            uv_poll_start(&this->connection, new_events, &on_dbus_ready);
        }
    
    
        /** close the connections */
        int close() {
            // TODO: maybe this memoryerrors when the loop actually
            //       does the cleanup. we have to wait for the callback.
            uv_close((uv_handle_t *) &this->timer, nullptr);
    
            uv_poll_stop(&this->connection);
    
            sd_bus_close(this->bus);
            sd_bus_slot_unref(this->bus_slot);
            sd_bus_unref(this->bus);
            return 0;
        }
    
        /**
          * Return the bus handle.
          */
        sd_bus *get_bus() const {
            return this->bus;
        }
    
    protected:
        /**
          * loop handle
          */
        uv_loop_t *loop;
    
        /**
          * polling object for dbus events
          */
        uv_poll_t connection;
    
        /**
          * dbus also wants to be called periodically
          */
        uv_timer_t timer;
    
        /**
          * dbus bus handle
          */
        sd_bus *bus;
    
        /**
          * dbus slot handle
          */
        sd_bus_slot *bus_slot;
    };