Search code examples
elasticsearchlogstash

Indexing multiple csv files into one index with nested fields/objects


I want to load data from multiple CSV files(Users, Scores, Messages) into one index via logstash. All CSV files have the same "userId" field that connects data in it.

My goal is to have User-Index as a result, that has the data from the User CSV file as simple fields and the data from Scores and Messages files as nested fields.

Is there a way to somehow achieve this?

One user can have multiple messages and scores.

I am not sure, that i got the idea of merging the correct way, here's the logstash config i tried.

input {
    file {
        path => "C:/resources/files/users.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
    }
    file {
        path => "C:/resources/files/scores.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
    }
    file {
        path => "C:/resources/files/messages.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
    }
}    
              
filter {

    if [log][file][path] == "C:/resources/files/users.csv" {
        csv {
            separator => ","
            columns => ["userId", "username", "email"]
        }
        mutate {
        remove_field => ["[event][original]", "[log][file][path]", "[log][file][path][keyword]", "[message]", "[message][keyword]"]
        }
    }
    
    if [log][file][path] == "C:/resources/files/scores.csv" {
        csv{
            separator => ","
            columns => ["userId", "field", "score"]
        }
        
        translate { 
            destination => "[@metadata][scores]" 
            dictionary_path => "C:/resources/files/scores.csv"
            field => "userId" 
            }
        dissect { 
            mapping => { 
            "[@metadata][scores]" => "%{field};%{score}" 
            } 
        }
    }
    
    if [log][file][path] == "C:/resources/files/messages.csv" {
        csv {
            separator => ","
            columns => ["userId", "message", "tag"]
        }
        
        translate { 
            destination => "[@metadata][messages]" 
            dictionary_path => "C:/resources/files/messages.csv"
            field => "userId" 
            }
            
        dissect { 
            mapping => { 
            "[@metadata][messages]" => "%{message};%{tag}" 
            } 
        }
    }
        
}

output {
    elasticsearch{
        action => "create"
        hosts => "localhost:9200"
        index => "users-index"
    }
    
}
    


Solution

  • It's not going to work the way you expect. You first need to load the scores and messages data into separate indexes.

    Then, out of those two indexes you can build up an enrich policy for each of them.

    Next, you can create an ingest pipeline that leverages the two enrich policies built during the previous step.

    Finally, you can modify your Logstash configuration to use the ingest pipeline created during the previous step in order to enrich each user record with the appropriate scores and messages for that user.