I have installed logstash and elasticSearch on my windows machine with below config (which polls records from customer table in 1 mins interval as configured)
#1 Logstash config file loads customer table data and index it logstash-config.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
jdbc_user => "test"
jdbc_password => "test"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT * FROM public.customer where id >:sql_last_value"
tracking_column_type => "numeric"
use_column_value =>true
tracking_column => id
}
}
output {
elasticsearch {
index => "customer_index"
document_type => "customer"
document_id => "%{id}"
hosts =>"localhost:9200"
}
stdout {
codec =>rubydebug
}
}
#2 Create table in db with some records
create table customer (id integer,name varchar);
select * from customer;
insert into customer values (1,'test1');
insert into customer values (2,'test2');
#3 Start logstash e:\Software\logstash-7.8.0\bin>logstash -f logstash-config.conf http://localhost:9600/
#4 start elasticSearch e:\Software\elasticsearch-7.8.0\bin>elasticsearch.bat http://localhost:9200/_cat/indices
#4 Hit get API 4.1: Does not return any record http://localhost:9200/customer_index/_search?q=1
e.g.
"took": 86,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 0,
"relation": "eq"
},
"max_score": null,
"hits": []
}
}
4.2 Returns record
http://localhost:9200/customer_index/_search?q=2
e.g.
{
"took": 371,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "customer_index",
"_type": "customer",
"_id": "%{customer_id}",
"_score": 1.0,
"_source": {
"name": "test2",
"id": 17,
"@timestamp": "2020-07-06T06:41:00.343Z",
"@version": "1"
}
}
]
}
}
**Also what is customer_id here and how can I get the whole record as I indexed whole row i.e. select * from customer (Which means I should get all columns)**
4.3
Looks like index contains only the record which was added to index last
e.g. if I will execute in db
insert into customer values (2,'test2');
http://localhost:9200/customer_index/_search?q=2
will not return record
4.4 however Returns record
http://localhost:9200/customer_index/_search?q=3
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
jdbc_user => "test"
jdbc_password => "test"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT * FROM public.customer where id >:sql_last_value"
tracking_column_type => "numeric"
use_column_value =>true
tracking_column => id
**last_run_metadata_path => "C:\\logstash\\.sch_id_tracker_file"**
}
}
Here last_run_metadata_path captures the value of a given field when it ran last time so that As per schedule when it runs next time records with value
i.e. id >:sql_last_value would be considered to process and push to elasticsearch