Nested document to elasticsearch using logstash

Hi All i am trying to index the documents from MSSQL server to elasticsearch using logstash. I wanted my documents to ingest as nested documents but i am getting aggregate exception error

Here i place all my code

Create table department(
ID Int identity(1,1) not null,
Name varchar(100)

Insert into department(Name)
Select 'IT Application development'

union all

Select 'HR & Marketing'

Create table Employee(
ID Int identity(1,1) not null,
emp_Name varchar(100),
dept_Id int

Insert into Employee(emp_Name,dept_Id)
Select 'Mohan',1
union all
Select 'parthi',1
union all
Select 'vignesh',1

Insert into Employee(emp_Name,dept_Id)
Select 'Suresh',2
union all
Select 'Jithesh',2
union all
Select 'Venkat',2

Final select statement

SELECT AS id, AS deptname, AS empid,Emp.emp_name AS empname
FROM  department De LEFT JOIN employee Emp ON = Emp.dept_Id

Result should be like this

enter image description here

My elastic search mapping

PUT /departments
 "mappings": {
   "properties": {
       "type": "nested",
       "properties": {

My logstash config file

input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => ""
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
statement => "SELECT AS id, AS deptname, AS empid,Emp.emp_name AS empname
FROM  department De LEFT JOIN employee Emp ON = Emp.dept_Id
        aggregate {
        task_id => "%{id}"
        code => "
        map['id'] = event['id']
        map['deptname'] = event['deptname']
        map['employee_details'] ||= []
        map['employee_details'] << {'empId' => event['empid'], 'empname' => event['empname'] }
        push_previous_map_as_event => true
        timeout => 5
        timeout_tags => ['aggregated']
    stdout{ codec => rubydebug }
            hosts => ""
            user => "elastic"
            password => "****"
            index => "departments"
            action => "index"
            document_type => "departments"
            document_id => "%{id}"


while running logstash i am getting below error

enter image description here

Elastic search scrrenshot for reference

enter image description here

my elasticsearch output should be something like this

  "took" : 398,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    "max_score" : 1.0,
    "hits" : [
        "_index" : "departments",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "deptname" : "IT Application development"
          "employee_details" : [
          "empid" : 1,
          "empname" : "Mohan"
          "empid" : 2,
          "empname" : "Parthi"
          "empid" : 3,
          "empname" : "Vignesh"

Could any one please help me to resolve this issue? i want empname and empid of all the employees should get inserted as nested document for respective department. Thanks in advance


  • Instead of aggregate filter i used JDBC_STREAMING it is working fine might be helpful to some one looking at this post.

    input {
    jdbc {
    jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
    jdbc_driver_class => ""
    jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxx;"
    jdbc_user => "xxx"
    jdbc_password => "xxxx"
    statement => "Select Policyholdername,Age,Policynumber,Dob,Client_Address,is_active from policy"
    jdbc_streaming {
    jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
    jdbc_driver_class => ""
    jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
    jdbc_user => "xxxx"
    jdbc_password => "xxxx"
    statement => "select claimnumber,claimtype,is_active from claim where policynumber = :policynumber"
    parameters => {"policynumber" => "policynumber"}
    target => "claim_details"
    output {
    elasticsearch {
    hosts => ""
    user => "elastic"
    password => "xxxx"
    index => "xxxx"
    action => "index"
    document_type => "_doc"
    document_id => "%{policynumber}"
    stdout { codec => rubydebug }