Search code examples
amazon-web-servicesgoamazon-cloudwatchamazon-kinesis

Unable to decode CloudWatch Logs from Kinesis stream


I am trying to "funnel" my cloudwatch logs through Kinesis and then to lambda for processing, however I cannot find a way to decode/parse the incoming logs. So far I have tried this:

Method 1 using cloudwatch "class"

func function(request events.KinesisEvent) error {
    for _, record := range request.Records {
        fmt.Println(record.EventName)
        fmt.Println(string(record.Kinesis.Data))

        rawData := events.CloudwatchLogsRawData{
            Data: string(record.Kinesis.Data),
        }

        parse, err := rawData.Parse()
        fmt.Println(parse)
        fmt.Println(err)
    }
    return nil
}

func main() {
    lambda.Start(function)
}

Method 2 manual decoding

var logData events.CloudwatchLogsData

func Base64Decode(message []byte) (b []byte, err error) {
    var l int
    b = make([]byte, base64.StdEncoding.DecodedLen(len(message)))
    l, err = base64.StdEncoding.Decode(b, message)
    if err != nil {
        return
    }
    return b[:l], nil
}

func Parse(rawData []byte, d events.CloudwatchLogsData) (err error) {
    data, err := Base64Decode(rawData)
    if err != nil {
        return
    }

    zr, err := gzip.NewReader(bytes.NewBuffer(data))
    if err != nil {
        return
    }
    defer zr.Close()
    fmt.Println(zr)
    dec := json.NewDecoder(zr)
    err = dec.Decode(&d)

    return
}

func function(request events.KinesisEvent) error {
    for _, record := range request.Records {
        fmt.Println(record.EventName)
        fmt.Println(string(record.Kinesis.Data))

        err = Parse(record.Kinesis.Data, logData)
        fmt.Println(err)
        fmt.Println(logData)
    }
    return nil
}

func main() {
    lambda.Start(function)
}

Both of them I get the same error:

illegal base64 data at input byte 0

So as my understanding the log format received in in Base64 and compressed, but I cannot find anything online specifically for Go.

EDIT:

Added logData type

// CloudwatchLogsData is an unmarshal'd, ungzip'd, cloudwatch logs event
type CloudwatchLogsData struct {
    Owner               string                   `json:"owner"`
    LogGroup            string                   `json:"logGroup"`
    LogStream           string                   `json:"logStream"`
    SubscriptionFilters []string                 `json:"subscriptionFilters"`
    MessageType         string                   `json:"messageType"`
    LogEvents           []CloudwatchLogsLogEvent `json:"logEvents"`
}

The Base64 decoded and decompressed data is formatted as JSON with the following structure: (According to AWS: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html)

{
    "owner": "111111111111",
    "logGroup": "logGroup_name",
    "logStream": "111111111111_logGroup_name_us-east-1",
    "subscriptionFilters": [
        "Destination"
    ],
    "messageType": "DATA_MESSAGE",
    "logEvents": [
        {
            "id": "31953106606966983378809025079804211143289615424298221568",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221569",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221570",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        }
    ]
}

Solution

  • Ok, turns out that I did not have to decode from base64, but simply uncompress the data

    
    
    func Unzip(data []byte) error {
        rdata := bytes.NewReader(data)
        r, err := gzip.NewReader(rdata)
        if err != nil {
            return err
        }
        uncompressedData, err := ioutil.ReadAll(r)
        if err != nil {
            return err
        }
        fmt.Println(string(uncompressedData))
        return nil
    }
    
    

    The uncompressedData is the JSON string of the cloudwatch log