Search code examples
elasticsearchlogstashlogstash-jdbc

Elasticsearch fields from one index appearing into another index


I have created a few indices into elasticsearch. I have created separate elasticsearch config file per elaticsearch index. I am using JDBC driver to fetch data from two different tables of database. When I restarted logstash after I changed the mapping of one of the indices, fields from one index start appearing on the second index.

The config for both of the indices is given bellow

# file: contacts-index-logstash.conf
input {
    jdbc {
        jdbc_connection_string =>
        "jdbc:mysql://xxxx.com:3306/xxxx_engine?useSSL=false&autoReconnect=true&useUnicode=yes"
        jdbc_user => "email"
        jdbc_password => "xxxxxxxy"
        jdbc_validate_connection => true
        jdbc_paging_enabled => true
        jdbc_page_size => 500
        jdbc_driver_library => "/home/clodura/mysql-connector-java-5.1.46-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        schedule => "* * * * *"
        statement => "select c.id, c.name, c.description, c.industry, c.comp_size_range, c.specialities, ccd.industry_tags, ccd.social_tags, d.company_type, cw.website, d.menu, d.header, d.cleaned_page_text, cga.city, cga.state, cga.country from companies c left outer join company_calais_data ccd on c.id = ccd.company_id left outer join website_scraped_data d on c.id = d.company_id, company_websites cw, company_geocode_address cga where c.id = cw.company_id and c.id = cga.company_id and c.date_added > '2018-03-01'"
    }
}
output {
    elasticsearch {
#        protocol => http
        index => "clodura"
        document_type => "companies"
        document_id => "%{id}"
        hosts => ["localhost:9200"]
    }
}

Here is the second config

# file: contacts-position-logstash.conf
input {
    jdbc {
        jdbc_connection_string =>
        "jdbc:mysql://xxxxxx.com:3306/xxxxxx_engine?useSSL=false&autoReconnect=true&useUnicode=yes"
        jdbc_user => "email"
        jdbc_password => "xxxxxxxy"
        jdbc_validate_connection => true
        jdbc_paging_enabled => true
        jdbc_page_size => 500
        jdbc_driver_library => "/home/clodura/mysql-connector-java-5.1.46-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        schedule => "* * * * *"
        statement => "select company_id, person_id, position from company_person"
    }
}
output {
    elasticsearch {
        index => "contactposition"
        document_type => "positions"
        document_id => "%{company_id}%{person_id}"
        hosts => ["localhost:9200"]
    }
}

The mapping for contactposition index after one hour changes to this,

{
  "contactposition" : {
    "mappings" : {
      "positions" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date",
            "format" : "strict_date_optional_time||epoch_millis"
          },
          "@version" : {
            "type" : "string"
          },
          "city" : {
            "type" : "string"
          },
          "cleaned_page_text" : {
            "type" : "string"
          },
          "comp_size_range" : {
            "type" : "string"
          },
          "company_id" : {
            "type" : "string",
            "index" : "not_analyzed"
          },
          "company_type" : {
            "type" : "string"
          },
          "country" : {
            "type" : "string"
          },
          "description" : {
            "type" : "string"
          },
          "header" : {
            "type" : "string"
          },
          "id" : {
            "type" : "string"
          },
          "industry" : {
            "type" : "string"
          },
          "industry_tags" : {
            "type" : "string"
          },
          "menu" : {
            "type" : "string"
          },
          "name" : {
            "type" : "string"
          },
          "person_id" : {
            "type" : "string",
            "index" : "not_analyzed"
          },
          "position" : {
            "type" : "string"
          },
          "social_tags" : {
            "type" : "string"
          },
          "specialities" : {
            "type" : "string"
          },
          "state" : {
            "type" : "string"
          },
          "website" : {
            "type" : "string"
          }
        }
      }
    }
  }
}

How fields from clodura index appearing in contactposition index? Please help.


Solution

  • You need to put conditionals in your output. Logstash does not treat the files independently, meaning that your inputs will go to all outputs.

    input {
        ...
        tags => ["contactposition"]
    }
    
    output {
      if "contactposition" in [tags] {
        elasticsearch {
            index => "contactposition"
            document_type => "positions"
            document_id => "%{company_id}%{person_id}"
            hosts => ["localhost:9200"]
        }
      }
    }