Search code examples
logstashlogstash-configurationlogstash-file

logstash with hdfs for paritcular duration


Hi I am new logstash and i have done with read the data from tcp and write to the hdfs...that part is don but i want to write to data to 4 different folder of hdfs

Here is sample code

input {
tcp {

host => "X.X.X.X"
port => 5051
codec => json_lines
}

}
filter 
{
  mutate 
{
 remove_field => [ "@version", "path", "host","logger_name","tags","stack_info","level","port","type"]
 }
 mutate {
         add_field => { "count" => "1"} 
 }

 }

 output {
 webhdfs 
    {
    
        host => "127.0.0.1"                
        port => 50070  
        path => "/folder/%{+YYYY-MM-dd_HH-mm}.csv"          
        user => "hduser"                
        codec => line { format => "%{message}"} 
    }

here issue is i have write to folder but later on i want to write to 3 more different folder like folder1,folder2,folder3 for certain duration of time....


Solution

  • It is possible, you will need to use some mutate filters and some conditionals.

    First you need to get the value of the minute from the @timestamp of the event and add the value into a new field, you can use the [@metadata] object, which can be use to filtering, but it will not be present in the output event.

    mutate {
        add_field => { "[@metadata][minute]" => "%{+mm}" }
    }
    

    Then you will need to specify which minute will be saved in which folder.

    For example, if you want something like this:

    00:00 to 00:59 - folder1
    01:00 to 01:59 - folder2
    02:00 to 02:59 - folder3
    03:00 to 03:59 - folder4

    And start with folder1 in the next minute, 04:00 to 04:59, you will need something like this, considering just the first 8 minutes.

    if [@metadata][minute] in ["00", "04"] {
        mutate {
            add_field => {"[@metadata][folder]" => "folder1" }
        }
    }
    if [@metadata][minute] in ["01", "05"] {
        mutate {
            add_field => {"[@metadata][folder]" => "folder2" }
        }
    }
    if [@metadata][minute] in ["02", "06"] {
        mutate {
           add_field => {"[@metadata][folder]" => "folder3" }
        }
    }
    if [@metadata][minute] in ["03", "07"] {
        mutate {
            add_field => {"[@metadata][folder]" => "folder4" }
        }
    }
    

    Then in your output you will use [@metadata][folder] in your path.

    path => "/[@metadata][folder]/%{+YYYY-MM-dd_HH-mm}.csv"
    

    You just need to expand the conditionals to the other minutes in the hour.