diff --git a/streamd/http.go b/streamd/http.go index 75e04f7..8da13d2 100644 --- a/streamd/http.go +++ b/streamd/http.go @@ -11,155 +11,177 @@ type httpServer struct { daemonController } -// Minimalist prometheus exporter -func (h *httpServer) metrics(w http.ResponseWriter, r *http.Request) { - m := h.metricsSnapshot() +func writeSRTStatsMeta(w http.ResponseWriter) { + fmt.Fprintln(w, "# HELP srt_callers Current number of subscribers to the SRT stream") + fmt.Fprintln(w, "# TYPE srt_callers gauge") - /* CPU */ + fmt.Fprintln(w, "# HELP srt_send_bytes_total Total bytes sent across all callers") + fmt.Fprintln(w, "# TYPE srt_send_bytes_total counter") - cpuTime := m.cpu.Time.UnixMilli() - fmt.Fprintf(w, "# HELP linux_proc_user Time spent in user mode, in ticks\n") - fmt.Fprintf(w, "# TYPE linux_proc_user gauge\n") - fmt.Fprintf(w, "linux_proc_user %d %d\n", m.cpu.User, cpuTime) + fmt.Fprintln(w, "# HELP srt_send_rate Send rate in Mbps") + fmt.Fprintln(w, "# TYPE srt_send_rate gauge") - fmt.Fprintf(w, "# HELP linux_proc_system Time spent in system mode, in ticks\n") - fmt.Fprintf(w, "# TYPE linux_proc_system gauge\n") - fmt.Fprintf(w, "linux_proc_system %d %d\n", m.cpu.System, cpuTime) + fmt.Fprintln(w, "# HELP srt_bandwidth Bandwidth in Mbps") + fmt.Fprintln(w, "# TYPE srt_bandwidth gauge") - fmt.Fprintf(w, "# HELP linux_proc_iowait Time spent waiting for I/O to complete, in ticks\n") - fmt.Fprintf(w, "# TYPE linux_proc_iowait gauge\n") - fmt.Fprintf(w, "linux_proc_iowait %d %d\n", m.cpu.Iowait, cpuTime) + fmt.Fprintln(w, "# HELP srt_rtt_seconds RTT in s") + fmt.Fprintln(w, "# TYPE srt_rtt_seconds gauge") - fmt.Fprintf(w, "# HELP linux_proc_irq Time spent servicing interrupts, in ticks\n") - fmt.Fprintf(w, "# TYPE linux_proc_irq gauge\n") - fmt.Fprintf(w, "linux_proc_irq %d %d\n", m.cpu.Irq, cpuTime) + fmt.Fprintln(w, "# HELP srt_negotiated_latency_seconds Negotiated latency in s") + fmt.Fprintln(w, "# TYPE srt_negotiated_latency_seconds gauge") - fmt.Fprintf(w, "# HELP linux_proc_softirq Time spent servicing soft interrupts, in ticks\n") - fmt.Fprintf(w, "# TYPE linux_proc_softirq gauge\n") - fmt.Fprintf(w, "linux_proc_softirq %d %d\n", m.cpu.SoftIrq, cpuTime) + fmt.Fprintln(w, "# HELP srt_sent_bytes_total Total bytes sent") + fmt.Fprintln(w, "# TYPE srt_sent_bytes_total counter") - /* Memory */ + fmt.Fprintln(w, "# HELP srt_retransmitted_bytes_total Total bytes retransmitted") + fmt.Fprintln(w, "# TYPE srt_retransmitted_bytes_total counter") - memTime := m.mem.Time.UnixMilli() - fmt.Fprintf(w, "# HELP linux_mem_used Amount of memory used, in kB\n") - fmt.Fprintf(w, "# TYPE linux_mem_used gauge\n") - fmt.Fprintf(w, "linux_mem_used %d %d\n", m.mem.MemUsed, memTime) + fmt.Fprintln(w, "# HELP srt_sent_dropped_bytes_total Total bytes retransmitted") + fmt.Fprintln(w, "# TYPE srt_sent_dropped_bytes_total counter") - fmt.Fprintf(w, "# HELP linux_mem_free Amount of free memory, in kB\n") - fmt.Fprintf(w, "# TYPE linux_mem_free gauge\n") - fmt.Fprintf(w, "linux_mem_free %d %d\n", m.mem.MemFree, memTime) + fmt.Fprintln(w, "# HELP srt_packets_sent_total Total packets sent") + fmt.Fprintln(w, "# TYPE srt_packets_sent_total counter") - /* TODO(hugo) I/O Status metrics via `iostat` when recording is implemented */ + fmt.Fprintln(w, "# HELP srt_packets_sent_lost_total Total packets lost") + fmt.Fprintln(w, "# TYPE srt_packets_sent_lost_total counter") - /* Load Average */ - - // The timestamp is an int64 (milliseconds since epoch, i.e. 1970-01-01 - // 00:00:00 UTC, excluding leap seconds), represented as required by Go's - // ParseInt() function. - loadAvgTime := m.loadAvg.Time.UnixMilli() - fmt.Fprintf(w, "# HELP load_avg_one Load average over one minute\n") - fmt.Fprintf(w, "# TYPE load_avg_one gauge\n") - fmt.Fprintf(w, "load_avg_one %f %d\n", m.loadAvg.One, loadAvgTime) + fmt.Fprintln(w, "# HELP srt_packets_sent_dropped_total Total packets dropped") + fmt.Fprintln(w, "# TYPE srt_packets_sent_dropped_total counter") - fmt.Fprintf(w, "# HELP load_avg_five Load average over five minutes\n") - fmt.Fprintf(w, "# TYPE load_avg_five gauge\n") - fmt.Fprintf(w, "load_avg_five %f %d\n", m.loadAvg.Five, loadAvgTime) + fmt.Fprintln(w, "# HELP srt_packets_retransmitted_total Total packets retransmitted") + fmt.Fprintln(w, "# TYPE srt_packets_retransmitted_total counter") - fmt.Fprintf(w, "# HELP load_avg_fifteen Load average over fifteen minutes\n") - fmt.Fprintf(w, "# TYPE load_avg_fifteen gauge\n") - fmt.Fprintf(w, "load_avg_fifteen %f %d\n", m.loadAvg.Fifteen, loadAvgTime) + fmt.Fprintln(w, "# HELP srt_packets_ack_received_total Number of acks received") + fmt.Fprintln(w, "# TYPE srt_packets_ack_received_total counter") - /* SRT Statistics */ + fmt.Fprintln(w, "# HELP srt_packets_nack_received_total Number of nacks received") + fmt.Fprintln(w, "# TYPE srt_packets_nack_received_total counter") +} - srtTime := m.compSinkStats.time.UnixMilli() - fmt.Fprintf(w, "# HELP srt_callers Current number of subscribers to the SRT stream\n") - fmt.Fprintf(w, "# TYPE srt_callers gauge\n") - fmt.Fprintf(w, "srt_callers %d %d\n", len(m.compSinkStats.callers), srtTime) +func writeSRTStats(w http.ResponseWriter, s *srtStats, sink string) { + srtTime := s.time.UnixMilli() + fmt.Fprintf(w, "srt_callers{sink=\"%s\"} %d %d\n", sink, len(s.callers), srtTime) // Total bytes sent - fmt.Fprintf(w, "# HELP srt_bytes_send_total Total bytes sent across all callers\n") - fmt.Fprintf(w, "# TYPE srt_bytes_send_total counter\n") - fmt.Fprintf(w, "srt_bytes_send_total %d %d\n", m.compSinkStats.bytesSendTotal, srtTime) + fmt.Fprintf(w, "srt_send_bytes_total{sink=\"%s\"} %d %d\n", sink, s.bytesSendTotal, srtTime) // Send rate per caller - // TODO: caller should be identified by IP, but that field is NULL in the srt stats structure - for i, caller := range m.compSinkStats.callers { - common := fmt.Sprintf("caller=\"%d\"", i) + for _, caller := range s.callers { + common := fmt.Sprintf("address=\"%s\", port=\"%d\", sink=\"%s\"", caller.callerAddress.String(), caller.callerPort, sink) // Send Rate - fmt.Fprintf(w, "# HELP srt_send_rate Send rate in Mbps\n") - fmt.Fprintf(w, "# TYPE srt_send_rate gauge\n") fmt.Fprintf(w, "srt_send_rate{%s} %f %d\n", common, caller.sendRateMbps, srtTime) // Bandwidth - fmt.Fprintf(w, "# HELP srt_bandwidth Bandwidth in Mbps\n") - fmt.Fprintf(w, "# TYPE srt_bandwidth gauge\n") fmt.Fprintf(w, "srt_bandwidth{%s} %f %d\n", common, caller.bandwidthMbps, srtTime) // Round-trip time (RTT) - fmt.Fprintf(w, "# HELP srt_rtt RTT in ms\n") - fmt.Fprintf(w, "# TYPE srt_rtt gauge\n") - fmt.Fprintf(w, "srt_rtt{%s} %f %d\n", common, caller.rttMS, srtTime) + fmt.Fprintf(w, "srt_rtt_seconds{%s} %f %d\n", common, caller.rttMS/1000, srtTime) // Negotiated Latency - fmt.Fprintf(w, "# HELP srt_negotiated_latency Negotiated latency in ms\n") - fmt.Fprintf(w, "# TYPE srt_negotiated_latency gauge\n") - fmt.Fprintf(w, "srt_negotiated_latency{%s} %d %d\n", common, caller.negotiatedLatencyMS, srtTime) + fmt.Fprintf(w, "srt_negotiated_latency_seconds{%s} %d %d\n", common, caller.negotiatedLatencyMS/1000, srtTime) // Bytes sent - fmt.Fprintf(w, "# HELP srt_bytes_sent Total bytes sent\n") - fmt.Fprintf(w, "# TYPE srt_bytes_sent gauge\n") - fmt.Fprintf(w, "srt_bytes_sent{%s} %d %d\n", common, caller.bytesSent, srtTime) + fmt.Fprintf(w, "srt_sent_bytes_total{%s} %d %d\n", common, caller.bytesSent, srtTime) // Bytes Retransmitted - fmt.Fprintf(w, "# HELP srt_bytes_retransmitted Total bytes retransmitted\n") - fmt.Fprintf(w, "# TYPE srt_bytes_retransmitted gauge\n") - fmt.Fprintf(w, "srt_bytes_retransmitted{%s} %d %d\n", common, caller.bytesRetransmitted, srtTime) + fmt.Fprintf(w, "srt_retransmitted_bytes_total{%s} %d %d\n", common, caller.bytesRetransmitted, srtTime) // Bytes Sent Dropped - fmt.Fprintf(w, "# HELP srt_bytes_send_dropped Total bytes retransmitted\n") - fmt.Fprintf(w, "# TYPE srt_bytes_send_dropped gauge\n") - fmt.Fprintf(w, "srt_bytes_send_dropped{%s} %d %d\n", common, caller.bytesSentDropped, srtTime) + fmt.Fprintf(w, "srt_sent_dropped_bytes_total{%s} %d %d\n", common, caller.bytesSentDropped, srtTime) // Packets sent - fmt.Fprintf(w, "# HELP srt_packets_sent Total packets sent\n") - fmt.Fprintf(w, "# TYPE srt_packets_sent gauge\n") - fmt.Fprintf(w, "srt_packets_sent{%s} %d %d\n", common, caller.packetsSent, srtTime) + fmt.Fprintf(w, "srt_packets_sent_total{%s} %d %d\n", common, caller.packetsSent, srtTime) // Packets Sent Lost - fmt.Fprintf(w, "# HELP srt_packets_sent_lost Total packets lost\n") - fmt.Fprintf(w, "# TYPE srt_packets_sent_lost gauge\n") - fmt.Fprintf(w, "srt_packets_sent_lost{%s} %d %d\n", common, caller.packetsSentLost, srtTime) + fmt.Fprintf(w, "srt_packets_sent_lost_total{%s} %d %d\n", common, caller.packetsSentLost, srtTime) // Packets Sent Dropped - fmt.Fprintf(w, "# HELP srt_packets_sent_dropped Total packets dropped\n") - fmt.Fprintf(w, "# TYPE srt_packets_sent_dropped gauge\n") - fmt.Fprintf(w, "srt_packets_sent_dropped{%s} %d %d\n", common, caller.packetsSentDropped, srtTime) + fmt.Fprintf(w, "srt_packets_sent_dropped_total{%s} %d %d\n", common, caller.packetsSentDropped, srtTime) // Packets Retransmitted - fmt.Fprintf(w, "# HELP srt_packets_retransmitted Total packets retransmitted\n") - fmt.Fprintf(w, "# TYPE srt_packets_retransmitted gauge\n") - fmt.Fprintf(w, "srt_packets_retransmitted{%s} %d %d\n", common, caller.packetsRetransmitted, srtTime) + fmt.Fprintf(w, "srt_packets_retransmitted_total{%s} %d %d\n", common, caller.packetsRetransmitted, srtTime) // Packets Ack Received - fmt.Fprintf(w, "# HELP srt_packets_ack_received Number of acks received\n") - fmt.Fprintf(w, "# TYPE srt_packets_ack_received gauge\n") - fmt.Fprintf(w, "srt_packets_ack_received{%s} %d %d\n", common, caller.packetAckReceived, srtTime) + fmt.Fprintf(w, "srt_packets_ack_received_total{%s} %d %d\n", common, caller.packetAckReceived, srtTime) // Packets Nack Received - fmt.Fprintf(w, "# HELP srt_packets_nack_received Number of nacks received\n") - fmt.Fprintf(w, "# TYPE srt_packets_nack_received gauge\n") - fmt.Fprintf(w, "srt_packets_nack_received{%s} %d %d\n", common, caller.packetNackReceived, srtTime) - - // TODO(hugo): Add receive metrics from 'srtCallerStats'? + fmt.Fprintf(w, "srt_packets_nack_received_total{%s} %d %d\n", common, caller.packetNackReceived, srtTime) } +} + +// Minimalist prometheus exporter +func (h *httpServer) metrics(w http.ResponseWriter, r *http.Request) { + m := h.metricsSnapshot() + + /* CPU */ + + cpuTime := m.cpu.Time.UnixMilli() + fmt.Fprintf(w, "# HELP linux_proc_user_total Time spent in user mode, in ticks\n") + fmt.Fprintf(w, "# TYPE linux_proc_user_total counter\n") + fmt.Fprintf(w, "linux_proc_user_total %d %d\n", m.cpu.User, cpuTime) + + fmt.Fprintf(w, "# HELP linux_proc_system_total Time spent in system mode, in ticks\n") + fmt.Fprintf(w, "# TYPE linux_proc_system_total counter\n") + fmt.Fprintf(w, "linux_proc_system_total %d %d\n", m.cpu.System, cpuTime) + + fmt.Fprintf(w, "# HELP linux_proc_iowait_total Time spent waiting for I/O to complete, in ticks\n") + fmt.Fprintf(w, "# TYPE linux_proc_iowait_total counter\n") + fmt.Fprintf(w, "linux_proc_iowait_total %d %d\n", m.cpu.Iowait, cpuTime) + + fmt.Fprintf(w, "# HELP linux_proc_irq_total Time spent servicing interrupts, in ticks\n") + fmt.Fprintf(w, "# TYPE linux_proc_irq_total counter\n") + fmt.Fprintf(w, "linux_proc_irq_total %d %d\n", m.cpu.Irq, cpuTime) + + fmt.Fprintf(w, "# HELP linux_proc_softirq_total Time spent servicing soft interrupts, in ticks\n") + fmt.Fprintf(w, "# TYPE linux_proc_softirq_total counter\n") + fmt.Fprintf(w, "linux_proc_softirq_total %d %d\n", m.cpu.SoftIrq, cpuTime) + + /* Memory */ + + memTime := m.mem.Time.UnixMilli() + fmt.Fprintf(w, "# HELP linux_mem_used_bytes Amount of memory used, in bytes\n") + fmt.Fprintf(w, "# TYPE linux_mem_used_bytes gauge\n") + fmt.Fprintf(w, "linux_mem_used_bytes %d %d\n", m.mem.MemUsed*1024, memTime) + + fmt.Fprintf(w, "# HELP linux_mem_free_bytes Amount of free memory, in bytes\n") + fmt.Fprintf(w, "# TYPE linux_mem_free_bytes gauge\n") + fmt.Fprintf(w, "linux_mem_free_bytes %d %d\n", m.mem.MemFree*1024, memTime) + + /* TODO(hugo) I/O Status metrics via `iostat` when recording is implemented */ + + /* Load Average */ + + // The timestamp is an int64 (milliseconds since epoch, i.e. 1970-01-01 + // 00:00:00 UTC, excluding leap seconds), represented as required by Go's + // ParseInt() function. + loadAvgTime := m.loadAvg.Time.UnixMilli() + fmt.Fprintf(w, "# HELP load_avg_one Load average over one minute\n") + fmt.Fprintf(w, "# TYPE load_avg_one gauge\n") + fmt.Fprintf(w, "load_avg_one %f %d\n", m.loadAvg.One, loadAvgTime) + + fmt.Fprintf(w, "# HELP load_avg_five Load average over five minutes\n") + fmt.Fprintf(w, "# TYPE load_avg_five gauge\n") + fmt.Fprintf(w, "load_avg_five %f %d\n", m.loadAvg.Five, loadAvgTime) + + fmt.Fprintf(w, "# HELP load_avg_fifteen Load average over fifteen minutes\n") + fmt.Fprintf(w, "# TYPE load_avg_fifteen gauge\n") + fmt.Fprintf(w, "load_avg_fifteen %f %d\n", m.loadAvg.Fifteen, loadAvgTime) + + /* SRT Statistics */ + + writeSRTStatsMeta(w) + writeSRTStats(w, &m.compSinkStats, "combined") + writeSRTStats(w, &m.presentSinkStats, "present") + writeSRTStats(w, &m.camSinkStats, "camera") + /* GStreamer Statistics */ for k, v := range m.pipelineStats.qosEvents { - fmt.Fprintf(w, "# HELP gst_qos_events Number of qos events\n") - fmt.Fprintf(w, "# TYPE gst_qos_events gauge\n") - fmt.Fprintf(w, "gst_qos_events{source=\"%s\"} %d\n", k, v) + fmt.Fprintf(w, "# HELP gst_qos_events_total Number of qos events\n") + fmt.Fprintf(w, "# TYPE gst_qos_events_total gauge\n") + fmt.Fprintf(w, "gst_qos_events_total{source=\"%s\"} %d\n", k, v) } } diff --git a/streamd/metrics.go b/streamd/metrics.go index 27c0ad6..666703c 100644 --- a/streamd/metrics.go +++ b/streamd/metrics.go @@ -9,11 +9,13 @@ import ( ) type metrics struct { - compSinkStats srtStats - pipelineStats pipelineStats // Updated by bus watch on main thread - cpu systemstat.CPUSample - mem systemstat.MemSample - loadAvg systemstat.LoadAvgSample + compSinkStats srtStats + presentSinkStats srtStats + camSinkStats srtStats + pipelineStats pipelineStats // Updated by bus watch on main thread + cpu systemstat.CPUSample + mem systemstat.MemSample + loadAvg systemstat.LoadAvgSample } func (d *daemon) metricsProcess(ctx context.Context) { @@ -26,6 +28,7 @@ func (d *daemon) metricsProcess(ctx context.Context) { mem := systemstat.GetMemSample() loadAvg := systemstat.GetLoadAvgSample() + // []*srtStats{combStats, presentStats, camStats} srtStats, err := d.srtStatistics() if err != nil { klog.Warningf("failed to retrieve statistics from srtsinks: %v", err) @@ -33,12 +36,16 @@ func (d *daemon) metricsProcess(ctx context.Context) { } srtCompStats := srtStats[0] + srtPresentStats := srtStats[1] + srtCamStats := srtStats[2] d.mu.Lock() d.metrics.cpu = cpu d.metrics.mem = mem d.metrics.loadAvg = loadAvg d.metrics.compSinkStats = *srtCompStats + d.metrics.presentSinkStats = *srtPresentStats + d.metrics.camSinkStats = *srtCamStats d.mu.Unlock() time.Sleep(time.Second * 1)