Search code examples
gnuradiocircular-buffergnuradio-companion

GNU Radio circular buffer manipulation


I encountered the following error

gr::log :WARN: tpb_thread_body - asynchronous message buffer overflowing, dropping message

Out of serendipity, I ran into this GNU Radio presentation on Youtube.

The presenter mentioned an OOT block he called "buffer" that is capable of eliminating the "buffer overflowing" error. Apparently, this block plays with different sample rates and the so-called "circular buffers". I haven't worked with circular buffers myself. Any ideas on circular buffers or any hints on how to build this buffer block are welcome.

Flowgraph

EDIT

Below is the flowgraph that generates the error. As it was suggested in the comments, the culprits could be the message processing blocks (red-circled) namely generateCADU (for generating standard CCSDS frames) and processCADU (for extracting CADUs from a data stream). flowgraph The implementation file of the generateCADU block is given below

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gnuradio/io_signature.h>
#include "generateCADU_impl.h"
#include "fec/ReedSolomon/ReedSolomon.h"
#include "fec/Scrambler/Scrambler.h"

namespace gr {
  namespace ccsds {

generateCADU::sptr
generateCADU::make(int frameLength,std::string sync, int scramble, int rs, int intDepth)
{
  return gnuradio::get_initial_sptr
    (new generateCADU_impl(frameLength, sync, scramble, rs, intDepth));
}

/*
 * The private constructor
 */
generateCADU_impl::generateCADU_impl(int frameLength,std::string sync, int scramble, int rs, int intDepth)
  : gr::sync_block("generateCADU",
          gr::io_signature::make(1, 1, sizeof(unsigned char)),
           gr::io_signature::make(0, 0, 0)),
d_frameLength(frameLength),d_scramble(scramble == 1),d_rs(rs >= 1), d_basis(rs >= 2), d_intDepth(intDepth)

{
  set_output_multiple(d_frameLength);
  //Registering output port
  message_port_register_out(pmt::mp("out"));
  d_sync = parse_string(sync);
}

/*
 * Our virtual destructor.
 */
generateCADU_impl::~generateCADU_impl()
{
}

unsigned char
generateCADU_impl::parse_hex(char c)
{
  if ('0' <= c && c <= '9') return c - '0';
  if ('A' <= c && c <= 'F') return c - 'A' + 10;
  if ('a' <= c && c <= 'f') return c - 'a' + 10;
  std::abort();
}

std::vector<unsigned char>
generateCADU_impl::parse_string(const std::string & s)
{
  if (s.size() % 2 != 0) std::abort();
  std::vector<unsigned char> result(s.size() / 2);

  for (std::size_t i = 0; i != s.size() / 2; ++i)
    result[i] = 16 * parse_hex(s[2 * i]) + parse_hex(s[2 * i + 1]);

  return result;
}
int
generateCADU_impl::work(int noutput_items,
    gr_vector_const_void_star &input_items,
    gr_vector_void_star &output_items)
{
  const unsigned char *in = (const unsigned char *) input_items[0];

  //Reed-Solomon and Scrambler objects
  ReedSolomon RS(16,d_intDepth,d_basis);// False = conventional, True = dual-basis
  Scrambler S;

  //Buffers
  unsigned char *frameBuffer1 = (unsigned char*)malloc(d_frameLength*sizeof(unsigned char));
  std::vector<unsigned char> frameBuffer2;

  //The work function engine
  for(int i = 0; (i + d_frameLength) < noutput_items; i += d_frameLength)
{
  //Copying data from input stream
  memcpy(frameBuffer1,in + i + d_frameLength,d_frameLength);

  //Copying frame into std::vector buffer
  frameBuffer2.insert(frameBuffer2.begin(),frameBuffer1, frameBuffer1 + d_frameLength);

  //Optional scrambling and Reed-Solomon
  if (d_rs) RS.Encode_RS(frameBuffer2);
  if (d_scramble) S.Scramble(frameBuffer2);

  //Insert sync word
  frameBuffer2.insert(frameBuffer2.begin(), d_sync.begin(), d_sync.end());

  //Transmitting PDU
  pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer2.data(),frameBuffer2.size())));
  message_port_pub(pmt::mp("out"), pdu);

  //Clear buffer
  frameBuffer2.clear();
}

  // Tell runtime system how many output items we produced.
  return noutput_items;
}

} /* namespace ccsds */
} /* namespace gr */

And here is the processCADU block. This block uses tags generated by the synchronizeCADU (which is simply a wrapper for the correlate_access_tag block) to extract CADUs

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gnuradio/io_signature.h>
#include "processCADU_impl.h"
#include "fec/ReedSolomon/ReedSolomon.h"
#include "fec/Scrambler/Scrambler.h"

namespace gr {
   namespace ccsds {

processCADU::sptr
processCADU::make(int frameLength, int scramble, int rs, int intDepth, std::string tagName)
{
  return gnuradio::get_initial_sptr
    (new processCADU_impl(frameLength, scramble, rs, intDepth, tagName));
}

/*
 * The private constructor
 */
processCADU_impl::processCADU_impl(int frameLength, int scramble, int rs, int intDepth, std::string tagName)
  : gr::sync_block("processCADU",
           gr::io_signature::make(1, 1, sizeof(unsigned char)),
           gr::io_signature::make(0, 0, 0)),
d_frameLength(frameLength),d_scramble(scramble == 1),d_rs(rs >= 1), d_basis(rs >= 2), d_intDepth(intDepth)
{
  //Multiple input
  set_output_multiple(d_frameLength * 8);

  //Registering output port
  message_port_register_out(pmt::mp("out"));

  if (d_rs) d_frameLength += 32 * d_intDepth;
  //SEtting tag name
  key = pmt::mp(tagName);
}

/*
 * Our virtual destructor.
 */
processCADU_impl::~processCADU_impl()
{
  delete d_pack;
}

int
processCADU_impl::work(int noutput_items,
           gr_vector_const_void_star &input_items,
           gr_vector_void_star &output_items)
{
  const unsigned char *in = (const unsigned char *) input_items[0];
  unsigned char *out = (unsigned char *) output_items[0];

  void *msg_data = NULL;
  unsigned char frame_data[d_frameLength];
  unsigned char frame_len = 0;
  std::vector<unsigned char> frameBuffer;

  //Reed-Solomon and Scrambler objects
  ReedSolomon RS(16,d_intDepth,d_basis);// False = conventional, True = dual-basis
  std::vector<int> errors;//errors.push_back(0);
  Scrambler S;

  d_tags.clear();
  d_pack = new blocks::kernel::pack_k_bits(8);

  this->get_tags_in_window(d_tags, 0, 0, noutput_items,key);
  for(d_tags_itr = d_tags.begin(); d_tags_itr != d_tags.end(); d_tags_itr++) {
// Check that we have enough data for a full frame
if ((d_tags_itr->offset - this->nitems_read(0)) > (noutput_items - (d_frameLength) * 8))
  {
    return (d_tags_itr->offset - this->nitems_read(0) - 1);
  }
//Pack bits into bytes
d_pack->pack(frame_data, &in[d_tags_itr->offset - this->nitems_read(0)], d_frameLength);

//Copying frame into std::vector buffer
frameBuffer.insert(frameBuffer.begin(),frame_data, frame_data + d_frameLength);

//Optional scrambling and Reed-Solomon
if (d_scramble) S.Scramble(frameBuffer);
//if (d_rs) RS.Decode_RS(frameBuffer,errors);
//If there is Reed-Solomon decoding

if(d_rs)
  {
    RS.Decode_RS(frameBuffer,errors);
    if (RS.Success(errors)) // Success
      {
    //std::cout << "Success" << std::endl;
    pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer.data(),frameBuffer.size())));
        message_port_pub(pmt::mp("out"), pdu);
    /*for(int i=0; i < errors.size(); i++)
      {

        //std::cout << "Number of Errors : " << errors.at(i) << std::endl << std::endl;
        }*/
      }
    else // Failure
      {
    std::cout << "RS failure" << std::endl;
      }
  }
  else{
  pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer.data(),frameBuffer.size())));
  message_port_pub(pmt::mp("out"), pdu);
  }

//Clear buffers
frameBuffer.clear();
errors.clear();
  }

  // Tell runtime system how many output items we produced.
  return noutput_items;
}

} /* namespace ccsds */
} /* namespace gr */

Regards, M


Solution

  • Thanks to @MarcusMüller suggestion, using the tagged_stream paradigma as opposed to PDUs solved the problem. I was able to transmit 47 terabytes of data without any problems. Below is the code for the newly implemented block.

    #ifdef HAVE_CONFIG_H
    #include "config.h"
    #endif
    
    #include <gnuradio/io_signature.h>
    #include "genCADU_impl.h"
    
    
    namespace gr {
      namespace ccsds {
    
    genCADU::sptr
    genCADU::make(int frameLength,std::string sync, int scramble, int rs, int intDepth, std::string len_tag_key)
    {
      return gnuradio::get_initial_sptr
        (new genCADU_impl(frameLength, sync, scramble, rs, intDepth, len_tag_key));
    }
    
    /*
     * The private constructor
     */
    genCADU_impl::genCADU_impl(int frameLength,std::string sync, int scramble, int rs, int intDepth, std::string len_tag_key)
      : gr::tagged_stream_block("genCADU",
              gr::io_signature::make(1, 1, sizeof(unsigned char)),
              gr::io_signature::make(1, 1, sizeof(unsigned char)),len_tag_key),
        d_frameLength(frameLength),d_scramble(scramble == 1),d_rs(rs >= 1), d_basis(rs >= 2), d_intDepth(intDepth)
    {
        //Synchronization pattern
        d_sync = parse_string(sync);
    
        //Reed-Solomon and Scrambler objects
    RS = new ReedSolomon(16,d_intDepth,d_basis);// False = conventional, True = dual-basis
        S = new Scrambler();
    }
    
    /*
     * Our virtual destructor.
     */
    genCADU_impl::~genCADU_impl()
    {
      delete RS;
      delete S;
    }
    
    int
    genCADU_impl::calculate_output_stream_length(const gr_vector_int &ninput_items)
    {
      int noutput_items = (d_rs) ? d_frameLength + 32*d_intDepth + d_sync.size() : d_frameLength + d_sync.size();
      return noutput_items ;
    }
    
    unsigned char
    genCADU_impl::parse_hex(char c)
    {
      if ('0' <= c && c <= '9') return c - '0';
      if ('A' <= c && c <= 'F') return c - 'A' + 10;
      if ('a' <= c && c <= 'f') return c - 'a' + 10;
      std::abort();
    }
    
    std::vector<unsigned char>
    genCADU_impl::parse_string(const std::string & s)
    {
      if (s.size() % 2 != 0) std::abort();
      std::vector<unsigned char> result(s.size() / 2);
    
      for (std::size_t i = 0; i != s.size() / 2; ++i)
        result[i] = 16 * parse_hex(s[2 * i]) + parse_hex(s[2 * i + 1]);
    
      return result;
    }
    
    int
    genCADU_impl::work (int noutput_items,
                       gr_vector_int &ninput_items,
                       gr_vector_const_void_star &input_items,
                       gr_vector_void_star &output_items)
    {
      const unsigned char *in = (const unsigned char *) input_items[0];
      unsigned char *out = (unsigned char *) output_items[0];
    
      int total_len;
    
      //Copy pdu from circular buffer to local buffer
      buffer.insert(buffer.end(), in, in +  d_frameLength);
    
      //Optional scrambling and Reed-Solomon. TO DO: Turbo and LDPC
      if (d_rs) RS->Encode_RS(buffer);
      if (d_scramble) S->Scramble(buffer);
    
      //Insert sync word
      buffer.insert(buffer.begin(), d_sync.begin(), d_sync.end());
    
      //Copy from local buffer to circular buffer
      std::copy(buffer.begin(),buffer.end(),out);
    
      //Clear the local buffer
      total_len = buffer.size();
      buffer.clear();
    
      // Tell runtime system how many output items we produced.
      return total_len;
    }
    
    } /* namespace ccsds */
    } /* namespace gr */
    

    Regards,

    M.