Search code examples
redislibeventhiredis

Hiredis publisher only sending first message in a while loop


I am quite new to Hiredis/Redis and can't manage to create a working publisher in a while loop for a Pub/Sub architecture.
I succeeded in creating a publisher firing only one message, then exiting. But I'm trying to have a publisher sending messages regularly. Here is my publisher:

#include <signal.h>
#include <iostream>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>

using namespace std;

void pubCallback(redisAsyncContext *c, void *r, void *privdata) {

  redisReply *reply = (redisReply*)r;
  if (reply == NULL){
    cout<<"Response not recev"<<endl; 
    return;
  }
  cout<<"message published"<<endl;
  redisAsyncDisconnect(c);
}

int main(int argc, char* argv[])
{
    signal(SIGPIPE, SIG_IGN);
    struct event_base* base = event_base_new();
    int status;
    int i = 0;
    redisAsyncContext* _redisContext = redisAsyncConnect("172.17.0.2", 6379);

    if (_redisContext->err) {
        /* Let context leak for now... */
        cout<<"Error: "<< _redisContext->errstr<<endl;
        return 1;
    }
    redisLibeventAttach(_redisContext,base);

    while(1) {
        string command ("publish ");
        command.append("test_channel");
        command.append (" ");
        command.append(to_string(i));
        cout << command << endl;
        status = redisAsyncCommand(_redisContext, 
            pubCallback, 
            (char*)"pub", command.c_str()
        );
        event_base_dispatch(base);
        i+=1;
        usleep(1000000);
    }
}

With this publisher, only the first message "0" is received and subsequent commands seem to be ignored.
Is it possible to create a publisher publishing in a while loop? Do I have to create a new connection or disconnect/reconnect for each message?


Solution

  • I have managed to make it work.
    First of all, the redisAsyncDisconnect in the pubCallback function forbid my program to send subsequent messages. This line needed to be removed.
    This led to another issue as the program started hanging after the publication of the first message. That happened as the event loop will hang waiting for new events to dispatch. I needed a way to break this hang as soon as the message is published.
    The way to go is to call event_base_loopbreak in the pubCallback.

    Here's the working code:

    #include <signal.h>
    #include <iostream>
    #include <stdio.h>
    #include <string.h>
    #include <unistd.h>
    
    #include <hiredis/hiredis.h>
    #include <hiredis/async.h>
    #include <hiredis/adapters/libevent.h>
    
    using namespace std;
    
    void pubCallback(redisAsyncContext *c, void *r, void *privdata) {
    
      redisReply *reply = (redisReply*)r;
      if (reply == NULL){
        cout<<"Response not recev"<<endl; 
        return;
      }
      cout<<"message published"<<endl;
    
      redisLibeventEvents *e = (redisLibeventEvents*) c->ev.data;
      event_base_loopbreak(e->base);
    }
    
    void connectCallback(const redisAsyncContext *c, int status) {
        if (status != REDIS_OK) {
            printf("Error: %s\n", c->errstr);
            return;
        }
        printf("Connected...\n");
    }
    
    void disconnectCallback(const redisAsyncContext *c, int status) {
        if (status != REDIS_OK) {
            printf("Error: %s\n", c->errstr);
            return;
        }
        printf("Disconnected...\n");
    }
    
    int main(int argc, char* argv[])
    {
        signal(SIGPIPE, SIG_IGN);
        
        int status;
        int i = 0;
        
        redisAsyncContext* _redisContext = redisAsyncConnect("172.17.0.2", 6379);
        if (_redisContext->err) {
            /* Let context leak for now... */
            cout<<"Error: "<< _redisContext->errstr<<endl;
            return 1;
        }
        struct event_base* base = event_base_new();
    
        redisAsyncSetConnectCallback(_redisContext,connectCallback);
        redisAsyncSetDisconnectCallback(_redisContext,disconnectCallback);
        redisLibeventAttach(_redisContext,base);
    
        while(1) {
            string command ("publish ");
            command.append("test_channel");
            command.append (" ");
            command.append(to_string(i));
            cout << command << endl;
    
            status = redisAsyncCommand(_redisContext, 
                pubCallback, 
                (char*)"pub", command.c_str()
            );
    
            event_base_dispatch(base);
            i+=1;
            usleep(500000);
        }
    }