Dear freindly helpers,
I have an index that is fed by a database via Kafka. Now this database holds a field that aggregates a couple of pieces of information like so key/value; key/value; (don't ask for the reason, I have no idea who designed it liked that and why ;-) )
93/4; 34/12;
it can be empty, or it can hold 1..n key/value pairs.
I want to use an ingest pipeline and ideally have a "nested" field which holds all values that are in tha field.
Probably like this:
{"categories": { "93": 7, "82": 4 } }
The use case is the following: we want to visualize the sum of a filtered number of these categories (they tell me how many minutes a specific process took longer) and relate them in ranges.
Example: I filter categories x, y ,z and then group how many documents for the day had no delay, which had a delay up to 5 minutes and which had a delay between 5 and 15 minutes.
I have tried to get the fields neatly separated with the kv processor and wanted to work from there on but it was a complete wrong approach I guess.
"kv": {
"field": "IncomingField",
"field_split": ";",
"value_split": "/",
"target_field": "delays",
"ignore_missing": true,
"trim_key": "\\s",
"trim_value": "\\s",
"ignore_failure": true
}
When I test the pipeline it seems ok
"delays": {
"62": "3",
"86": "2"
}
but there are two things that don't work.
I have looked into other processors (dorexpander) but can't really get my head around how to get this working.
I hope my question is clear (I lack english skills, sorry) and that someone can point me at the right direction.
Thank you very much!
You should rather structure them as an array of objects with shared accessors, for instance:
[ {key: 93, value: 7}, ...]
That way, you'll be able to aggregate on categories.key
and categories.value
.
So this means iterating the categories' entrySet()
using a custom script processor like so:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "extracts k/v pairs",
"processors": [
{
"script": {
"source": """
def categories = ctx.categories;
def kv_pairs = new ArrayList();
for (def pair : categories.entrySet()) {
def k = pair.getKey();
def v = pair.getValue();
kv_pairs.add(["key": k, "value": v]);
}
ctx.categories = kv_pairs;
"""
}
}
]
},
"docs": [
{
"_source": {
"categories": {
"82": 4,
"93": 7
}
}
}
]
}
P.S.: Do make sure your categories
field is mapped as nested
b/c otherwise you'll lose the connections between the keys & the values (also called flattening).