Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rum initialization async #679

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import android.util.Log;
import io.opentelemetry.android.common.RumConstants;
import io.opentelemetry.android.config.OtelRumConfig;
import io.opentelemetry.android.export.BufferDelegatingLogExporter;
import io.opentelemetry.android.export.BufferDelegatingSpanExporter;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
import io.opentelemetry.android.features.diskbuffering.SignalFromDiskExporter;
import io.opentelemetry.android.features.diskbuffering.scheduler.ExportScheduleHandler;
Expand Down Expand Up @@ -53,6 +55,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -86,6 +90,8 @@ public final class OpenTelemetryRumBuilder {

private Resource resource;

private final ExecutorService executorService = Executors.newSingleThreadExecutor();

private static TextMapPropagator buildDefaultPropagator() {
return TextMapPropagator.composite(
W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance());
Expand Down Expand Up @@ -273,11 +279,69 @@ OpenTelemetryRum build(ServiceManager serviceManager) {
InitializationEvents initializationEvents = InitializationEvents.get();
applyConfiguration(serviceManager, initializationEvents);

BufferDelegatingLogExporter bufferDelegatingLogExporter =
new BufferDelegatingLogExporter();

BufferDelegatingSpanExporter bufferDelegatingSpanExporter =
new BufferDelegatingSpanExporter();

SessionManager sessionManager =
SessionManager.create(timeoutHandler, config.getSessionTimeout().toNanos());

OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
.setTracerProvider(
buildTracerProvider(
sessionManager,
application,
bufferDelegatingSpanExporter))
.setMeterProvider(buildMeterProvider(application))
.setLoggerProvider(
buildLoggerProvider(
application, bufferDelegatingLogExporter))
.setPropagators(buildFinalPropagators())
.build();

otelSdkReadyListeners.forEach(listener -> listener.accept(sdk));

SdkPreconfiguredRumBuilder delegate =
new SdkPreconfiguredRumBuilder(
application,
sdk,
timeoutHandler,
sessionManager,
config.shouldDiscoverInstrumentations(),
serviceManager);

executorService.execute(
() -> {
initializeExporters(
serviceManager,
initializationEvents,
bufferDelegatingSpanExporter,
bufferDelegatingLogExporter);
});

instrumentations.forEach(delegate::addInstrumentation);

return delegate.build();
}

private void initializeExporters(
ServiceManager serviceManager,
InitializationEvents initializationEvents,
BufferDelegatingSpanExporter bufferDelegatingSpanExporter,
BufferDelegatingLogExporter bufferedDelegatingLogExporter) {

DiskBufferingConfiguration diskBufferingConfiguration =
config.getDiskBufferingConfiguration();

SpanExporter spanExporter = buildSpanExporter();

LogRecordExporter logsExporter = buildLogsExporter();

SignalFromDiskExporter signalFromDiskExporter = null;

if (diskBufferingConfiguration.isEnabled()) {
try {
StorageConfiguration storageConfiguration =
Expand All @@ -299,34 +363,14 @@ OpenTelemetryRum build(ServiceManager serviceManager) {
Log.e(RumConstants.OTEL_RUM_LOG_TAG, "Could not initialize disk exporters.", e);
}
}
initializationEvents.spanExporterInitialized(spanExporter);

SessionManager sessionManager =
SessionManager.create(timeoutHandler, config.getSessionTimeout().toNanos());
initializationEvents.spanExporterInitialized(spanExporter);

OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
.setTracerProvider(
buildTracerProvider(sessionManager, application, spanExporter))
.setMeterProvider(buildMeterProvider(application))
.setLoggerProvider(buildLoggerProvider(application, logsExporter))
.setPropagators(buildFinalPropagators())
.build();
bufferedDelegatingLogExporter.setDelegate(logsExporter);

otelSdkReadyListeners.forEach(listener -> listener.accept(sdk));
bufferDelegatingSpanExporter.setDelegate(spanExporter);

scheduleDiskTelemetryReader(signalFromDiskExporter, diskBufferingConfiguration);

SdkPreconfiguredRumBuilder delegate =
new SdkPreconfiguredRumBuilder(
application,
sdk,
timeoutHandler,
sessionManager,
config.shouldDiscoverInstrumentations(),
serviceManager);
instrumentations.forEach(delegate::addInstrumentation);
return delegate.build();
}

private StorageConfiguration createStorageConfiguration(ServiceManager serviceManager)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.export

import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.logs.data.LogRecordData
import io.opentelemetry.sdk.logs.export.LogRecordExporter

/**
* An in-memory buffer delegating log exporter that buffers log records in memory until a delegate is set.
* Once a delegate is set, the buffered log records are exported to the delegate.
*
* The buffer size is set to 5,000 by default. If the buffer is full, the exporter will drop new log records.
*/
internal class BufferDelegatingLogExporter(
bufferSize: Int = 5_000
): BufferedDelegatingExporter<LogRecordData, LogRecordExporter>(bufferSize = bufferSize), LogRecordExporter {

override fun exportToDelegate(delegate: LogRecordExporter, data: Collection<LogRecordData>): CompletableResultCode {
return delegate.export(data)
}

override fun shutdownDelegate(delegate: LogRecordExporter): CompletableResultCode {
return delegate.shutdown()
}

override fun export(logs: Collection<LogRecordData>): CompletableResultCode {
return bufferOrDelegate(logs)
}

override fun flush(): CompletableResultCode {
return withDelegateOrNull { delegate ->
delegate?.flush() ?: CompletableResultCode.ofSuccess()
}
}

override fun shutdown(): CompletableResultCode {
return bufferedShutDown()
}

override fun close() {
shutdown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.export

import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.sdk.trace.export.SpanExporter

/**
* An in-memory buffer delegating span exporter that buffers span data in memory until a delegate is set.
* Once a delegate is set, the buffered span data is exported to the delegate.
*
* The buffer size is set to 5,000 by default. If the buffer is full, the exporter will drop new span data.
*/
internal class BufferDelegatingSpanExporter(
bufferSize: Int = 5_000
): BufferedDelegatingExporter<SpanData, SpanExporter>(bufferSize = bufferSize), SpanExporter {

override fun exportToDelegate(delegate: SpanExporter, data: Collection<SpanData>): CompletableResultCode {
return delegate.export(data)
}

override fun shutdownDelegate(delegate: SpanExporter): CompletableResultCode {
return delegate.shutdown()
}

override fun export(spans: Collection<SpanData>): CompletableResultCode {
return bufferOrDelegate(spans)
}

override fun flush(): CompletableResultCode {
return withDelegateOrNull { delegate ->
delegate?.flush() ?: CompletableResultCode.ofSuccess()
}
}

override fun shutdown(): CompletableResultCode {
return bufferedShutDown()
}

override fun close() {
shutdown()
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.opentelemetry.android.export

import io.opentelemetry.sdk.common.CompletableResultCode
import java.util.concurrent.atomic.AtomicBoolean

/**
* An in-memory buffer delegating span exporter that buffers span data in memory until a delegate is set.
* Once a delegate is set, the buffered span data is exported to the delegate.
*
* The buffer size is set to 5,000 by default. If the buffer is full, the exporter will drop new span data.
*/
internal abstract class BufferedDelegatingExporter<T, D>(private val bufferSize: Int = 5_000) {
private var delegate: D? = null
private val buffer = mutableListOf<T>()
private val lock = Any()
private var isShutDown = AtomicBoolean(false)

/**
* Sets the delegate for this exporter and flushes the buffer to the delegate.
*
* If the delegate has already been set, an [IllegalStateException] will be thrown.
* If this exporter has been shut down, the delegate will be shut down immediately.
*
* @param delegate the delegate to set
*/
fun setDelegate(delegate: D) {
synchronized(lock) {
if (this.delegate != null) {
throw IllegalStateException("Exporter delegate has already been set.")
}

this.delegate = delegate

flushToDelegate(delegate)

if (isShutDown.get()) shutdownDelegate(delegate)
}
}

/**
* Buffers the given data if the delegate has not been set, otherwise exports the data to the delegate.
*
* @param data the data to buffer or export
*/
fun bufferOrDelegate(data: Collection<T>): CompletableResultCode =
withDelegateOrNull {
if (it != null) {
exportToDelegate(it, data)
} else {
val amountToTake = bufferSize - buffer.size
buffer.addAll(data.take(amountToTake))
CompletableResultCode.ofSuccess()
}
}

/**
* Executes the given block with the delegate if it has been set, otherwise executes the block with a null delegate.
*
* @param block the block to execute
*/
fun <T> withDelegateOrNull(block: (D?) -> T): T {
return delegate?.let { block(it) } ?: synchronized(lock) { block(delegate) }
}

fun bufferedShutDown(): CompletableResultCode {
return withDelegateOrNull {
if (it != null) {
flushToDelegate(it)
shutdownDelegate(it)
} else {
CompletableResultCode.ofSuccess()
}
}
}

protected abstract fun exportToDelegate(delegate: D, data: Collection<T>): CompletableResultCode

protected abstract fun shutdownDelegate(delegate: D): CompletableResultCode

private fun flushToDelegate(delegate: D) {
exportToDelegate(delegate, buffer)
buffer.clear()
}
}
Loading
Loading