Search code examples
c#azure.net-coreazure-application-insightstelemetry

When using Application Insights sampling is it possible to preserve related entries along with excluded items


I've been playing around with telemetry sampling in order to avoid hitting App Insights daily logs cap. Ideally, I want to apply sampling to everything but exclude exceptions and preserve related traces (same operation id) for the exceptions.

I've created a sample console app to test things and so far I can successfully sample and preserve exceptions. But related traces get sampled as well.

I looked at implementing custom ITelemetryProcessor, but it processes one entry at a time. So I'm not sure if it is even possible with custom processor. Maybe there is something that help achieve desired behavior.

The Program.cs code is below

using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

const string appInsightConnString = "<connection string>";
const double samplingPercentage = 50;

var services = new ServiceCollection();

// add context
var context = new Context();
services.AddSingleton(context);

// configure application insights
services.AddApplicationInsightsTelemetryWorkerService(
    (options) =>
    {
        // disable adaptive sampling
        options.EnableAdaptiveSampling = false;
        options.ConnectionString = appInsightConnString;
    });

// configure logging
services.AddLogging(loggingBuilder =>
{
    loggingBuilder.ClearProviders();
    loggingBuilder.Services.AddSingleton<ILoggerProvider, ContextApplicationInsightsLoggerProvider>();
    loggingBuilder.AddConsole();
});

var serviceProvider = services.BuildServiceProvider();

// setup sampling
var telemetryConfiguration = serviceProvider.GetRequiredService<TelemetryConfiguration>();
var telemetryBuilder = telemetryConfiguration.DefaultTelemetrySink.TelemetryProcessorChainBuilder;
telemetryBuilder.UseSampling(samplingPercentage, excludedTypes: "Exception");
telemetryBuilder.Build();

// get logger
var logger = serviceProvider.GetRequiredService<ILogger<Program>>();

// do something important
DoWork(context, logger);

// explicitly call Flush() followed by sleep is required in console apps.
// this is to ensure that even if application terminates, telemetry is sent to the back-end.
var telemetryClient = serviceProvider.GetRequiredService<TelemetryClient>();
telemetryClient.Flush();
Task.Delay(10000).Wait();

Console.WriteLine("Flushed. Press any key to exit");
Console.ReadKey();


static void DoWork(Context context, ILogger logger)
{
    const int iterations = 50;
    const int errors = 15;

    // session Id to filter logs
    var sessionId = Guid.NewGuid().ToString();
    Console.WriteLine($"Session Id: {sessionId}");

    // randomize errors
    var random = new Random();
    var errorsHash = new HashSet<int>();
    while (errorsHash.Count < errors)
    {
        errorsHash.Add(random.Next(0, iterations));
    }

    // log
    for (var i = 0; i < iterations; i++)
    {
        context.CorrelationId = Guid.NewGuid().ToString();
        logger.LogInformation($"Begin operation: {context.CorrelationId}. Session Id: {sessionId}");
        if (errorsHash.Contains(i))
            logger.LogError(new Exception("test ex"), $"Error operation: {context.CorrelationId}. Session Id: {sessionId}");
        logger.LogInformation($"End operation: {context.CorrelationId}. Session Id: {sessionId}");
    }
}

Solution

  • Just wanted to post what I've came up so far after looking more into implementing custom telemetry processor. The idea to keep accumulating buckets of telemetry for some time before making sampling decision. It creates a bit of lag, but that's the best I've got so far.

    using Microsoft.ApplicationInsights.Channel;
    using Microsoft.ApplicationInsights.DataContracts;
    using Microsoft.ApplicationInsights.Extensibility;
    using Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel;
    using Microsoft.Extensions.Caching.Memory;
    using Microsoft.Extensions.Primitives;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    
    /// <summary>
    /// Sampes only successful telemetry chains
    /// </summary>
    public class SuccessfulSamplingTelemetryProcessor : ITelemetryProcessor
    {
        private readonly TimeSpan _bufferedTime;
        private readonly ITelemetryProcessor _next;
        private readonly SamplingTelemetryProcessor _samplingTelemetryProcessor;
        private readonly MemoryCache _cache;
    
        public SuccessfulSamplingTelemetryProcessor(double successfulSamplingPercentage, TimeSpan bufferedTime, ITelemetryProcessor next)
        {
            _bufferedTime = bufferedTime;
            _next = next;
            _samplingTelemetryProcessor = new SamplingTelemetryProcessor(next)
            {
                SamplingPercentage = successfulSamplingPercentage
            };
            _cache = new MemoryCache(new MemoryCacheOptions());
        }
    
        public void Process(ITelemetry item)
        {
            // get operation id 
            var operationId = item.Context.Operation.Id;
            if (string.IsNullOrEmpty(operationId))
            {
                // sample by default without correlation id
                _samplingTelemetryProcessor.Process(item);
                return;
            }
    
            var queue = _cache.GetOrCreate(operationId, entry =>
            {
                // expiration doesnt work as I'd expected, so use cancellation token instead
                // see https://stackoverflow.com/questions/42535408/net-core-memorycache-postevictioncallback-not-working-properly
                var expirationToken = new CancellationChangeToken(new CancellationTokenSource(_bufferedTime).Token);
                entry.AbsoluteExpirationRelativeToNow = _bufferedTime;
                entry.Priority = CacheItemPriority.NeverRemove;
                entry.AddExpirationToken(expirationToken);
                entry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration
                {
                    EvictionCallback = OnPostEviction
                });
                return new ConcurrentQueue<ITelemetry>();
            });
    
            // just add to the queue for now
            queue.Enqueue(item);
        }
    
        private void OnPostEviction(object key, object? value, EvictionReason reason, object? state)
        {
            var queue = (ConcurrentQueue<ITelemetry>)value;
    
            // check if there is exception in the chain 
            var hasException = queue.Any(t => t is ExceptionTelemetry);
            while (queue.TryDequeue(out var telemetry))
            {
                if (hasException)
                {
                    // pass through
                    _next.Process(telemetry);
                }
                else
                {
                    // apply sampling
                    _samplingTelemetryProcessor.Process(telemetry);
                }
            }   
        }
    }