I'm writting a worker to add lot's of users into a group. I'm wondering if it's better to run a big task who had all users, or batch like 100 users or one by one per task.
For the moment here is my code
class AddUsersToGroupWorker
include Sidekiq::Worker
sidekiq_options :queue => :group_utility
def perform(store_id, group_id, user_ids_to_add)
begin
store = Store.find store_id
group = Group.find group_id
rescue ActiveRecord::RecordNotFound => e
Airbrake.notify e
return
end
users_to_process = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
group.users += users_to_process
users_to_process.map(&:id).each do |user_to_process_id|
UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
end
end
end
Maybe it's better to have something like this in my method :
def add_users
users_to_process = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
users_to_process.map(&:id).each do |user_to_process_id|
AddUserToGroupWorker.perform_async group_id, user_to_process_id
UpdateLastUpdatesForUserWorker.perform_async store.id, user_to_process_id
end
end
But so many find
request. What do you think ?
I have a sidekig pro licence if needed (for batch for example).
Here are my thoughts.
1. Do a single SQL query instead of N queries
This line: group.users += users_to_process
is likely to produce N SQL queries (where N is users_to_process.count). I assume that you have many-to-many connection between users and groups (with user_groups
join table/model), so you should use some Mass inserting data technique:
users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")
Yes, it's raw SQL. And it's fast.
2. User pluck(:id)
instead of map(&:id)
pluck
is much quicker, because:
Doing SQL is cheap. Creating Ruby objects is really expensive.
3. Use horizontal parallelization instead of vertical parallelization
What I mean here, is if you need to do sequential tasks A -> B -> C
for a dozen of records, there are two major ways to split the work:
AWorker
does A(1)
, A(2)
, A(3)
; BWorker
does B(1)
, etc.; CWorker
does all C(i)
jobs;UniversalWorker
does A(1)+B(1)+C(1)
.Use the latter (horizontal) way.
It's a statement from experience, not from some theoretical point of view (where both ways are feasible).
Why you should do that?
B
and C
workers will just waste the RAM, while your A
workers do the job. And then your A
and C
will waste the RAM, while B
's are at work. And so on. If you make horizontal segmentation, your resource drain will even itself out.Applying that advice to your specific case: for starters, don't call perform_async
in another async task.
4. Process in batches
Answering your original question – yes, do process in batches. Creating and managing async task takes some resources by itself, so there's no need to create too many of them.
TL;DR So in the end, your code could look something like this:
# model code
BATCH_SIZE = 100
def add_users
users_to_process_ids = store.users.where(id: user_ids_to_add)
.where.not(id: group.user_ids)
.pluck(:id)
# With 100,000 users performance of this query should be acceptable
# to make it in a synchronous fasion
sql_values = users_to_process_ids.map{|i| "(#{i.to_i}, #{group.id.to_i}, NOW(), NOW())"}
Group.connection.execute("
INSERT INTO groups_users (user_id, group_id, created_at, updated_at)
VALUES #{sql_values.join(",")}
")
users_to_process_ids.each_slice(BATCH_SIZE) do |batch|
AddUserToGroupWorker.perform_async group_id, batch
end
end
# add_user_to_group_worker.rb
def perform(group_id, user_ids_to_add)
group = Group.find group_id
# Do some heavy load with a batch as a whole
# ...
# ...
# If nothing here is left, call UpdateLastUpdatesForUserWorker from the model instead
user_ids_to_add.each do |id|
# do it synchronously – we already parallelized the job
# by splitting it in slices in the model above
UpdateLastUpdatesForUserWorker.new.perform store.id, user_to_process_id
end
end