Search code examples
c++reactivexrxcpp

RxCpp calls copy constructor a lot


I am trying to include RxCpp in my program and I noticed, that the framework calls the copy constructor of emitted objects quite a lot.

#include <iostream>
#include <rxcpp/rx.hpp>

class Foo
{
public:
    Foo() = default;

    Foo(Foo const &other)
    {
        std::cout << "Copy constructor called" << std::endl;
    };

    Foo(Foo &&other) noexcept
    {
        std::cout << "Move constructor called" << std::endl;
    };
};

int main()
{
    Foo bar;
    rxcpp::sources::just(std::move(bar))
            .subscribe(
                    [](Foo const &baz)
                    {
                        std::cout << "Foo received" << std::endl;
                    }
            );

    return 0;
}

Running this outputs

Move constructor called
Copy constructor called
Move constructor called
Move constructor called
Move constructor called
Move constructor called
Move constructor called
Copy constructor called
Copy constructor called
Copy constructor called
Copy constructor called
Copy constructor called
Copy constructor called
Copy constructor called
Copy constructor called
Copy constructor called
Copy constructor called
Foo received

I noticed this first on a subject with which I wanted to publish an object that was created on stack after the completion of a network operation. In that case, the copy constructor was called (only?) 4 times, but no operators were acting in between and the subject only had one subscriber.

I understand, that calling the copy constructor is necessary, since multiple observers might be listening and they can not share a moved object. I also expect that each operator on an observable is just like another subscriber.

However, I don't understand why it happens so much internally especially in this example. This feels like I did something wrong. Is there some way to optimize this? Also is there a good reason why the move constructor is not used if there is only one subscriber?

Is it generally a good idea to use std::shared_ptr to emit larger objects through an observable to avoid copy constructor calls?


Solution

  • Yes, rxcpp does a lot of copies. The burden lies on the value to be cheaply copyable.

    PR’s are welcome, but must retain the existing model where each subscribe() is allowed to be called multiple times.

    This means each call to subscribe creates a subscription and must make a copy of the value for that subscription.

    Subscribe itself does an extra step if it is the first subscribe on a thread (as in this case). It uses the current thread scheduler to take ownership of the thread. This most likely copies the sender into the scheduled work. This is a case where some copies could be saved.

    just() itself probably schedules the call to on_next using the specified scheduler arg, which is defaulted to current thread scheduler in this case, with another copy of the value.