Search code examples
elasticsearchelasticsearch-7

ElasticSearch Ingest Pipeline: create and update timestamp field


To create a timestamp field on my indices, according to this answer, I have created a Ingest Pipeline to run over specific indices:

PUT _ingest/pipeline/auto_now_add
{
  "description": "Assigns the current date if not yet present and if the index name is whitelisted",
  "processors": [
    {
      "script": {
        "source": """
          // skip if not whitelisted
          if (![ "my_index_1",
                 "my_index_2"
              ].contains(ctx['_index'])) { return; }
          
          // always update updated_at
          ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
          
        """
      }
    }
  ]
}

then I apply to all indices settings as the default pipeline

PUT _all/_settings
{
  "index": {
    "default_pipeline": "auto_now_add"
  }
}

After that, I start indexing my objects into those indices. When I query an indexed item, I will get that item with the updated_at field updated at the time of the indexing like:

{
  _index: 'my_index_1',
  _type: '_doc',
  _id: 'r1285044056',
  _version: 11,
  _seq_no: 373,
  _primary_term: 2,
  found: true,
  _source: {
    updated_at: '2021-07-07 04:35:39',
    ...
  }
}

I would like now to have a created_at field, that only updates the first time, so I have tried to upsert script above in this way:

PUT _ingest/pipeline/auto_now_add
{
  "description": "Assigns the current date if not yet present and if the index name is whitelisted",
  "processors": [
    {
      "script": {
        "source": """
          // skip if not whitelisted
          if (![ "my_index_1",
                 "my_index_2",
                 "..."
              ].contains(ctx['_index'])) { return; }
          
           // always update updated_at
          ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
          // don't overwrite if present
          if (ctx != null && ctx['created_at'] != null) { return; }
          
          ctx['created_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        """
      }
    }
  ]
}

but this solution does not seem to work: the condition

if (ctx != null && ctx['created_at'] != null) { return; }

will always fail, thus resulting in a update of the created_at at every object update on the index, in the same way of the updated_at field, making it useless. So, how to prevent that, and make sure that that field created_at exists after it has been created by the Ingestion Pipeline?


Solution

  • As described by @Val in this answer:

    ... the ingest pipeline processor(s) will only operate within the context of the document you're sending, not the one stored (if any).

    As such, you won't have access to the underlying _source nor doc because ingest pipelines were designed for the ingest phase, not the update phase.


    You can of course keep your auto_now_add pipeline to auto-add updated_at, and you can extend it with created_at (if not already present in the ingest payload) by checking ctx.containsKey — since ctx is essentially a java Map:

    PUT _ingest/pipeline/auto_now_add
    {
      "description": "Assigns the current date if not yet present and if the index name is whitelisted",
      "processors": [
        {
          "script": {
            "source": """
              // skip if not whitelisted
              if (![ "my_index_1",
                     "my_index_2",
                     "..."
                  ].contains(ctx['_index'])) { return; }
              
              def now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
              
              // guaranteee updated_at
              ctx['updated_at'] = now;
              
              // add created_at only if nonexistent in the payload
              if (!ctx.containsKey('created_at')) {
                ctx['created_at'] = now;
              }  
            """
          }
        }
      ]
    }
    

    However, this'll work only for the first time you ingest your document!

    Running:

    POST my_index_1/_doc/some_id
    { 
      "some": "param"
    }
    

    will yield:

    {
      "some" : "param",
      "updated_at" : "2021-07-08 10:35:13",
      "created_at" : "2021-07-08 10:35:13"
    }
    

    Now, in order to auto-increment updated_at each time you update a document, you'll need one more script — this time stored under _scripts, not _ingest/pipeline:

    PUT _scripts/incement_update_at__plus_new_params
    {
      "script": {
        "lang": "painless", 
        "source": """
          // add whatever is in the params
          ctx._source.putAll(params);
          
          // increment updated_at no matter what was in the params
          ctx._source['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        """
      }
    }
    

    Then, as you run your _update call, do so by mentioning the above-mentioned script:

    POST my_index_1/_doc/some_id/_update
    {
      "script": {
        "id": "incement_update_at__plus_new_params",
        "params": {
          "your": "new params"
        }
      }
    }
    

    which'll increment updated_at without touching created_at and add any other params:

    {
       "some":"param",
       "updated_at":"2021-07-08 10:49:44",    <--
       "created_at":"2021-07-08 10:39:55",
       "your":"new params"                    <--
    }
    

    Shameless plug: I discuss pipelines & scripts in great detail in my Elasticsearch Handbook.