My question is simple, but I find it difficult to get the point straight, so please allow me to explain step by step.
Suppose I have N
items and N
corresponding indices.
Each item can be loaded using the corresponding index.
def load_item(index: int) -> ItemType:
# Mostly just reading, but very slow.
return item
Also I have a function that takes two (loaded) items and calculates a score.
def calc_score(item_a: ItemType, item_b: ItemType) -> ScoreType:
# Much faster than load function.
return score
Note that calc_score(a, b) == calc_score(b, a)
.
What I want to do is calculate the score for all 2-item combinations and find (at least) one combination that gives the maximum score.
This can be implemented as follows:
def dumb_solution(n: int) -> Tuple[int, int]:
best_score = 0
best_combination = None
for index_a, index_b in itertools.combinations(range(n), 2):
item_a = load_item(index_a)
item_b = load_item(index_b)
score = calc_score(item_a, item_b)
if score > best_score:
best_score = score
best_combination = (index_a, index_b)
return best_combination
However, this solution calls the load_item
function 2*C(N,2) = N*(N-1)
times, which is the bottleneck for this function.
This can be resolved by using a cache. Unfortunately, however, the items are so large that it is impossible to keep all items in memory. Therefore, we need to use a size-limited cache.
from functools import lru_cache
@lru_cache(maxsize=M)
def load(index: int) -> ItemType:
# Very slow process.
return item
Note that M
(cache size) is much smaller than N
(approx. N // 10
to N // 2
).
The problem is that the typical sequence of combinations is not ideal for the LRU cache.
For instance, when N=6, M=3
, itertools.combinations
generates the following sequence, and the number of calls of the load_item
function is 17.
[
(0, 1), # 1, 2
(0, 2), # -, 3
(0, 3), # -, 4
(0, 4), # -, 5
(0, 5), # -, 6
(1, 2), # 7, 8
(1, 3), # -, 9
(1, 4), # -, 10
(1, 5), # -, 11
(2, 3), # 12, 13
(2, 4), # -, 14
(2, 5), # -, 15
(3, 4), # 16, 17
(3, 5), # -, -
(4, 5), # -, -
]
However, if I rearrange the above sequence as follows, the number of calls will be 10.
[
(0, 1), # 1, 2
(0, 2), # -, 3
(1, 2), # -, -
(0, 3), # -, 4
(2, 3), # -, -
(0, 4), # -, 5
(3, 4), # -, -
(0, 5), # -, 6
(4, 5), # -, -
(1, 4), # 7, -
(1, 5), # -, -
(1, 3), # -, 8
(3, 5), # -, -
(2, 5), # 9, -
(2, 4), # -, 10
]
How can I generate a sequence of 2-item combinations that maximizes the cache hit rate?
The solution I came up with is to prioritize items that are already in the cache.
from collections import OrderedDict
def prioritizes_item_already_in_cache(n, cache_size):
items = list(itertools.combinations(range(n), 2))
cache = OrderedDict()
reordered = []
def update_cache(x, y):
cache[x] = cache[y] = None
cache.move_to_end(x)
cache.move_to_end(y)
while len(cache) > cache_size:
cache.popitem(last=False)
while items:
# Find a pair where both are cached.
for i, (a, b) in enumerate(items):
if a in cache and b in cache:
reordered.append((a, b))
update_cache(a, b)
del items[i]
break
else:
# Find a pair where one of them is cached.
for i, (a, b) in enumerate(items):
if a in cache or b in cache:
reordered.append((a, b))
update_cache(a, b)
del items[i]
break
else:
# Cannot find item in cache.
a, b = items.pop(0)
reordered.append((a, b))
update_cache(a, b)
return reordered
For N=100, M=10
, this sequence resulted in 1660 calls, which is about 1/3 of the typical sequence. For N=100, M=50
there are only 155 calls. So I think I can say that this is a promising approach.
Unfortunately, this function is too slow and useless for large N
.
I was not able to finish for N=1000
, but the actual data is in the tens of thousands.
Also, it does not take into account how to select an item when no cached item is found.
Therefore, even if it is fast, it is doubtful that it is theoretically the best solution (so please note my question is not how to make the above function faster).
(Edited) Here is the complete code including everyone's answers and the test and benchmark code.
import functools
import itertools
import math
import time
from collections import Counter, OrderedDict
from itertools import chain, combinations, product
from pathlib import Path
from typing import Callable, Iterable, Tuple
import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw
ItemType = int
ScoreType = int
def load_item(index: int) -> ItemType:
return int(index)
def calc_score(item_a: ItemType, item_b: ItemType) -> ScoreType:
return abs(item_a - item_b)
class LRUCacheWithCounter:
def __init__(self, maxsize: int):
def wrapped_func(key):
self.load_count += 1
return load_item(key)
self.__cache = functools.lru_cache(maxsize=maxsize)(wrapped_func)
self.load_count = 0
def __call__(self, key: int) -> int:
return self.__cache(key)
def basic_loop(iterator: Iterable[Tuple[int, int]], cached_load: Callable[[int], int]):
best_score = 0
best_combination = None
for i, j in iterator:
a = cached_load(i)
b = cached_load(j)
score = calc_score(a, b)
if score > best_score:
best_score = score
best_combination = (i, j)
return best_score, best_combination
def baseline(n, _):
return itertools.combinations(range(n), 2)
def prioritizes(n, cache_size):
items = list(itertools.combinations(range(n), 2))
cache = OrderedDict()
reordered = []
def update_cache(x, y):
cache[x] = cache[y] = None
cache.move_to_end(x)
cache.move_to_end(y)
while len(cache) > cache_size:
cache.popitem(last=False)
while items:
# Find a pair where both are cached.
for i, (a, b) in enumerate(items):
if a in cache and b in cache:
reordered.append((a, b))
update_cache(a, b)
del items[i]
break
else:
# Find a pair where one of them is cached.
for i, (a, b) in enumerate(items):
if a in cache or b in cache:
reordered.append((a, b))
update_cache(a, b)
del items[i]
break
else:
# Cannot find item in cache.
a, b = items.pop(0)
reordered.append((a, b))
update_cache(a, b)
return reordered
def Matt_solution(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
dest = []
def findPairs(lo1: int, n1: int, lo2: int, n2: int):
if n1 < 1 or n2 < 1:
return
if n1 == 1:
for i in range(max(lo1 + 1, lo2), lo2 + n2):
dest.append((lo1, i))
elif n2 == 1:
for i in range(lo1, min(lo1 + n1, lo2)):
dest.append((i, lo2))
elif n1 >= n2:
half = n1 // 2
findPairs(lo1, half, lo2, n2)
findPairs(lo1 + half, n1 - half, lo2, n2)
else:
half = n2 // 2
findPairs(lo1, n1, lo2, half)
findPairs(lo1, n1, lo2 + half, n2 - half)
findPairs(0, n, 0, n)
return dest
def Kelly_solution(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
k = cache_size // 2
r = range(n)
return chain.from_iterable(combinations(r[i : i + k], 2) if i == j else product(r[i : i + k], r[j : j + k]) for i in r[::k] for j in r[i::k])
def Kelly_solution2(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
k = cache_size - 2
r = range(n)
return chain.from_iterable(combinations(r[i : i + k], 2) if i == j else product(r[i : i + k], r[j : j + k]) for i in r[::k] for j in r[i::k])
def diagonal_block(lower, upper):
for i in range(lower, upper + 1):
for j in range(i + 1, upper + 1):
yield i, j
def strip(i_lower, i_upper, j_lower, j_upper):
for i in range(i_lower, i_upper + 1):
for j in range(j_lower, j_upper + 1):
yield i, j
def btilly_solution(n: int, cache_size: int):
i_lower = 0
i_upper = n - 1
k = cache_size - 2
is_asc = True
while i_lower <= i_upper:
# Handle a k*k block first. At the end that is likely loaded.
if is_asc:
upper = min(i_lower + k - 1, i_upper)
yield from diagonal_block(i_lower, upper)
j_lower = i_lower
j_upper = upper
i_lower = upper + 1
else:
lower = max(i_lower, i_upper - k + 1)
yield from diagonal_block(lower, i_upper)
j_lower = lower
j_upper = i_upper
i_upper = lower - 1
yield from strip(i_lower, i_upper, j_lower, j_upper)
is_asc = not is_asc
def btilly_solution2(n: int, cache_size: int):
k = cache_size - 2
for top in range(0, n, k):
bottom = top + k
# Diagonal part.
for y in range(top, min(bottom, n)): # Y-axis Top to Bottom
for x in range(y + 1, min(bottom, n)): # X-axis Left to Right
yield y, x
# Strip part.
# Stripping right to left works well when cache_size is very small, but makes little difference when it is not.
for x in range(n - 1, bottom - 1, -1): # X-axis Right to Left
for y in range(top, min(bottom, n)): # Y-axis Top to Bottom
yield y, x
def btilly_solution3(n: int, cache_size: int):
k = cache_size - 2
r = range(n)
for i in r[::k]:
yield from combinations(r[i : i + k], 2)
yield from product(r[i + k :], r[i : i + k])
def btilly_solution4(n: int, cache_size: int):
def parts():
k = cache_size - 2
r = range(n)
for i in r[::k]:
yield combinations(r[i : i + k], 2)
yield product(r[i + k :], r[i : i + k])
return chain.from_iterable(parts())
def plot(df, series, ignore, y, label, title):
df = df[df["name"].isin(series)]
# plt.figure(figsize=(10, 10))
for name, group in df.groupby("name"):
plt.plot(group["n"], group[y], label=name)
y_max = df[~df["name"].isin(ignore)][y].max()
plt.ylim(0, y_max * 1.1)
plt.xlabel("n")
plt.ylabel(label)
plt.title(title)
plt.legend(loc="upper left")
plt.tight_layout()
plt.grid()
plt.show()
def run(func, n, cache_ratio, output_dir: Path):
cache_size = int(n * cache_ratio / 100)
output_path = output_dir / f"{n}_{cache_ratio}_{func.__name__}.csv"
if output_path.exists():
return
started = time.perf_counter()
for a, b in func(n, cache_size):
pass
elapsed_iterate = time.perf_counter() - started
# test_combinations(func(n, cache_size), n)
started = time.perf_counter()
cache = LRUCacheWithCounter(cache_size)
basic_loop(iterator=func(n, cache_size), cached_load=cache)
elapsed_cache = time.perf_counter() - started
output_path.write_text(f"{func.__name__},{n},{cache_ratio},{cache_size},{cache.load_count},{elapsed_iterate},{elapsed_cache}")
def add_lower_bound(df):
def calc_lower_bound(ni, mi):
n = ni
m = n * mi // 100
return m + math.ceil((math.comb(n, 2) - math.comb(m, 2)) / (m - 1))
return pd.concat(
[
df,
pd.DataFrame(
[
{"name": "lower_bound", "n": ni, "m": mi, "count": calc_lower_bound(ni, mi)}
for ni, mi in itertools.product(df["n"].unique(), df["m"].unique())
]
),
]
)
def benchmark(output_dir: Path):
log_dir = output_dir / "log"
log_dir.mkdir(parents=True, exist_ok=True)
candidates = [
baseline,
prioritizes,
Matt_solution,
Kelly_solution,
Kelly_solution2,
btilly_solution,
btilly_solution2,
btilly_solution3,
btilly_solution4,
]
nc = np.linspace(100, 500, num=9).astype(int)
# nc = np.linspace(500, 10000, num=9).astype(int)[1:]
# nc = np.linspace(10000, 100000, num=9).astype(int).tolist()[1:]
print(nc)
mc = np.linspace(10, 50, num=2).astype(int)
print(mc)
joblib.Parallel(n_jobs=1, verbose=5, batch_size=1)([joblib.delayed(run)(func, ni, mi, log_dir) for ni in nc for mi in mc for func in candidates])
def plot_graphs(output_dir: Path):
log_dir = output_dir / "log"
results = []
for path in log_dir.glob("*.csv"):
results.append(path.read_text().strip())
(output_dir / "stat.csv").write_text("\n".join(results))
df = pd.read_csv(output_dir / "stat.csv", header=None, names=["name", "n", "m", "size", "count", "time", "time_full"])
df = add_lower_bound(df)
df = df.sort_values(["name", "n", "m"])
for m in [10, 50]:
plot(
df[df["m"] == m],
series=[
baseline.__name__,
prioritizes.__name__,
Matt_solution.__name__,
Kelly_solution.__name__,
Kelly_solution2.__name__,
btilly_solution.__name__,
"lower_bound",
],
ignore=[
baseline.__name__,
prioritizes.__name__,
],
y="count",
label="load count",
title=f"cache_size = {m}% of N",
)
plot(
df[df["m"] == 10],
series=[
baseline.__name__,
prioritizes.__name__,
Matt_solution.__name__,
Kelly_solution.__name__,
Kelly_solution2.__name__,
btilly_solution.__name__,
btilly_solution2.__name__,
btilly_solution3.__name__,
btilly_solution4.__name__,
],
ignore=[
prioritizes.__name__,
Matt_solution.__name__,
],
y="time",
label="time (sec)",
title=f"cache_size = {10}% of N",
)
class LRUCacheForTest:
def __init__(self, maxsize: int):
self.cache = OrderedDict()
self.maxsize = maxsize
self.load_count = 0
def __call__(self, key: int) -> int:
if key in self.cache:
value = self.cache[key]
self.cache.move_to_end(key)
else:
if len(self.cache) == self.maxsize:
self.cache.popitem(last=False)
value = load_item(key)
self.cache[key] = value
self.load_count += 1
return value
def hit(self, i, j):
count = int(i in self.cache)
self(i)
count += int(j in self.cache)
self(j)
return count
def visualize():
# Taken from https://stackoverflow.com/a/77024514/18125313 and modified.
n, m = 100, 30
func = btilly_solution2
pairs = func(n, m)
cache = LRUCacheForTest(m)
# Create the images, save as animated png.
images = []
s = 5
img = Image.new("RGB", (s * n, s * n), (255, 255, 255))
draw = ImageDraw.Draw(img)
colors = [(255, 0, 0), (255, 255, 0), (0, 255, 0)]
for step, (i, j) in enumerate(pairs):
draw.rectangle((s * j, s * i, s * j + s - 2, s * i + s - 2), colors[cache.hit(i, j)])
if not step % 17:
images.append(img.copy())
images += [img] * 40
images[0].save(f"{func.__name__}_{m}.gif", save_all=True, append_images=images[1:], optimize=False, duration=30, loop=0)
def test_combinations(iterator: Iterable[Tuple[int, int]], n: int):
# Note that this function is not suitable for large N.
expected = set(frozenset(pair) for pair in itertools.combinations(range(n), 2))
items = list(iterator)
actual = set(frozenset(pair) for pair in items)
assert len(actual) == len(items), f"{[item for item, count in Counter(items).items() if count > 1]}"
assert actual == expected, f"dup={actual - expected}, missing={expected - actual}"
def test():
n = 100 # N
cache_size = 30 # M
def run(func):
func(n, cache_size)
# Measure generation performance.
started = time.perf_counter()
for a, b in func(n, cache_size):
pass
elapsed = time.perf_counter() - started
# Test generated combinations.
test_combinations(func(n, cache_size), n)
# Measure cache hit (load count) performance.
cache = LRUCacheWithCounter(cache_size)
_ = basic_loop(iterator=func(n, cache_size), cached_load=cache)
print(f"{func.__name__}: {cache.load_count=}, {elapsed=}")
candidates = [
baseline,
prioritizes,
Matt_solution,
Kelly_solution,
Kelly_solution2,
btilly_solution,
btilly_solution2,
btilly_solution3,
btilly_solution4,
]
for f in candidates:
run(f)
def main():
test()
visualize()
output_dir = Path("./temp2")
benchmark(output_dir)
plot_graphs(output_dir)
if __name__ == "__main__":
main()
I have no problem with you not using the above test code or changing the behavior of basic_loop
or LRUCacheWithCounter
.
Additional Note:
Thank you for reading this long post to the end.
Thanks to btilly's answer and help with Kelly's visualization, I have come to the conclusion that btilly's solution is the best and (possibly) optimal one.
Here is a theoretical explanation (although I am not very good at math, so it could be wrong).
Let N
represent the number of indexes, M
the cache size, and C
the number of combinations (same as math.comb
).
Consider a situation where the cache is full and no further combinations can be generated without loading.
If we add a new index at this point, the only combinations that can be generated are combinations of the newly added index and the remaining indexes in the cache.
This pattern holds for each subsequent iteration.
Hence, while the cache is full, the maximum number of combinations can be generated per load is M - 1
.
This logic holds if the cache isn't full as well.
If M'
indexes are currently in the cache, then the next index can generate at most M'
combinations.
The subsequent index can generate at most M' + 1
combinations, and so forth.
In total, at most C(M,2)
combinations can be generated before the cache is full.
Thus, to generate C(N,2)
combinations, at least M
loads are required to fill the cache, at least (C(N,2) - C(M,2)) / (M - 1)
loads are required after the cache is filled.
From above, the load counts complexity of this problem is Ω(N^2 / M)
.
I have plotted this formula as a lower_bound
in the graphs below.
Note that it is only a lower bound and no guarantee that it can actually be achieved.
As an aside, Kelly's solution needs to configure k
to maximize its performance.
For M = 50% of N
, it's about M * 2/3
.
For M = 30% of N
, it's about M * 5/6
.
Although I couldn't figure out how to calculate it.
As a general configuration, I use k = M - 2
(which is not best, but relatively good) in the Kelly_solution2
in the graphs below.
For M = 10% of N
:
For M = 50% of N
:
Note that, in these graphs, it looks like O(N)
, but this is because I determined M
based on N
. When M
does not change, it is O(N^2)
as described above.
Here is an animation visualizing the cache hit rate of btilly_solution2
, composed by a modified version of Kelly's code.
Each pixel represents a combination, with red representing combinations where both indexes are loaded, yellow where one index is loaded, and green where neither index is loaded.
In addition, since I'm looking for the optimal sequence, execution time doesn't matter much. But just in case anyone is curious, here is a comparison of execution times (iteration only).
btilly_solution4
(btilly's solution modified by Kelly) is almost as fast as itertools.combinations
, which should be optimal in this case.
Note, however, that even without the modification, it took only 112 nanoseconds per combination.
That's it. Thanks to everyone involved.
Here is a simple approach that depends on the cache and gets 230 on your benchmark.
def diagonal_block (lower, upper):
for i in range(lower, upper + 1):
for j in range(i, upper + 1):
yield (i, j)
def strip (i_lower, i_upper, j_lower, j_upper):
for i in range(i_lower, i_upper+1):
for j in range (j_lower, j_upper + 1):
yield (i, j)
# def your_solution_here(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
def your_solution_here(n: int, cache_size: int):
i_lower = 0
i_upper = n-1
k = cache_size - 2
is_asc = True
while i_lower <= i_upper:
# Handle a k*k block first. At the end that is likely loaded.
if is_asc:
upper = min(i_lower + k - 1, i_upper)
yield from diagonal_block(i_lower, upper)
j_lower = i_lower
j_upper = upper
i_lower = upper + 1
else:
lower = max(i_lower, i_upper - k + 1)
yield from diagonal_block(lower, i_upper)
j_lower = lower
j_upper = i_upper
i_upper = lower - 1
yield from strip(i_lower, i_upper, j_lower, j_upper)
is_asc = not is_asc
A comment about how I thought this one up.
We want to compare a group of objects with every other uncompared object. The group should be everything that fits in the cache except one.
So we start with the first k
objects, compare them with each other, then just proceed along in a strip to the end.
And now we need our second group. Well, we already have the last object, and we don't need the rest. So we take k
objects from the end, make that a group. Compare the group with itself, then proceed along a strip to the first object outside of our original group.
Now reverse direction, and so on.
At all points, i_lower
represents the first object still needing comparing, and i_upper
represents the last. If we're going forward, we take k
objects starting at i_lower
. If we're going backwards we take k
objects starting at i_upper
and go backwards.
When I was implementing it, there were two complications. The first is that we have to worry about the edge condition when we meet in the middle. The second is that we might have to do the strip in 2 directions.
I chose to only do the strip ascending. This is actually a bug. On most of the ascending loads, I did not get the first element in my cache. Oops. But it is still pretty good.