Bit of a newbie and am completely baffled.
I have a worker that Imports CSV Contacts to save to the Contacts Table (prior to saving, it checks to see if it already exists based on phone or email (in that case, the Contact is not saved but the details are appended).
While the code below logically works (CSV file will process correctly), when this is running the entire dbases get locked (so no User of the App can update (e.g. save a new contact via the app front end) while this job is running)
So my question is this: Why/What locks entire Contacts, Phone and Email dbases? Am I violating some principle of ruby on rails? What do I need to do to fix this?
key points:
when we wrote this code (email and phone were part of the Contact Model) and it worked fine. We then created them as children Models (as we had multiple emails & phone numbers per Contact). --- is it possible the array within array causes it to lock?
When we first wrote the code, it would save after every smart row was checked (and we could see it incrementing). Now it seems to be saving all the contacts after the full CSV file is processed.
Something else altogether?
WORKER:
class ImportCsvFileWorker
def self.perform(csv_file_id)
csv_file = CsvFile.find(csv_file_id)
csv_file.import!
csv_file.send_report! end
end
class CsvParsingService
attr_accessor :csv_file, :contact
def initialize(csv_file)
@csv_file = csv_file
@contact = nil
end
def perform
Rails.logger.info "[CSV.parsing] Processing new csv file..."
process_csv
csv_file.finish_import!
end
def process_csv
parser = ::ImportData::SmartCsvParser.new(csv_file.file_url)
parser.each do |smart_row|
csv_file.increment!(:total_parsed_records)
begin
self.contact = process_row(smart_row)
rescue => e
row_parse_error(smart_row, e)
end
end
rescue => e # parser error or unexpected error
csv_file.save_import_error(e)
end
private
def process_row(smart_row)
new_contact, existing_records = smart_row.to_contact
self.contact = ContactMergingService.new(csv_file.user, new_contact, existing_records).perform
init_contact_info self.contact
if contact_valid?
save_imported_contact(new_contact)
else
reject_imported_contact(new_contact, smart_row)
end
end
def contact_valid?
self.contact.first_name || self.contact.last_name ||
self.contact.email_addresses.first || self.contact.phone_numbers.first
end
def save_imported_contact(new_contact)
self.contact.save!
csv_file.increment!(:total_imported_records)
log_processed_contacts new_contact
end
def reject_imported_contact(new_contact, smart_row)
Rails.logger.info "[CSV.parsing] Contact rejected. Missing name, email or phone number."
csv_file.increment!(:total_failed_records)
csv_file.invalid_records.create!(
original_row: smart_row.row.to_csv,
contact_errors: ["Contact rejected. Missing name, email or phone number"]
)
log_processed_contacts new_contact
false
end
def row_parse_error(smart_row, e)
csv_file.increment!(:total_failed_records)
csv_file.invalid_records.create!(
original_row: smart_row.row.to_csv,
contact_errors: contact.try(:errors).try(:full_messages)
)
end
def init_contact_info(contact)
unless contact.persisted?
contact.user = csv_file.user
contact.created_by_user = csv_file.user
contact.import_source = csv_file
end
contact.required_salutations_to_set = true # will be used for envelope/letter saluation
end
def log_processed_contacts(new_contact)
Rails.logger.info(
"[CSV.parsing] Records parsed:: parsed: #{csv_file.total_parsed_records}"\
" : imported: #{csv_file.total_imported_records} : failed: "\
"#{csv_file.total_failed_records}"
)
Rails.logger.info(
"[CSV.parsing] Contact- New : #{new_contact.email_addresses.map(&:email)}"\
" : #{new_contact.first_name} : #{new_contact.last_name} "\
"#{new_contact.phone_numbers.map(&:number)} :: Old : "\
"#{self.contact.email_addresses.map(&:email)} :"\
"#{self.contact.phone_numbers.map(&:number)}\n"
)
end
end
The following is the Merge Service
class ContactMergingService
attr_reader :new_contact, :user
def initialize(user, new_contact, _existing_records)
@user = user
@new_contact = new_contact
@found_records = matching_emails_and_phone_numbers end
def perform
Rails.logger.info "[CSV.merging] Checking if new contact matches existing contact..."
if (existing_contact = existing_contact())
Rails.logger.info "[CSV.merging] Contact match found."
merge(existing_contact, new_contact)
existing_contact
else
Rails.logger.info "[CSV.merging] No contact match found."
binding.pry
new_contact
end end
private
def existing_contact
Rails.logger.info "[CSV.merging] Found records: #{@found_records.inspect}"
if @found_records.present?
@user.contacts.find @found_records.first.owner_id # Fetch first owner
end end
def merge(existing_contact, new_contact)
Rails.logger.info "[CSV.merging] Merging with existing contact (ID: #{existing_contact.id})..."
merge_records(existing_contact, new_contact) end
def merge_records(existing_relation, new_relation)
existing_relation.attributes do |field, value|
if value.blank? && new_relation[field].present?
existing_relation[field] = new_relation[field]
end
end
new_relation.email_addresses.each do |email_address|
Rails.logger.info "[CSV.merging.emails] Email: #{email_address.inspect}"
if existing_relation.email_addresses.find_by(email: email_address.email)
Rails.logger.info "[CSV.merging.emails] Email address exists."
else
Rails.logger.info "[CSV.merging.emails] Email does not already exist. Saving..."
email_address.owner = existing_relation
email_address.save!
end
end
new_relation.phone_numbers.each do |phone_number|
Rails.logger.info "[CSV.merging.phone] Phone Number: #{phone_number.inspect}"
if existing_relation.phone_numbers.find_by(number: phone_number.number)
Rails.logger.info "[CSV.merging.phone] Phone number exists."
else
Rails.logger.info "[CSV.merging.phone] Phone Number does not already exist. Saving..."
phone_number.owner = existing_relation
phone_number.save!
end
end end
def matching_emails_and_phone_numbers
records = []
if @user
records << matching_emails
records << matching_phone_numbers
Rails.logger.info "[CSV.merging] merged records: #{records.inspect}"
records.flatten!
Rails.logger.info "[CSV.merging] flattened records: #{records.inspect}"
records.compact!
Rails.logger.info "[CSV.merging] compacted records: #{records.inspect}"
end
records end
def matching_emails
existing_emails = []
new_contact_emails = @new_contact.email_addresses
Rails.logger.info "[CSV.merging] new_contact_emails: #{new_contact_emails.inspect}"
new_contact_emails.each do |email|
Rails.logger.info "[CSV.merging] Checking for a match on email: #{email.inspect}..."
if existing_email = @user.contact_email_addresses.find_by(email: email.email, primary: email.primary)
Rails.logger.info "[CSV.merging] Found a matching email"
existing_emails << existing_email
else
Rails.logger.info "[CSV.merging] No match found"
false
end
end
existing_emails end
def matching_phone_numbers
existing_phone_numbers = []
@new_contact.phone_numbers.each do |phone_number|
Rails.logger.info "[CSV.merging] Checking for a match on phone_number: #{phone_number.inspect}..."
if existing_phone_number = @user.contact_phone_numbers.find_by(number: phone_number.number)
Rails.logger.info "[CSV.merging] Found a matching phone number"
existing_phone_numbers << existing_phone_number
else
Rails.logger.info "[CSV.merging] No match found"
false
end
end
existing_phone_numbers end
def clean_phone_number(number)
number.gsub(/[\s\-\(\)]+/, "") end
end
Smart CSV Parser.rb
require "fuzzy_match"
require "import_data/base_parser"
module ImportData
class SmartCsvParser < BaseParser
def row_class
::ImportData::SmartCsvRow
end
def each_contact
each { |row| yield row.to_contact[0] }
end
end
class SmartCsvRow < BaseRow
CONTACT_MAPPING = {
"Name" => :name,
"First Name" => :first_name,
"Last Name" => :last_name,
"Middle Name" => "",
"Spouse" => :spouse,
"Mobile Phone" => :mobile_phone,
"Notes" => :notes,
"Account" => "",
"Internet Free Busy" => "",
}
ADDRESS_MAPPING = {
"Address" => :address,
"Address1" => :street,
"Street" => :street,
"City" => :city,
"State" => :state,
"Postal Code" => :zip,
"Zip" => :zip,
"Zip Code" => :zip,
"Country" => :country,
"Home Address" => :address,
"Home Street" => :street,
"Home Street 2" => :street,
"Home Street 3" => :street,
"Home Address PO Box" => :street,
"Home City" => :city,
"Home State" => :state,
"Home Postal Code" => :zip,
"Home Country" => :country,
"Business Address" => :address,
"Business Street" => :street,
"Business Street 2" => :street,
"Business Street 3" => :street,
"Business Address PO Box" => :street,
"Business City" => :city,
"Business State" => :state,
"Business Postal Code" => :zip,
"Business Country" => :country,
"Other Address" => :address,
"Other Street" => :street,
"Other Street 2" => :street,
"Other Street 3" => :street,
"Other Address PO Box" => :street,
"Other City" => :city,
"Other State" => :state,
"Other Postal Code" => :zip,
"Other Country" => :country,
}
EMAIL_ADDRESS_FIELDS = [
"Email",
"E-mail Address",
"Email 2 Address",
"Email 3 Address",
"E-mail",
"E-mail 2 Address",
"E-mail 3 Address"
]
PHONE_TYPE_MAPPINGS = {
"Phone" => "Home",
"Primary Phone" => "Home",
"Home Phone" => "Home",
"Home Phone 2" => "Home",
"Mobile Phone" => "Mobile",
"Home Fax" => "Fax",
"Business Phone" => "Work",
"Business Phone 2" => "Work",
"Business Fax" => "Fax",
"Other Phone" => "Other",
"Other Fax" => "Fax",
"Company Main Phone" => "Work",
}
def initialize(headers, row)
headers = headers.map { |h| best_match_or_self(h) }
super(headers, row)
end
def to_contact
existing_emails = existing_phone_numbers = nil
contact = Contact.new.tap do |contact|
initiate_instance(contact, CONTACT_MAPPING)
address = initiate_instance(Address.new, ADDRESS_MAPPING)
contact.addresses << address if address
email_addresses, existing_emails = initialize_emails(EMAIL_ADDRESS_FIELDS)
contact.email_addresses << email_addresses
phone_numbers, existing_phone_numbers = initialize_phone_numbers(PHONE_TYPE_MAPPINGS)
contact.phone_numbers << phone_numbers
contact
end
existing_records = []
existing_records << existing_emails
existing_records << existing_phone_numbers
existing_records.flatten!
existing_records.compact!
[contact, existing_records]
end
private
def fetch_phone_type field
PHONE_TYPE_MAPPINGS[field]
end
FM = FuzzyMatch.new(CONTACT_MAPPING.keys + ADDRESS_MAPPING.keys + EMAIL_ADDRESS_FIELDS + PHONE_TYPE_MAPPINGS.keys)
def best_match_or_self(header)
# Select if Dice's Coefficient
# choose closet by Levenshtein distance
candidate = FM.find(header, find_all_with_score: true).
select { |(_text, dice, _lev)| dice > 0.5 }.
max_by { |(_text, _dice, lev)| lev }
# if cannot find candidate return header
candidate ? candidate[0] : header
end
end
end
DATABASE YML
<% branch_name = `git symbolic-ref HEAD 2>/dev/null`.chomp.sub('refs/heads/', '') %>
<% repository_name = `git rev-parse --show-toplevel`.split('/').last.strip %>
development:
adapter: postgresql
database: <%= repository_name %>_development
host: localhost
test:
adapter: postgresql
database: <%= repository_name %>_test
host: localhost
production:
adapter: postgresql
database: <%= repository_name %>_production
host: localhost
WORKER LOGS - initial few lines running
Delayed::Backend::ActiveRecord::Job Load (1.5ms) UPDATE "delayed_jobs" SET locked_at = '2016-04-07 17:59:37.569861', locked_by = 'host:tests-MBP-3.att.net pid:9659' WHERE id IN (SELECT "delayed_jobs"."id" FROM "delayed_jobs" WHERE ((run_at <= '2016-04-07 17:59:37.568990' AND (locked_at IS NULL OR locked_at < '2016-04-07 16:29:37.569013') OR locked_by = 'host:tests-MBP-3.att.net pid:9659') AND failed_at IS NULL) ORDER BY priority ASC, run_at ASC LIMIT 1 FOR UPDATE) RETURNING * /*application:AGENTBRIGHT*/
2016-04-07T13:59:37-0400: [Worker(host:tests-MBP-3.att.net pid:9659)] Job ImportCsvFileWorker.perform (id=19) RUNNING
CsvFile Load (0.6ms) SELECT "csv_files".* FROM "csv_files" WHERE "csv_files"."id" = $1 LIMIT 1 /*application:AGENTBRIGHT*/ [["id", 1]]
(0.2ms) BEGIN /*application:AGENTBRIGHT*/
SQL (0.4ms) UPDATE "csv_files" SET "state" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["state", "processing"], ["updated_at", "2016-04-07 17:59:37.632648"], ["id", 1]]
[CSV.parsing] Processing new csv file...
SQL (0.4ms) UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_parsed_records", 1], ["updated_at", "2016-04-07 17:59:38.571053"], ["id", 1]]
[CSV.base_row] Initializing emails...
[CSV.base_row]
[CSV.base_row] Email addresses: 0
[CSV.base_row] Existing emails : 0
[CSV.base_row] Initializing phone numbers...
[CSV.base_row] Phone numbers: 0
[CSV.base_row] Existing phone numbers : []
User Load (0.4ms) SELECT "users".* FROM "users" WHERE "users"."id" = $1 LIMIT 1 /*application:AGENTBRIGHT*/ [["id", 1]]
[CSV.merging] new_contact_emails: #<ActiveRecord::Associations::CollectionProxy []>
[CSV.merging] merged records: [[], []]
[CSV.merging] flattened records: []
[CSV.merging] compacted records: []
[CSV.merging] Checking if new contact matches existing contact...
[CSV.merging] Found records: []
[CSV.merging] No contact match found.
SQL (0.7ms) INSERT INTO "contacts" ("data", "first_name", "user_id", "created_by_user_id", "import_source_id", "import_source_type", "name", "envelope_salutation", "letter_salutation", "created_at", "updated_at", "avatar_color") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING "id" /*application:AGENTBRIGHT*/ [["data", "\"notes\"=>\"Family Name: and XXXX ail\r\nAddress 1: Any Street Point Road\r\nCity: Louisville, \r\nState: TN \r\nZip: 06437\r\n# invited: 2\r\nAdults: 2\r\n\""], ["first_name", "Mary Joe"], ["user_id", 1], ["created_by_user_id", 1], ["import_source_id", 1], ["import_source_type", "CsvFile"], ["name", "Mary Joe Smith"], ["envelope_salutation", ""], ["letter_salutation", "Dear Mary Joe,"], ["created_at", "2016-04-07 17:59:38.763119"], ["updated_at", "2016-04-07 17:59:38.763119"], ["avatar_color", 10]]
SQL (1.0ms) UPDATE "users" SET "contacts_count" = COALESCE("contacts_count", 0) + 1 WHERE "users"."id" = $1 /*application:AGENTBRIGHT*/ [["id", 1]]
SQL (0.4ms) UPDATE "csv_files" SET "total_failed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_failed_records", 1], ["updated_at", "2016-04-07 17:59:38.775598"], ["id", 1]]
SQL (1.1ms) INSERT INTO "csv_file_invalid_records" ("original_row", "csv_file_id", "created_at", "updated_at") VALUES ($1, $2, $3, $4) RETURNING "id" /*application:AGENTBRIGHT*/ [["original_row", "Mary Joe Smith,,,,,,,,,,,,,\"Family Name: XXXX \r\nAddress 1: Any Street \r\nCity: Louisville, \r\nState: TN \r\nZip: 37777\r\n# invited: 2\r\nAdults: 2\r\n\",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,Normal,,My Contacts;Imported 8/13/15 1;Imported 8/13/15,\n"], ["csv_file_id", 1], ["created_at", "2016-04-07 17:59:38.796886"], ["updated_at", "2016-04-07 17:59:38.796886"]]
SQL (0.4ms) UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_parsed_records", 2], ["updated_at", "2016-04-07 17:59:39.744526"], ["id", 1]]
[CSV.base_row] Initializing emails...
[CSV.base_row]
[CSV.base_row] Email addresses: 0
[CSV.base_row] Existing emails : 0
[CSV.base_row] Initializing phone numbers...
[CSV.base_row] Phone numbers: 0
[CSV.base_row] Existing phone numbers : []
[CSV.merging] new_contact_emails: #<ActiveRecord::Associations::CollectionProxy []>
[CSV.merging] merged records: [[], []]
[CSV.merging] flattened records: []
[CSV.merging] compacted records: []
[CSV.merging] Checking if new contact matches existing contact...
[CSV.merging] Found records: []
[CSV.merging] No contact match found.
SQL (0.5ms) INSERT INTO "contacts" ("data", "first_name", "user_id", "created_by_user_id", "import_source_id", "import_source_type", "name", "envelope_salutation", "letter_salutation", "created_at", "updated_at", "avatar_color") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING "id" /*application:AGENTBRIGHT*/ [["data", "\"notes\"=>\"Family Name: Xyzlastname\r\nAddress 1: 100 Village Drive #C101\r\nCity: Somecity, \r\nState: MA \r\nZip: 07827\r\n# invited: 1\r\nAdults: 1\r\n\""], ["first_name", "ABC"], ["user_id", 1], ["created_by_user_id", 1], ["import_source_id", 1], ["import_source_type", "CsvFile"], ["name", "ABC"], ["envelope_salutation", ""], ["letter_salutation", "Dear ABC,"], ["created_at", "2016-04-07 17:59:39.753055"], ["updated_at", "2016-04-07 17:59:39.753055"], ["avatar_color", 4]]
SQL (0.7ms) UPDATE "users" SET "contacts_count" = COALESCE("contacts_count", 0) + 1 WHERE "users"."id" = $1 /*application:AGENTBRIGHT*/ [["id", 1]]
SQL (0.4ms) UPDATE "csv_files" SET "total_failed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_failed_records", 2], ["updated_at", "2016-04-07 17:59:39.763091"], ["id", 1]]
SQL (0.4ms) INSERT INTO "csv_file_invalid_records" ("original_row", "csv_file_id", "created_at", "updated_at") VALUES ($1, $2, $3, $4) RETURNING "id" /*application:AGENTBRIGHT*/ [["original_row", "Avi,,,,,,,,,,,,,\"Family Name: Xyzlastname\r\nAddress 1: 100 Village Drive #C101\r\nCity: SomeCity, \r\nState: MA \r\nZip: 06777\r\n# invited: 1\r\nAdults: 1\r\n\",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,Normal,,My Contacts;Imported 8/13/15 1;Imported 8/13/15,\n"], ["csv_file_id", 1], ["created_at", "2016-04-07 17:59:39.767706"], ["updated_at", "2016-04-07 17:59:39.767706"]]
SQL (0.4ms) UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_parsed_records", 3], ["updated_at", "2016-04-07 17:59:40.761966"], ["id", 1]]
[CSV.base_row] Initializing emails...
[CSV.base_row]
[CSV.base_row] Email addresses: 0
[CSV.base_ro
The full table lock probably has more to do with Postgres enforcing consistency to preserve referential integrity when applying an update across multiple tables.
That said, there are many ways to improve your query from within Ruby/Rails. The create
method can create multiple records at once if you pass it an array of Hash objects like so:
User.create([{email: "[email protected]", name: "Joe"}, {email: "Ann", name: "Steve"}])
This won't completely dodge the issue of table locks, but should be more efficient.
Also, use find_each
instead of each
when iteration over ActiveRecord
collections. find_each
will load collections in batches rather than loading all records at once, which can be much more efficient in memory.