Search code examples
elasticsearchlogstashlogstash-jdbc

Log stash not loading exact number of records in Elasticsearch and on every hit results are keep changing


Problem statement : Logstash is not loading all records from Database to elasticsearch correctly and everytime I hit same api gets different results (However sometimes correct but changes on every hit and shows only subset of records under salutations nested field). The logstash mechanism looks sporadic and loading results are not consistent especially in One to Many scenario . http://localhost:9200/staffsalutation/_search I am observing a weird behaviour of logstash logstash-7.8.0 while loading records from 2 tables with query and configuration as below

Query :
select s.update_time, s.staff_id as staff_id, birth_date, first_name, last_name, gender, hire_date, st.title AS title_nm, st.from_date AS title_frm_dt, st.to_date AS title_to_dt from staff s LEFT JOIN salutation st ON s.staff_id = st.staff_id order by s.update_time

    input {
        
        jdbc {

            jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
            jdbc_driver_library => "C:\\Users\\NS\\.m2\\repository\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
            jdbc_user => "postgres"
            jdbc_password => "postgres"
            jdbc_driver_class => "org.postgresql.Driver"
            schedule => "* * * * *"     
            statement => "select  e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt 
            from employees e 
            LEFT JOIN titles t 
            ON e.emp_no  = t.emp_no  
            order by e.update_time"

            add_field => { "doctype" => "employee" }
            tracking_column_type => "timestamp"
            use_column_value =>true
            tracking_column => update_time
            
            jdbc_fetch_size => "50000"
        }

    }
    filter {
    aggregate {
            task_id => "%{staff_id}"
                code => "
                    map['staff_id'] = event.get('staff_id')
                    map['birth_date'] = event.get('birth_date')
                    map['first_name'] = event.get('first_name')
                    map['last_name'] = event.get('last_name')
                    map['gender'] = event.get('gender')
                    map['hire_date'] = event.get('hire_date')
                    map['salutations'] ||= []
                    map['salutations'] << {
                    'title_nm' => event.get('title_nm'),'title_frm_dt' => event.get('title_frm_dt'),
                    'title_to_dt' => event.get('title_to_dt')
                    }
                    event.cancel()
                "
            push_previous_map_as_event => true
            timeout => 30
            }
    }
    output {
        elasticsearch {
        document_id => "%{staff_id}"
        index => "staffsalutation"
        }
        file {
        path => "test.log"  
        codec => line
       }
    }

Solution

  • Found the solution !

    1. Need to use order by clause in query so that records are sorted by emp_no and logstash can search and aggregate dependant entities like titles (like One to many ).

    select e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt from employees e LEFT JOIN titles t ON e.emp_no = t.emp_no order by e.emp_no

    1. Since aggregation is used here need to have single thread to process the record else it will cause aggregation issues (and that is where the random results you will get on multiple call to search on index as per url above) . Though it looks to be a performance hit as only 1 worker thread will process records but it can be mitigated by invoking multiple logstash config file with heterogeneous set of records e.g. first 100 emp_no in one file and 2nd hundred in other so that logstash can execute them in parallel. so execute like below

    logstash -f logstash_config.conf -w 1