I am trying to count JSON objects based on a flag. To do so, I created two foreach pipelines iterating over my objects.
I want to count all the objects in the documents
array for which the field "count"
is set to true
.
POST _ingest/pipeline/_simulate
{
"pipeline":{
"description":"...",
"processors":[
{
"set":{
"field":"specific.docCount",
"value":0
}
},
{
"foreach":{
"field":"data.status.transactions",
"processor":{
"foreach":{
"field":"_ingest._value.documents",
"processor":{
"script":{
"lang":"painless",
"inline":"if (ctx.count) ctx.specific.docCount += 1"
}
}
}
}
}
}
]
},
"docs":[
{
"_source":{
"data":{
"status":{
"transactions":[
{
"id":"123",
"documents":[
{
"count": true
},
{
"count": false
}
]
}
]
}
}
}
}
]
}
I'm getting the following error :
{
"docs": [
{
"error": {
"root_cause": [
{
"type": "exception",
"reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: NullPointerException;",
"header": {
"processor_type": "foreach"
}
}
],
"type": "exception",
"reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: NullPointerException;",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "ScriptException[runtime error]; nested: NullPointerException;",
"caused_by": {
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"if (ctx.count) ",
" ^---- HERE"
],
"script": "if (ctx.count) ctx.stats.docCount += 1",
"lang": "painless",
"caused_by": {
"type": "null_pointer_exception",
"reason": null
}
}
},
"header": {
"processor_type": "foreach"
}
}
}
]
}
This foreach pipeline doc suggests to use ctx
to refer to the ingest document, but I am not sure how to use this in my case.
How can I retrieve the current "foreach entry" in my painless script ?
I ended up doing the whole thing in the painless script.
POST _ingest/pipeline/_simulate
{
"pipeline":{
"description":"...",
"processors":[
{
"set":{
"field":"stats.docCount",
"value":0
}
},
{
"script":{
"lang":"painless",
"inline":"def transactions = ctx.data.status.transactions; for (def transaction : transactions) {def documents = transaction.documents; for (def document : documents){if (document.count != null && document.count){ctx.stats.docCount += 1}}}"
}
}
]
},
"docs":[
{
"_source":{
"data":{
"status":{
"transactions":[
{
"id":"123",
"documents":[
{
"count":true
},
{
"count":false
}
]
}
]
}
}
}
},
{
"_source":{
"data":{
"status":{
"transactions":[
{
"id":"234",
"documents":[
{
"count":true
},
{
"count":true
}
]
}
]
}
}
}
},
{
"_source":{
"data":{
"status":{
"transactions":[
{
"id":"345",
"documents":[
{
},
{
"count":true
}
]
}
]
}
}
}
}
]
}
Output:
{
"docs": [
{
"doc": {
"_id": "_id",
"_index": "_index",
"_type": "_type",
"_source": {
"data": {
"status": {
"transactions": [
{
"documents": [
{
"count": true
},
{
"count": false
}
],
"id": "123"
}
]
}
},
"stats": {
"docCount": 1
}
},
"_ingest": {
"timestamp": "2018-11-14T13:46:29.963Z"
}
}
},
{
"doc": {
"_id": "_id",
"_index": "_index",
"_type": "_type",
"_source": {
"data": {
"status": {
"transactions": [
{
"documents": [
{
"count": true
},
{
"count": true
}
],
"id": "234"
}
]
}
},
"stats": {
"docCount": 2
}
},
"_ingest": {
"timestamp": "2018-11-14T13:46:29.963Z"
}
}
},
{
"doc": {
"_id": "_id",
"_index": "_index",
"_type": "_type",
"_source": {
"data": {
"status": {
"transactions": [
{
"documents": [
{},
{
"count": true
}
],
"id": "345"
}
]
}
},
"stats": {
"docCount": 1
}
},
"_ingest": {
"timestamp": "2018-11-14T13:46:29.963Z"
}
}
}
]
}