Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic exponential histogram #1736

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ module Aggregation
require 'opentelemetry/sdk/metrics/aggregation/sum'
require 'opentelemetry/sdk/metrics/aggregation/last_value'
require 'opentelemetry/sdk/metrics/aggregation/drop'
require 'opentelemetry/sdk/metrics/aggregation/exponential_histogram_data_point'
require 'opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram'
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require_relative 'exponential_histogram/buckets'
require_relative 'exponential_histogram/log2e_scale_factor'
require_relative 'exponential_histogram/ieee_754'
require_relative 'exponential_histogram/logarithm_mapping'
require_relative 'exponential_histogram/exponent_mapping'

module OpenTelemetry
module SDK
module Metrics
module Aggregation
# Contains the implementation of the ExponentialBucketHistogram aggregation
# https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
class ExponentialBucketHistogram # rubocop:disable Metrics/ClassLength
attr_reader :aggregation_temporality

# relate to min max scale: https://opentelemetry.io/docs/specs/otel/metrics/sdk/#support-a-minimum-and-maximum-scale
MAX_SCALE = 20
MIN_SCALE = -10
MAX_SIZE = 160

# The default boundaries is calculated based on default max_size and max_scale value
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
def initialize(
aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), # TODO: aggregation_temporality should be renamed to collect_aggregation_temporality for clear definition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why this should be renamed. Can you tell me more? Is there a spec reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a mistake; removed the comment.

max_size: MAX_SIZE,
max_scale: MAX_SCALE,
record_min_max: true,
zero_threshold: 0
)
@data_points = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @data_points was refactored out of the other aggregations when views were merged in. Why save them as an instance variable in this aggregation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might start the pr before metrics view got merged. Updated.

@aggregation_temporality = aggregation_temporality
@record_min_max = record_min_max
@min = Float::INFINITY
@max = -Float::INFINITY
@sum = 0
@count = 0
@zero_threshold = zero_threshold
@zero_count = 0
@size = validate_size(max_size)
@scale = validate_scale(max_scale)

@mapping = new_mapping(@scale)
end

def collect(start_time, end_time)
if @aggregation_temporality == :delta
# Set timestamps and 'move' data point values to result.
hdps = @data_points.values.map! do |hdp|
hdp.start_time_unix_nano = start_time
hdp.time_unix_nano = end_time
hdp
end
@data_points.clear
hdps
else
# Assume merge is done in collector side at this point
@data_points.values.map! do |hdp|
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
hdp.time_unix_nano = end_time
hdp = hdp.dup
hdp.positive = hdp.positive.dup
hdp.negative = hdp.negative.dup
hdp
end
end
end

# rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
def update(amount, attributes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this is almost identical to explicit bucket histograms. Do you think we should combine the duplicated code into a module or something for the two aggregations to share?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the collect or update are identical? Could you give me some example?

# fetch or initialize the ExponentialHistogramDataPoint
hdp = @data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

@data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
0, # :count
0, # :sum
@scale, # :scale
@zero_count, # :zero_count
ExponentialHistogram::Buckets.new, # :positive
ExponentialHistogram::Buckets.new, # :negative
0, # :flags
nil, # :exemplars
min, # :min
max, # :max
@zero_threshold # :zero_threshold)
)
end

# Start to populate the data point (esp. the buckets)
if @record_min_max
hdp.max = amount if amount > hdp.max
hdp.min = amount if amount < hdp.min
end

hdp.sum += amount
hdp.count += 1

if amount.abs <= @zero_threshold
hdp.zero_count += 1
hdp.scale = 0 if hdp.count == hdp.zero_count # if always getting zero, then there is no point to keep doing the update
return
end

# rescale, map to index, update the buckets here
buckets = amount.positive? ? hdp.positive : hdp.negative
amount = -amount if amount.negative?

bucket_index = @mapping.map_to_index(amount)

is_rescaling_needed = false
low = high = 0

if buckets.counts == [0] # special case of empty
buckets.index_start = bucket_index
buckets.index_end = bucket_index
buckets.index_base = bucket_index

elsif bucket_index < buckets.index_start && (buckets.index_end - bucket_index) >= @size
is_rescaling_needed = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename to rescaling needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

low = bucket_index
high = buckets.index_end

elsif bucket_index > buckets.index_end && (bucket_index - buckets.index_start) >= @size
is_rescaling_needed = true
low = buckets.index_start
high = bucket_index
end

if is_rescaling_needed
scale_change = get_scale_change(low, high)
downscale(scale_change, hdp.positive, hdp.negative)
new_scale = @mapping.scale - scale_change
hdp.scale = new_scale
@mapping = new_mapping(new_scale)
bucket_index = @mapping.map_to_index(amount)

OpenTelemetry.logger.debug "Rescaled with new scale #{new_scale} from #{low} and #{high}; bucket_index is updated to #{bucket_index}"
end

# adjust buckets based on the bucket_index
if bucket_index < buckets.index_start
span = buckets.index_end - bucket_index

if span >= buckets.counts.size
OpenTelemetry.logger.debug "buckets need to grow to #{span + 1} from #{buckets.counts.size} (max bucket size #{@size})"
buckets.grow(span + 1, @size)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicated on lines 164-167. Should it go into its own method?

Copy link
Contributor Author

@xuan-cao-swi xuan-cao-swi Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with its own method


buckets.index_start = bucket_index
elsif bucket_index > buckets.index_end
span = bucket_index - buckets.index_start

if span >= buckets.counts.size
OpenTelemetry.logger.debug "buckets need to grow to #{span + 1} from #{buckets.counts.size} (max bucket size #{@size})"
buckets.grow(span + 1, @size)
end

buckets.index_end = bucket_index
end

bucket_index -= buckets.index_base
bucket_index += buckets.counts.size if bucket_index.negative?

buckets.increment_bucket(bucket_index)
nil
end
# rubocop:enable Metrics/MethodLength, Metrics/CyclomaticComplexity

private

def new_mapping(scale)
scale <= 0 ? ExponentialHistogram::ExponentMapping.new(scale) : ExponentialHistogram::LogarithmMapping.new(scale)
end

def empty_counts
@boundaries ? Array.new(@boundaries.size + 1, 0) : nil
end

def get_scale_change(low, high)
# puts "get_scale_change: low: #{low}, high: #{high}, @size: #{@size}"
# python code also produce 18 with 0,1048575, the high is little bit off
# just checked, the mapping is also ok, produce the 1048575
change = 0
while high - low >= @size
high >>= 1
low >>= 1
change += 1
end
change
end

def downscale(change, positive, negative)
return if change == 0
raise 'Invalid change of scale' if change.negative?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this get caught by the error handler? I'm concerned if we raise, we might crash a user's app.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to return if change <= 0


positive.downscale(change)
negative.downscale(change)
end

def validate_scale(scale)
return scale unless scale > MAX_SCALE || scale < MIN_SCALE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how performant it is, but another way to implement this could be:

(MIN_SCALE..MAX_SCALE).cover?(scale)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think use operator will be faster since it's direct comparisons and short-circuiting behavior; but range comparison looks nice. Let me know if I need to change it.

       user     system      total        real
Direct Comparison:  0.047998   0.000000   0.047998 (  0.047786)
Range Cover:  0.172396   0.000963   0.173359 (  0.173389)


OpenTelemetry.logger.warn "Scale #{scale} is invalid, using default max scale #{MAX_SCALE}"
MAX_SCALE
end

def validate_size(size)
return size unless size > MAX_SIZE || size < 0

OpenTelemetry.logger.warn "Size #{size} is invalid, using default max size #{MAX_SIZE}"
MAX_SIZE
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Aggregation
module ExponentialHistogram
# Buckets is the fundamental building block of exponential histogram that store bucket/boundary value
class Buckets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this class have test coverage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no; it's intensively test through test case for exponential_bucket_histogram. If bucket is not correct, the exponential_bucket_histogram will fail.

I will come up with some test case for the class inside /exponential_histogram/

attr_accessor :index_start, :index_end, :index_base

def initialize
@counts = [0]
@index_base = 0
@index_start = 0
@index_end = 0
end

def grow(needed, max_size)
size = @counts.size
bias = @index_base - @index_start
old_positive_limit = size - bias

new_size = [2**Math.log2(needed).ceil, max_size].min

new_positive_limit = new_size - bias

tmp = Array.new(new_size, 0)
tmp[new_positive_limit..-1] = @counts[old_positive_limit..]
tmp[0...old_positive_limit] = @counts[0...old_positive_limit]
@counts = tmp
end

def offset
@index_start
end

def offset_counts
bias = @index_base - @index_start
@counts[-bias..] + @counts[0...-bias]
end
alias counts offset_counts

def length
return 0 if @counts.empty?
return 0 if @index_end == @index_start && self[0] == 0

@index_end - @index_start + 1
end

def get_bucket(key)
bias = @index_base - @index_start

key += @counts.size if key < bias
key -= bias

@counts[key]
end

def downscale(amount)
bias = @index_base - @index_start

if bias != 0
@index_base = @index_start
@counts.reverse!
@counts = @counts[0...bias].reverse + @counts[bias..].reverse
end

size = 1 + @index_end - @index_start
each = 1 << amount
inpos = 0
outpos = 0
pos = @index_start

while pos <= @index_end
mod = pos % each
mod += each if mod < 0

inds = mod

while inds < each && inpos < size
if outpos != inpos
@counts[outpos] += @counts[inpos]
@counts[inpos] = 0
end

inpos += 1
pos += 1
inds += 1
end

outpos += 1
end

@index_start >>= amount
@index_end >>= amount
@index_base = @index_start
end

def increment_bucket(bucket_index, increment = 1)
@counts[bucket_index] += increment
end
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Aggregation
module ExponentialHistogram
# LogarithmMapping for mapping when scale < 0
class ExponentMapping
attr_reader :scale

def initialize(scale)
@scale = scale
@min_normal_lower_boundary_index = calculate_min_normal_lower_boundary_index(scale)
@max_normal_lower_boundary_index = IEEE754::MAX_NORMAL_EXPONENT >> -@scale
end

def map_to_index(value)
return @min_normal_lower_boundary_index if value < IEEE754::MIN_NORMAL_VALUE

exponent = IEEE754.get_ieee_754_exponent(value)
correction = (IEEE754.get_ieee_754_mantissa(value) - 1) >> IEEE754::MANTISSA_WIDTH
(exponent + correction) >> -@scale
end

def get_lower_boundary(inds)
raise StandardError, 'mapping underflow' if inds < @min_normal_lower_boundary_index || inds > @max_normal_lower_boundary_index

Math.ldexp(1, inds << -@scale)
end

def calculate_min_normal_lower_boundary_index(scale)
inds = IEEE754::MIN_NORMAL_EXPONENT >> -scale
inds -= 1 if -scale < 2
inds
end
end
end
end
end
end
end
Loading
Loading