Search code examples
elasticsearchmonitoringjaeger

How to generate new documents in elastic from an insertion?


Currently I monitor my applications with Jaeger and as a backend it uses elastic search, I would like to be able to aggregate this information into new documents based on some criteria in order to generate some reports.

From the insertion of a document that has the tag “phase=end” as below:

{
  "_index": "jaeger-span-2022-03-30",
  "_type": "_doc",
  "_id": "7fso238BngwX41T6Cr9y",
  "_version": 1,
  "_score": null,
  "fields": {
    "traceID": [
      "0cefe26bed7464436c43519e7fcbf6c2"
    ],
    "duration": [
      1898679
    ],
    "spanID": [
      "74b463687cfaf503"
    ],
    "startTimeMillis": [
      "2022-03-30T14:08:011.642Z"
    ],
    "references": [
      {
        "spanID": [
          "8657c748a0508e8b"
        ],
        "traceID": [
          "0cefe26bed7464436c43519e7fcbf6c2"
        ],
        "refType": [
          "CHILD_OF"
        ]
      }
    ],
    "process.serviceName": [
      "a09-002"
    ],
    "startTime": [
      1648649289644801
    ],
    "operationName": [
      "br.com.flow.items.FinalOperation.execute"
    ],
    "tags": [
      {
        "type": [
          "string"
        ],
        "value": [
          "end"
        ],
        "key": [
          "phase"
        ]
      }
    ]
  }
}

And a document created at the beginning of the trace like the one below:

{
  "_index": "jaeger-span-2022-03-30",
  "_type": "_doc",
  "_id": "7fso238BngwX41T6Cr9y",
  "_version": 1,
  "_score": null,
  "fields": {
    "traceID": [
      "0cefe26bed7464436c43519e7fcbf6c2"
    ],
    "duration": [
      1898679
    ],
    "spanID": [
      "74b463687cfaf503"
    ],
    "startTimeMillis": [
      "2022-03-30T14:08:09.642Z"
    ],
    "references": [
      {
        "spanID": [
          "8657c748a0508e8b"
        ],
        "traceID": [
          "0cefe26bed7464436c43519e7fcbf6c2"
        ],
        "refType": [
          "CHILD_OF"
        ]
      }
    ],
    "process.serviceName": [
      "a09-002"
    ],
    "startTime": [
      1648649289642801
    ],
    "operationName": [
      "br.com.flow.items.InitialOperation.execute"
    ]
  }
}

I would like to join data to form a new document like the one below:

{
    "fields": {
        "traceID": [
            "0cefe26bed7464436c43519e7fcbf6c2"
        ],
        "duration": [
            2000
        ],
        "startTime": [
            1648649289642801
        ],
        "endTime": [
            1648649289644801
        ],
        "process.serviceName": [
            "a09-002"
        ]
    }
}

For that I need to do the following steps:

1 - Observe the document insertion event with the mentioned tag

2 - Calculate the endTime which is basically the startTime + duration of the document that contains the phase=end tag

3 - Calculate the duration , for that it would be necessary to capture the startTime of the first inserted document that has the same traceID of the document with the tag and then subtract the value of the endTime calculated previously from it.

How can I perform these tasks and generate a new document? Initially I thought about using CDC to do this process, but it seems elasticsearch doesn't support this feature.


Solution

  • I ended up going the other way, I sent the traces to Kafka and made a stream to analyze the data and then save the consolidated result in a database.