Search code examples
amazon-kinesisaws-xray

How can use the Java Kinesis Client Library together with X-Ray?


When adding aws-xray-recorder-sdk-aws-sdk-instrumentor the KCL raises a SegmentNotFoundException.

As I understand this is because the KCL is starting their own threads so my calls to AWSXRay.beginSegment() do not apply to the instrumented requests that are run on those threads but I have not direct control over those threads so I cannot set the context / segment.

Is there any workaround for this?


Solution

  • The KCL allows to provide the your own instantiated AmazonKinesis client, AmazonDynamoDb client and AmazonCloudWatch client.

    You can instantiate your own clients (using AmazonKinesisClientBuilder,etc) , add a withRequestHandler() and provide a IRequestHandler2 instance that calls AWSXRay.beginSegment() on the beforeRequest() and calls AWSXRay.endSegment() on afterResponse(). In that way the request handler gives the opportunity to run your own user code on the threads that the different KCL ExecutorService instances are creating.

    An example of such request handler:

    class XRayTracingHandler extends TracingHandler {
        private final String name;
    
        XRayTracingHandler(String name) {
            super(AWSXRay.getGlobalRecorder());
            this.name = name;
        }
    
        @Override
        public void beforeRequest(Request<?> request) {
            AWSXRay.beginSegment(name);
            super.beforeRequest(request);
        }
    
        @Override
        public void afterResponse(Request<?> request, Response<?> response) {
            super.afterResponse(request, response);
            AWSXRay.endSegment();
        }
    }
    

    That you can use like this:

      final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(
        "my-app-name",
        "my-kinesis-stream",
        new DefaultAWSCredentialsProviderChain(), UUID.randomUUID());
    
    
      final IRecordProcessorFactory recordProcessorFactory = ...
    
    
      final AmazonCloudWatch cloudWatchClient = AmazonCloudWatchClientBuilder
        .standard()
        .withRequestHandlers(new XRayTracingHandler("my-segment-listener"))
        .build();
    
      final AmazonKinesis kinesisClient = AmazonKinesisClientBuilder
        .standard()
        .withRequestHandlers(new XRayTracingHandler("my-segment-listener"))
        .build();
    
      final AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder
        .standard()
        .withRequestHandlers(new XRayTracingHandler("my-segment-listener"))
        .build();
    
      final Worker worker = new Worker.Builder()
        .recordProcessorFactory(recordProcessorFactory)
        .config(config)
        .kinesisClient(kinesisClient)
        .dynamoDBClient(dynamoDBClient)
        .cloudWatchClient(cloudWatchClient)
        .build();