I have a Logstash
pipeline that fetches data from MS SQL view that joins to tables A and B and put the denormalised data into ES.
Initially, INSERTS or UPDATES could happen only for table A. Therefore, to configure Logstash
to pick up only newly inserted or updated records since last iteration of the polling loop, I have defined the tracking_column
field which refers updatedDate
timestamp column in table A:
jdbc {
#Program Search
jdbc_connection_string => "jdbc:sqlserver://__DB_LISTNER__"
jdbc_user => “admin”
jdbc_password => “admin”
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_driver_library => "/usr/share/logstash/drivers/mssql-jdbc-6.2.2.jre8.jar"
sql_log_level => "info"
tracking_column => "updated_date_timestamp"
use_column_value => true
tracking_column_type => "timestamp"
schedule => "*/1 * * * *"
statement => "select *, updateDate as updated_date_timestamp from dbo.MyView where (updateDate > :sql_last_value and updateDate < getdate()) order by updateDate ASC"
last_run_metadata_path => "/usr/share/logstash/myfolder/.logstash_jdbc_last_run"
}
Now, the UPDATES can also happen in table B. With this new requirement I am confused how can I configure Logstash
to track changes on the table B as well in the same pipeline. Can I define multiple tracking_columns
for the same pipeline?
Another two options I have in mind but not sure about them are:
updateDate
fields of table A and B, that will be referenced by the tracking_column
. But I am not sure how the SQL query should look like then?Please, advise me how should I go from here?
I found this ES discussion that suggests to use a function to select greatest value of provided dates in the SQL query. For the SQL server there is GREATEST function, but it is not recognised by SQL server I am currently using. Long story short, as a workaround I found iff() function which I use for dates comparing. So my SQL query looks like this:
select *, iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) as updated_date_timestamp from dbo.MyView where (iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) > :sql_last_value and iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) < getdate()) order by iif(A.updatedDate>B.updatedDate, A.updatedDate, B.updatedDate) ASC, id ASC