Search code examples
amazon-web-servicesapache-kafkaapache-kafka-connectconfluent-platformamazon-kinesis

Kinesis Kafka Source Connector - Credentials


I am working on a POC using Confluent platform and trying to connect Kinesis in my AWS account to send data to Kafka running on Confluent platform (setup using Docker compose). I have used the AWS Kinesis connector available with Confluent. I am using trial version of the connector valid for 30 days.

I have setup the KinesisSourceConnector plugin from https://www.confluent.io/hub/confluentinc/kafka-connect-kinesis

The Source connector configuration has credentials configuration available for AWS Access Key Id, AWS Secret Key Id However, it does not have a configuration parameter for AWS Session Token. Is there any way to set this up since my AWS account can only be accessed using STS ?

I have tried adding an additional property aws_access_key_id but with no success.

Error description - The provided credentials are invalid: The security token included in the request is invalid. (Service: AWSSecurityTokenService; Status Code: 403; Error Code: InvalidClientTokenId; Request ID: d893039b-d4f3-4de3-95ef-ede233b0885c)


Solution

  • Thanks to @OneCricketeer for helping find an answer

    Add environment variables to the Connect server's Java process for security reasons, or have ~/.aws/credentials file on the Connect worker servers

    1. Create a .env file in the folder where you will run Kafka connect
    2. Setup the aws credentials in the .env file (AWS_SESSION_TOKEN, AWS_SECRET_ACCESS_KEY, AWS_ACCESS_KEY_ID, AWS_DEFAULT_REGION)
    3. Modify the docker-compose yml file to add the environment variables for Kafka connect
    connect:
        image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
        hostname: connect
        container_name: connect
        depends_on:
          - broker
          - schema-registry
        ports:
          - "8083:8083"
    
        environment:
          ...
          AWS_SESSION_TOKEN: '${AWS_SESSION_TOKEN}'
          AWS_SECRET_ACCESS_KEY: '${AWS_SECRET_ACCESS_KEY}'
          AWS_ACCESS_KEY_ID: '${AWS_ACCESS_KEY_ID}'
          AWS_DEFAULT_REGION: '${AWS_DEFAULT_REGION}'
    
    1. Restart Kafka connect