Search code examples
ruby-on-railsrubycsvruby-on-rails-4rails-postgresql

CSV Worker is locking dbase


Bit of a newbie and am completely baffled.

I have a worker that Imports CSV Contacts to save to the Contacts Table (prior to saving, it checks to see if it already exists based on phone or email (in that case, the Contact is not saved but the details are appended).

While the code below logically works (CSV file will process correctly), when this is running the entire dbases get locked (so no User of the App can update (e.g. save a new contact via the app front end) while this job is running)

So my question is this: Why/What locks entire Contacts, Phone and Email dbases? Am I violating some principle of ruby on rails? What do I need to do to fix this?

key points:

  • when we wrote this code (email and phone were part of the Contact Model) and it worked fine. We then created them as children Models (as we had multiple emails & phone numbers per Contact). --- is it possible the array within array causes it to lock?

  • When we first wrote the code, it would save after every smart row was checked (and we could see it incrementing). Now it seems to be saving all the contacts after the full CSV file is processed.

  • Something else altogether?

WORKER:

class ImportCsvFileWorker

  def self.perform(csv_file_id)
    csv_file = CsvFile.find(csv_file_id)

    csv_file.import!
    csv_file.send_report!   end

end

class CsvParsingService

      attr_accessor :csv_file, :contact

      def initialize(csv_file)
        @csv_file = csv_file
        @contact = nil
      end

      def perform
        Rails.logger.info "[CSV.parsing] Processing new csv file..."
        process_csv
        csv_file.finish_import!
      end

      def process_csv
        parser = ::ImportData::SmartCsvParser.new(csv_file.file_url)

        parser.each do |smart_row|
          csv_file.increment!(:total_parsed_records)
          begin
            self.contact = process_row(smart_row)
          rescue => e
            row_parse_error(smart_row, e)
          end
        end
      rescue => e # parser error or unexpected error
        csv_file.save_import_error(e)
      end

      private

      def process_row(smart_row)
        new_contact, existing_records = smart_row.to_contact
        self.contact = ContactMergingService.new(csv_file.user, new_contact, existing_records).perform
        init_contact_info self.contact

        if contact_valid?
          save_imported_contact(new_contact)
        else
          reject_imported_contact(new_contact, smart_row)
        end
      end

      def contact_valid?
        self.contact.first_name || self.contact.last_name ||
          self.contact.email_addresses.first || self.contact.phone_numbers.first
      end

      def save_imported_contact(new_contact)
        self.contact.save!
        csv_file.increment!(:total_imported_records)
        log_processed_contacts new_contact
      end

      def reject_imported_contact(new_contact, smart_row)
        Rails.logger.info "[CSV.parsing] Contact rejected. Missing name, email or phone number."
        csv_file.increment!(:total_failed_records)
        csv_file.invalid_records.create!(
          original_row: smart_row.row.to_csv,
          contact_errors: ["Contact rejected. Missing name, email or phone number"]
        )
        log_processed_contacts new_contact
        false
      end

      def row_parse_error(smart_row, e)
        csv_file.increment!(:total_failed_records)
        csv_file.invalid_records.create!(
          original_row: smart_row.row.to_csv,
          contact_errors: contact.try(:errors).try(:full_messages)
        )
      end

      def init_contact_info(contact)
        unless contact.persisted?
          contact.user = csv_file.user
          contact.created_by_user = csv_file.user
          contact.import_source = csv_file
        end
        contact.required_salutations_to_set = true # will be used for envelope/letter saluation
      end

      def log_processed_contacts(new_contact)
        Rails.logger.info(
          "[CSV.parsing] Records parsed:: parsed: #{csv_file.total_parsed_records}"\
            " : imported: #{csv_file.total_imported_records} : failed: "\
            "#{csv_file.total_failed_records}"
        )
        Rails.logger.info(
          "[CSV.parsing] Contact- New : #{new_contact.email_addresses.map(&:email)}"\
            " : #{new_contact.first_name} : #{new_contact.last_name} "\
            "#{new_contact.phone_numbers.map(&:number)} :: Old : "\
            "#{self.contact.email_addresses.map(&:email)} :"\
            "#{self.contact.phone_numbers.map(&:number)}\n"
        )
      end
    end

The following is the Merge Service

class ContactMergingService

  attr_reader :new_contact, :user

  def initialize(user, new_contact, _existing_records)
    @user = user
    @new_contact = new_contact
    @found_records = matching_emails_and_phone_numbers   end

  def perform
    Rails.logger.info "[CSV.merging] Checking if new contact matches existing contact..."
    if (existing_contact = existing_contact())
      Rails.logger.info "[CSV.merging] Contact match found."
      merge(existing_contact, new_contact)
      existing_contact
    else
      Rails.logger.info "[CSV.merging] No contact match found."
      binding.pry
      new_contact
    end   end

  private

  def existing_contact
    Rails.logger.info "[CSV.merging] Found records: #{@found_records.inspect}"
    if @found_records.present?
      @user.contacts.find @found_records.first.owner_id # Fetch first owner
    end   end

  def merge(existing_contact, new_contact)
    Rails.logger.info "[CSV.merging] Merging with existing contact (ID: #{existing_contact.id})..."
    merge_records(existing_contact, new_contact)   end

  def merge_records(existing_relation, new_relation)
    existing_relation.attributes do |field, value|
      if value.blank? && new_relation[field].present?
        existing_relation[field] = new_relation[field]
      end
    end
    new_relation.email_addresses.each do |email_address|
      Rails.logger.info "[CSV.merging.emails] Email: #{email_address.inspect}"
      if existing_relation.email_addresses.find_by(email: email_address.email)
        Rails.logger.info "[CSV.merging.emails] Email address exists."
      else
        Rails.logger.info "[CSV.merging.emails] Email does not already exist. Saving..."
        email_address.owner = existing_relation
        email_address.save!
      end
    end
    new_relation.phone_numbers.each do |phone_number|
      Rails.logger.info "[CSV.merging.phone] Phone Number: #{phone_number.inspect}"
      if existing_relation.phone_numbers.find_by(number: phone_number.number)
        Rails.logger.info "[CSV.merging.phone] Phone number exists."
      else
        Rails.logger.info "[CSV.merging.phone] Phone Number does not already exist. Saving..."
        phone_number.owner = existing_relation
        phone_number.save!
      end
    end   end

  def matching_emails_and_phone_numbers
    records = []
    if @user
      records << matching_emails
      records << matching_phone_numbers
      Rails.logger.info "[CSV.merging] merged records: #{records.inspect}"
      records.flatten!
      Rails.logger.info "[CSV.merging] flattened records: #{records.inspect}"
      records.compact!
      Rails.logger.info "[CSV.merging] compacted records: #{records.inspect}"
    end
    records   end

  def matching_emails
    existing_emails = []
    new_contact_emails = @new_contact.email_addresses
    Rails.logger.info "[CSV.merging] new_contact_emails: #{new_contact_emails.inspect}"
    new_contact_emails.each do |email|
      Rails.logger.info "[CSV.merging] Checking for a match on email: #{email.inspect}..."
      if existing_email = @user.contact_email_addresses.find_by(email: email.email, primary: email.primary)
        Rails.logger.info "[CSV.merging] Found a matching email"
        existing_emails << existing_email
      else
        Rails.logger.info "[CSV.merging] No match found"
        false
      end
    end
    existing_emails   end

  def matching_phone_numbers
    existing_phone_numbers = []
    @new_contact.phone_numbers.each do |phone_number|
      Rails.logger.info "[CSV.merging] Checking for a match on phone_number: #{phone_number.inspect}..."
      if existing_phone_number = @user.contact_phone_numbers.find_by(number: phone_number.number)
        Rails.logger.info "[CSV.merging] Found a matching phone number"
        existing_phone_numbers << existing_phone_number
      else
        Rails.logger.info "[CSV.merging] No match found"
        false
      end
    end
    existing_phone_numbers   end

  def clean_phone_number(number)
    number.gsub(/[\s\-\(\)]+/, "")   end

end

Smart CSV Parser.rb

require "fuzzy_match"
require "import_data/base_parser"

module ImportData
  class SmartCsvParser < BaseParser

    def row_class
      ::ImportData::SmartCsvRow
    end

    def each_contact
      each { |row| yield row.to_contact[0] }
    end
  end

  class SmartCsvRow < BaseRow

    CONTACT_MAPPING = {
      "Name" => :name,
      "First Name" => :first_name,
      "Last Name" => :last_name,
      "Middle Name" => "",
      "Spouse" => :spouse,
      "Mobile Phone" => :mobile_phone,
      "Notes" => :notes,
      "Account" => "",
      "Internet Free Busy" => "",
    }

    ADDRESS_MAPPING = {
      "Address" => :address,
      "Address1" => :street,
      "Street" => :street,
      "City" => :city,
      "State" => :state,
      "Postal Code" => :zip,
      "Zip" => :zip,
      "Zip Code" => :zip,
      "Country" => :country,
      "Home Address" => :address,
      "Home Street" => :street,
      "Home Street 2" => :street,
      "Home Street 3" => :street,
      "Home Address PO Box" => :street,
      "Home City" => :city,
      "Home State" => :state,
      "Home Postal Code" => :zip,
      "Home Country" => :country,
      "Business Address" => :address,
      "Business Street" => :street,
      "Business Street 2" => :street,
      "Business Street 3" => :street,
      "Business Address PO Box" => :street,
      "Business City" => :city,
      "Business State" => :state,
      "Business Postal Code" => :zip,
      "Business Country" => :country,
      "Other Address" => :address,
      "Other Street" => :street,
      "Other Street 2" => :street,
      "Other Street 3" => :street,
      "Other Address PO Box" => :street,
      "Other City" => :city,
      "Other State" => :state,
      "Other Postal Code" => :zip,
      "Other Country" => :country,
    }

    EMAIL_ADDRESS_FIELDS = [
      "Email",
      "E-mail Address",
      "Email 2 Address",
      "Email 3 Address",
      "E-mail",
      "E-mail 2 Address",
      "E-mail 3 Address"
    ]
    PHONE_TYPE_MAPPINGS  = {
      "Phone" => "Home",
      "Primary Phone" => "Home",
      "Home Phone" => "Home",
      "Home Phone 2" => "Home",
      "Mobile Phone" => "Mobile",
      "Home Fax" => "Fax",
      "Business Phone" => "Work",
      "Business Phone 2" => "Work",
      "Business Fax" => "Fax",
      "Other Phone" => "Other",
      "Other Fax" => "Fax",
      "Company Main Phone" => "Work",
    }

    def initialize(headers, row)
      headers = headers.map { |h| best_match_or_self(h) }
      super(headers, row)
    end

    def to_contact
      existing_emails = existing_phone_numbers = nil
      contact = Contact.new.tap do |contact|
        initiate_instance(contact, CONTACT_MAPPING)
        address = initiate_instance(Address.new, ADDRESS_MAPPING)
        contact.addresses << address if address
        email_addresses, existing_emails = initialize_emails(EMAIL_ADDRESS_FIELDS)
        contact.email_addresses << email_addresses
        phone_numbers, existing_phone_numbers = initialize_phone_numbers(PHONE_TYPE_MAPPINGS)
        contact.phone_numbers << phone_numbers
        contact
      end
      existing_records = []
      existing_records << existing_emails
      existing_records << existing_phone_numbers
      existing_records.flatten!
      existing_records.compact!
      [contact, existing_records]
    end

    private

    def fetch_phone_type field
      PHONE_TYPE_MAPPINGS[field]
    end

    FM = FuzzyMatch.new(CONTACT_MAPPING.keys + ADDRESS_MAPPING.keys + EMAIL_ADDRESS_FIELDS + PHONE_TYPE_MAPPINGS.keys)

    def best_match_or_self(header)
      # Select if Dice's Coefficient
      # choose closet by Levenshtein distance
      candidate = FM.find(header, find_all_with_score: true).
                  select { |(_text, dice, _lev)| dice > 0.5 }.
                  max_by { |(_text, _dice, lev)| lev }

      # if cannot find candidate return header
      candidate ? candidate[0] : header
    end

  end

end

DATABASE YML

<% branch_name = `git symbolic-ref HEAD 2>/dev/null`.chomp.sub('refs/heads/', '') %>
<% repository_name = `git rev-parse --show-toplevel`.split('/').last.strip %>

development:
  adapter: postgresql
  database: <%= repository_name %>_development
  host: localhost

test:
  adapter: postgresql
  database: <%= repository_name %>_test
  host: localhost

production:
  adapter: postgresql
  database: <%= repository_name %>_production
  host: localhost

WORKER LOGS - initial few lines running

Delayed::Backend::ActiveRecord::Job Load (1.5ms)  UPDATE "delayed_jobs" SET locked_at = '2016-04-07 17:59:37.569861', locked_by = 'host:tests-MBP-3.att.net pid:9659' WHERE id IN (SELECT  "delayed_jobs"."id" FROM "delayed_jobs" WHERE ((run_at <= '2016-04-07 17:59:37.568990' AND (locked_at IS NULL OR locked_at < '2016-04-07 16:29:37.569013') OR locked_by = 'host:tests-MBP-3.att.net pid:9659') AND failed_at IS NULL)  ORDER BY priority ASC, run_at ASC LIMIT 1 FOR UPDATE) RETURNING * /*application:AGENTBRIGHT*/
2016-04-07T13:59:37-0400: [Worker(host:tests-MBP-3.att.net pid:9659)] Job ImportCsvFileWorker.perform (id=19) RUNNING
  CsvFile Load (0.6ms)  SELECT  "csv_files".* FROM "csv_files" WHERE "csv_files"."id" = $1 LIMIT 1 /*application:AGENTBRIGHT*/  [["id", 1]]
   (0.2ms)  BEGIN /*application:AGENTBRIGHT*/
  SQL (0.4ms)  UPDATE "csv_files" SET "state" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/  [["state", "processing"], ["updated_at", "2016-04-07 17:59:37.632648"], ["id", 1]]
[CSV.parsing] Processing new csv file...
  SQL (0.4ms)  UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/  [["total_parsed_records", 1], ["updated_at", "2016-04-07 17:59:38.571053"], ["id", 1]]
[CSV.base_row] Initializing emails...
[CSV.base_row] 
[CSV.base_row] Email addresses: 0
[CSV.base_row] Existing emails : 0
[CSV.base_row] Initializing phone numbers...
[CSV.base_row] Phone numbers: 0
[CSV.base_row] Existing phone numbers : []
  User Load (0.4ms)  SELECT  "users".* FROM "users" WHERE "users"."id" = $1 LIMIT 1 /*application:AGENTBRIGHT*/  [["id", 1]]
[CSV.merging] new_contact_emails: #<ActiveRecord::Associations::CollectionProxy []>
[CSV.merging] merged records: [[], []]
[CSV.merging] flattened records: []
[CSV.merging] compacted records: []
[CSV.merging] Checking if new contact matches existing contact...
[CSV.merging] Found records: []
[CSV.merging] No contact match found.
  SQL (0.7ms)  INSERT INTO "contacts" ("data", "first_name", "user_id", "created_by_user_id", "import_source_id", "import_source_type", "name", "envelope_salutation", "letter_salutation", "created_at", "updated_at", "avatar_color") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING "id" /*application:AGENTBRIGHT*/  [["data", "\"notes\"=>\"Family Name: and XXXX ail\r\nAddress 1: Any Street Point Road\r\nCity: Louisville, \r\nState: TN \r\nZip: 06437\r\n# invited: 2\r\nAdults: 2\r\n\""], ["first_name", "Mary Joe"], ["user_id", 1], ["created_by_user_id", 1], ["import_source_id", 1], ["import_source_type", "CsvFile"], ["name", "Mary Joe Smith"], ["envelope_salutation", ""], ["letter_salutation", "Dear Mary Joe,"], ["created_at", "2016-04-07 17:59:38.763119"], ["updated_at", "2016-04-07 17:59:38.763119"], ["avatar_color", 10]]
  SQL (1.0ms)  UPDATE "users" SET "contacts_count" = COALESCE("contacts_count", 0) + 1 WHERE "users"."id" = $1 /*application:AGENTBRIGHT*/  [["id", 1]]
  SQL (0.4ms)  UPDATE "csv_files" SET "total_failed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/  [["total_failed_records", 1], ["updated_at", "2016-04-07 17:59:38.775598"], ["id", 1]]
  SQL (1.1ms)  INSERT INTO "csv_file_invalid_records" ("original_row", "csv_file_id", "created_at", "updated_at") VALUES ($1, $2, $3, $4) RETURNING "id" /*application:AGENTBRIGHT*/  [["original_row", "Mary Joe Smith,,,,,,,,,,,,,\"Family Name: XXXX \r\nAddress 1: Any Street \r\nCity: Louisville, \r\nState: TN \r\nZip: 37777\r\n# invited: 2\r\nAdults: 2\r\n\",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,Normal,,My Contacts;Imported 8/13/15 1;Imported 8/13/15,\n"], ["csv_file_id", 1], ["created_at", "2016-04-07 17:59:38.796886"], ["updated_at", "2016-04-07 17:59:38.796886"]]
  SQL (0.4ms)  UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/  [["total_parsed_records", 2], ["updated_at", "2016-04-07 17:59:39.744526"], ["id", 1]]
[CSV.base_row] Initializing emails...
[CSV.base_row] 
[CSV.base_row] Email addresses: 0
[CSV.base_row] Existing emails : 0
[CSV.base_row] Initializing phone numbers...
[CSV.base_row] Phone numbers: 0
[CSV.base_row] Existing phone numbers : []
[CSV.merging] new_contact_emails: #<ActiveRecord::Associations::CollectionProxy []>
[CSV.merging] merged records: [[], []]
[CSV.merging] flattened records: []
[CSV.merging] compacted records: []
[CSV.merging] Checking if new contact matches existing contact...
[CSV.merging] Found records: []
[CSV.merging] No contact match found.
  SQL (0.5ms)  INSERT INTO "contacts" ("data", "first_name", "user_id", "created_by_user_id", "import_source_id", "import_source_type", "name", "envelope_salutation", "letter_salutation", "created_at", "updated_at", "avatar_color") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING "id" /*application:AGENTBRIGHT*/  [["data", "\"notes\"=>\"Family Name: Xyzlastname\r\nAddress 1: 100 Village Drive #C101\r\nCity: Somecity, \r\nState: MA \r\nZip: 07827\r\n# invited: 1\r\nAdults: 1\r\n\""], ["first_name", "ABC"], ["user_id", 1], ["created_by_user_id", 1], ["import_source_id", 1], ["import_source_type", "CsvFile"], ["name", "ABC"], ["envelope_salutation", ""], ["letter_salutation", "Dear ABC,"], ["created_at", "2016-04-07 17:59:39.753055"], ["updated_at", "2016-04-07 17:59:39.753055"], ["avatar_color", 4]]
  SQL (0.7ms)  UPDATE "users" SET "contacts_count" = COALESCE("contacts_count", 0) + 1 WHERE "users"."id" = $1 /*application:AGENTBRIGHT*/  [["id", 1]]
  SQL (0.4ms)  UPDATE "csv_files" SET "total_failed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/  [["total_failed_records", 2], ["updated_at", "2016-04-07 17:59:39.763091"], ["id", 1]]
  SQL (0.4ms)  INSERT INTO "csv_file_invalid_records" ("original_row", "csv_file_id", "created_at", "updated_at") VALUES ($1, $2, $3, $4) RETURNING "id" /*application:AGENTBRIGHT*/  [["original_row", "Avi,,,,,,,,,,,,,\"Family Name: Xyzlastname\r\nAddress 1: 100 Village Drive #C101\r\nCity: SomeCity, \r\nState: MA \r\nZip: 06777\r\n# invited: 1\r\nAdults: 1\r\n\",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,Normal,,My Contacts;Imported 8/13/15 1;Imported 8/13/15,\n"], ["csv_file_id", 1], ["created_at", "2016-04-07 17:59:39.767706"], ["updated_at", "2016-04-07 17:59:39.767706"]]
  SQL (0.4ms)  UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/  [["total_parsed_records", 3], ["updated_at", "2016-04-07 17:59:40.761966"], ["id", 1]]
[CSV.base_row] Initializing emails...
[CSV.base_row] 
[CSV.base_row] Email addresses: 0
[CSV.base_ro

Solution

  • The full table lock probably has more to do with Postgres enforcing consistency to preserve referential integrity when applying an update across multiple tables.

    That said, there are many ways to improve your query from within Ruby/Rails. The create method can create multiple records at once if you pass it an array of Hash objects like so:

    User.create([{email: "[email protected]", name: "Joe"}, {email: "Ann", name: "Steve"}])
    

    This won't completely dodge the issue of table locks, but should be more efficient.

    Also, use find_each instead of each when iteration over ActiveRecord collections. find_each will load collections in batches rather than loading all records at once, which can be much more efficient in memory.