Search code examples
c++opencvdata-distribution-service

Streams API For Fast-RTPS


I want to use fast-rtps to publish video(streams data) to subscriber. While I publish ten consecutive jpg file successfully, every picture received by subscriber wastes a lot of time to processing because I use function get_byte_value get a pixel one by one.

Do anyone know how to publish and subscribe more efficiently by fast-rtps midleware? (Create a new type? other?)

Below is my publisher's and subscriber's code:

Publisher.cpp

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");

// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");

/**
 * @file PicturePublisher.cpp
 *
 */

#include "Publisher.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastrtps/publisher/Publisher.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>

#include <thread>
#include <time.h>
#include <vector>

#include <opencv2/opencv.hpp>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;

// using namespace cv;

PicturePublisher::PicturePublisher()
    : mp_participant(nullptr)
    , mp_publisher(nullptr)
    , m_DynType(DynamicType_ptr(nullptr))
{
}

bool PicturePublisher::init()
{
    cv::Mat image = cv::imread("drone.jpg", 1);
    std::vector<unsigned char> buffer;
    cv::imencode(".jpg", image, buffer);
    
    // Create basic builders
    DynamicTypeBuilder_ptr struct_type_builder(DynamicTypeBuilderFactory::get_instance()->create_struct_builder());

    DynamicType_ptr octet_type(DynamicTypeBuilderFactory::get_instance()->create_byte_type());
    DynamicTypeBuilder_ptr sequence_type_builder(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_type, 3873715));
    DynamicType_ptr sequence_type = sequence_type_builder->build();
    
    // Add members to the struct. By the way, id must be consecutive starting by zero.
    struct_type_builder->add_member(0, "index", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
    struct_type_builder->add_member(1, "size", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
    struct_type_builder->add_member(2, "Picture", sequence_type);
    struct_type_builder->set_name("Picture"); // Need to be same with topic data type
    
    DynamicType_ptr dynType = struct_type_builder->build();
    m_DynType.SetDynamicType(dynType);
    m_DynHello = DynamicDataFactory::get_instance()->create_data(dynType);
    
    m_DynHello->set_uint32_value(0, 0);
    m_DynHello->set_uint32_value(buffer.size(), 1);
    
    MemberId id;
    // std::cout << "init: " << id << std::endl;
    
    DynamicData* sequence_data = m_DynHello->loan_value(2);
    for (int i = 0; i < buffer.size(); i++) {
        if (i == buffer.size() - 1) {
            std::cout << "Total Size: " << i + 1 << std::endl;
        }
        sequence_data->insert_byte_value(buffer[i], id);
    }
    m_DynHello->return_loaned_value(sequence_data);

    ParticipantAttributes PParam;
    PParam.rtps.setName("DynPicture_pub");
    mp_participant = Domain::createParticipant(PParam, (ParticipantListener*)&m_part_list);

    if (mp_participant == nullptr)
    {
        return false;
    }

    //REGISTER THE TYPE
    Domain::registerDynamicType(mp_participant, &m_DynType);

    //CREATE THE PUBLISHER
    PublisherAttributes Wparam;
    Wparam.topic.topicKind = NO_KEY;
    Wparam.topic.topicDataType = "Picture";
    Wparam.topic.topicName = "PictureTopic";

    mp_publisher = Domain::createPublisher(mp_participant, Wparam, (PublisherListener*)&m_listener);
    if (mp_publisher == nullptr)
    {
        return false;
    }

    return true;

}

PicturePublisher::~PicturePublisher()
{
    Domain::removeParticipant(mp_participant);

    DynamicDataFactory::get_instance()->delete_data(m_DynHello);

    Domain::stopAll();
}

void PicturePublisher::PubListener::onPublicationMatched(
        Publisher* /*pub*/,
        MatchingInfo& info)
{
    if (info.status == MATCHED_MATCHING)
    {
        n_matched++;
        firstConnected = true;
        std::cout << "Publisher matched" << std::endl;
    }
    else
    {
        n_matched--;
        std::cout << "Publisher unmatched" << std::endl;
    }
}

void PicturePublisher::PartListener::onParticipantDiscovery(
        Participant*,
        ParticipantDiscoveryInfo&& info)
{
    if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
    }
    else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
    }
    else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
    }
}

void PicturePublisher::runThread(
        uint32_t samples,
        uint32_t sleep)
{
    uint32_t i = 0;

    while (!stop && (i < samples || samples == 0))
    {
        if (publish(samples != 0))
        {
            uint32_t index;
            m_DynHello->get_uint32_value(index, 0);
            std::cout << "runThreading...; \tSample Index: " << index << "; \t";

            uint32_t size;
            m_DynHello->get_uint32_value(size, 1);
            std::cout << "size: " << size << std::endl;
            

            if (i == 9){
                std::cout << "Structure message" << " with index: " << i + 1 << " SENT" << std::endl;
                // Avoid unmatched condition impact subscriber receiving message
                std::cout << "Wait within twenty second..." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(10000));
            }
            ++i;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
    }
}

void PicturePublisher::run(
        uint32_t samples,
        uint32_t sleep)
{
    stop = false;
    std::thread thread(&PicturePublisher::runThread, this, samples, sleep);
    if (samples == 0)
    {
        std::cout << "Publisher running. Please press enter to stop the Publisher at any time." << std::endl;
        std::cin.ignore();
        stop = true;
    }
    else
    {
        std::cout << "Publisher running " << samples << " samples." << std::endl;
    }
    thread.join();
}

bool PicturePublisher::publish(
        bool waitForListener)
{
    // std::cout << "m_listener.n_matched: " << m_listener.n_matched << std::endl;
    if (m_listener.firstConnected || !waitForListener || m_listener.n_matched > 0)
    {
        uint32_t index;
        m_DynHello->get_uint32_value(index, 0);
        m_DynHello->set_uint32_value(index + 1, 0);

        mp_publisher->write((void*)m_DynHello);
        
        return true;
    }
    return false;
}

In PicturePublisher::init() function

Subsciber.cpp

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");

// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");

/**
 * @file Subscriber.cpp
 *
 */

#include "Subscriber.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/subscriber/Subscriber.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>

#include <vector>
#include <string>
#include <sstream>
#include <iterator>

#include <opencv2/opencv.hpp>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;

// using namespace cv;

PictureSubscriber::PictureSubscriber()
    : mp_participant(nullptr)
    , mp_subscriber(nullptr)
    , m_DynType(DynamicType_ptr(nullptr))
{
}

struct timespec begin, end;
double elapsed;
std::vector<unsigned char> buffer;

bool PictureSubscriber::init()
{

    ParticipantAttributes PParam;
    PParam.rtps.setName("DynPicture_sub");
    mp_participant = Domain::createParticipant(PParam, (ParticipantListener*)&m_part_list);
    if (mp_participant == nullptr)
    {
        return false;
    }

    // Create basic builders
    DynamicTypeBuilder_ptr struct_type_builder(DynamicTypeBuilderFactory::get_instance()->create_struct_builder());

    DynamicTypeBuilder_ptr octet_builder(DynamicTypeBuilderFactory::get_instance()->create_byte_builder());
    DynamicTypeBuilder_ptr sequence_type_builder(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_builder.get(), 3873715));
    DynamicType_ptr sequence_type = sequence_type_builder->build();

    // Add members to the struct.
    struct_type_builder->add_member(0, "index", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
    struct_type_builder->add_member(1, "size", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
    struct_type_builder->add_member(2, "Picture", sequence_type);
    struct_type_builder->set_name("Picture");
    
    DynamicType_ptr dynType = struct_type_builder->build();
    m_DynType.SetDynamicType(dynType);
    m_listener.m_DynHello = DynamicDataFactory::get_instance()->create_data(dynType);

    //REGISTER THE TYPE
    Domain::registerDynamicType(mp_participant, &m_DynType);

    //CREATE THE SUBSCRIBER
    SubscriberAttributes Rparam;
    Rparam.topic.topicKind = NO_KEY;
    Rparam.topic.topicDataType = "Picture";
    Rparam.topic.topicName = "PictureTopic";

    mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, (SubscriberListener*)&m_listener);

    if (mp_subscriber == nullptr)
    {
        return false;
    }


    return true;
}

PictureSubscriber::~PictureSubscriber()
{
    Domain::removeParticipant(mp_participant);

    DynamicDataFactory::get_instance()->delete_data(m_listener.m_DynHello);

    Domain::stopAll();
}

void PictureSubscriber::SubListener::onSubscriptionMatched(
        Subscriber* /*sub*/,
        MatchingInfo& info)
{
    if (info.status == MATCHED_MATCHING)
    {
        n_matched++;
        std::cout << "Subscriber matched" << std::endl;
    }
    else
    {
        n_matched--;
        std::cout << "Subscriber unmatched" << std::endl;
    }
}

void PictureSubscriber::PartListener::onParticipantDiscovery(
        Participant*,
        ParticipantDiscoveryInfo&& info)
{
    if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
    }
    else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
    }
    else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
    }
}

void PictureSubscriber::SubListener::onNewDataMessage(
        Subscriber* sub)
{
    if (sub->takeNextData((void*)m_DynHello, &m_info))
    {
        if (m_info.sampleKind == ALIVE)
        {
            this->n_samples++;
            
            // Print your structure data here.
            uint32_t index;
            m_DynHello->get_uint32_value(index, 0);
            std::cout << "index: " << index <<  "; \t";

            uint32_t size;
            m_DynHello->get_uint32_value(size, 1);
            std::cout << "size: " << size <<  std::endl; 
            

            DynamicType_ptr octet_type_temp(DynamicTypeBuilderFactory::get_instance()->create_byte_type());
            DynamicTypeBuilder_ptr sequence_type_builder_temp(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_type_temp, 3873715));
            DynamicType_ptr sequence_type_temp = sequence_type_builder_temp->build();

            DynamicData* sequence_data_temp = m_DynHello->loan_value(2);
            for (int i = 0; i < size; i++) {
                buffer.push_back(sequence_data_temp->get_byte_value(i));
            }
            m_DynHello->return_loaned_value(sequence_data_temp);
            
            cv::Mat imageDecoded = cv::imdecode(buffer, 1);

            cv::imwrite(std::to_string(index) + "_droneNew.jpg", imageDecoded);
        }
    }
}

void PictureSubscriber::run()
{
    std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
    std::cin.ignore();
}

void PictureSubscriber::run(
        uint32_t number)
{
    std::cout << "Subscriber running until " << number << "samples have been received" << std::endl;
    while (number > this->m_listener.n_samples)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

In PictureSubscriber::SubListener::onNewDataMessage(Subscriber* sub) function


Solution

  • Here at eProsima, we have found some solutions to the problem you point out.

    Firstly, please note that you don't need to use Dynamic Types to define the type that contains the image you are going to transmit. The easiest thing to do in your case is to define your type through an IDL file. Using the IDL file and the Fast-DDS-Gen tool you can generate the code for access to the data type elements, as well as automatically generate the data serialization and deserialization functions. In the Picture.idl file you will find the type defined in IDL format that best suits the data type you have created with dynamic types. Here you can find a guide on how to use the Fast-DDS-Gen tool. In this documentation you will also find a complete example of how an IDL file can be used to generate a complete DDS publisher/subscriber application, as well as the supported formats for the data. Also below are the files Publisher.cpp and Subscriber.cpp which have been modified according to the new data type.

    We also recommend you to take a look at the example HelloWorldExample, as it is the one that best suits your needs. In this example you can also discover the new DDS API, included in the latest version of Fast DDS (2.1.0).

    As an additional comment, we recommend that, instead of transmitting an octet vector, you encode the image in string base64 format before transmitting it since it's one of the most widespread formats for image transmission.

    Picture.idl

    struct Picture
    {
        unsigned long index;
        unsigned long size;
        sequence<octet> picture;
    };
    

    Publisher.cpp

    // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    
    // Hpshboss modifys code from eprosima's github example;
    // Licensed under the Apache License, Version 2.0 (the "License");
    
    /**
     * @file PicturePublisher.cpp
     *
     */
    
    #include "PicturePublisher.h"
    #include <fastrtps/attributes/ParticipantAttributes.h>
    #include <fastrtps/attributes/PublisherAttributes.h>
    #include <fastrtps/publisher/Publisher.h>
    #include <fastrtps/Domain.h>
    #include <fastrtps/types/DynamicTypeBuilderFactory.h>
    #include <fastrtps/types/DynamicDataFactory.h>
    #include <fastrtps/types/DynamicTypeBuilder.h>
    #include <fastrtps/types/DynamicTypeBuilderPtr.h>
    #include <fastrtps/types/DynamicType.h>
    
    #include <thread>
    #include <time.h>
    #include <vector>
    
    #include <opencv2/opencv.hpp>
    
    using namespace eprosima::fastrtps;
    using namespace eprosima::fastrtps::rtps;
    using namespace eprosima::fastrtps::types;
    
    // using namespace cv;
    
    PicturePublisher::PicturePublisher()
        : mp_participant(nullptr)
        , mp_publisher(nullptr)
    {
    }
    
    bool PicturePublisher::init()
    {
        cv::Mat image = cv::imread("dog.jpg", cv::IMREAD_COLOR);
    
        if(image.empty())
        {
            std::cout << "Could not read the image." << std::endl;
            return false;
        }
        cv::imshow("Display window", image);
        int k = cv::waitKey(0);
    
        std::vector<unsigned char> buffer;
    
        if(!cv::imencode(".jpg", image, buffer)){
            printf("Image encoding failed");
        }
    
        m_Picture.index(0);
        m_Picture.size(buffer.size());
        m_Picture.picture(buffer);
    
        ParticipantAttributes PParam;
        PParam.rtps.setName("Picture_pub");
        mp_participant = Domain::createParticipant(PParam, &m_part_list);
    
        if (mp_participant == nullptr)
        {
            return false;
        }
    
        //REGISTER THE TYPE
        Domain::registerType(mp_participant, &m_type);
        // Domain::registerDynamicType(mp_participant, &m_DynType);
    
        //CREATE THE PUBLISHER
        PublisherAttributes Wparam;
        Wparam.topic.topicKind = NO_KEY;
        Wparam.topic.topicDataType = "Picture";
        Wparam.topic.topicName = "PictureTopic";
    
        mp_publisher = Domain::createPublisher(mp_participant, Wparam, (PublisherListener*)&m_listener);
        if (mp_publisher == nullptr)
        {
            return false;
        }
    
        return true;
    }
    
    PicturePublisher::~PicturePublisher()
    {
        Domain::removeParticipant(mp_participant);
    }
    
    void PicturePublisher::PubListener::onPublicationMatched(
            Publisher* /*pub*/,
            MatchingInfo& info)
    {
        if (info.status == MATCHED_MATCHING)
        {
            n_matched++;
            firstConnected = true;
            std::cout << "Publisher matched" << std::endl;
        }
        else
        {
            n_matched--;
            std::cout << "Publisher unmatched" << std::endl;
        }
    }
    
    void PicturePublisher::PartListener::onParticipantDiscovery(
            Participant*,
            ParticipantDiscoveryInfo&& info)
    {
        if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
        {
            std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
        }
        else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
        {
            std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
        }
        else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
        {
            std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
        }
    }
    
    void PicturePublisher::runThread(
            uint32_t samples,
            uint32_t sleep)
    {
        uint32_t i = 0;
    
        while (!stop && (i < samples || samples == 0))
        {
            if (publish(samples != 0))
            {
                std::cout << "runThreading...; \tSample Index: " << m_Picture.index() << "; \t";
    
                std::cout << "size: " << m_Picture.size() << std::endl;
    
    
                if (i == 9){
                    std::cout << "Structure message" << " with index: " << i + 1 << " SENT" << std::endl;
                    // Avoid unmatched condition impact subscriber receiving message
                    std::cout << "Wait within twenty second..." << std::endl;
                    std::this_thread::sleep_for(std::chrono::milliseconds(10000));
                }
                ++i;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
        }
    }
    
    void PicturePublisher::run(
            uint32_t samples,
            uint32_t sleep)
    {
        stop = false;
        std::thread thread(&PicturePublisher::runThread, this, samples, sleep);
        if (samples == 0)
        {
            std::cout << "Publisher running. Please press enter to stop the Publisher at any time." << std::endl;
            std::cin.ignore();
            stop = true;
        }
        else
        {
            std::cout << "Publisher running " << samples << " samples." << std::endl;
        }
        thread.join();
    }
    
    bool PicturePublisher::publish(
            bool waitForListener)
    {
        // std::cout << "m_listener.n_matched: " << m_listener.n_matched << std::endl;
        if (m_listener.firstConnected || !waitForListener || m_listener.n_matched > 0)
        {
            m_Picture.index(m_Picture.index() + 1);
    
            mp_publisher->write((void*)&m_Picture);
    
            return true;
        }
        return false;
    }
    

    Subscriber.cpp

    // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    
    // Hpshboss modifys code from eprosima's github example;
    // Licensed under the Apache License, Version 2.0 (the "License");
    
    /**
     * @file Subscriber.cpp
     *
     */
    
    #include "PictureSubscriber.h"
    #include <fastrtps/attributes/ParticipantAttributes.h>
    #include <fastrtps/attributes/SubscriberAttributes.h>
    #include <fastrtps/subscriber/Subscriber.h>
    #include <fastrtps/Domain.h>
    #include <fastrtps/types/DynamicTypeBuilderFactory.h>
    #include <fastrtps/types/DynamicDataFactory.h>
    #include <fastrtps/types/DynamicTypeBuilder.h>
    #include <fastrtps/types/DynamicTypeBuilderPtr.h>
    #include <fastrtps/types/DynamicType.h>
    
    #include <vector>
    #include <string>
    #include <sstream>
    #include <iterator>
    
    #include <opencv2/opencv.hpp>
    
    using namespace eprosima::fastrtps;
    using namespace eprosima::fastrtps::rtps;
    using namespace eprosima::fastrtps::types;
    
    // using namespace cv;
    
    PictureSubscriber::PictureSubscriber()
        : mp_participant(nullptr)
        , mp_subscriber(nullptr)
    {
    }
    
    struct timespec begin, end;
    double elapsed;
    std::vector<unsigned char> buffer;
    
    bool PictureSubscriber::init()
    {
    
        ParticipantAttributes PParam;
        PParam.rtps.setName("Picture_sub");
        mp_participant = Domain::createParticipant(PParam, &m_part_list);
        if (mp_participant == nullptr)
        {
            return false;
        }
    
        //REGISTER THE TYPE
        Domain::registerType(mp_participant, &m_type);
    
        //CREATE THE SUBSCRIBER
        SubscriberAttributes Rparam;
        Rparam.topic.topicKind = NO_KEY;
        Rparam.topic.topicDataType = "Picture";
        Rparam.topic.topicName = "PictureTopic";
    
        mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, (SubscriberListener*)&m_listener);
    
        if (mp_subscriber == nullptr)
        {
            return false;
        }
    
    
        return true;
    }
    
    PictureSubscriber::~PictureSubscriber()
    {
        Domain::removeParticipant(mp_participant);
    }
    
    void PictureSubscriber::SubListener::onSubscriptionMatched(
            Subscriber* /*sub*/,
            MatchingInfo& info)
    {
        if (info.status == MATCHED_MATCHING)
        {
            n_matched++;
            std::cout << "Subscriber matched" << std::endl;
        }
        else
        {
            n_matched--;
            std::cout << "Subscriber unmatched" << std::endl;
        }
    }
    
    void PictureSubscriber::PartListener::onParticipantDiscovery(
            Participant*,
            ParticipantDiscoveryInfo&& info)
    {
        if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
        {
            std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
        }
        else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
        {
            std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
        }
        else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
        {
            std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
        }
    }
    
    void PictureSubscriber::SubListener::onNewDataMessage(
            Subscriber* sub)
    {
        std::cout << "Data received." << std::endl;
    
        if (sub->takeNextData((void*)&m_Picture, &m_info))
        {
            if (m_info.sampleKind == ALIVE)
            {
                this->n_samples++;
    
                // Print your structure data here.
                uint32_t index = m_Picture.index();
                std::cout << "index: " << index <<  "; \t";
    
                std::cout << "size: " << m_Picture.size() <<  std::endl;
    
                cv::Mat imageDecoded = cv::imdecode(m_Picture.picture(), 1);
                cv::imwrite(std::to_string(index) + "_dog_received.jpg", imageDecoded);
            }
        }
    }
    
    void PictureSubscriber::run()
    {
        std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
        std::cin.ignore();
    }
    
    void PictureSubscriber::run(
            uint32_t number)
    {
        std::cout << "Subscriber running until " << number << "samples have been received" << std::endl;
        while (number > this->m_listener.n_samples)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
    }