Search code examples
kurento

Kurento : strange log is generated continuously "disconnectAll()"


I have been working on kurento past couple of months and today suddenly I started getting the following log:

[0x00007f40b9f6d700]   debug KurentoMediaElementImpl   MediaElementImpl.cpp:660 disconnectAll() <kmswebrtcendpoint3>  Retry disconnect all <kmshubport3>

this log is generated continuously and causing 100% cpu usage with 100+mb of multiple log file getting created.

I also got this log in another log file

2016-04-15 20:05:47,914497 23751 [0x00007fd903de1700]   error KurentoWorkerPool         WorkerPool.cpp:41 workerThreadLoop()  Unexpected error while running the server: boost::filesystem::last_write_time: No such file or directory: "/var/log/kurento-media-server/media-server_2016-04-15_20-05-18.00049.pid23751.log"
2016-04-15 20:05:47,914513 23751 [0x00007fd90f7fe700]   debug KurentoMediaSet           MediaSet.cpp:470 async_delete()  Destroying HubPort -> d1975230-7fcb-498e-8d4a-bfc7afbf8bf2_kurento.MediaPipeline/528783df-ac14-47bc-a7ad-8044721563b9_kurento.HubPort
2016-04-15 20:05:47,914593 23751 [0x00007fd903de1700]   debug KurentoWorkerPool         WorkerPool.cpp:37 workerThreadLoop()  Working thread starting
2016-04-15 20:05:47,917583 23751 [0x00007fd903de1700]   debug KurentoMediaSet           MediaSet.cpp:470 async_delete()  Destroying Composite -> d1975230-7fcb-498e-8d4a-bfc7afbf8bf2_kurento.MediaPipeline/63c84bc4-0d42-4299-ae22-219c0a603837_kurento.Composite
2016-04-15 20:06:10,067442 23751 [0x00007fd9cbfff700]   debug KurentoMediaSet           MediaSet.cpp:131 doGarbageCollection()  Running garbage collector
2016-04-15 20:10:10,067626 23751 [0x00007fd9cbfff700]   debug KurentoMediaSet           MediaSet.cpp:131 doGarbageCollection()  Running garbage collector
2016-04-15 20:14:10,067789 23751 [0x00007fd9cbfff700]   debug KurentoMediaSet           MediaSet.cpp:131 doGarbageCollection()  Running garbage collector

Note:

I am using customized group call java example and I have written a kms filter where I have used vadfilter filter to get the non silent audio buffer, copy it into a gst_buffer object until silence is detected and then send it to a webservice using curl_easy_perform.

also when I use sudo netstat -np | grep "CLOSE_WAIT"

I see that the machine's which I am using for the group call meeting and the webservice IP address are listed in CLOSE_WAIT state but I am not sure if they are in CLOSE_WAIT because of the crash in my plugin or something else.

P.s: I have used gst_buffer_copy_deep() gst_buffer_get_memory gst_buffer_append_memory to copy subsequent audio buffers until silence is detected.

It will be really helpful if someone can point out anything related to the errors I am facing.

And there is more... I have enter link description here which you can refer to see the errors which I am getting. Thank you.

Edit 1:

namespace kurento
{
  namespace module
  {
    namespace vadcustomfilter
    {
      void VADCustomFilterImpl::newBufferHandler (GstBuffer * buffer)
      {
        try
        {
            //instead of doing audio_buffer=null we set a flag audio_buffer_null to TRUE when we want to make a new copy of audio_buffer
            if (!audio_buffer_null )
            {
                // dest, source, flag, offset, size
                //gst_buffer_copy_into (audio_buffer, buffer, GST_BUFFER_COPY_MEMORY, 0, -1);
                GstMemory *inbuf_memory = gst_buffer_get_memory(buffer,0);
                gst_buffer_append_memory(audio_buffer,inbuf_memory);
            }
            else
            {
              audio_buffer = gst_buffer_new ();
              audio_buffer = gst_buffer_copy_deep (buffer);
              audio_buffer_null = FALSE;
            }
        }
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR ("EXCEPTION: new buffer");
        }
      }


      void VADCustomFilterImpl::busMessage (GstMessage * message)
      {
        try
        {
            switch (message->type)
            {
                case GST_MESSAGE_EOS:
                {
                    GST_WARNING ("---------------> GST_MESSAGE_EOS");
                }
                break;

                case GST_MESSAGE_ELEMENT:
                {
                    const GstStructure *structure = gst_message_get_structure (message);
                    if (structure == NULL) return;

                    const gchar *name = gst_structure_get_name (structure);
                    if (name == NULL || strlen(name) == 0) return;


                    if (strcmp (name, "vadfilter") == 0)
                    {
                        //messages are voice , silence, max_voice_reached
                        const GValue *msg_value = NULL;
                        gint64 silence_buffer = 0;

                        gst_structure_get_int64(structure, "silence", &silence_buffer);
                        silence_buffer /= sizeof(gint16);

                        //silence_buffer is the size of buffer which was emitted in chunks before silence was detected
                        //use to check for min voice size

                        if (silence_buffer > 0)
                        {
                            //min_voice_buffer_size set from java server to set a limit of min buffer size which can be sent to the web service
                            if (audio_buffer != NULL && (silence_buffer > (gint64)min_voice_buffer_size))
                            {   
                                audio_buffer_copy = NULL;
                                audio_buffer_copy = gst_buffer_copy_deep(audio_buffer);

                                push_buffer_in_queue ();
                            }
                            return;
                        }

                        gint64 max_voice = 0;
                        gst_structure_get_int64 (structure, "max_voice_reached", &max_voice);

                        if (audio_buffer != NULL && (max_voice > 0))
                          {
                            audio_buffer_copy = NULL;
                            audio_buffer_copy = gst_buffer_copy_deep(audio_buffer);
                            push_buffer_in_queue ();
                          }
                        } /* vadfilter */
                    } /* message_element */
                    break;

                default:
                    break;

                } /* switch */
            }
          catch (std::bad_weak_ptr & e)
            {
              GST_ERROR("EXCEPTION: bus message");
            }
        } /* bus message */


      void VADCustomFilterImpl::push_buffer_in_queue ()
      {
        try 
        {
            GstMapInfo map;         
            if (audio_buffer_copy != NULL)
            {

                if (gst_buffer_map (audio_buffer_copy, &map, GST_MAP_READ))
                {
                    AUDIO_STREAM *pcm = new AUDIO_STREAM;
                    pcm->size = (unsigned int) map.size;
                    pcm->data = new unsigned char[pcm->size + 1];
                    std::memcpy (pcm->data, (unsigned char *) map.data, pcm->size);

                    gst_buffer_unmap (audio_buffer_copy, &map);

                    //set the flag for audio_buffer to create a new copy
                    audio_buffer_null = TRUE;
                    fetch_response(pcm);
                    if (pcm->data != NULL)
                    {               
                        pcm->size = 0;
                        pcm->data = NULL;
                    }

                    if (pcm != NULL)
                    {           
                        delete pcm;
                        pcm = NULL;
                    }               
                }
                else
                    GST_ERROR("push_buffer_in_queue: Invalid buffer");


            }   
        }
         catch (std::bad_weak_ptr & e)
        {
          GST_ERROR ("EXCEPTION::push_buffer_in_queue ");
        } 
      }

      VADCustomFilterImpl::VADCustomFilterImpl (const boost::property_tree::ptree &conf, 
                                                std::shared_ptr<MediaPipeline> mediaPipeline, 
                                                int minVoiceBufferSize, 
                                                int maxVoiceBufferSize, 
                                                int64_t minSilenceTime, 
                                                const std::string &url, 
                                                const std::string &language):FilterImpl
                                                (conf,
                                                 std::dynamic_pointer_cast < MediaObjectImpl > (mediaPipeline))
      {

        try
        {
            min_voice_buffer_size = (gsize) minVoiceBufferSize;
            curl_url = url ;   
            accept_language = (!language.empty()) ? ("Accept-Language: " + language) : ("Accept-Language: en-US");

            g_object_set (element, "filter-factory", "vadfilter", NULL);

            g_object_get(G_OBJECT(element), "filter", &vadfilter, NULL);

            if (vadfilter  == NULL)
              {
                throw KurentoException (MEDIA_OBJECT_NOT_AVAILABLE, "Media Object not available");
              }


            g_object_set(G_OBJECT(vadfilter), "minimum-silence-time", (guint64)minSilenceTime, NULL); //2000000000
            g_object_set(G_OBJECT(vadfilter), "max-voice-buffer-size", (guint)maxVoiceBufferSize, NULL);

            g_object_unref(vadfilter);

            g_async_queue_create ();

            curl_init ();
        }
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: constructor... ");
        } 
      }

      void VADCustomFilterImpl::postConstructor ()
      {
        try
        {
            GstBus *bus;

            std::shared_ptr < MediaPipelineImpl > pipe;

            FilterImpl::postConstructor ();

            pipe = std::dynamic_pointer_cast < MediaPipelineImpl >(getMediaPipeline ());

            bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline ()));

            /* register bus message handler */
            bus_handler_id = register_signal_handler (G_OBJECT (bus),
                                  "message",
                                  std::function <
                                  void (GstElement *,
                                    GstMessage *) >
                                  (std::
                                   bind
                                   (&VADCustomFilterImpl::
                                    busMessage, this,
                                    std::placeholders::_2)),
                                  std::dynamic_pointer_cast <
                                  VADCustomFilterImpl >
                                  (shared_from_this ()));

            /* register new_buffer handler */
            remove_silence_id = register_signal_handler(G_OBJECT(vadfilter),
                                     "buffer",
                                     std::function <
                                     void (GstElement *,
                                       GstBuffer *) >
                                     (std::
                                      bind
                                      (&VADCustomFilterImpl::
                                       newBufferHandler, this,
                                       std::placeholders::
                                       _2)),
                                     std::
                                     dynamic_pointer_cast <
                                     VADCustomFilterImpl >
                                     (shared_from_this ()));

            if (bus)
                g_object_unref (bus);   
        }
          catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: post constructor... ");
        } 
      }

      VADCustomFilterImpl::~VADCustomFilterImpl ()
      {
        std::shared_ptr < MediaPipelineImpl > pipe;

        try
        {           
            if (bus_handler_id > 0)
          {
            pipe = std::dynamic_pointer_cast < MediaPipelineImpl > (getMediaPipeline ());
            GstBus *bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline ()));
            if (bus)
            {
                unregister_signal_handler (bus, bus_handler_id);
                g_object_unref (bus);
            }

          }

          /*if (remove_silence_id > 0)
            {
              unregister_signal_handler (element, remove_silence_id);
            }
            */

          /* curl cleanup */
          curl_cleanup ();

          /* buffer queue cleanup */
            if (g_async_buffer_queue)
            {
                g_async_queue_unref (g_async_buffer_queue);
                 g_async_buffer_queue = NULL;
            }


          /* audio_buffer cleanup */
            if (audio_buffer_copy != NULL)
            {
                audio_buffer_copy = NULL;   
            }

            if (audio_buffer != NULL)
            {
                audio_buffer = NULL;
                audio_buffer_null = TRUE;
            }

        }
        catch (std::bad_weak_ptr & e)
        {
          remove_silence_id = -1;
          GST_ERROR("EXCEPTION: destructor ");
        }
      }


      size_t VADCustomFilterImpl::curl_callback (char *contents, size_t size,
                         size_t nmemb, void *userp)
      {
        size_t realsize = 0;  
        try
        {
            realsize = size * nmemb;    /* calculate buffer size */
            ((std::string *) userp)->append ((char *) contents, realsize);
        }
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: curl_callback");
        }
        return realsize;
      }

      int VADCustomFilterImpl::curl_init ()
      {
        /* init curl handle */
        if ((curl_handle = curl_easy_init ()) == NULL)
            return 1;


        /* set content type */
        headers = curl_slist_append (headers, "Accept: application/xml");

        headers = curl_slist_append (headers, "Transfer-Encoding: chunked");
        headers = curl_slist_append (headers, accept_language.c_str());     

        /* set curl options */
        curl_easy_setopt (curl_handle, CURLOPT_VERBOSE, 0);
        curl_easy_setopt (curl_handle, CURLOPT_POST, 1);
        curl_easy_setopt (curl_handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
        curl_easy_setopt (curl_handle, CURLOPT_HTTPHEADER, headers);
        curl_easy_setopt (curl_handle, CURLOPT_TIMEOUT, 10);          
        curl_easy_setopt (curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); /* set default user agent */                
        curl_easy_setopt (curl_handle, CURLOPT_URL, curl_url.c_str ()); /* set url to fetch */      

        return 0;
      }


      int VADCustomFilterImpl::curl_fetch (AUDIO_STREAM * pcm)
      {
        curl_op_in_progress = TRUE;

        if (curl_handle == NULL)
            return 1;

        CURLcode rcode;

        try
        {
            response.clear();
            curl_easy_setopt (curl_handle, CURLOPT_WRITEFUNCTION, curl_callback); /* set calback function */        
            curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &response);/* pass fetch struct pointer */
            curl_easy_setopt (curl_handle, CURLOPT_POSTFIELDSIZE, pcm->size);
            curl_easy_setopt (curl_handle, CURLOPT_POSTFIELDS,  pcm->data);

            rcode = curl_easy_perform(curl_handle);

            handle_curl_response(rcode);
        }   
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: curl_fetch ");
        }


        return 1;
      }

    void VADCustomFilterImpl::handle_curl_response(CURLcode rcode)
    {
        curl_op_in_progress = FALSE;
        switch (rcode)
        {
            case CURLE_OK:
            {

                if (!response.empty())
                {
                    SendresponseReceivedEvent(response);
                    response.clear();
                }
            }
                break;

            case CURLE_OPERATION_TIMEDOUT:
            case CURLE_HTTP_RETURNED_ERROR: /* HTTP Error, >= 400*/
            {
                GST_ERROR ("ERROR: handle_curl_response: (%s)", curl_easy_strerror (rcode));

                goto pop_pcm_from_queue;
            }
            break;

            default:
                GST_ERROR ("ERROR: Failed to fetch url (%s) - curl said: %s\n", curl_url.c_str (), curl_easy_strerror (rcode));
                break;
        } /* switch */



        pop_pcm_from_queue:
            if (g_async_queue_length (g_async_buffer_queue) > 0)
              {
                AUDIO_STREAM *pcm_popped = reinterpret_cast < AUDIO_STREAM * >(g_async_queue_pop (g_async_buffer_queue));
                curl_fetch (pcm_popped);

              }
            else
                GST_ERROR ("handle_curl_response: queue is empty, no pcm to pop out");
    }

    void VADCustomFilterImpl::curl_cleanup ()
    {
        if (curl_handle)
            curl_easy_cleanup (curl_handle);

        /* free headers */
        if (headers)
            curl_slist_free_all (headers);

        curl_url.clear ();
    }

    /* fetch the response from web service, if it is already fetching the push the current audio buffer in queue */
    void VADCustomFilterImpl::fetch_response(AUDIO_STREAM * pcm)
    {
    //check queue size if > 0 then push else hit curl
    if (!curl_op_in_progress)
      {
        curl_fetch (pcm);
      }
    else
      { 
        try 
        {           
            if (g_async_buffer_queue == NULL)
              g_async_queue_create ();

            g_async_queue_push (g_async_buffer_queue, pcm);
        }
        catch (std::bad_weak_ptr & e)
        {
          GST_ERROR("EXCEPTION: curl_fetch ");
        }

      }
    }

      void VADCustomFilterImpl::g_async_queue_create ()
      {
        g_async_buffer_queue = g_async_queue_new ();
      }
      }         /* vadcustomfilter */
   }        /* module */
}       /* kurento */

Solution

  • Giving the information you provide here and in this other related question. I'm pretty sure that the problem is that your filter is blocking the media flow (probably because of the fact that it is sending http requests). In addition all the errors you are having related with buffers and mini objects, indicate that you are doing something wrong in the filter while processing the media.

    You should review your code, and potentially consult with an expert to know the source of the problem. It could be hard to find just guessing with the information you provide without access to the source code. Even with the source code it could be difficult to determine what is causing the deadlock.