Search code examples
elasticsearchlogstashlogstash-grok

elasticsearch - import csv using logstash date is not parsed as of datetime type


I am trying to import csv into elasticsearch using logstash I have tried using two ways:

  1. Using CSV
  2. Using grok filter

1) For csv below is my logstash file:

input {
  file {
    path => "path_to_my_csv.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
        separator => ","
        columns => ["col1","col2_datetime"]
  }
  mutate {convert => [ "col1", "float" ]}
  date {
        locale => "en"
        match => ["col2_datetime", "ISO8601"] // tried this one also - match => ["col2_datetime", "yyyy-MM-dd HH:mm:ss"]
        timezone => "Asia/Kolkata"
        target => "@timestamp" // tried this one also - target => "col2_datetime"
   }
}
output {
   elasticsearch {
     hosts => "http://localhost:9200"
     index => "my_collection"

  }
  stdout {}
}

2) Using grok filter:

For grok filter below is my logstash file

input {
  file {
    path => "path_to_my_csv.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  grok {
    match => { "message" => "(?<col1>(?:%{BASE10NUM})),(%{TIMESTAMP_ISO8601:col2_datetime})"}
    remove_field => [ "message" ]
  }
  date {
        match => ["col2_datetime", "yyyy-MM-dd HH:mm:ss"]
   }
}
output {
   elasticsearch {
     hosts => "http://localhost:9200"
     index => "my_collection_grok"

  }
  stdout {}
}

PROBLEM:

So when I run both the files individually, I am able to import the data in elasticsearch. But my date field is not parsed as of datetime type rather it has been saved as string and because of that I am not able to run the date filters.

So can someone help me to figure out why it's happening. My elasticsearch version is 5.4.1.

Thanks in advance


Solution

  • There are 2 changes I made to your config file.

    1) remove the under_score in the column name col2_datetime

    2) add target

    Here is how my config file look like...

    vi logstash.conf
    
    input {
      file {
        path => "/config-dir/path_to_my_csv.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
    filter {
      csv {
            separator => ","
            columns => ["col1","col2"]
      }
      mutate {convert => [ "col1", "float" ]}
      date {
            locale => "en"
            match => ["col2",  "yyyy-MM-dd HH:mm:ss"]
            target => "col2"
       }
    }
    output {
       elasticsearch {
         hosts => "http://172.17.0.1:9200"
         index => "my_collection"
    
      }
      stdout {}
    }
    

    Here is the data file:

    vi path_to_my_csv.csv
    
    1234365,2016-12-02 19:00:52 
    1234368,2016-12-02 15:02:02 
    1234369,2016-12-02 15:02:07