Search code examples
elasticsearchlogstashlogstash-configurationlogstash-jdbc

Nested document to elasticsearch using logstash


Hi All i am trying to index the documents from MSSQL server to elasticsearch using logstash. I wanted my documents to ingest as nested documents but i am getting aggregate exception error

Here i place all my code

Create table department(
ID Int identity(1,1) not null,
Name varchar(100)
)

Insert into department(Name)
Select 'IT Application development'

union all

Select 'HR & Marketing'

Create table Employee(
ID Int identity(1,1) not null,
emp_Name varchar(100),
dept_Id int
)

Insert into Employee(emp_Name,dept_Id)
Select 'Mohan',1
union all
Select 'parthi',1
union all
Select 'vignesh',1

Insert into Employee(emp_Name,dept_Id)
Select 'Suresh',2
union all
Select 'Jithesh',2
union all
Select 'Venkat',2

Final select statement

SELECT 
De.id AS id,De.name AS deptname,Emp.id AS empid,Emp.emp_name AS empname
FROM  department De LEFT JOIN employee Emp ON De.id = Emp.dept_Id
ORDER BY De.id

Result should be like this

enter image description here

My elastic search mapping

PUT /departments
{
 "mappings": {
   "properties": {
     "id":{
       "type":"integer"
     },
     "deptname":{
       "type":"text"
     },
     "employee_details":{
       "type": "nested",
       "properties": {
         "empid":{
           "type":"integer"
         },
         "empname":{
           "type":"text"
         }
       }
     }
   }
 }
}

My logstash config file

input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
statement => "SELECT 
De.id AS id,De.name AS deptname,Emp.id AS empid,Emp.emp_name AS empname
FROM  department De LEFT JOIN employee Emp ON De.id = Emp.dept_Id
ORDER BY De.id"
}
}
filter{
        aggregate {
        task_id => "%{id}"
        code => "
        map['id'] = event['id']
        map['deptname'] = event['deptname']
        map['employee_details'] ||= []
        map['employee_details'] << {'empId' => event['empid'], 'empname' => event['empname'] }
        "
        push_previous_map_as_event => true
        timeout => 5
        timeout_tags => ['aggregated']
        } 
    }
    output{
    stdout{ codec => rubydebug }
    elasticsearch{
            hosts => "https://d9bc7cbca5ec49ea96a6ea683f70caca.eastus2.azure.elastic-cloud.com:4567"
            user => "elastic"
            password => "****"
            index => "departments"
            action => "index"
            document_type => "departments"
            document_id => "%{id}"
           }

}

while running logstash i am getting below error

enter image description here

Elastic search scrrenshot for reference

enter image description here

my elasticsearch output should be something like this

{
  "took" : 398,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "departments",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "deptname" : "IT Application development"
          "employee_details" : [
          {
          "empid" : 1,
          "empname" : "Mohan"
           },
          {
          "empid" : 2,
          "empname" : "Parthi"
          },
          {
          "empid" : 3,
          "empname" : "Vignesh"
           }
          ]
         }
      }
    ]
  }
}

Could any one please help me to resolve this issue? i want empname and empid of all the employees should get inserted as nested document for respective department. Thanks in advance


Solution

  • Instead of aggregate filter i used JDBC_STREAMING it is working fine might be helpful to some one looking at this post.

    input {
    jdbc {
    jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxx;"
    jdbc_user => "xxx"
    jdbc_password => "xxxx"
    statement => "Select Policyholdername,Age,Policynumber,Dob,Client_Address,is_active from policy"
    }
    }
    filter{
    jdbc_streaming {
    jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
    jdbc_user => "xxxx"
    jdbc_password => "xxxx"
    statement => "select claimnumber,claimtype,is_active from claim where policynumber = :policynumber"
    parameters => {"policynumber" => "policynumber"}
    target => "claim_details"
    }
    }
    output {
    elasticsearch {
    hosts => "https://e5a4a4a4de7940d9b12674d62eac9762.eastus2.azure.elastic-cloud.com:9243"
    user => "elastic"
    password => "xxxx"
    index => "xxxx"
    action => "index"
    document_type => "_doc"
    document_id => "%{policynumber}"
    
    }
    stdout { codec => rubydebug }
    }