PS: the following situation describes an hypothetical scenario, where I own a company that sells things to customers.
I have an Ecto query that is so big, that my machine cannot handle it. With billions of results returned, there is probably not enough RAM in the world that can handle it.
The solution here (or so my research indicates) is to use streams. Streams were made for potentially infinite sets of results, which would fit my use case.
https://hexdocs.pm/ecto/Ecto.Repo.html#c:stream/2
So lets imagine that I want to delete All users that bought a given item. Maybe that item was not really legal in their country, and now me, the poor guy in IT, has to fix things so the world doesn't come down crashing.
Naive way:
item_id = "123asdasd123"
purchase_ids =
Purchases
|> where([p], p.item_id == ^item_id)
|> select([p], p.id)
|> Repo.all()
Users
|> where([u], u.purchase_id in ^purchase_ids)
|> Repo.delete_all()
This is the naive way. I call it naive, because of 2 issues:
purchase_ids
query)purchase_ids
will likely have more than 100K ids, so the second query (where we delete things) will fail as it hits Postgres parameters limit of 32K: https://stackoverflow.com/a/42251312/1337392What can I say, our product is highly addictive and very well priced! Our customers simply cant get enough of it. Don't know why. Nope. No reason comes to mind. None at all.
With these problems in mind, I cannot help my customers and grow my [s]empire[/s], I mean, little home owned business.
I did find this possible solution:
Stream way:
item_id = "123asdasd123"
purchase_ids =
Purchases
|> where([p], p.item_id == ^item_id)
|> select([p], p.id)
stream = Repo.stream(purchase_ids)
Repo.transacion(fn ->
ids = Enum.to_list(stream)
Users
|> where([u], u.purchase_id in ^ids)
|> Repo.delete_all()
end)
However, I am not convinced this will work:
Enum.to_list
and saving everything into a variable, placing everything into memory again. So I am not gaining any advantage by using Repo.stream
.ids
for my Repo.delete_all
to work without blowing upI guess the one advantage here is that this now a transaction, so either everything goes or nothing goes.
So, the following questions arise:
streams
in this scenario?ids
) or do I have to manually batch them?Repo.delete_all
?One cannot directly feed Repo.delete_all/1
with a stream, but Stream.chunk_every/2
is your friend here. One can do somewhat like below (500
is the default value for :max_rows
hence we’d use it in chunk_every/2
as well.)
Repo.transacion(fn ->
max_rows = 500
purchase_ids
|> Repo.stream(max_rows: max_rows)
|> Stream.chunk_every(max_rows)
|> Stream.each(fn ids ->
Users
|> where([u], u.purchase_id in ^ids)
|> Repo.delete_all()
end)
|> Stream.run()
end, timeout: :infinity)