Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
collect kafka metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Oct 27, 2023
1 parent c9e835e commit 3e0477e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 5 deletions.
1 change: 1 addition & 0 deletions capture-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct Config {
#[envconfig(default = "127.0.0.1:3000")]
address: SocketAddr,
redis_url: String,

kafka_hosts: String,
kafka_topic: String,
}
Expand Down
69 changes: 64 additions & 5 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use async_trait::async_trait;
use metrics::{counter, histogram};
use metrics::{absolute_counter, counter, gauge, histogram};
use tokio::task::JoinSet;

use crate::api::CaptureError;
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use tracing::debug;

use crate::event::ProcessedEvent;

Expand Down Expand Up @@ -39,25 +40,83 @@ impl EventSink for PrintSink {
}
}

struct KafkaContext;

impl rdkafka::ClientContext for KafkaContext {
fn stats(&self, stats: rdkafka::Statistics) {
debug!("Client stats: {:?}", &stats);

gauge!("capture_kafka_callback_queue_depth", stats.replyq as f64);
gauge!("capture_kafka_producer_queue_depth", stats.msg_cnt as f64);
gauge!(
"capture_kafka_producer_queue_depth_limit",
stats.msg_max as f64
);
gauge!("capture_kafka_producer_queue_bytes", stats.msg_max as f64);
gauge!(
"capture_kafka_producer_queue_bytes_limit",
stats.msg_size_max as f64
);

for (topic, stats) in stats.topics {
gauge!(
"capture_kafka_produce_avg_batch_size_bytes",
stats.batchsize.avg as f64,
"topic" => topic.clone()
);
gauge!(
"capture_kafka_produce_avg_batch_size_events",
stats.batchcnt.avg as f64,
"topic" => topic
);
}

for (_, stats) in stats.brokers {
let id_string = format!("{}", stats.nodeid);
gauge!(
"capture_kafka_broker_requests_pending",
stats.outbuf_cnt as f64,
"broker" => id_string.clone()
);
gauge!(
"capture_kafka_broker_responses_awaiting",
stats.waitresp_cnt as f64,
"broker" => id_string.clone()
);
absolute_counter!(
"capture_kafka_broker_tx_errors_total",
stats.txerrs,
"broker" => id_string.clone()
);
absolute_counter!(
"capture_kafka_broker_rx_errors_total",
stats.rxerrs,
"broker" => id_string
);
}
}
}

#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer,
producer: FutureProducer<KafkaContext>,
topic: String,
}

impl KafkaSink {
pub fn new(topic: String, brokers: String) -> anyhow::Result<KafkaSink> {
let producer: FutureProducer = ClientConfig::new()
let producer: FutureProducer<KafkaContext> = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.create()?;
.set("statistics.interval.ms", "10000")
.create_with_context(KafkaContext)?;

Ok(KafkaSink { producer, topic })
}
}

impl KafkaSink {
async fn kafka_send(
producer: FutureProducer,
producer: FutureProducer<KafkaContext>,
topic: String,
event: ProcessedEvent,
) -> Result<(), CaptureError> {
Expand Down

0 comments on commit 3e0477e

Please sign in to comment.