Search code examples
azurekubernetesazure-eventhubkeda

keda is unable to get eventhub metadata: no storage connection string given


I am currently trying to use Keda to start a job after having received an event from Azure Event Hub. When I deploy my scaledjob, I see the message (with kubectl describe):

Warning  KEDAScalerFailed      4m29s (x17 over 9m57s)  scale-handler  unable to get eventhub metadata: no storage connection string given

Here is the scaledjob I'm using, it is from the sample we can find in keda's website

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
  name: pi-az-consumer
  namespace: default
spec:
  jobTargetRef:
    template:
      spec:
        containers:
        - name: pi
          image: perl:5.34.0
          command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
        restartPolicy: Never
    backoffLimit: 4  
  pollingInterval: 10             # Optional. Default: 30 seconds
  maxReplicaCount: 30             # Optional. Default: 100
  successfulJobsHistoryLimit: 3   # Optional. Default: 100. How many completed jobs should be kept.
  failedJobsHistoryLimit: 2       # Optional. Default: 100. How many failed jobs should be kept.
  scalingStrategy:
    strategy: "custom"                        # Optional. Default: default. Which Scaling Strategy to use.
    customScalingQueueLengthDeduction: 1      # Optional. A parameter to optimize custom ScalingStrategy.
    customScalingRunningJobPercentage: "0.5"  # Optional. A parameter to optimize custom ScalingStrategy.
  triggers:
  - type: azure-eventhub
    metadata:
      connectionFromEnv: "Endpoint=sb://{...}.servicebus.windows.net/;SharedAccessKeyName={...};SharedAccessKey={...};EntityPath={...}"
      storageConnectionFromEnv: "DefaultEndpointsProtocol=https;AccountName={...};AccountKey={...};EndpointSuffix=core.windows.net"
      consumerGroup: $Default
      unprocessedEventThreshold: '64'
      activationUnprocessedEventThreshold: '10'
      blobContainer: '{...}'

From what I've found on the internet, the connections are what I should have used. Is the problem with my connections? is it elsewhere? I was expecting a connection or an authentication error but I do not understand why I am told that I did not give the connection string.

Thank you if you have any idea where the problem can come from, I will be keeping this question updated if I find the answer.


Solution

  • The error no storage connection string given is likely due to how the connection strings are being referenced in your ScaledJob configuration.

    You can follow the below steps to get your event data. Things you will need-

    An Event Hub namespace

    az eventhubs namespace create --name $EVENTHUB_NAMESPACE --resource-group $RESOURCE_GROUP --location eastus
    

    An Event Hub

    az eventhubs eventhub create --name $EVENTHUB_NAME --namespace-name $EVENTHUB_NAMESPACE --resource-group $RESOURCE_GROUP
    

    A Storage Account

    az storage account create --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location eastus --sku Standard_LRS
    

    A Blob Container

    az storage container create --name $CONTAINER_NAME --account-name $STORAGE_ACCOUNT_NAME
    

    enter image description here

    Once created, retrieve the connection strings for the Event Hub and the Storage Account.

    EVENTHUB_CONNECTION_STRING=$(az eventhubs namespace authorization-rule keys list --resource-group $RESOURCE_GROUP --namespace-name $EVENTHUB_NAMESPACE --name RootManageSharedAccessKey --query primaryConnectionString --output tsv)
    
    
    STORAGE_CONNECTION_STRING=$(az storage account show-connection-string --name $STORAGE_ACCOUNT_NAME --query connectionString --output tsv)
    
    
    echo $EVENTHUB_CONNECTION_STRING
    echo $STORAGE_CONNECTION_STRING
    

    enter image description here

    Store the connection strings in a Kubernetes secret.

    kubectl create secret generic eventhub-secrets \
      --from-literal=eventhub-connection-string=$EVENTHUB_CONNECTION_STRING \
      --from-literal=storage-connection-string=$STORAGE_CONNECTION_STRING \
      --namespace default
      
    

    enter image description here

    Install Keda

    enter image description here

    Create the ScaledJob with KEDA

    apiVersion: keda.sh/v1alpha1
    kind: ScaledJob
    metadata:
      name: pi-az-consumer
      namespace: default
    spec:
      jobTargetRef:
        template:
          spec:
            containers:
            - name: pi
              image: perl:5.34.0
              command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
              env:
              - name: EVENTHUB_CONNECTION_STRING
                valueFrom:
                  secretKeyRef:
                    name: eventhub-secrets
                    key: eventhub-connection-string
              - name: STORAGE_CONNECTION_STRING
                valueFrom:
                  secretKeyRef:
                    name: eventhub-secrets
                    key: storage-connection-string
            restartPolicy: Never
        backoffLimit: 4  
      pollingInterval: 10             # Optional. Default: 30 seconds
      maxReplicaCount: 30             # Optional. Default: 100
      successfulJobsHistoryLimit: 3   # Optional. Default: 100. How many completed jobs should be kept.
      failedJobsHistoryLimit: 2       # Optional. Default: 100. How many failed jobs should be kept.
      scalingStrategy:
        strategy: "custom"                        # Optional. Default: default. Which Scaling Strategy to use.
        customScalingQueueLengthDeduction: 1      # Optional. A parameter to optimize custom ScalingStrategy.
        customScalingRunningJobPercentage: "0.5"  # Optional. A parameter to optimize custom ScalingStrategy.
      triggers:
      - type: azure-eventhub
        metadata:
          eventHubName: "arkoeventhub"            # Add this line
          connectionFromEnv: "EVENTHUB_CONNECTION_STRING"
          storageConnectionFromEnv: "STORAGE_CONNECTION_STRING"
          consumerGroup: "$Default"
          unprocessedEventThreshold: '64'
          activationUnprocessedEventThreshold: '10'
          blobContainer: 'arkocontainer'
    
    
    

    apply the same kubectl apply -f scaledjob.yaml and now you can send Test Events to Event Hub.

    I am using a python script as an example to trigger event-

    from azure.eventhub import EventHubProducerClient, EventData
    
    # Replace with your Event Hub connection string and Event Hub name
    connection_str = 'Endpoint=sb://arkoeventhubns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=RQd12345iDO7JOEpfL3khbNGlpZ0QhnP+abcd=;EntityPath=arkoeventhub'
    eventhub_name = 'arkoeventhub'
    
    producer = EventHubProducerClient.from_connection_string(conn_str=connection_str, eventhub_name=eventhub_name)
    
    event_data_batch = producer.create_batch()
    
    for i in range(10):
        event_data_batch.add(EventData(f"Test message {i+1}"))
    
    
    producer.send_batch(event_data_batch)
    
    print("Messages sent successfully.")
    

    enter image description here

    Reference- Keda eventhub