diff --git a/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/instrumentation.rb b/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/instrumentation.rb index 6d76b91f4..58500bbc9 100644 --- a/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/instrumentation.rb +++ b/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/instrumentation.rb @@ -39,6 +39,7 @@ def require_dependencies require_relative 'patches/transactions_class_methods' require_relative 'patches/validations' require_relative 'patches/relation_persistence' + require_relative 'patches/async_query_context_propagation' end def patch_activerecord @@ -55,6 +56,9 @@ def patch_activerecord ::ActiveRecord::Base.prepend(Patches::Validations) ::ActiveRecord::Relation.prepend(Patches::RelationPersistence) + + ::ActiveRecord::ConnectionAdapters::ConnectionPool.prepend(Patches::AsyncQueryContextPropagation) + ::ActiveRecord::FutureResult.prepend(Patches::FutureResultExtensions) end end end diff --git a/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation.rb b/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation.rb new file mode 100644 index 000000000..9a8e073f7 --- /dev/null +++ b/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveRecord + module Patches + # Module to prepend to ActiveRecord::ConnectionAdapters::ConnectionPool + module AsyncQueryContextPropagation + def schedule_query(future_result) # :nodoc: + context = OpenTelemetry::Context.current + + @async_executor.post do + # This can happen in the request thread, in the case of a busy executor (fallback_action is executed.) + # FIXME: This override should be unecessary if the concurrent-ruby instrumentation is always installed. + OpenTelemetry::Context.with_current(context) do + future_result.execute_or_skip + end + end + + Thread.pass + end + end + + module FutureResultExtensions + private + + def execute_query(connection, async: false) + _, name = @args + Instrumentation.instance.tracer.in_span(name, attributes: { 'async' => async }) do + super + end + end + end + end + end + end +end diff --git a/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/patches/querying.rb b/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/patches/querying.rb index 0b9f37904..1f177a95e 100644 --- a/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/patches/querying.rb +++ b/instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/patches/querying.rb @@ -21,7 +21,8 @@ module ClassMethods method_name = ::ActiveRecord.version >= Gem::Version.new('7.0.0') ? :_query_by_sql : :find_by_sql define_method(method_name) do |*args, **kwargs, &block| - tracer.in_span("#{self} query") do + query_span_name = "#{self} query" + tracer.in_span(kwargs[:async] ? "schedule #{query_span_name}" : query_span_name) do super(*args, **kwargs, &block) end end diff --git a/instrumentation/active_record/test/instrumentation/active_record/patches/async_query_context_propagation_test.rb b/instrumentation/active_record/test/instrumentation/active_record/patches/async_query_context_propagation_test.rb new file mode 100644 index 000000000..b67d0ccb5 --- /dev/null +++ b/instrumentation/active_record/test/instrumentation/active_record/patches/async_query_context_propagation_test.rb @@ -0,0 +1,183 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../lib/opentelemetry/instrumentation/active_record' +require_relative '../../../../lib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation' + +ASYNC_TEST_LOGGER = Logger.new($stdout).tap { |logger| logger.level = Logger::WARN } + +describe OpenTelemetry::Instrumentation::ActiveRecord::Patches::AsyncQueryContextPropagation do + let(:exporter) { EXPORTER } + let(:unfiltered_spans) { exporter.finished_spans } + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveRecord::Instrumentation.instance } + let(:logger) { ASYNC_TEST_LOGGER } + + before do + exporter.reset + setup_asynchronous_queries_session + User.create! + end + + after do + teardown_asynchronous_queries_session + + ActiveRecord::Base.subclasses.each do |model| + model.connection.truncate(model.table_name) + end + end + + def setup_asynchronous_queries_session + @_async_queries_session = ActiveRecord::Base.asynchronous_queries_tracker.start_session + end + + def teardown_asynchronous_queries_session + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session(true) if @_async_queries_session + end + + def run_async_load + logger.debug('>>> run async load') + in_new_trace do + instrumentation.tracer.in_span('test_wrapper') do + if block_given? + yield + else + users = User.all.load_async + sleep(0.5) + logger.debug('>>> async #to_a') + users.to_a + end + end + end + end + + def in_new_trace(&block) + OpenTelemetry::Context.with_current(OpenTelemetry::Context::ROOT, &block) + end + + def spans + test_wrapper_span = unfiltered_spans.find { |span| span.name == 'test_wrapper' } + unfiltered_spans.select { |span| span.trace_id == test_wrapper_span.trace_id } + end + + def span_names + spans.map(&:name).sort + end + + # call with block_queue: true to completely block the executor (no tasks can be enqueued), + # or with block_queue: false to block the workers only (tasks still accepted in the queue) + def with_busy_executor(block_queue: true) + _(ActiveRecord.async_query_executor).must_equal :global_thread_pool + + mutex = Mutex.new + condvar = ConditionVariable.new + executor = ActiveRecord.instance_variable_get(:@global_thread_pool_async_query_executor) + + task_count = executor.max_length + task_count += executor.max_queue if block_queue + + awaiting_signals = (0...task_count).to_a + + # Fill up the max thread count and queue with tasks that + # will never complete until they are signaled to do so. + task_count.times do |n| + executor.post do + mutex.synchronize do + ASYNC_TEST_LOGGER.debug("task #{n} waiting...") + condvar.wait(mutex) + ASYNC_TEST_LOGGER.debug("task #{n} got the signal") + awaiting_signals.delete(n) + end + end + end + + logger.debug("yielding... block_queue=#{block_queue}") + yield + logger.debug('...done!') + ensure + logger.debug('cleaning up...') + # clean up the queue + mutex.synchronize { condvar.signal } until awaiting_signals.empty? + end + + def current_thread_id + Thread.current.object_id + end + + def execute_query_span + spans.find { |span| span.name == 'User Load' } + end + + it 'async_query' do + run_async_load + + _(span_names).must_equal(['test_wrapper', 'User Load', 'schedule User query'].sort) + _(execute_query_span.attributes['__test_only_thread_id']).wont_equal(current_thread_id) + _(execute_query_span.attributes['async']).must_equal(true) + end + + describe 'no executor' do + before do + @async_query_executor_was = ActiveRecord.async_query_executor + ActiveRecord.async_query_executor = nil + end + + after do + ActiveRecord.async_query_executor = @async_query_executor_was + end + + it 'is not actually async' do + run_async_load # sic + + _(spans.map(&:name)).wont_include('Schedule User query') + _(spans.map(&:name)).must_include('User query') + + user_query = spans.find { |span| span.name == 'User query' } + _(user_query.attributes['async']).must_equal(false) if user_query.attributes.key?('async') + _(span_names).must_equal(['User query', 'test_wrapper'].sort) + end + end + + it 'async_query_blocked_executor' do + with_busy_executor { run_async_load } + + # In this case the wrapped task is executed as the 'fallback_action' by the thread pool executor, + # so we get the async span, even though it is not actually async. + _(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id) + + skip(<<~SKIP) + FIXME: async _should_ be false here, but it's executed as a fallback action and + is incorrectly set to `true` + SKIP + + _(execute_query_span.attributes['async']).must_equal(false) + end + + it 'async_query_slow_executor' do + # executor accepts task, but doesn't fulfill it before the waiter + with_busy_executor(block_queue: false) do + run_async_load + end + + # When #to_a is called, the query is still pending and hasn't been picked up, + # so AR executes is synchronously. The executor task is cancelled (or should be?), + # so this span won't be here. + _(execute_query_span.attributes['async']).must_equal(false) + _(span_names).must_equal(['User Load', 'schedule User query', 'test_wrapper']) + end + + it 'async_query_no_wait' do + run_async_load do + User.all.load_async.to_a + end + + # here we called #to_a inline, so it executed before the async scheduler + # could assign the task to a worker. I'm not sure this test will always pass. + _(execute_query_span.attributes['async']).must_equal(false) + _(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id) + end +end diff --git a/instrumentation/active_record/test/instrumentation/active_record/patches/querying_test.rb b/instrumentation/active_record/test/instrumentation/active_record/patches/querying_test.rb index 5a398a3c2..2097fa0a5 100644 --- a/instrumentation/active_record/test/instrumentation/active_record/patches/querying_test.rb +++ b/instrumentation/active_record/test/instrumentation/active_record/patches/querying_test.rb @@ -14,6 +14,7 @@ let(:spans) { exporter.finished_spans } before { exporter.reset } + after do ActiveRecord::Base.subclasses.each do |model| model.connection.truncate(model.table_name) diff --git a/instrumentation/active_record/test/test_helper.rb b/instrumentation/active_record/test/test_helper.rb index 02751e7f6..d643cba54 100644 --- a/instrumentation/active_record/test/test_helper.rb +++ b/instrumentation/active_record/test/test_helper.rb @@ -29,6 +29,8 @@ ActiveRecord::Base.logger = logger ActiveRecord::Migration.verbose = false +ActiveRecord.async_query_executor = :global_thread_pool + ActiveRecord::Base.establish_connection( adapter: 'sqlite3', database: 'db/development.sqlite3' @@ -84,3 +86,15 @@ def change end Minitest.after_run { CreateUserTable.migrate(:down) } + +# Used in async tests to determine what thread spawned which span +module SpanThreadIdTracking + def internal_start_span(name, kind, attributes, links, start_timestamp, parent_context, instrumentation_scope) + attributes ||= {} + attributes['__test_only_thread_id'] = Thread.current.object_id + + super + end +end + +OpenTelemetry::SDK::Trace::TracerProvider.prepend(SpanThreadIdTracking)