Search code examples
c++multithreadingboostpipeline

C++ Filter Pipeline


I want to develop a Filter Pipeline for my Application. The Pipeline should consist of any number of filters.

For the Filters i declare an abstract base class like this:

struct AbstractFilter {
    virtual void execute(const std::string& message) = 0;
    virtual ~AbstractFilter() = default;
}

Each Filter should inherit from this base class and implement the execute Method. Like so:

struct PrintMessage : public AbstractFilter {
    void execute(const std::string& message) override {
        std::cout << "Filter A " << message << '\n';
        //hand over message to next Filter

    }
}

struct Upper : public AbstractFilter {
    void execute(const std::string& message) override {
        std::string new_line;
        for (char c : line)
           new_line.push_back(std::toupper(c));
        //hand over message to next Filter
    }
}

struct WriteToFile : public AbstractFilter {
    void execute(const std::string& message) override {
        std::ofstream of{"test.txt"};
        of << message;
        of.close();
    }
}

EDIT 1:

The Message should be send from one filter to the next in the Pipeline. If the pipeline for example is like this:

Upper -- PrintMessage -- WriteToFile

The Message should pass all the 3 Filters. (For example if Upper finished his work the message should be send to PrintMessage and so on)

In the example above if the Message Hello World is send to the Pipeline the output should be:

Console:
HELLO WORLD
test.txt:
HELLO WORLD

EDIT 2:

The Filter only changes the content of the given Message. The Type is not changed. Every Filter should work with for example strings or a given class. The Message is only forwarded to one recipient.

My Question is now how to connect these Filters?

My First guess was to use Queues. So every Filter gets an Input and Output Queue. For this i think every filter should run inside it's own Thread and be notified if data is added to his Input Queue. (The Output Queue of for example FilterA is also the Input Queue of FilterB)

My Second Guess was to use the Chain Of Responsibility Pattern and boost::signals2 So FilterB for example connects to the Signal of FilterA. FilterA calls these Filter when it finished it's work.

Which of the two solutions is the more flexible? Or is there even a better way to connect the Filters?

An additional Question is it also possible to run the whole Pipeline inside a Thread so that i can start multiple Pipelines? (In the Example have 3 of the FilterA-FilterB-FilterD Pipeline up and running?)


Solution

  • I would procede in this way: create a List with all the implemented versions of the Abstract Filter. So, following your exmample, after reading the input file I will get a list with:

    [0]:Upper 
    [1]:PrintMessage
    [2]:WriteToFile
    

    Then a single thread (or a thread poll if you need to process many string at time) waiting a string in an input queue. When a new string appears in the pool, the thread loops on the filter list and at the end posts the result in an output queue.

    If you want to run it in parallel, you need to find a way to keep the order of the input strings anche nelle stringhe di output.