Search code examples
pythonalgorithmcachingcombinations

How to maximize the cache hit rate of the 2-element combinations?


My question is simple, but I find it difficult to get the point straight, so please allow me to explain step by step.

Suppose I have N items and N corresponding indices. Each item can be loaded using the corresponding index.

def load_item(index: int) -> ItemType:
    # Mostly just reading, but very slow.
    return item

Also I have a function that takes two (loaded) items and calculates a score.

def calc_score(item_a: ItemType, item_b: ItemType) -> ScoreType:
    # Much faster than load function.
    return score

Note that calc_score(a, b) == calc_score(b, a).

What I want to do is calculate the score for all 2-item combinations and find (at least) one combination that gives the maximum score.

This can be implemented as follows:

def dumb_solution(n: int) -> Tuple[int, int]:
    best_score = 0
    best_combination = None
    for index_a, index_b in itertools.combinations(range(n), 2):
        item_a = load_item(index_a)
        item_b = load_item(index_b)
        score = calc_score(item_a, item_b)
        if score > best_score:
            best_score = score
            best_combination = (index_a, index_b)
    return best_combination

However, this solution calls the load_item function 2*C(N,2) = N*(N-1) times, which is the bottleneck for this function.

This can be resolved by using a cache. Unfortunately, however, the items are so large that it is impossible to keep all items in memory. Therefore, we need to use a size-limited cache.

from functools import lru_cache

@lru_cache(maxsize=M)
def load(index: int) -> ItemType:
    # Very slow process.
    return item

Note that M (cache size) is much smaller than N (approx. N // 10 to N // 2).

The problem is that the typical sequence of combinations is not ideal for the LRU cache.

For instance, when N=6, M=3, itertools.combinations generates the following sequence, and the number of calls of the load_item function is 17.

[
    (0, 1),  # 1, 2
    (0, 2),  # -, 3
    (0, 3),  # -, 4
    (0, 4),  # -, 5
    (0, 5),  # -, 6
    (1, 2),  # 7, 8
    (1, 3),  # -, 9
    (1, 4),  # -, 10
    (1, 5),  # -, 11
    (2, 3),  # 12, 13
    (2, 4),  # -, 14
    (2, 5),  # -, 15
    (3, 4),  # 16, 17
    (3, 5),  # -, -
    (4, 5),  # -, -
]

However, if I rearrange the above sequence as follows, the number of calls will be 10.

[
    (0, 1),  # 1, 2
    (0, 2),  # -, 3
    (1, 2),  # -, -
    (0, 3),  # -, 4
    (2, 3),  # -, -
    (0, 4),  # -, 5
    (3, 4),  # -, -
    (0, 5),  # -, 6
    (4, 5),  # -, -
    (1, 4),  # 7, -
    (1, 5),  # -, -
    (1, 3),  # -, 8
    (3, 5),  # -, -
    (2, 5),  # 9, -
    (2, 4),  # -, 10
]

Question:

How can I generate a sequence of 2-item combinations that maximizes the cache hit rate?


What I tried:

The solution I came up with is to prioritize items that are already in the cache.

from collections import OrderedDict


def prioritizes_item_already_in_cache(n, cache_size):
    items = list(itertools.combinations(range(n), 2))
    cache = OrderedDict()
    reordered = []

    def update_cache(x, y):
        cache[x] = cache[y] = None
        cache.move_to_end(x)
        cache.move_to_end(y)
        while len(cache) > cache_size:
            cache.popitem(last=False)

    while items:
        # Find a pair where both are cached.
        for i, (a, b) in enumerate(items):
            if a in cache and b in cache:
                reordered.append((a, b))
                update_cache(a, b)
                del items[i]
                break
        else:
            # Find a pair where one of them is cached.
            for i, (a, b) in enumerate(items):
                if a in cache or b in cache:
                    reordered.append((a, b))
                    update_cache(a, b)
                    del items[i]
                    break
            else:
                # Cannot find item in cache.
                a, b = items.pop(0)
                reordered.append((a, b))
                update_cache(a, b)

    return reordered

For N=100, M=10, this sequence resulted in 1660 calls, which is about 1/3 of the typical sequence. For N=100, M=50 there are only 155 calls. So I think I can say that this is a promising approach.

Unfortunately, this function is too slow and useless for large N. I was not able to finish for N=1000, but the actual data is in the tens of thousands. Also, it does not take into account how to select an item when no cached item is found. Therefore, even if it is fast, it is doubtful that it is theoretically the best solution (so please note my question is not how to make the above function faster).

(Edited) Here is the complete code including everyone's answers and the test and benchmark code.

import functools
import itertools
import math
import time
from collections import Counter, OrderedDict
from itertools import chain, combinations, product
from pathlib import Path
from typing import Callable, Iterable, Tuple

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw

ItemType = int
ScoreType = int


def load_item(index: int) -> ItemType:
    return int(index)


def calc_score(item_a: ItemType, item_b: ItemType) -> ScoreType:
    return abs(item_a - item_b)


class LRUCacheWithCounter:
    def __init__(self, maxsize: int):
        def wrapped_func(key):
            self.load_count += 1
            return load_item(key)

        self.__cache = functools.lru_cache(maxsize=maxsize)(wrapped_func)
        self.load_count = 0

    def __call__(self, key: int) -> int:
        return self.__cache(key)


def basic_loop(iterator: Iterable[Tuple[int, int]], cached_load: Callable[[int], int]):
    best_score = 0
    best_combination = None
    for i, j in iterator:
        a = cached_load(i)
        b = cached_load(j)
        score = calc_score(a, b)
        if score > best_score:
            best_score = score
            best_combination = (i, j)
    return best_score, best_combination


def baseline(n, _):
    return itertools.combinations(range(n), 2)


def prioritizes(n, cache_size):
    items = list(itertools.combinations(range(n), 2))
    cache = OrderedDict()
    reordered = []

    def update_cache(x, y):
        cache[x] = cache[y] = None
        cache.move_to_end(x)
        cache.move_to_end(y)
        while len(cache) > cache_size:
            cache.popitem(last=False)

    while items:
        # Find a pair where both are cached.
        for i, (a, b) in enumerate(items):
            if a in cache and b in cache:
                reordered.append((a, b))
                update_cache(a, b)
                del items[i]
                break
        else:
            # Find a pair where one of them is cached.
            for i, (a, b) in enumerate(items):
                if a in cache or b in cache:
                    reordered.append((a, b))
                    update_cache(a, b)
                    del items[i]
                    break
            else:
                # Cannot find item in cache.
                a, b = items.pop(0)
                reordered.append((a, b))
                update_cache(a, b)

    return reordered


def Matt_solution(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
    dest = []

    def findPairs(lo1: int, n1: int, lo2: int, n2: int):
        if n1 < 1 or n2 < 1:
            return
        if n1 == 1:
            for i in range(max(lo1 + 1, lo2), lo2 + n2):
                dest.append((lo1, i))
        elif n2 == 1:
            for i in range(lo1, min(lo1 + n1, lo2)):
                dest.append((i, lo2))
        elif n1 >= n2:
            half = n1 // 2
            findPairs(lo1, half, lo2, n2)
            findPairs(lo1 + half, n1 - half, lo2, n2)
        else:
            half = n2 // 2
            findPairs(lo1, n1, lo2, half)
            findPairs(lo1, n1, lo2 + half, n2 - half)

    findPairs(0, n, 0, n)
    return dest


def Kelly_solution(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
    k = cache_size // 2
    r = range(n)
    return chain.from_iterable(combinations(r[i : i + k], 2) if i == j else product(r[i : i + k], r[j : j + k]) for i in r[::k] for j in r[i::k])


def Kelly_solution2(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
    k = cache_size - 2
    r = range(n)
    return chain.from_iterable(combinations(r[i : i + k], 2) if i == j else product(r[i : i + k], r[j : j + k]) for i in r[::k] for j in r[i::k])


def diagonal_block(lower, upper):
    for i in range(lower, upper + 1):
        for j in range(i + 1, upper + 1):
            yield i, j


def strip(i_lower, i_upper, j_lower, j_upper):
    for i in range(i_lower, i_upper + 1):
        for j in range(j_lower, j_upper + 1):
            yield i, j


def btilly_solution(n: int, cache_size: int):
    i_lower = 0
    i_upper = n - 1
    k = cache_size - 2
    is_asc = True
    while i_lower <= i_upper:
        # Handle a k*k block first. At the end that is likely loaded.
        if is_asc:
            upper = min(i_lower + k - 1, i_upper)
            yield from diagonal_block(i_lower, upper)
            j_lower = i_lower
            j_upper = upper
            i_lower = upper + 1
        else:
            lower = max(i_lower, i_upper - k + 1)
            yield from diagonal_block(lower, i_upper)
            j_lower = lower
            j_upper = i_upper
            i_upper = lower - 1
        yield from strip(i_lower, i_upper, j_lower, j_upper)
        is_asc = not is_asc


def btilly_solution2(n: int, cache_size: int):
    k = cache_size - 2
    for top in range(0, n, k):
        bottom = top + k
        # Diagonal part.
        for y in range(top, min(bottom, n)):  # Y-axis Top to Bottom
            for x in range(y + 1, min(bottom, n)):  # X-axis Left to Right
                yield y, x
        # Strip part.
        # Stripping right to left works well when cache_size is very small, but makes little difference when it is not.
        for x in range(n - 1, bottom - 1, -1):  # X-axis Right to Left
            for y in range(top, min(bottom, n)):  # Y-axis Top to Bottom
                yield y, x


def btilly_solution3(n: int, cache_size: int):
    k = cache_size - 2
    r = range(n)
    for i in r[::k]:
        yield from combinations(r[i : i + k], 2)
        yield from product(r[i + k :], r[i : i + k])


def btilly_solution4(n: int, cache_size: int):
    def parts():
        k = cache_size - 2
        r = range(n)
        for i in r[::k]:
            yield combinations(r[i : i + k], 2)
            yield product(r[i + k :], r[i : i + k])

    return chain.from_iterable(parts())


def plot(df, series, ignore, y, label, title):
    df = df[df["name"].isin(series)]
    # plt.figure(figsize=(10, 10))
    for name, group in df.groupby("name"):
        plt.plot(group["n"], group[y], label=name)

    y_max = df[~df["name"].isin(ignore)][y].max()
    plt.ylim(0, y_max * 1.1)

    plt.xlabel("n")
    plt.ylabel(label)
    plt.title(title)
    plt.legend(loc="upper left")
    plt.tight_layout()
    plt.grid()
    plt.show()


def run(func, n, cache_ratio, output_dir: Path):
    cache_size = int(n * cache_ratio / 100)
    output_path = output_dir / f"{n}_{cache_ratio}_{func.__name__}.csv"
    if output_path.exists():
        return

    started = time.perf_counter()
    for a, b in func(n, cache_size):
        pass
    elapsed_iterate = time.perf_counter() - started

    # test_combinations(func(n, cache_size), n)

    started = time.perf_counter()
    cache = LRUCacheWithCounter(cache_size)
    basic_loop(iterator=func(n, cache_size), cached_load=cache)
    elapsed_cache = time.perf_counter() - started

    output_path.write_text(f"{func.__name__},{n},{cache_ratio},{cache_size},{cache.load_count},{elapsed_iterate},{elapsed_cache}")


def add_lower_bound(df):
    def calc_lower_bound(ni, mi):
        n = ni
        m = n * mi // 100
        return m + math.ceil((math.comb(n, 2) - math.comb(m, 2)) / (m - 1))

    return pd.concat(
        [
            df,
            pd.DataFrame(
                [
                    {"name": "lower_bound", "n": ni, "m": mi, "count": calc_lower_bound(ni, mi)}
                    for ni, mi in itertools.product(df["n"].unique(), df["m"].unique())
                ]
            ),
        ]
    )


def benchmark(output_dir: Path):
    log_dir = output_dir / "log"
    log_dir.mkdir(parents=True, exist_ok=True)

    candidates = [
        baseline,
        prioritizes,
        Matt_solution,
        Kelly_solution,
        Kelly_solution2,
        btilly_solution,
        btilly_solution2,
        btilly_solution3,
        btilly_solution4,
    ]

    nc = np.linspace(100, 500, num=9).astype(int)
    # nc = np.linspace(500, 10000, num=9).astype(int)[1:]
    # nc = np.linspace(10000, 100000, num=9).astype(int).tolist()[1:]
    print(nc)

    mc = np.linspace(10, 50, num=2).astype(int)
    print(mc)

    joblib.Parallel(n_jobs=1, verbose=5, batch_size=1)([joblib.delayed(run)(func, ni, mi, log_dir) for ni in nc for mi in mc for func in candidates])


def plot_graphs(output_dir: Path):
    log_dir = output_dir / "log"

    results = []
    for path in log_dir.glob("*.csv"):
        results.append(path.read_text().strip())
    (output_dir / "stat.csv").write_text("\n".join(results))

    df = pd.read_csv(output_dir / "stat.csv", header=None, names=["name", "n", "m", "size", "count", "time", "time_full"])
    df = add_lower_bound(df)
    df = df.sort_values(["name", "n", "m"])

    for m in [10, 50]:
        plot(
            df[df["m"] == m],
            series=[
                baseline.__name__,
                prioritizes.__name__,
                Matt_solution.__name__,
                Kelly_solution.__name__,
                Kelly_solution2.__name__,
                btilly_solution.__name__,
                "lower_bound",
            ],
            ignore=[
                baseline.__name__,
                prioritizes.__name__,
            ],
            y="count",
            label="load count",
            title=f"cache_size = {m}% of N",
        )

    plot(
        df[df["m"] == 10],
        series=[
            baseline.__name__,
            prioritizes.__name__,
            Matt_solution.__name__,
            Kelly_solution.__name__,
            Kelly_solution2.__name__,
            btilly_solution.__name__,
            btilly_solution2.__name__,
            btilly_solution3.__name__,
            btilly_solution4.__name__,
        ],
        ignore=[
            prioritizes.__name__,
            Matt_solution.__name__,
        ],
        y="time",
        label="time (sec)",
        title=f"cache_size = {10}% of N",
    )


class LRUCacheForTest:
    def __init__(self, maxsize: int):
        self.cache = OrderedDict()
        self.maxsize = maxsize
        self.load_count = 0

    def __call__(self, key: int) -> int:
        if key in self.cache:
            value = self.cache[key]
            self.cache.move_to_end(key)
        else:
            if len(self.cache) == self.maxsize:
                self.cache.popitem(last=False)
            value = load_item(key)
            self.cache[key] = value
            self.load_count += 1
        return value

    def hit(self, i, j):
        count = int(i in self.cache)
        self(i)
        count += int(j in self.cache)
        self(j)
        return count


def visualize():
    # Taken from https://stackoverflow.com/a/77024514/18125313 and modified.
    n, m = 100, 30
    func = btilly_solution2

    pairs = func(n, m)
    cache = LRUCacheForTest(m)

    # Create the images, save as animated png.
    images = []
    s = 5
    img = Image.new("RGB", (s * n, s * n), (255, 255, 255))
    draw = ImageDraw.Draw(img)

    colors = [(255, 0, 0), (255, 255, 0), (0, 255, 0)]
    for step, (i, j) in enumerate(pairs):
        draw.rectangle((s * j, s * i, s * j + s - 2, s * i + s - 2), colors[cache.hit(i, j)])
        if not step % 17:
            images.append(img.copy())

    images += [img] * 40

    images[0].save(f"{func.__name__}_{m}.gif", save_all=True, append_images=images[1:], optimize=False, duration=30, loop=0)


def test_combinations(iterator: Iterable[Tuple[int, int]], n: int):
    # Note that this function is not suitable for large N.
    expected = set(frozenset(pair) for pair in itertools.combinations(range(n), 2))
    items = list(iterator)
    actual = set(frozenset(pair) for pair in items)
    assert len(actual) == len(items), f"{[item for item, count in Counter(items).items() if count > 1]}"
    assert actual == expected, f"dup={actual - expected}, missing={expected - actual}"


def test():
    n = 100  # N
    cache_size = 30  # M

    def run(func):
        func(n, cache_size)

        # Measure generation performance.
        started = time.perf_counter()
        for a, b in func(n, cache_size):
            pass
        elapsed = time.perf_counter() - started

        # Test generated combinations.
        test_combinations(func(n, cache_size), n)

        # Measure cache hit (load count) performance.
        cache = LRUCacheWithCounter(cache_size)
        _ = basic_loop(iterator=func(n, cache_size), cached_load=cache)
        print(f"{func.__name__}: {cache.load_count=}, {elapsed=}")

    candidates = [
        baseline,
        prioritizes,
        Matt_solution,
        Kelly_solution,
        Kelly_solution2,
        btilly_solution,
        btilly_solution2,
        btilly_solution3,
        btilly_solution4,
    ]
    for f in candidates:
        run(f)


def main():
    test()
    visualize()

    output_dir = Path("./temp2")
    benchmark(output_dir)
    plot_graphs(output_dir)


if __name__ == "__main__":
    main()

I have no problem with you not using the above test code or changing the behavior of basic_loop or LRUCacheWithCounter.

Additional Note:

  • The score calculation cannot be pruned using neighbor scores.
  • The score calculation cannot be pruned using only a portion of the item.
  • It is impossible to guess where the best combination will be.
  • Using faster media is one option, but I'm already at my limit, so I'm looking for a software solution.

Thank you for reading this long post to the end.


Edit:

Thanks to btilly's answer and help with Kelly's visualization, I have come to the conclusion that btilly's solution is the best and (possibly) optimal one.

Here is a theoretical explanation (although I am not very good at math, so it could be wrong).


Let N represent the number of indexes, M the cache size, and C the number of combinations (same as math.comb).

Consider a situation where the cache is full and no further combinations can be generated without loading. If we add a new index at this point, the only combinations that can be generated are combinations of the newly added index and the remaining indexes in the cache. This pattern holds for each subsequent iteration. Hence, while the cache is full, the maximum number of combinations can be generated per load is M - 1.

This logic holds if the cache isn't full as well. If M' indexes are currently in the cache, then the next index can generate at most M' combinations. The subsequent index can generate at most M' + 1 combinations, and so forth. In total, at most C(M,2) combinations can be generated before the cache is full.

Thus, to generate C(N,2) combinations, at least M loads are required to fill the cache, at least (C(N,2) - C(M,2)) / (M - 1) loads are required after the cache is filled.

From above, the load counts complexity of this problem is Ω(N^2 / M).


I have plotted this formula as a lower_bound in the graphs below. Note that it is only a lower bound and no guarantee that it can actually be achieved.

As an aside, Kelly's solution needs to configure k to maximize its performance. For M = 50% of N, it's about M * 2/3. For M = 30% of N, it's about M * 5/6. Although I couldn't figure out how to calculate it. As a general configuration, I use k = M - 2 (which is not best, but relatively good) in the Kelly_solution2 in the graphs below.

For M = 10% of N:

n_to_load_count_graph_10

For M = 50% of N:

n_to_load_count_graph_50

Note that, in these graphs, it looks like O(N), but this is because I determined M based on N. When M does not change, it is O(N^2) as described above.

Here is an animation visualizing the cache hit rate of btilly_solution2, composed by a modified version of Kelly's code. Each pixel represents a combination, with red representing combinations where both indexes are loaded, yellow where one index is loaded, and green where neither index is loaded.

visualization_of_btilly_solution2

In addition, since I'm looking for the optimal sequence, execution time doesn't matter much. But just in case anyone is curious, here is a comparison of execution times (iteration only).

n_to_time_graph

btilly_solution4 (btilly's solution modified by Kelly) is almost as fast as itertools.combinations, which should be optimal in this case. Note, however, that even without the modification, it took only 112 nanoseconds per combination.

That's it. Thanks to everyone involved.


Solution

  • Here is a simple approach that depends on the cache and gets 230 on your benchmark.

    def diagonal_block (lower, upper):
        for i in range(lower, upper + 1):
            for j in range(i, upper + 1):
                yield (i, j)
    
    def strip (i_lower, i_upper, j_lower, j_upper):
        for i in range(i_lower, i_upper+1):
            for j in range (j_lower, j_upper + 1):
                yield (i, j)
    
    # def your_solution_here(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
    def your_solution_here(n: int, cache_size: int):
        i_lower = 0
        i_upper = n-1
        k = cache_size - 2
        is_asc = True
        while i_lower <= i_upper:
            # Handle a k*k block first. At the end that is likely loaded.
            if is_asc:
                upper = min(i_lower + k - 1, i_upper)
                yield from diagonal_block(i_lower, upper)
                j_lower = i_lower
                j_upper = upper
                i_lower = upper + 1
            else:
                lower = max(i_lower, i_upper - k + 1)
                yield from diagonal_block(lower, i_upper)
                j_lower = lower
                j_upper = i_upper
                i_upper = lower - 1
            yield from strip(i_lower, i_upper, j_lower, j_upper)
            is_asc = not is_asc
    

    A comment about how I thought this one up.

    We want to compare a group of objects with every other uncompared object. The group should be everything that fits in the cache except one.

    So we start with the first k objects, compare them with each other, then just proceed along in a strip to the end.

    And now we need our second group. Well, we already have the last object, and we don't need the rest. So we take k objects from the end, make that a group. Compare the group with itself, then proceed along a strip to the first object outside of our original group.

    Now reverse direction, and so on.

    At all points, i_lower represents the first object still needing comparing, and i_upper represents the last. If we're going forward, we take k objects starting at i_lower. If we're going backwards we take k objects starting at i_upper and go backwards.

    When I was implementing it, there were two complications. The first is that we have to worry about the edge condition when we meet in the middle. The second is that we might have to do the strip in 2 directions.

    I chose to only do the strip ascending. This is actually a bug. On most of the ascending loads, I did not get the first element in my cache. Oops. But it is still pretty good.