Search code examples
amazon-web-servicesaws-lambdaserverless-frameworkaws-msk

Connect to Amazon MSK cluster


I’m trying to setup an Amazon MSK cluster and connect to it from a lambda function. The lambda function will be a producer of messages, not a consumer.

I am using the serverless framework to provision everything and in my serverless.yml I have added the following and that seems to be working fine.

    MSK:
      Type: AWS::MSK::Cluster
      Properties:
        ClusterName: kafkaOne
        KafkaVersion: 2.2.1
        NumberOfBrokerNodes: 3
        BrokerNodeGroupInfo:
          InstanceType: kafka.t3.small
          ClientSubnets:
            - Ref: PrivateSubnet1
            - Ref: PrivateSubnet2
            - Ref: PrivateSubnet3

But when trying to connect to this cluster to actually send messages I am unsure how to get the connection string here? I presume it should be the ZookeeperConnectString? I’m new to kafka/msk so maybe I am not seeing something obvious.

Any advice much appreciated. Cheers.


Solution

  • I don't know what kind of code base u are using, so I will add my code which I wrote in GO.

    In essence you should connect to MSK cluster the same way as you would connect to some stand alone Kafka instance. We are using brokers for "connecting" or better said writing to MSK cluster.

    I'm using segmentio/kafka-go library. My function for sending event to MSK cluster looks like this

    // Add event
    func addEvent(ctx context.Context, requestBody RequestBodyType) (bool, error) {
    
        // Prepare dialer
        dialer := &kafka.Dialer{
            Timeout:   2 * time.Second,
            DualStack: true,
        }
    
        brokers := []string{os.Getenv("KAFKA_BROKER_1"), os.Getenv("KAFKA_BROKER_2"), os.Getenv("KAFKA_BROKER_3"), os.Getenv("KAFKA_BROKER_4")}
    
    
        // Prepare writer config
        kafkaConfig := kafka.WriterConfig{
            Brokers:  brokers,
            Topic:    os.Getenv("KAFKA_TOPIC"),
            Balancer: &kafka.Hash{},
            Dialer:   dialer,
        }
    
        // Prepare writer
        w := kafka.NewWriter(kafkaConfig)
    
    
        // Convert struct to json string
        event, err := json.Marshal(requestBody)
        if err != nil {
            fmt.Println("Convert struct to json for writing to KAFKA failed")
            panic(err)
        }
    
        // Write message
        writeError := w.WriteMessages(ctx,
            kafka.Message{
                Key:   []byte(requestBody.Event),
                Value: []byte(event),
            },
        )
        if writeError != nil {
            fmt.Println("ERROR WRITING EVENT TO KAFKA")
            panic("could not write message " + err.Error())
        }
    
        return true, nil
    }
    

    My serverless.yml

    Upper code (addEvent) belongs to functions -> postEvent in serverless.yml... If you are consuming from kafka, then you should check functions -> processEvent. Consuming event is fairly simple, but setting everything up for producing to Kafka it crazy. We are probably working on this for month and a half and still figuring out how everything should be set up. Sadly serverless does not do everything for you, so you will have to "click trough" manually in AWS, but we compared to other frameworks and serverless is still the best right now

    provider:
      name: aws
      runtime: go1.x
      stage: dev
      profile: ${env:AWS_PROFILE}
      region: ${env:REGION}
      apiName: my-app-${sls:stage}
      lambdaHashingVersion: 20201221
      environment:
        ENV: ${env:ENV}
        KAFKA_TOPIC: ${env:KAFKA_TOPIC}
        KAFKA_BROKER_1: ${env:KAFKA_BROKER_1}
        KAFKA_BROKER_2: ${env:KAFKA_BROKER_2}
        KAFKA_BROKER_3: ${env:KAFKA_BROKER_3}
        KAFKA_BROKER_4: ${env:KAFKA_BROKER_4}
        KAFKA_ARN: ${env:KAFKA_ARN}
        ACCESS_CONTROL_ORIGINS: ${env:ACCESS_CONTROL_ORIGINS}
        ACCESS_CONTROL_HEADERS: ${env:ACCESS_CONTROL_HEADERS}
        ACCESS_CONTROL_METHODS: ${env:ACCESS_CONTROL_METHODS}
        BATCH_SIZE: ${env:BATCH_SIZE}
        SLACK_API_TOKEN: ${env:SLACK_API_TOKEN}
        SLACK_CHANNEL_ID: ${env:SLACK_CHANNEL_ID}
      httpApi:
        cors: true
      apiGateway:
        resourcePolicy:
          - Effect: Allow
            Action: '*'
            Resource: '*'
            Principal: '*'
      vpc:
        securityGroupIds:
          - sg-*********
        subnetIds:
          - subnet-******
          - subnet-*******
    
    functions:
      postEvent:
        handler: bin/postEvent
        package:
          patterns:
            - bin/postEvent
        events:
          - http:
              path: event
              method: post
              cors:
                origin: ${env:ACCESS_CONTROL_ORIGINS}
                headers:
                  - Content-Type
                  - Content-Length
                  - Accept-Encoding
                  - Origin
                  - Referer
                  - Authorization
                  - X-CSRF-Token
                  - X-Amz-Date
                  - X-Api-Key
                  - X-Amz-Security-Token
                  - X-Amz-User-Agent
                allowCredentials: false
                methods:
                  - OPTIONS
                  - POST
      processEvent:
        handler: bin/processEvent
        package:
          patterns:
            - bin/processEvent
        events:
          - msk:
              arn: ${env:KAFKA_ARN}
              topic: ${env:KAFKA_TOPIC}
              batchSize: ${env:BATCH_SIZE}
              startingPosition: LATEST
    resources:
      Resources:
        GatewayResponseDefault4XX:
          Type: 'AWS::ApiGateway::GatewayResponse'
          Properties:
            ResponseParameters:
              gatewayresponse.header.Access-Control-Allow-Origin: "'*'"
              gatewayresponse.header.Access-Control-Allow-Headers: "'*'"
            ResponseType: DEFAULT_4XX
            RestApiId:
              Ref: 'ApiGatewayRestApi'
        myDefaultRole:
          Type: AWS::IAM::Role
          Properties:
            Path: /
            RoleName: my-app-dev-eu-serverless-lambdaRole-${sls:stage} # required if you want to use 'serverless deploy --function' later on
            AssumeRolePolicyDocument:
              Version: '2012-10-17'
              Statement:
                - Effect: Allow
                  Principal:
                    Service:
                      - lambda.amazonaws.com
                  Action: sts:AssumeRole
            # note that these rights are needed if you want your function to be able to communicate with resources within your vpc
            ManagedPolicyArns:
              - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
              - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
    

    I must warn you that we spend a lot of time figuring out how to properly setup VPC and other networking / permission stuff. My collage will write blog post once he arrivers from vacation. :) I hope this helps you some how. Best of luck ;)

    UPDATE

    If you are using javascript, then you would connect to Kafka similar to this

    const { Kafka } = require('kafkajs')
    
    const kafka = new Kafka({
      clientId: 'order-app',
      brokers: [
        'broker1:port',
        'broker2:port',
      ],
      ssl: true, // false
    })