Search code examples
goelasticsearch

How do you do sliced scrolls in Elasticsearch using golang?


I'm using go-elasticsearch/v7 and with a normal scroll:

import es "github.com/elastic/go-elasticsearch/v7"
[...]

type Query struct {
    Size  int          `json:"size"`
    Slice *QuerySlice  `json:"slice,omitempty"`
    Aggs  *Aggs        `json:"aggs,omitempty"`
    Query *QueryFilter `json:"query,omitempty"`
    Sort  []string     `json:"sort,omitempty"`
}

type QuerySlice struct {
    ID  int `json:"id"`
    Max int `json:"max"`
}

[...]

type QueryFilter struct {
    Bool QFBool `json:"bool"`
}

type QFBool struct {
    Filter Filter `json:"filter"`
}

type Filter []map[string]map[string]interface{}

func (q *Query) AsBody() (*bytes.Reader, error) {
    queryBytes, err := json.Marshal(q)
    if err != nil {
        return nil, err
    }

    return bytes.NewReader(queryBytes), nil
}

[...]

type Result struct {
    ScrollID     string `json:"_scroll_id"`
    Took         int
    TimedOut     bool   `json:"timed_out"`
    HitSet       HitSet `json:"hits"`
    Aggregations Aggregations
}

type HitSet struct {
    Total struct {
        Value int
    }
    Hits []Hit
}

[...]

func main() {
    query := &Query{
        Size:  MaxSize,
        Sort:  []string{"_doc"},
        Query: &QueryFilter{Bool: QFBool{Filter: filter}},
    }

    qbody, err := query.AsBody()

    resp, err := client.Search(
        client.Search.WithIndex(index),
        client.Search.WithBody(qbody),
        client.Search.WithSize(maxSize),
        client.Search.WithScroll(scrollTime),
    )

    result, err := parseResponse(resp)
}

I get result.HitSet.Total.Value of 432734 for an example query (that I can't use aggregations for). I'm then able to use client.Scroll to get all the hits, but it takes over 1 minute, so I'd like to try slices and do each slice concurrently.

So I tried:

total := 0

for i := 0; i < maxSlices; i++ {
    query := &Query{
        Size: maxSize,
        Sort: []string{"_doc"},
        Slice: &QuerySlice{
            ID:  i,
            Max: maxSlices,
        },
        Query: &QueryFilter{Bool: QFBool{Filter: filter}},
    }

    qbody, err := query.AsBody()
    if err != nil {
        return nil, err
    }

    resp, err := client.Search(
        client.Search.WithIndex(index),
        client.Search.WithBody(qbody),
        client.Search.WithSize(maxSize),
        client.Search.WithScroll(scrollTime),
    )
    if err != nil {
        return nil, err
    }

    result, err := parseResponse(resp)
    if err != nil {
        return nil, err
    }

    fmt.Printf("slice total: %d; hits: %d\n", result.HitSet.Total.Value, len(result.HitSet.Hits))

    total += result.HitSet.Total.Value
}

fmt.Printf("overall total: %d\n", total)

When maxSlices = 2, I get expected output:

slice total: 194104; hits: 10000
slice total: 238630; hits: 10000
overall total: 432734

But with maxSlices of 3 I get:

slice total: 125374; hits: 10000
slice total: 80754; hits: 10000
slice total: 125374; hits: 10000
overall total: 331502

And for 6 I get:

slice total: 10117; hits: 10000
slice total: 11253; hits: 10000
slice total: 114486; hits: 10000
slice total: 0; hits: 0
slice total: 0; hits: 0
slice total: 0; hits: 0
overall total: 135856

And despite that my example query should always be returning the same results (and does so day after day with the normal scroll query), what I get back for different maxHits values varies over time. For example yesterday maxScroll of 3 actually worked correctly.

Have I misunderstood sliced scrolls? Am I using it wrong?


Solution

  • By default, in elasticsearch v7.x, any number that you get in hits.total.value about 10,000 is just a lower bound. 11,253 means while elasticsearch was searching your data it encountered at least 11,253 but due to various optimization it could have skipped segments with thousands more hits that it didn't count. To avoid that behavior you need to set track_total_hits flag to true.

    What aggregation issue did you run into?