Skip to content

Commit

Permalink
Improve Error handling on CSV import (#879)
Browse files Browse the repository at this point in the history
* Add rollbar logging for ingest errors

* When CSV Import restarts, import all rows that aren't 'complete' or 'pending finalization'

Previously, only 'queued' rows were imported. A previous PR added this behavior for 'in progress', but the new 'deleting child works' status was not restarted.

The new approach guarantees all such cases would be restarted, as well as that rows with errors will be retried (useful if manually restarting after fixing a bug).

* Handle MySQL errors in ingest

* fix bug trying to delete children of `nil` record
  • Loading branch information
sourcefilter authored Jun 1, 2021
1 parent 4db4fd0 commit b435e81
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 17 deletions.
11 changes: 7 additions & 4 deletions app/importers/actor_record_importer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,23 @@ def create_for(record:)
raise "Validation failed: #{error_messages.join(', ')}"
end
rescue Ldp::BadRequest => e
retries += 1
# get the id from the ark and the uri from the id then delete the tombstone
tombstone_uri = "#{ActiveFedora::Base.id_to_uri(Californica::IdGenerator.id_from_ark(created.ark))}/fcr:tombstone"
ActiveFedora.fedora.connection.delete(tombstone_uri)
result = ActiveFedora.fedora.connection.delete(tombstone_uri)
Rollbar.warning(e, "Attempted to delete FCRepo tombstone.", retries: retries, ark: created.ark, tombstone_uri: tombstone_uri, result: result)
if (retries += 1) < 3
retry
else
raise e
end
end
rescue ActiveFedora::IllegalOperation => e
raise e unless e.message.start_with?('Attempting to recreate existing ldp_source')
retries ||= 0
raise e if e.class == ActiveFedora::IllegalOperation && !e.message.start_with?('Attempting to recreate existing ldp_source') # error class doesn't look specific enough
retries = (retries || 0) + 1
Rollbar.warning(e, "Attempting to delete bad FCRepo record.", retries: retries, ark: record.ark)
fcrepo_id = Californica::IdGenerator.id_from_ark(record.ark)
Californica::Deleter.new(id: fcrepo_id).delete
Californica::Deleter.new(id: fcrepo_id, logger: info_stream).delete
if (retries += 1) < 3
retry
else
Expand Down
5 changes: 4 additions & 1 deletion app/importers/californica_importer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def import
# Running darlingtonia w/ our RecordImporter just creates CsvRow objects
Darlingtonia::Importer.new(parser: parser, record_importer: record_importer, info_stream: @info_stream, error_stream: @error_stream).import

@csv_import.csv_rows.where(status: ['queued', 'in progress']).each do |csv_row|
# Reset status of unfinished jobs
@csv_import.csv_rows.where.not(status: 'complete').update_all(status: 'queued')

@csv_import.csv_rows.where(status: 'queued').each do |csv_row|
CsvRowImportJob.perform_now(row_id: csv_row.id)
end

Expand Down
10 changes: 8 additions & 2 deletions app/jobs/csv_row_import_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# frozen_string_literal: true
class CsvRowImportJob < ActiveJob::Base
rescue_from Mysql2::Error::ConnectionError do
Rollbar.error(e, csv_import: csv_import_id)
retry_job wait: 600 # wait 10 minutes for MySQL to come back
end

def perform(row_id:)
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
ENV["TZ"]
Expand Down Expand Up @@ -36,7 +41,7 @@ def perform(row_id:)
case record.mapper.object_type
when 'Work', 'Manuscript'
@row.update(status: 'deleting child works')
Californica::Deleter.new(id: Californica::IdGenerator.id_from_ark(record.mapper.ark)).delete_with_children(of_type: ChildWork)
Californica::Deleter.new(id: Californica::IdGenerator.id_from_ark(record.mapper.ark), logger: @row.error_messages).delete_with_children(of_type: ChildWork)
@row.update(status: 'in progress')
selected_importer = actor_record_importer
new_status = 'complete'
Expand All @@ -58,12 +63,13 @@ def perform(row_id:)
ingest_duration: end_time - start_time,
job_ids_completed: @row.job_ids_completed << job_id)
rescue => e
Rollbar.error(e, csv_import_id: @row.csv_import_id, row_id: @row_id, ark: record.mapper.ark)
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@row.update(status: 'error',
ingest_record_end_time: Time.current,
ingest_duration: end_time - start_time,
job_ids_errored: @row.job_ids_completed << job_id,
error_messages: @row.error_messages << "#{e.class}: #{e.message}")
error_messages: @row.error_messages << "#{e.class}: #{e.message.split('\n').first}")
end

def collection_record_importer
Expand Down
10 changes: 9 additions & 1 deletion app/jobs/start_csv_import_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

class StartCsvImportJob < ApplicationJob
queue_as Hyrax.config.ingest_queue_name
rescue_from Mysql2::Error::ConnectionError do
Rollbar.error(e, csv_import: csv_import_id)
retry_job wait: 600 # wait 10 minutes for MySQL to come back
end

def perform(csv_import_id)
@csv_import = CsvImport.find csv_import_id
Expand All @@ -11,7 +15,11 @@ def perform(csv_import_id)
importer.import

rescue => e
@error_stream << "StartCsvImportJob failed: #{e.message}\n#{e.backtrace.inspect}"
Rollbar.error(e, csv_import: csv_import_id)
@error_stream << "#{e.class}: #{e.message}\n#{e.backtrace.inspect}"
@row.update(status: 'error',
end_time: Time.current,
ingest_duration: @row.start_time - Time.current)
end

def ingest_log_filename
Expand Down
17 changes: 8 additions & 9 deletions app/lib/californica/deleter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@ def delete
def delete_with_children(of_type: nil)
# Delete the record _first_, or sever its connection to children
# so that each child deletion doesnt trigger a save / reindex
record.member_ids.each do |child_id|
Californica::Deleter.new(id: child_id)
record&.member_ids&.each do |child_id|
Californica::Deleter.new(id: child_id, logger: logger)
.delete_with_children(of_type: of_type)
end
delete if record.is_a?(of_type)

rescue ActiveFedora::ObjectNotFoundError
delete_from_fcrepo
end

def delete_children(of_type: nil)
record.members.each do |child|
Californica::Deleter.new(record: child)
Californica::Deleter.new(record: child, logger: logger)
.delete_with_children(of_type: of_type)
end
end
Expand All @@ -45,19 +42,21 @@ def destroy_and_eradicate
record&.destroy&.eradicate
Hyrax.config.callback.run(:after_destroy, record.id, User.batch_user)
logger.info("Deleted #{record_name || id}}")
rescue ActiveFedora::ObjectNotFoundError
delete_from_fcrepo
end

def delete_from_fcrepo
ActiveFedora.fedora.connection.delete(ActiveFedora::Base.id_to_uri(id))
logger.info("Forced delete of #{record_name || id} from Fedora")
Rollbar.info("Forced delete of #{id} from Fedora")
logger.info("Forced delete of #{id} from Fedora")
rescue Ldp::NotFound
nil # Everything's good, we just wanted to make sure there wasn't a record in fedora not indexed to solr
end

def record
@record ||= ActiveFedora::Base.find(id)

rescue ActiveFedora::ObjectNotFoundError
delete_from_fcrepo
end
end
end
3 changes: 3 additions & 0 deletions config/initializers/rollbar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
# Here we'll disable in 'test':
config.enabled = false if Rails.env.test?

config.branch = DEPLOYED_VERSION
config.code_version = GIT_SHA

# By default, Rollbar will try to call the `current_user` controller method
# to fetch the logged-in user object, and then call that object's `id`
# method to fetch this property. To customize:
Expand Down

0 comments on commit b435e81

Please sign in to comment.