Search code examples
node.jspostgresqlelasticsearchlogstash

Problems with updating :sql_last_value


I want to synchronize data between postgres and elasticsearch for this I am using logstash. This is how looks configure file for logstash


input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://postgres:5432/onlinestore"
        jdbc_user => "postgres"
        jdbc_password => "1234"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_driver_library => "/usr/share/logstash/libs/postgresql-42.7.3.jar"
        statement => "
            SELECT * 
            FROM products 
            WHERE updated_date > :sql_last_value
            ORDER BY updated_date ASC
            LIMIT 1000
        "
        use_column_value => true
        tracking_column => "updated_date" 
        last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
        schedule => "* * * * *" 
        # clean_run => true
    }
}

# Add extra field timestamp to indicate when we had performed an operation on the row
filter {
  ruby {
    code => "
      event.set('sql_last_value_log', event.get('sql_last_value'))
      event.set('tracking_column_log', event.get('updated_date'))
    "
  }
  mutate {
    add_field => { "timestamp" => "%{@timestamp}" }
  }
}


output {
    elasticsearch{
        hosts => ["http://elasticsearch:9200"]
        index => "products"
        document_id => "%{id}"
    }
}

When the query tries to execute I get this error log:

/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/cronline.rb:77: warning: constant ::Fixnum is deprecated
[2024-07-07T10:06:04,030][ERROR][logstash.inputs.jdbc     ][main][c96da82d0fe625ff4baf1477649ffdc670902119c45fc017f20bc336f857bcfe] Java::OrgPostgresqlUtil::PSQLException: ERROR: operator does not exist: timestamp without time zone > integer
  Hint: No operator matches the given name and argument types. You might need to add explicit type casts.
  Position: 82: 
            SELECT * 
            FROM products 
            WHERE updated_date > 0
            ORDER BY updated_date ASC
            LIMIT 1000
        
[2024-07-07T10:06:04,117][WARN ][logstash.inputs.jdbc     ][main][c96da82d0fe625ff4baf1477649ffdc670902119c45fc017f20bc336f857bcfe] Exception when executing JDBC query {:exception=>Sequel::DatabaseError, :message=>"Java::OrgPostgresqlUtil::PSQLException: ERROR: operator does not exist: timestamp without time zone > integer\n  Hint: No operator matches the given name and argument types. You might need to add explicit type casts.\n  Position: 82", :cause=>"org.postgresql.util.PSQLException: ERROR: operator does not exist: timestamp without time zone > integer\n  Hint: No operator matches the given name and argument types. You might need to add explicit type casts.\n  Position: 82"}

Help me solve this problem

I have tried using conditionals and nullif statements in postgres and the query even executes but the value is always zero and not updated

        statement => "
            SELECT * 
            FROM products 
            WHERE updated_date > COALESCE(NULLIF(:sql_last_value::text, '0')::timestamp, '1970-01-01 00:00:00'::timestamp)
            ORDER BY updated_date ASC
            LIMIT 1000
        "

Solution

  • You need to set value as timestamp for tracking_column_type as its default value is numeric and due to that it is considering 0 always.

    You can check this documentation.

    jdbc {
            jdbc_connection_string => "jdbc:postgresql://postgres:5432/onlinestore"
            jdbc_user => "postgres"
            jdbc_password => "1234"
            jdbc_driver_class => "org.postgresql.Driver"
            jdbc_driver_library => "/usr/share/logstash/libs/postgresql-42.7.3.jar"
            statement => "
                SELECT * 
                FROM products 
                WHERE updated_date > :sql_last_value
                ORDER BY updated_date ASC
                LIMIT 1000
            "
            use_column_value => true
            tracking_column => "updated_date" 
            last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
            schedule => "* * * * *" 
            tracking_column_type => "timestamp"
            # clean_run => true
        }