Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an OTLP receiver to Weaver to prepare for the weaver registry live-check command #548

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0f96776
feat(otlp-receiver): skeleton of infer-registry and check-registry
lquerel Jan 10, 2025
8a3b4bb
feat(otlp-receiver): vendor OTLP gRPC services
lquerel Jan 11, 2025
3ebf464
feat(otlp-receiver): add logs, metrics, trace gRPC services.
lquerel Jan 11, 2025
dfa58d1
feat(otlp-receiver): provide an iterator to listen to OTLP request.
lquerel Jan 11, 2025
fabf279
feat(otlp-receiver): clean up the code
lquerel Jan 11, 2025
98c058a
feat(otlp-receiver): rename the directory containing the generated co…
lquerel Jan 11, 2025
6a266df
feat(otlp-receiver): rename the directory containing the generated co…
lquerel Jan 11, 2025
32e03a1
feat(otlp-receiver): Stop the OTLP receiver after x seconds of inacti…
lquerel Jan 13, 2025
8ef296f
feat(otlp-receiver): Test stop conditions
lquerel Jan 16, 2025
f1c4328
feat(otlp-receiver): Improve command documentation
lquerel Jan 16, 2025
7763933
feat(otlp-receiver): Clean up code
lquerel Jan 16, 2025
2f9cae9
feat(otlp-receiver): Fix all Clippy issues.
lquerel Jan 16, 2025
9d608ac
feat(otlp-receiver): Comment build.rs.
lquerel Jan 16, 2025
61c31ea
chore(build): Merge with upstream branch
lquerel Jan 16, 2025
5718bc4
chore(build): Fix rustdoc lint issue
lquerel Jan 16, 2025
28aee66
feat(oltp-receiver): Remove SIGHUP handling on non-unix system.
lquerel Jan 16, 2025
7ac8a45
feat(otlp-receiver): Block the `listen_otlp_requests` function until …
lquerel Jan 17, 2025
82f39b9
feat(oltp-receiver): Add more logs in the test to understand the issu…
lquerel Jan 17, 2025
636a279
feat(oltp-receiver): New attempt to fix the Windows issue.
lquerel Jan 17, 2025
e82750f
feat(oltp-receiver): Fix clippy issue
lquerel Jan 17, 2025
3d14bd5
feat(oltp-receiver): Expose otlp address in the parameters of the lis…
lquerel Jan 17, 2025
f8ce975
Merge branch 'main' into shift-left-round-1
lquerel Jan 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
448 changes: 430 additions & 18 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ rayon = "1.10.0"
ratatui = { version = "0.29.0", features=["serde"] }
crossterm = { version = "0.28.1", features = ["serde"] }
tui-textarea = "0.7.0"
tokio = { version = "1.43.0", features = ["rt", "net", "signal"]}
prost = "0.13.4"
tonic = { version = "0.12.3", default-features = false, features = ["server", "codegen", "prost", "tls", "tls-roots"] }

# workspace dependencies
serde.workspace = true
Expand All @@ -96,11 +99,21 @@ thiserror.workspace = true
miette.workspace = true
schemars.workspace = true
itertools.workspace = true
tracing = "0.1.41"
opentelemetry = "0.27.1"
log = "0.4.22"

[dev-dependencies]
weaver_diff = { path = "crates/weaver_diff" }
tempdir.workspace = true
ureq.workspace = true
assert_cmd = "2.0.16"
portpicker = "0.1.1"
#opentelemetry_sdk = { version = "0.27.1", features = [] }
#opentelemetry-otlp = { version ="0.27.0", features = ["metrics", "logs", "trace"] }

[build-dependencies]
tonic-build = { version = "0.12.3", features = ["cleanup-markdown"] }

[profile.release]
lto = true
Expand Down
28 changes: 28 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// SPDX-License-Identifier: Apache-2.0

//! A build script to generate the gRPC OTLP receiver API (client and server stubs.

fn main() -> Result<(), Box<dyn std::error::Error>> {
// The gRPC OTLP Receiver is vendored in `src/otlp_receiver/receiver` to avoid
// depending on protoc in GitHub Actions.
//
// To regenerate the gRPC API from the proto file:
// - Uncomment the following lines.
// - Run `cargo build` to regenerate the API.
// - Comment the following lines.
// - Commit the changes.

// tonic_build::configure()
// // .build_client(false)
// .out_dir("src/registry/otlp/grpc_stubs")
// .compile_protos(
// &[
// "src/registry/otlp/proto/opentelemetry/proto/collector/logs/v1/logs_service.proto",
// "src/registry/otlp/proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto",
// "src/registry/otlp/proto/opentelemetry/proto/collector/trace/v1/trace_service.proto",
// ],
// &["src/registry/otlp/proto"],
// )?;

Ok(())
}
112 changes: 112 additions & 0 deletions src/registry/live_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// SPDX-License-Identifier: Apache-2.0

//! Check the gap between a semantic convention registry and an OTLP traffic.

use crate::registry::otlp::{listen_otlp_requests, OtlpRequest};
use crate::registry::{PolicyArgs, RegistryArgs};
use crate::util::prepare_main_registry;
use crate::{DiagnosticArgs, ExitDirectives};
use clap::Args;
use std::time::Duration;
use weaver_common::diagnostic::DiagnosticMessages;
use weaver_common::Logger;

/// Parameters for the `registry live-check` sub-command
#[derive(Debug, Args)]
pub struct CheckRegistryArgs {
/// Parameters to specify the semantic convention registry
#[command(flatten)]
registry: RegistryArgs,

/// Address used by the gRPC OTLP listener.
#[clap(long, default_value = "0.0.0.0")]
pub otlp_grpc_address: String,

/// Port used by the gRPC OTLP listener.
#[clap(long, default_value = "4317", short = 'p')]
pub otlp_grpc_port: u16,

/// Port used by the HTTP admin port (endpoints: /stop).
#[clap(long, default_value = "4320", short = 'a')]
pub admin_port: u16,

/// Max inactivity time in seconds before stopping the listener.
#[clap(long, default_value = "10", short = 't')]
pub inactivity_timeout: u64,

/// Parameters to specify the diagnostic format.
#[command(flatten)]
pub diagnostic: DiagnosticArgs,
}

/// Check the conformance level of an OTLP stream against a semantic convention registry.
///
/// This command starts an OTLP listener and compares each received OTLP message with the
/// registry provided as a parameter. When the command is stopped (see stop conditions),
/// a conformance/coverage report is generated. The purpose of this command is to be used
/// in a CI/CD pipeline to validate the telemetry stream from an application or service
/// against a registry.
///
/// The currently supported stop conditions are: CTRL+C (SIGINT), SIGHUP, the HTTP /stop
/// endpoint, and a maximum duration of no OTLP message reception.
pub(crate) fn command(
logger: impl Logger + Sync + Clone,
args: &CheckRegistryArgs,
) -> Result<ExitDirectives, DiagnosticMessages> {
let mut diag_msgs = DiagnosticMessages::empty();
let policy = PolicyArgs::skip();
let otlp_requests = listen_otlp_requests(
args.otlp_grpc_address.as_str(),
args.otlp_grpc_port,
args.admin_port,
Duration::from_secs(args.inactivity_timeout),
logger.clone(),
)?;

// @ToDo Use the following resolved registry to check the level of compliance of the incoming OTLP messages
let (_resolved_registry, _) =
prepare_main_registry(&args.registry, &policy, logger.clone(), &mut diag_msgs)?;

logger.loading(&format!(
"Checking OTLP traffic on port {}.",
args.otlp_grpc_port
));

// @ToDo Implement the checking logic
for otlp_request in otlp_requests {
match otlp_request {
OtlpRequest::Logs(_logs) => {
// ToDo Implement the checking logic for logs
println!("Logs Request received");
}
OtlpRequest::Metrics(_metrics) => {
// ToDo Implement the checking logic for metrics
println!("Metrics Request received");
}
OtlpRequest::Traces(_traces) => {
// ToDo Implement the checking logic for traces
println!("Trace Request received");
}
OtlpRequest::Stop(reason) => {
logger.warn(&format!("Stopping the listener, reason: {}", reason));
// ToDo Generate the report here
break;
}
OtlpRequest::Error(error) => {
diag_msgs.extend(DiagnosticMessages::from_error(error));
break;
}
}
}

if diag_msgs.has_error() {
return Err(diag_msgs);
}

logger.success("OTLP requests received and checked.");

Ok(ExitDirectives {
exit_code: 0,
quiet_mode: false,
})
}
31 changes: 31 additions & 0 deletions src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::Serialize;

use crate::registry::generate::RegistryGenerateArgs;
use crate::registry::json_schema::RegistryJsonSchemaArgs;
use crate::registry::live_check::CheckRegistryArgs;
use crate::registry::resolve::RegistryResolveArgs;
use crate::registry::search::RegistrySearchArgs;
use crate::registry::stats::RegistryStatsArgs;
Expand All @@ -23,6 +24,8 @@ use weaver_common::Logger;
mod check;
mod generate;
mod json_schema;
mod live_check;
mod otlp;
mod resolve;
mod search;
mod stats;
Expand Down Expand Up @@ -101,6 +104,19 @@ pub enum RegistrySubCommand {
/// The produced JSON Schema can be used to generate documentation of the resolved registry format or to generate code in your language of choice if you need to interact with the resolved registry format for any reason.
#[clap(verbatim_doc_comment)]
JsonSchema(RegistryJsonSchemaArgs),

/// Check the conformance level of an OTLP stream against a semantic convention registry.
///
/// This command starts an OTLP listener and compares each received OTLP message with the
/// registry provided as a parameter. When the command is stopped (see stop conditions),
/// a conformance/coverage report is generated. The purpose of this command is to be used
/// in a CI/CD pipeline to validate the telemetry stream from an application or service
/// against a registry.
///
/// The currently supported stop conditions are: CTRL+C (SIGINT), SIGHUP, the HTTP /stop
/// endpoint, and a maximum duration of no OTLP message reception.
#[clap(verbatim_doc_comment)]
LiveCheck(CheckRegistryArgs),
}

/// Set of parameters used to specify a semantic convention registry.
Expand Down Expand Up @@ -140,6 +156,17 @@ pub struct PolicyArgs {
pub display_policy_coverage: bool,
}

impl PolicyArgs {
/// Create a new empty `PolicyArgs` with the skip flag set to true.
pub fn skip() -> Self {
Self {
policies: Vec::new(),
skip_policies: true,
display_policy_coverage: false,
}
}
}

/// Manage a semantic convention registry and return the exit code.
pub fn semconv_registry(log: impl Logger + Sync + Clone, command: &RegistryCommand) -> CmdResult {
match &command.command {
Expand Down Expand Up @@ -171,5 +198,9 @@ pub fn semconv_registry(log: impl Logger + Sync + Clone, command: &RegistryComma
json_schema::command(log.clone(), args),
Some(args.diagnostic.clone()),
),
RegistrySubCommand::LiveCheck(args) => CmdResult::new(
live_check::command(log.clone(), args),
Some(args.diagnostic.clone()),
),
}
}
Loading
Loading