Skip to content

Commit

Permalink
feat: track ActiveRecord async queries
Browse files Browse the repository at this point in the history
  • Loading branch information
zvkemp committed Jan 17, 2025
1 parent 00d379c commit 43cc6dc
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def require_dependencies
require_relative 'patches/transactions_class_methods'
require_relative 'patches/validations'
require_relative 'patches/relation_persistence'
require_relative 'patches/async_query_context_propagation'
end

def patch_activerecord
Expand All @@ -55,6 +56,9 @@ def patch_activerecord
::ActiveRecord::Base.prepend(Patches::Validations)

::ActiveRecord::Relation.prepend(Patches::RelationPersistence)

::ActiveRecord::ConnectionAdapters::ConnectionPool.prepend(Patches::AsyncQueryContextPropagation)
::ActiveRecord::FutureResult.prepend(Patches::FutureResultExtensions)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Instrumentation
module ActiveRecord
module Patches
# Module to prepend to ActiveRecord::ConnectionAdapters::ConnectionPool
module AsyncQueryContextPropagation
def schedule_query(future_result) # :nodoc:
context = OpenTelemetry::Context.current

@async_executor.post do
# This can happen in the request thread, in the case of a busy executor (fallback_action is executed.)
# FIXME: This override should be unecessary if the concurrent-ruby instrumentation is always installed.
OpenTelemetry::Context.with_current(context) do
future_result.execute_or_skip
end
end

Thread.pass
end
end

module FutureResultExtensions
private

def execute_query(connection, async: false)
_, name = @args
Instrumentation.instance.tracer.in_span(name, attributes: { 'async' => async }) do
super
end
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ module ClassMethods
method_name = ::ActiveRecord.version >= Gem::Version.new('7.0.0') ? :_query_by_sql : :find_by_sql

define_method(method_name) do |*args, **kwargs, &block|
tracer.in_span("#{self} query") do
query_span_name = "#{self} query"
tracer.in_span(kwargs[:async] ? "schedule #{query_span_name}" : query_span_name) do
super(*args, **kwargs, &block)
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

require_relative '../../../../lib/opentelemetry/instrumentation/active_record'
require_relative '../../../../lib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation'

ASYNC_TEST_LOGGER = Logger.new($stdout).tap { |logger| logger.level = Logger::WARN }

describe OpenTelemetry::Instrumentation::ActiveRecord::Patches::AsyncQueryContextPropagation do
let(:exporter) { EXPORTER }
let(:unfiltered_spans) { exporter.finished_spans }
let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveRecord::Instrumentation.instance }
let(:logger) { ASYNC_TEST_LOGGER }

before do
exporter.reset
setup_asynchronous_queries_session
User.create!
end

after do
teardown_asynchronous_queries_session

ActiveRecord::Base.subclasses.each do |model|
model.connection.truncate(model.table_name)
end
end

def setup_asynchronous_queries_session
@_async_queries_session = ActiveRecord::Base.asynchronous_queries_tracker.start_session
end

def teardown_asynchronous_queries_session
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session(true) if @_async_queries_session
end

def run_async_load
logger.debug('>>> run async load')
in_new_trace do
instrumentation.tracer.in_span('test_wrapper') do
if block_given?
yield
else
users = User.all.load_async
sleep(0.5)
logger.debug('>>> async #to_a')
users.to_a
end
end
end
end

def in_new_trace(&block)
OpenTelemetry::Context.with_current(OpenTelemetry::Context::ROOT, &block)
end

def spans
test_wrapper_span = unfiltered_spans.find { |span| span.name == 'test_wrapper' }
unfiltered_spans.select { |span| span.trace_id == test_wrapper_span.trace_id }
end

def span_names
spans.map(&:name).sort
end

# call with block_queue: true to completely block the executor (no tasks can be enqueued),
# or with block_queue: false to block the workers only (tasks still accepted in the queue)
def with_busy_executor(block_queue: true)
_(ActiveRecord.async_query_executor).must_equal :global_thread_pool

mutex = Mutex.new
condvar = ConditionVariable.new
executor = ActiveRecord.instance_variable_get(:@global_thread_pool_async_query_executor)

task_count = executor.max_length
task_count += executor.max_queue if block_queue

awaiting_signals = (0...task_count).to_a

# Fill up the max thread count and queue with tasks that
# will never complete until they are signaled to do so.
task_count.times do |n|
executor.post do
mutex.synchronize do
ASYNC_TEST_LOGGER.debug("task #{n} waiting...")
condvar.wait(mutex)
ASYNC_TEST_LOGGER.debug("task #{n} got the signal")
awaiting_signals.delete(n)
end
end
end

logger.debug("yielding... block_queue=#{block_queue}")
yield
logger.debug('...done!')
ensure
logger.debug('cleaning up...')
# clean up the queue
mutex.synchronize { condvar.signal } until awaiting_signals.empty?
end

def current_thread_id
Thread.current.object_id
end

def execute_query_span
spans.find { |span| span.name == 'User Load' }
end

it 'async_query' do
run_async_load

_(span_names).must_equal(['test_wrapper', 'User Load', 'schedule User query'].sort)
_(execute_query_span.attributes['__test_only_thread_id']).wont_equal(current_thread_id)
_(execute_query_span.attributes['async']).must_equal(true)
end

describe 'no executor' do
before do
@async_query_executor_was = ActiveRecord.async_query_executor
ActiveRecord.async_query_executor = nil
end

after do
ActiveRecord.async_query_executor = @async_query_executor_was
end

it 'is not actually async' do
run_async_load # sic

_(spans.map(&:name)).wont_include('Schedule User query')
_(spans.map(&:name)).must_include('User query')

user_query = spans.find { |span| span.name == 'User query' }
_(user_query.attributes['async']).must_equal(false) if user_query.attributes.key?('async')
_(span_names).must_equal(['User query', 'test_wrapper'].sort)
end
end

it 'async_query_blocked_executor' do
with_busy_executor { run_async_load }

# In this case the wrapped task is executed as the 'fallback_action' by the thread pool executor,
# so we get the async span, even though it is not actually async.
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id)

skip(<<~SKIP)
FIXME: async _should_ be false here, but it's executed as a fallback action and
is incorrectly set to `true`
SKIP

_(execute_query_span.attributes['async']).must_equal(false)
end

it 'async_query_slow_executor' do
# executor accepts task, but doesn't fulfill it before the waiter
with_busy_executor(block_queue: false) do
run_async_load
end

# When #to_a is called, the query is still pending and hasn't been picked up,
# so AR executes is synchronously. The executor task is cancelled (or should be?),
# so this span won't be here.
_(execute_query_span.attributes['async']).must_equal(false)
_(span_names).must_equal(['User Load', 'schedule User query', 'test_wrapper'])
end

it 'async_query_no_wait' do
run_async_load do
User.all.load_async.to_a
end

# here we called #to_a inline, so it executed before the async scheduler
# could assign the task to a worker. I'm not sure this test will always pass.
_(execute_query_span.attributes['async']).must_equal(false)
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
let(:spans) { exporter.finished_spans }

before { exporter.reset }

after do
ActiveRecord::Base.subclasses.each do |model|
model.connection.truncate(model.table_name)
Expand Down
14 changes: 14 additions & 0 deletions instrumentation/active_record/test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
ActiveRecord::Base.logger = logger
ActiveRecord::Migration.verbose = false

ActiveRecord.async_query_executor = :global_thread_pool

ActiveRecord::Base.establish_connection(
adapter: 'sqlite3',
database: 'db/development.sqlite3'
Expand Down Expand Up @@ -84,3 +86,15 @@ def change
end

Minitest.after_run { CreateUserTable.migrate(:down) }

# Used in async tests to determine what thread spawned which span
module SpanThreadIdTracking
def internal_start_span(name, kind, attributes, links, start_timestamp, parent_context, instrumentation_scope)
attributes ||= {}
attributes['__test_only_thread_id'] = Thread.current.object_id

super
end
end

OpenTelemetry::SDK::Trace::TracerProvider.prepend(SpanThreadIdTracking)

0 comments on commit 43cc6dc

Please sign in to comment.