1use std::borrow::Cow;
4
5use opentelemetry::KeyValue;
6
7#[doc(hidden)]
9pub trait IsCollector: private::Sealed {
10 type Builder<'a>;
11
12 fn builder(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self::Builder<'_>;
13}
14
15mod private {
16 pub trait Sealed {
17 type Value;
18 }
19}
20
21macro_rules! impl_collector {
22 ($t:ty, $value:ty, $func:ident, $builder:ty) => {
23 impl private::Sealed for $t {
24 type Value = $value;
25 }
26
27 impl IsCollector for $t {
28 type Builder<'a> = $builder;
29
30 fn builder(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self::Builder<'_> {
31 meter.$func(name)
32 }
33 }
34 };
35}
36
37pub type Counter<T> = opentelemetry::metrics::Counter<T>;
41
42pub type CounterF64 = Counter<f64>;
46
47pub type CounterU64 = Counter<u64>;
51
52impl_collector!(
53 CounterF64,
54 f64,
55 f64_counter,
56 opentelemetry::metrics::InstrumentBuilder<'a, CounterF64>
57);
58impl_collector!(
59 CounterU64,
60 u64,
61 u64_counter,
62 opentelemetry::metrics::InstrumentBuilder<'a, CounterU64>
63);
64
65pub type Gauge<T> = opentelemetry::metrics::Gauge<T>;
70
71pub type GaugeF64 = Gauge<f64>;
77
78pub type GaugeI64 = Gauge<i64>;
84
85pub type GaugeU64 = Gauge<u64>;
91
92impl_collector!(
93 GaugeF64,
94 f64,
95 f64_gauge,
96 opentelemetry::metrics::InstrumentBuilder<'a, GaugeF64>
97);
98impl_collector!(
99 GaugeI64,
100 i64,
101 i64_gauge,
102 opentelemetry::metrics::InstrumentBuilder<'a, GaugeI64>
103);
104impl_collector!(
105 GaugeU64,
106 u64,
107 u64_gauge,
108 opentelemetry::metrics::InstrumentBuilder<'a, GaugeU64>
109);
110
111pub type Histogram<T> = opentelemetry::metrics::Histogram<T>;
115
116pub type HistogramF64 = Histogram<f64>;
120
121pub type HistogramU64 = Histogram<u64>;
125
126impl private::Sealed for HistogramF64 {
127 type Value = f64;
128}
129
130const DEFAULT_BOUNDARIES: [f64; 11] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
132
133impl IsCollector for HistogramF64 {
134 type Builder<'a> = opentelemetry::metrics::HistogramBuilder<'a, HistogramF64>;
135
136 fn builder(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self::Builder<'_> {
137 meter.f64_histogram(name).with_boundaries(DEFAULT_BOUNDARIES.into())
138 }
139}
140
141impl private::Sealed for HistogramU64 {
142 type Value = u64;
143}
144
145impl IsCollector for HistogramU64 {
146 type Builder<'a> = opentelemetry::metrics::HistogramBuilder<'a, HistogramU64>;
147
148 fn builder(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self::Builder<'_> {
149 meter.u64_histogram(name).with_boundaries(DEFAULT_BOUNDARIES.into())
150 }
151}
152
153pub type UpDownCounter<T> = opentelemetry::metrics::UpDownCounter<T>;
158
159pub type UpDownCounterI64 = UpDownCounter<i64>;
163
164pub type UpDownCounterF64 = UpDownCounter<f64>;
168
169impl_collector!(
170 UpDownCounterI64,
171 i64,
172 i64_up_down_counter,
173 opentelemetry::metrics::InstrumentBuilder<'a, UpDownCounterI64>
174);
175impl_collector!(
176 UpDownCounterF64,
177 f64,
178 f64_up_down_counter,
179 opentelemetry::metrics::InstrumentBuilder<'a, UpDownCounterF64>
180);
181
182trait Number {
185 const ONE: Self;
186}
187
188impl Number for f64 {
189 const ONE: Self = 1.0;
190}
191
192impl Number for u64 {
193 const ONE: Self = 1;
194}
195
196impl Number for i64 {
197 const ONE: Self = 1;
198}
199
200#[must_use = "Collectors do nothing by themselves, you must call them"]
204pub struct Collector<'a, T: IsCollector> {
205 attributes: Vec<KeyValue>,
206 collector: &'a T,
207}
208
209impl<'a, T: IsCollector> Collector<'a, T> {
210 pub fn new(attributes: Vec<KeyValue>, collector: &'a T) -> Self {
215 Self { attributes, collector }
216 }
217
218 pub fn inner(&self) -> &'a T {
220 self.collector
221 }
222}
223
224macro_rules! impl_counter {
225 ($t:ty) => {
226 impl<'a> Collector<'a, opentelemetry::metrics::Counter<$t>> {
227 #[inline]
229 pub fn incr(&self) {
230 self.incr_by(<$t as Number>::ONE);
231 }
232
233 pub fn incr_by(&self, value: $t) {
235 self.collector.add(value, &self.attributes);
236 }
237 }
238 };
239}
240
241impl_counter!(u64);
242impl_counter!(f64);
243
244macro_rules! impl_gauge {
245 ($t:ty) => {
246 impl<'a> Collector<'a, opentelemetry::metrics::Gauge<$t>> {
247 pub fn record(&self, value: $t) {
249 self.collector.record(value, &self.attributes);
250 }
251 }
252 };
253}
254
255impl_gauge!(u64);
256impl_gauge!(f64);
257impl_gauge!(i64);
258
259macro_rules! impl_histogram {
260 ($t:ty) => {
261 impl<'a> Collector<'a, opentelemetry::metrics::Histogram<$t>> {
262 pub fn observe(&self, value: $t) {
264 self.collector.record(value, &self.attributes);
265 }
266 }
267 };
268}
269
270impl_histogram!(u64);
271impl_histogram!(f64);
272
273macro_rules! impl_updowncounter {
274 ($t:ty) => {
275 impl<'a> Collector<'a, opentelemetry::metrics::UpDownCounter<$t>> {
276 pub fn incr(&self) {
278 self.incr_by(<$t as Number>::ONE);
279 }
280
281 pub fn incr_by(&self, value: $t) {
283 self.collector.add(value, &self.attributes);
284 }
285
286 pub fn decr(&self) {
288 self.decr_by(<$t as Number>::ONE);
289 }
290
291 pub fn decr_by(&self, value: $t) {
293 self.collector.add(-value, &self.attributes);
294 }
295 }
296 };
297}
298
299impl_updowncounter!(i64);
300impl_updowncounter!(f64);
301
302#[cfg(test)]
303#[cfg_attr(all(test, coverage_nightly), coverage(off))]
304mod tests {
305 use std::sync::Arc;
306
307 use opentelemetry::{KeyValue, Value};
308 use opentelemetry_sdk::Resource;
309 use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData, ResourceMetrics};
310 use opentelemetry_sdk::metrics::reader::MetricReader;
311 use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder, SdkMeterProvider};
312
313 use crate::HistogramF64;
314 use crate::collector::{Collector, IsCollector};
315
316 #[derive(Debug, Clone)]
317 struct TestReader(Arc<ManualReader>);
318
319 impl TestReader {
320 fn new() -> Self {
321 Self(Arc::new(ManualReaderBuilder::new().build()))
322 }
323
324 fn read(&self) -> ResourceMetrics {
325 let mut metrics = ResourceMetrics::default();
326
327 self.0.collect(&mut metrics).expect("collect");
328
329 metrics
330 }
331 }
332
333 impl opentelemetry_sdk::metrics::reader::MetricReader for TestReader {
334 fn register_pipeline(&self, pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>) {
335 self.0.register_pipeline(pipeline)
336 }
337
338 fn collect(
339 &self,
340 rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
341 ) -> opentelemetry_sdk::error::OTelSdkResult {
342 self.0.collect(rm)
343 }
344
345 fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
346 self.0.force_flush()
347 }
348
349 fn shutdown_with_timeout(&self, timeout: std::time::Duration) -> opentelemetry_sdk::error::OTelSdkResult {
350 self.0.shutdown_with_timeout(timeout)
351 }
352
353 fn temporality(&self, kind: opentelemetry_sdk::metrics::InstrumentKind) -> opentelemetry_sdk::metrics::Temporality {
354 self.0.temporality(kind)
355 }
356 }
357
358 fn setup_reader() -> TestReader {
359 let reader = TestReader::new();
360 let provider = SdkMeterProvider::builder()
361 .with_resource(
362 Resource::builder()
363 .with_attribute(KeyValue::new("service.name", "test_service"))
364 .build(),
365 )
366 .with_reader(reader.clone())
367 .build();
368 opentelemetry::global::set_meter_provider(provider);
369 reader
370 }
371
372 fn find_metric<'a>(metrics: &'a ResourceMetrics, name: &str) -> Option<&'a opentelemetry_sdk::metrics::data::Metric> {
373 metrics
374 .scope_metrics()
375 .find(|sm| sm.scope().name() == "scuffle-metrics")
376 .and_then(|sm| sm.metrics().find(|m| m.name() == name))
377 }
378
379 fn get_data_point_value<'a, T: PartialEq + std::fmt::Debug + Copy + 'a>(
380 mut data_points: impl Iterator<Item = &'a opentelemetry_sdk::metrics::data::SumDataPoint<T>>,
381 attr_key: &str,
382 attr_value: &str,
383 ) -> T {
384 data_points
385 .find(|dp| {
386 dp.attributes()
387 .any(|kv| kv.key.as_str() == attr_key && kv.value.as_str() == attr_value)
388 })
389 .map(|dp| dp.value())
390 .expect("Data point not found")
391 }
392
393 fn get_histogram_sum<'a>(
394 mut data_points: impl Iterator<Item = &'a opentelemetry_sdk::metrics::data::HistogramDataPoint<u64>>,
395 attr_key: &str,
396 attr_value: &str,
397 ) -> u64 {
398 data_points
399 .find(|dp| {
400 dp.attributes()
401 .any(|kv| kv.key.as_str() == attr_key && kv.value.as_str() == attr_value)
402 })
403 .map(|dp| dp.sum())
404 .expect("Histogram data point not found")
405 }
406
407 fn get_data_point_value_with_two_attrs<'a, T: PartialEq + std::fmt::Debug + Copy + 'a>(
408 mut data_points: impl Iterator<Item = &'a opentelemetry_sdk::metrics::data::SumDataPoint<T>>,
409 key1: &str,
410 val1: &str,
411 key2: &str,
412 val2: impl Into<Value>,
413 ) -> T {
414 let val2 = val2.into();
415 data_points
416 .find(|dp| {
417 dp.attributes().any(|kv| kv.key.as_str() == key1 && kv.value.as_str() == val1)
418 && dp.attributes().any(|kv| kv.key.as_str() == key2 && kv.value == val2)
419 })
420 .map(|dp| dp.value())
421 .expect("Data point not found")
422 }
423
424 #[test]
425 fn test_counter_metric() {
426 #[crate::metrics(crate_path = "crate")]
427 mod example {
428 use crate::{CounterU64, MetricEnum};
429
430 #[derive(MetricEnum)]
431 #[metrics(crate_path = "crate")]
432 pub enum Kind {
433 Http,
434 Grpc,
435 }
436
437 #[metrics(unit = "requests")]
438 pub fn request(kind: Kind) -> CounterU64;
439 }
440
441 let reader = setup_reader();
442 example::request(example::Kind::Http).incr();
443 example::request(example::Kind::Http).incr();
444 example::request(example::Kind::Grpc).incr();
445
446 let metrics = reader.read();
447 let metric = find_metric(&metrics, "example_request").unwrap();
448 assert_eq!(metric.unit(), "requests");
449
450 let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() else {
451 unreachable!()
452 };
453 assert_eq!(sum.data_points().count(), 2);
454 assert_eq!(get_data_point_value(sum.data_points(), "kind", "Http"), 2);
455 assert_eq!(get_data_point_value(sum.data_points(), "kind", "Grpc"), 1);
456 }
457
458 #[test]
459 fn test_gauge_metric() {
460 #[crate::metrics(crate_path = "crate")]
461 mod example {
462 use crate::GaugeU64;
463
464 #[metrics(unit = "connections")]
465 pub fn current_connections() -> GaugeU64;
466 }
467
468 let reader = setup_reader();
469 example::current_connections().record(10);
470 example::current_connections().record(20);
471
472 let metrics = reader.read();
473 let metric = find_metric(&metrics, "example_current_connections").unwrap();
474 assert_eq!(metric.unit(), "connections");
475
476 let AggregatedMetrics::U64(MetricData::Gauge(gauge)) = metric.data() else {
477 unreachable!()
478 };
479 assert_eq!(gauge.data_points().count(), 1);
480 assert_eq!(gauge.data_points().next().unwrap().value(), 20);
481 assert_eq!(gauge.data_points().next().unwrap().attributes().count(), 0);
482 }
483
484 #[test]
485 fn test_histogram_metric() {
486 #[crate::metrics(crate_path = "crate")]
487 mod example {
488 use crate::{HistogramU64, MetricEnum};
489
490 #[derive(MetricEnum)]
491 #[metrics(crate_path = "crate")]
492 pub enum Kind {
493 Http,
494 Grpc,
495 }
496
497 #[metrics(unit = "bytes")]
498 pub fn data_transfer(kind: Kind) -> HistogramU64;
499 }
500
501 let reader = setup_reader();
502 example::data_transfer(example::Kind::Http).observe(100);
503 example::data_transfer(example::Kind::Http).observe(200);
504 example::data_transfer(example::Kind::Grpc).observe(150);
505
506 let metrics = reader.read();
507 let metric = find_metric(&metrics, "example_data_transfer").unwrap();
508 assert_eq!(metric.unit(), "bytes");
509
510 let AggregatedMetrics::U64(MetricData::Histogram(histogram)) = metric.data() else {
511 unreachable!()
512 };
513
514 assert_eq!(histogram.data_points().count(), 2);
515 assert_eq!(get_histogram_sum(histogram.data_points(), "kind", "Http"), 300);
516 assert_eq!(get_histogram_sum(histogram.data_points(), "kind", "Grpc"), 150);
517 }
518
519 #[test]
520 fn test_updowncounter_metric() {
521 #[crate::metrics(crate_path = "crate")]
522 mod example {
523 use crate::{MetricEnum, UpDownCounterI64};
524
525 #[derive(MetricEnum)]
526 #[metrics(crate_path = "crate")]
527 pub enum Kind {
528 Http,
529 Grpc,
530 }
531
532 #[metrics(unit = "requests")]
533 pub fn active_requests(kind: Kind) -> UpDownCounterI64;
534 }
535
536 let reader = setup_reader();
537 example::active_requests(example::Kind::Http).incr();
538 example::active_requests(example::Kind::Http).incr();
539 example::active_requests(example::Kind::Http).decr();
540 example::active_requests(example::Kind::Grpc).incr();
541
542 let metrics = reader.read();
543 let metric = find_metric(&metrics, "example_active_requests").unwrap();
544 assert_eq!(metric.unit(), "requests");
545
546 let AggregatedMetrics::I64(MetricData::Sum(sum)) = metric.data() else {
547 unreachable!()
548 };
549
550 assert_eq!(sum.data_points().count(), 2);
551 assert_eq!(get_data_point_value(sum.data_points(), "kind", "Http"), 1);
552 assert_eq!(get_data_point_value(sum.data_points(), "kind", "Grpc"), 1);
553 }
554
555 #[test]
556 fn test_metric_with_multiple_attributes() {
557 #[crate::metrics(crate_path = "crate")]
558 mod example {
559 use crate::{CounterU64, MetricEnum};
560
561 #[derive(MetricEnum)]
562 #[metrics(crate_path = "crate")]
563 pub enum Kind {
564 Http,
565 Grpc,
566 }
567
568 #[metrics(unit = "requests")]
569 pub fn request_with_status(kind: Kind, status: u32) -> CounterU64;
570 }
571
572 let reader = setup_reader();
573 example::request_with_status(example::Kind::Http, 200).incr();
574 example::request_with_status(example::Kind::Http, 404).incr();
575 example::request_with_status(example::Kind::Grpc, 200).incr();
576
577 let metrics = reader.read();
578 let metric = find_metric(&metrics, "example_request_with_status").unwrap();
579 assert_eq!(metric.unit(), "requests");
580
581 let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() else {
582 unreachable!()
583 };
584 assert_eq!(sum.data_points().count(), 3);
585 assert_eq!(
586 get_data_point_value_with_two_attrs(sum.data_points(), "kind", "Http", "status", 200),
587 1
588 );
589 assert_eq!(
590 get_data_point_value_with_two_attrs(sum.data_points(), "kind", "Http", "status", 404),
591 1
592 );
593 assert_eq!(
594 get_data_point_value_with_two_attrs(sum.data_points(), "kind", "Grpc", "status", 200),
595 1
596 );
597 }
598
599 #[test]
600 fn test_metric_with_string_attribute() {
601 #[crate::metrics(crate_path = "crate")]
602 mod example {
603 use crate::{CounterU64, MetricEnum};
604
605 #[derive(MetricEnum)]
606 #[metrics(crate_path = "crate")]
607 pub enum Kind {
608 Http,
609 Grpc,
610 }
611
612 #[metrics(unit = "requests")]
613 pub fn request_with_method(kind: Kind, method: &str) -> CounterU64;
614 }
615
616 let reader = setup_reader();
617 example::request_with_method(example::Kind::Http, "GET").incr();
618 example::request_with_method(example::Kind::Http, "POST").incr();
619 example::request_with_method(example::Kind::Grpc, "GET").incr();
620
621 let metrics = reader.read();
622 let metric = find_metric(&metrics, "example_request_with_method").unwrap();
623 assert_eq!(metric.unit(), "requests");
624
625 let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() else {
626 unreachable!()
627 };
628 assert_eq!(sum.data_points().count(), 3);
629 assert_eq!(
630 get_data_point_value_with_two_attrs(sum.data_points(), "kind", "Http", "method", "GET"),
631 1
632 );
633 assert_eq!(
634 get_data_point_value_with_two_attrs(sum.data_points(), "kind", "Http", "method", "POST"),
635 1
636 );
637 assert_eq!(
638 get_data_point_value_with_two_attrs(sum.data_points(), "kind", "Grpc", "method", "GET"),
639 1
640 );
641 }
642
643 #[test]
644 fn test_metric_with_no_attributes() {
645 #[crate::metrics(crate_path = "crate")]
646 mod example {
647 use crate::CounterU64;
648
649 #[metrics(unit = "events")]
650 pub fn total_events() -> CounterU64;
651 }
652
653 let reader = setup_reader();
654 example::total_events().incr();
655 example::total_events().incr();
656
657 let metrics = reader.read();
658 let metric = find_metric(&metrics, "example_total_events").unwrap();
659 assert_eq!(metric.unit(), "events");
660
661 let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() else {
662 unreachable!()
663 };
664 assert_eq!(sum.data_points().count(), 1);
665 assert_eq!(sum.data_points().next().unwrap().value(), 2);
666 assert_eq!(sum.data_points().next().unwrap().attributes().count(), 0);
667 }
668
669 #[test]
670 fn test_metric_with_zero_values() {
671 #[crate::metrics(crate_path = "crate")]
672 mod example {
673 use crate::GaugeU64;
674
675 #[metrics(unit = "connections")]
676 pub fn current_connections() -> GaugeU64;
677 }
678
679 let reader = setup_reader();
680 example::current_connections().record(0);
681
682 let metrics = reader.read();
683 let metric = find_metric(&metrics, "example_current_connections").unwrap();
684 assert_eq!(metric.unit(), "connections");
685
686 let AggregatedMetrics::U64(MetricData::Gauge(gauge)) = metric.data() else {
687 unreachable!()
688 };
689 assert_eq!(gauge.data_points().count(), 1);
690 assert_eq!(gauge.data_points().next().unwrap().value(), 0);
691 assert_eq!(gauge.data_points().next().unwrap().attributes().count(), 0);
692 }
693
694 #[test]
695 fn test_metric_with_negative_increments() {
696 #[crate::metrics(crate_path = "crate")]
697 mod example {
698 use crate::{MetricEnum, UpDownCounterI64};
699
700 #[derive(MetricEnum)]
701 #[metrics(crate_path = "crate")]
702 pub enum Kind {
703 Http,
704 Grpc,
705 }
706
707 #[metrics(unit = "requests")]
708 pub fn active_requests(kind: Kind) -> UpDownCounterI64;
709 }
710
711 let reader = setup_reader();
712 example::active_requests(example::Kind::Http).incr();
713 example::active_requests(example::Kind::Http).decr();
714 example::active_requests(example::Kind::Http).decr();
715
716 let metrics = reader.read();
717 let metric = find_metric(&metrics, "example_active_requests").unwrap();
718 assert_eq!(metric.unit(), "requests");
719
720 let AggregatedMetrics::I64(MetricData::Sum(sum)) = metric.data() else {
721 unreachable!()
722 };
723 assert_eq!(sum.data_points().count(), 1);
724 assert_eq!(get_data_point_value(sum.data_points(), "kind", "Http"), -1);
725 }
726
727 #[test]
728 fn test_histogram_f64_builder() {
729 let reader = setup_reader();
730 let meter = opentelemetry::global::meter("scuffle-metrics");
731 let name = "test_histogram_f64";
732
733 let builder = HistogramF64::builder(&meter, name);
734 let histogram = builder.build();
735
736 histogram.record(1.5, &[]);
737
738 let metrics = reader.read();
739 let metric = find_metric(&metrics, name).expect("histogram metric not found");
740
741 assert_eq!(metric.name(), name);
742 assert_eq!(metric.unit(), "");
743
744 let AggregatedMetrics::F64(MetricData::Histogram(histogram_data)) = metric.data() else {
745 unreachable!()
746 };
747
748 assert_eq!(histogram_data.data_points().count(), 1);
749 assert_eq!(histogram_data.data_points().next().unwrap().sum(), 1.5);
750 assert_eq!(histogram_data.data_points().next().unwrap().attributes().count(), 0);
751 }
752
753 #[test]
754 fn test_collector_inner() {
755 let meter = opentelemetry::global::meter("test_meter");
756 let histogram = HistogramF64::builder(&meter, "inner_test_histogram").build();
757
758 let attributes = vec![KeyValue::new("key", "value")];
759 let collector = Collector::new(attributes.clone(), &histogram);
760
761 assert_eq!(collector.inner() as *const HistogramF64, &histogram as *const HistogramF64);
762 }
763}