Search code examples

Logstash field is never shown after aggregation

I have logstash version 7.8.0 Can someone tell me why the aggregation below never shown THREAD_ID field into documents please ? My field : thread_id is added in the end of aggregation ..


2024-12-14 12:00:01 thread-1 SOAP message <<Envelope 011>>
2024-12-14 12:00:02 thread-1 SOAP message >>Envelope 012<<
2024-12-14 12:05:03 thread-2 SOAP message <<Envelope 021>>
2024-12-14 12:05:04 thread-2 SOAP message >>Envelope 022<<

Filter GROK from logstash:

filter {
  grok {
    match => {
      "message" => [
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
  aggregate {
  task_id => "%{thread_id}"
  code => "
    map['soap_in'] ||= []
    map['soap_out'] ||= []

    # Capture soap_in with timestamp if exists
    if event.get('soap_in')
      map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}

    # Capture soap_out with timestamp if exists
    if event.get('soap_out')
      map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}

   # Once both soap_in and soap_out are available, emit the aggregated event
    if map['soap_in'] && map['soap_out']
      event.set('soap_in', map['soap_in'])
      event.set('soap_out', map['soap_out'])
      event.set('thread_id', event.get('thread_id'))
  push_previous_map_as_event => true
  timeout => 3
  mutate {
    remove_field => ["message"]

Result never shown thread_id

  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    "max_score" : 1.0,
    "hits" : [
        "_index" : "app-aggregate-2024.12.13",
        "_type" : "_doc",
        "_id" : "qop5wJMB81mNBoMqWzzC",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "soap_out" : [
              "log_timestamp" : "2024-12-14 12:00:02",
              "soap_out" : "Envelope 012"
          "@timestamp" : "2024-12-13T14:43:18.985Z",
          "soap_in" : [
              "log_timestamp" : "2024-12-14 12:00:01",
              "soap_in" : "Envelope 011"
        "_index" : "app-aggregate-2024.12.13",
        "_type" : "_doc",
        "_id" : "rop5wJMB81mNBoMqbTwN",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "soap_out" : [
              "log_timestamp" : "2024-12-14 12:05:04",
              "soap_out" : "Envelope 022"
          "@timestamp" : "2024-12-13T14:43:23.504Z",
          "soap_in" : [
              "log_timestamp" : "2024-12-14 12:05:03",
              "soap_in" : "Envelope 021"

So in result elasticsearch, we can see that we have 2 values instead of 4, that's great but i still don't know why this never shown the field thread_id even if it is mentioned in aggregate Thank you in advance,


  • It wasn't simple to find out how to solve this but finally i get the solution :

        filter {
      grok {
        match => {
          "message" => [
            '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
            '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
      aggregate {
      task_id => "%{thread_id}"
      code => "
        map['soap_in'] ||= []
        map['soap_out'] ||= []
        map['thread_id'] ||= []
        map['thread_id'] = event.get('thread_id')
        if event.get('soap_in')
          map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}
        if event.get('soap_out')
          map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}
        if map['soap_in'] && map['soap_out']
          event.set('thread_id', map['thread_id'])
          event.set('soap_in', map['soap_in'])
          event.set('soap_out', map['soap_out'])
      push_previous_map_as_event => true
      timeout => 3
      mutate {
        remove_field => ["message"]