Search code examples
sql-serverelasticsearchlogstash

Logstash: multiple tracking_columns for the same pipeline


I have a Logstash pipeline that fetches data from MS SQL view that joins to tables A and B and put the denormalised data into ES.

Initially, INSERTS or UPDATES could happen only for table A. Therefore, to configure Logstash to pick up only newly inserted or updated records since last iteration of the polling loop, I have defined the tracking_column field which refers updatedDate timestamp column in table A:

jdbc {
  #Program Search
  jdbc_connection_string => "jdbc:sqlserver://__DB_LISTNER__"
  jdbc_user => “admin”
  jdbc_password => “admin”
  jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  jdbc_driver_library => "/usr/share/logstash/drivers/mssql-jdbc-6.2.2.jre8.jar"
  sql_log_level => "info"
  tracking_column => "updated_date_timestamp"
  use_column_value => true
  tracking_column_type => "timestamp"
  schedule => "*/1 * * * *"
  statement => "select *, updateDate as updated_date_timestamp from dbo.MyView where (updateDate > :sql_last_value and updateDate < getdate()) order by updateDate ASC"
  last_run_metadata_path => "/usr/share/logstash/myfolder/.logstash_jdbc_last_run"
}

Now, the UPDATES can also happen in table B. With this new requirement I am confused how can I configure Logstash to track changes on the table B as well in the same pipeline. Can I define multiple tracking_columns for the same pipeline?

Another two options I have in mind but not sure about them are:

  1. Generate a composite value from updateDate fields of table A and B, that will be referenced by the tracking_column. But I am not sure how the SQL query should look like then?
  2. Create another pipeline that will track changes for table B only. Though, the drawback, I see for this approach, is that the existing and new pipelines will do duplicate work on the initial iterations in order to process all the records from the DB view.

Please, advise me how should I go from here?


Solution

  • I found this ES discussion that suggests to use a function to select greatest value of provided dates in the SQL query. For the SQL server there is GREATEST function, but it is not recognised by SQL server I am currently using. Long story short, as a workaround I found iff() function which I use for dates comparing. So my SQL query looks like this:

    select *, iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) as updated_date_timestamp from dbo.MyView where (iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) > :sql_last_value and iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) < getdate()) order by iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) ASC, id ASC