Search code examples
postgresqlelasticsearchlogstashdevopsetl

Custom offseh condition on Logstash for Postgres


I use jdbc interface on Logstash to upload a part of data from Postgres to ElasticSearch. Is it possible to configure Logstash to use WHERE instead of OFFSET?

My config:

input {
  file {
    path => "/var/log/logstash/logstash-plain.log"
    type => "logstash-logs"
    start_position => "beginning"
  }

  jdbc {
    jdbc_driver_library => "/usr/share/logstash/external_jars/postgresql-42.5.4.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://172.17.0.1:5432/tyver_stage"
    jdbc_user => "tyver_stage"
    jdbc_password => "password"
    schedule => "*/5 * * * *"
    statement => "
      SELECT
        c.*,
        COALESCE(c.updated_at, c.created_at) AS order_column,
        CASE
          WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
          ELSE ARRAY_AGG(ucv.user_id)
        END AS viewed_by
      FROM
        creatives as c
          LEFT JOIN
        user_creative_views ucv ON ucv.creative_id = c.id
      WHERE
        (c.updated_at >= :sql_last_value OR (c.updated_at IS NULL AND c.created_at >= :sql_last_value))
      GROUP BY
        c.id
      ORDER BY
        COALESCE(c.updated_at, c.created_at) ASC
    "
    use_column_value => true
    tracking_column => "order_column"
    tracking_column_type => "timestamp"
    jdbc_paging_enabled => true
    jdbc_page_size => 10000
    record_last_run => true
    clean_run => false
  }
}

filter {

}

output {
  elasticsearch {
    hosts => ["172.17.0.1:9200"]
    index => "tyver_index_creatives"
    document_id => "%{id}"
  }
}

And as the result i have following SQL queries:

 SELECT * FROM (
      SELECT
        c.*,
        COALESCE(c.updated_at, c.created_at) AS order_column,
        CASE
          WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
          ELSE ARRAY_AGG(ucv.user_id)
        END AS viewed_by
      FROM
        creatives as c
          LEFT JOIN
        user_creative_views ucv ON ucv.creative_id = c.id
      WHERE
        (c.updated_at >= '1970-01-01 00:00:00.000000+0000' OR (c.updated_at IS NULL AND c.created_at >= '1970-01-01 00:00:00.000000+0000'))
      GROUP BY
        c.id
      ORDER BY
        COALESCE(c.updated_at, c.created_at) ASC
    ) AS "t1" LIMIT 10000 OFFSET 20000

It's very expensive by resources because query LIMIT 10000 OFFSET 20000 is equal to LIMIT 30000. Here I have 35M+ rows and 100Gb+ of data and it's too heavy to upload it using this way. Is it possible to configure Logstash to use WHERE instead of OFFSET like WHERE COALESCE(c.updated_at, c.created_at) > :the_last_order_value?


Solution

  • Try this SQl query

    SELECT
        c.*,
        COALESCE(c.updated_at, c.created_at) AS order_column,
        CASE
            WHEN ARRAY_AGG(ucv.user_id) = ARRAY[null]::integer[] THEN ARRAY[]::integer[]
            ELSE ARRAY_AGG(ucv.user_id)
        END AS viewed_by
    FROM
        creatives AS c
    LEFT JOIN
        user_creative_views ucv ON ucv.creative_id = c.id
    WHERE
        COALESCE(c.updated_at, c.created_at) > :sql_last_value
    GROUP BY
        c.id