Search code examples
javamultithreadingconcurrencyatomic

non-locking threading code using atomic types when implementing a sliding window class for time


I am trying to understand this code from yammer metrics. The confusion starts with the trim method and the call to trim in both update and getSnapShot. Could someone explain the logic here say for a 15 min sliding window? Why would you want to clear the map before passing it into SnapShot (this is where the stats of the window are calculated).

package com.codahale.metrics;

import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


public class SlidingTimeWindowReservoir implements Reservoir {
    // allow for this many duplicate ticks before overwriting measurements
    private static final int COLLISION_BUFFER = 256;
    // only trim on updating once every N
    private static final int TRIM_THRESHOLD = 256;

    private final Clock clock;
    private final ConcurrentSkipListMap<Long, Long> measurements;
    private final long window;
    private final AtomicLong lastTick;
    private final AtomicLong count;


public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit) {
    this(window, windowUnit, Clock.defaultClock());
}

public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit, Clock clock) {
    this.clock = clock;
    this.measurements = new ConcurrentSkipListMap<Long, Long>();
    this.window = windowUnit.toNanos(window) * COLLISION_BUFFER;
    this.lastTick = new AtomicLong();
    this.count = new AtomicLong();
}

@Override
public int size() {
    trim();
    return measurements.size();
}

@Override
public void update(long value) {
    if (count.incrementAndGet() % TRIM_THRESHOLD == 0) {
        trim();
    }
    measurements.put(getTick(), value);
}

@Override
public Snapshot getSnapshot() {
    trim();
    return new Snapshot(measurements.values());
}

private long getTick() {
    for (; ; ) {
        final long oldTick = lastTick.get();
        final long tick = clock.getTick() * COLLISION_BUFFER;
        // ensure the tick is strictly incrementing even if there are duplicate ticks
        final long newTick = tick > oldTick ? tick : oldTick + 1;
        if (lastTick.compareAndSet(oldTick, newTick)) {
            return newTick;
        }
    }
}

private void trim() {
    measurements.headMap(getTick() - window).clear();
}
}

Solution

  • Two bits of information from the documentation

    ConcurrentSkipListMap is sorted according to the natural ordering of its keys

    that's the datastructure to hold all measurements. Key here is a long which is basically the current time. -> measurements indexed by time are sorted by time.

    .headMap(K toKey) returns a view of the portion of this map whose keys are strictly less than toKey.

    The magic code in getTick makes sure that one time value is never used twice (simply takes oldTick + 1 if that would happen). COLLISION_BUFFER is a bit tricky to understand but it's basically ensuring that even through Clock#getTick() returns the same value you get new values that don't collide with the next tick from clock.

    E.g. Clock.getTick() returns 0 -> modified to 0 * 256 = 0

    Clock.getTick() returns 1 -> modified to 1 * 256 = 256

    -> 256 values room in between.

    Now trim() does

    measurements.headMap(getTick() - window).clear();
    

    This calculates the "current time", subtracts the time window and uses that time to get the portion of the map that is older than "window ticks ago". Clearing that portion will also clear it in the original map. It's not clearing the whole map, just that part.

    -> trim removes values that are too old.

    Each time you update you need to remove old values or the map gets too large. When creating the Snapshot the same things happens so those old values are not included.

    The endless for loop in getTick is another trick to use the atomic compare and set method to ensure that - once you are ready to update the value - nothing has changed the value in between. If that happens, the whole loop starts over & refreshes it's starting value. The basic schema is

    for (; ; ) {
        long expectedOldValue = atomic.get();
        // other threads can change the value of atomic here..
    
        long modified = modify(expectedOldValue);
    
        // we can only set the new value if the old one is still the same
        if (atomic.compareAndSet(expectedOldValue, modified)) {
            return modified;
        }
    }