Search code examples
craspberry-pikaa

Sending log schema to kaa server from RPI3 with optional fields


I am trying to send a log schema from raspberry pi C application, to back end kaa server. Here is the schema

{
"type" : "record",
"name" : "RemoteSensorLog",
"namespace" : "org.kaa.iot.log.sensor",
"fields" : [ {
"name" : "deviceId",
"type" : {
  "type" : "string",
  "avro.java.string" : "String"
}
}, {
  "name" : "temperature",
  "type" : [ "double", "null" ]
}, {
  "name" : "humidity",
  "type" : [ "long", "null" ]
}, {
  "name" : "batteryLevel",
  "type" : [ "int", "null" ]
} ],
"version" : 1,
"dependencies" : [ ],
"displayName" : "RemoteSensorLog",
"description" : "This is the log sent by remote sensors"
}

Some of the items in the log schema are optional, Here is the initialization function

void kaaLogInitializing(void *context)
{
    void *log_storage_context = NULL;
    void *log_upload_strategy_context = NULL;

    printf("Initializing the Kaa log\n");

    kaa_client_t * kaa_client_context = context;

    if (context == NULL) {
            return;
    }

    /* Log delivery listener callbacks. Each callback called whenever something happen with a log bucket. */
    kaa_log_delivery_listener_t log_listener = {
            .on_success = success_log_delivery_callback, /* Called if log delivered successfully */
            .on_failed  = failed_log_delivery_callback, /* Called if delivery failed */
            .on_timeout = timeout_log_delivery_callback, /* Called if timeout occurs */
            .ctx        = kaa_client_context, /* Optional context */
    };

    /* The internal memory log storage distributed with Kaa SDK */
    kaa_error_t error_code = ext_unlimited_log_storage_create(&log_storage_context,
                                                              kaa_client_get_context(
                                                                      kaa_client_context
                                                                      )->logger
                                                              );

    if (error_code) {
            printf("Failed to create Kaa log storage %d\r\n", error_code);
            return;
    }

    error_code = ext_log_upload_strategy_create(kaa_client_get_context(
                                                        kaa_client_context),
                                                &log_upload_strategy_context, KAA_LOG_UPLOAD_VOLUME_STRATEGY);

    if (error_code) {
            printf("Failed to create log upload strategy, error code %d\r\n", error_code);
            return;
    }

    error_code = ext_log_upload_strategy_set_threshold_count(log_upload_strategy_context,
                                                             LOG_UPLOAD_THRESHOLD);

    if (error_code) {
            printf("Failed to set threshold log record count, error code %d\r\n", error_code);
            return;
    }

    error_code = kaa_logging_set_strategy(kaa_client_get_context(kaa_client_context)->log_collector,
                                          log_upload_strategy_context);

    if (error_code) {
            printf("Failed to set log upload strategy, error code %d\r\n", error_code);
            return;
    }

    /* Specify log bucket size constraints */
    kaa_log_bucket_constraints_t bucket_sizes = {
            .max_bucket_size       = MAX_LOG_BUCKET_SIZE, /* Bucket size in bytes */
            .max_bucket_log_count  = MAX_LOG_COUNT, /* Maximum log count in one bucket */
    };

    /* Initialize the log storage and strategy (by default it is not set) */
    error_code = kaa_logging_init(kaa_client_get_context(
                                          kaa_client_context)->log_collector
                                  , log_storage_context
                                  , log_upload_strategy_context
                                  , &bucket_sizes);

    if (error_code) {
            printf("Failed to initialize Kaa log %d\r\n", error_code);
            return;
    }

    /* Add listeners to a log collector */
    kaa_logging_set_listeners(kaa_client_get_context(
                                      kaa_client_context)->log_collector,
                              &log_listener);
}

Here is the function I use to send log

void sendLog(void *context)
{
    kaa_client_t * kaa_client_context = context;
    float temperature = 25.5;

    if (context == NULL) {
            return;
    }

    logDelivered = LOG_DELIVERY_DELIVERING;

            printf("Start attempt to send Log\n");

    kaa_logging_remote_sensor_log_t *log_record = kaa_logging_remote_sensor_log_create();

    log_record->device_id = kaa_string_copy_create("Dev1");
    log_record->temperature = kaa_logging_union_double_or_null_branch_0_create();
    log_record->temperature->data = &temperature; /* create subobject */

    log_record->humidity = kaa_logging_union_long_or_null_branch_1_create();
    log_record->battery_level = kaa_logging_union_int_or_null_branch_1_create();

            printf("Log record created\n");
    /* Log information. Populated when log is added via kaa_logging_add_record() */
    kaa_log_record_info_t log_info;

    kaa_error_t error = kaa_logging_add_record(
            kaa_client_get_context(kaa_client_context)->log_collector,
            log_record, &log_info);

    if (error) {
            printf("Failed to add log record, error code\r\n");
            kaa_client_stop(kaa_client_context);
            return;
    }

    //log_record->destroy(log_record);
}

I have 2 problems

  • Problem1 #### : if I uncomment the last line in sendLog function log_record->destroy(log_record); I got this error double free or corruption (out): 0x7efe05
  • Problem2 #### : after commenting the mentioned line and running the application I never get any error or the server get the log nothing seems to happen neither I receive log was sent successfully or failure or timeout.

Solution

  • You need to manually allocate memory to store the temperature value. It will be freed in thelog_record->destroy(log_record).

    So, you need to do something like this:

    double *p_temperature = KAA_MALLOC(sizeof(double));
    if (!p_temperature) {
        // error handling
    }
    
    *p_temperature = 25.5;
    log_record->temperature->data = p_temperature;