I am trying to "funnel" my cloudwatch logs through Kinesis and then to lambda for processing, however I cannot find a way to decode/parse the incoming logs. So far I have tried this:
func function(request events.KinesisEvent) error {
for _, record := range request.Records {
fmt.Println(record.EventName)
fmt.Println(string(record.Kinesis.Data))
rawData := events.CloudwatchLogsRawData{
Data: string(record.Kinesis.Data),
}
parse, err := rawData.Parse()
fmt.Println(parse)
fmt.Println(err)
}
return nil
}
func main() {
lambda.Start(function)
}
var logData events.CloudwatchLogsData
func Base64Decode(message []byte) (b []byte, err error) {
var l int
b = make([]byte, base64.StdEncoding.DecodedLen(len(message)))
l, err = base64.StdEncoding.Decode(b, message)
if err != nil {
return
}
return b[:l], nil
}
func Parse(rawData []byte, d events.CloudwatchLogsData) (err error) {
data, err := Base64Decode(rawData)
if err != nil {
return
}
zr, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return
}
defer zr.Close()
fmt.Println(zr)
dec := json.NewDecoder(zr)
err = dec.Decode(&d)
return
}
func function(request events.KinesisEvent) error {
for _, record := range request.Records {
fmt.Println(record.EventName)
fmt.Println(string(record.Kinesis.Data))
err = Parse(record.Kinesis.Data, logData)
fmt.Println(err)
fmt.Println(logData)
}
return nil
}
func main() {
lambda.Start(function)
}
Both of them I get the same error:
illegal base64 data at input byte 0
So as my understanding the log format received in in Base64 and compressed, but I cannot find anything online specifically for Go.
Added logData type
// CloudwatchLogsData is an unmarshal'd, ungzip'd, cloudwatch logs event
type CloudwatchLogsData struct {
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
MessageType string `json:"messageType"`
LogEvents []CloudwatchLogsLogEvent `json:"logEvents"`
}
{
"owner": "111111111111",
"logGroup": "logGroup_name",
"logStream": "111111111111_logGroup_name_us-east-1",
"subscriptionFilters": [
"Destination"
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
}
]
}
Ok, turns out that I did not have to decode from base64, but simply uncompress the data
func Unzip(data []byte) error {
rdata := bytes.NewReader(data)
r, err := gzip.NewReader(rdata)
if err != nil {
return err
}
uncompressedData, err := ioutil.ReadAll(r)
if err != nil {
return err
}
fmt.Println(string(uncompressedData))
return nil
}
The uncompressedData
is the JSON string of the cloudwatch log