Search code examples
amazon-web-servicesgrafana-lokipromtail

When storing AWS load balancer logs in S3, how do I pass the S3 prefix to Loki when using lambda-promtail?


I am ingesting application load balancer logs into Loki using lambda-promtail. This works great, however, the default labels only include the account and load balancer ID. I would like to use the S3 prefix as a label to use as variable in Grafana (I could use the domain as a filter but this doesn't really work for my use case).

I have searched through the docs and Github but have been unable to find a way to do this.


Solution

  • Promtail does not send the S3 key or prefix by default. You need to modify the Lambda code yourself. I added an example implementation but haven't tested it myself.

    Firstly, extracting the prefix from the S3 event.

    func getLabels(record events.S3EventRecord) (map[string]string, error) {
    
        labels := make(map[string]string)
    
        labels["key"] = record.S3.Object.Key
    
        // Define a regular expression to find the prefix before "AWSLogs"
        prefixRegex := regexp.MustCompile(`^(.*?)AWSLogs`)
        prefixMatch := prefixRegex.FindStringSubmatch(labels["key"])
        // The first submatch is the entire match, and the second one is the captured group
        labels["prefix"] = prefixMatch[1]
    
        labels["bucket"] = record.S3.Bucket.Name
        labels["bucket_owner"] = record.S3.Bucket.OwnerIdentity.PrincipalID
        labels["bucket_region"] = record.AWSRegion
        for key, p := range parsers {
            if p.filenameRegex.MatchString(labels["key"]) {
                if labels["type"] == "" {
                    labels["type"] = key
                }
                match := p.filenameRegex.FindStringSubmatch(labels["key"])
                for i, name := range p.filenameRegex.SubexpNames() {
                    if i != 0 && name != "" && match[i] != "" {
                        labels[name] = match[i]
                    }
                }
            }
        }
        if labels["type"] == "" {
            return labels, fmt.Errorf("type of S3 event could not be determined for object %q", record.S3.Object.Key)
        }
        return labels, nil
    }
    

    Then adding the prefix as a label.

    func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser, log *log.Logger) error {
        parser, ok := parsers[labels["type"]]
        if !ok {
            if labels["type"] == CLOUDTRAIL_DIGEST_LOG_TYPE {
                return nil
            }
            return fmt.Errorf("could not find parser for type %s", labels["type"])
        }
        gzreader, err := gzip.NewReader(obj)
        if err != nil {
            return err
        }
    
        scanner := bufio.NewScanner(gzreader)
    
        ls := model.LabelSet{
            model.LabelName("__aws_log_type"):                                   model.LabelValue(parser.logTypeLabel),
            model.LabelName("__aws_s3_prefix"):                                   model.LabelValue(labels["prefix"]),
            model.LabelName(fmt.Sprintf("__aws_%s", parser.logTypeLabel)):       model.LabelValue(labels["src"]),
            model.LabelName(fmt.Sprintf("__aws_%s_owner", parser.logTypeLabel)): model.LabelValue(labels[parser.ownerLabelKey]),
        }
    
        ls = applyLabels(ls)
    
      // ...
    }
    

    The data sent by default with lambda-promtail are the log line from the S3 object and attached labels, for example:

    • __aws_log_type = s3_lb The service that created the log
    • __aws_s3_lb = my-alb: The resource that created the log
    • __aws_s3_lb_owner = 123456789: The AWS account ID of the S3 bucket.

    You can also attach static hardcoded values in EXTRA_LABELS.