1use std::borrow::Cow;
2use std::sync::Arc;
3
4use opentelemetry::{InstrumentationScope, KeyValue};
5use opentelemetry_sdk::Resource;
6use opentelemetry_sdk::metrics::data::{AggregatedMetrics, Metric, MetricData, ResourceMetrics};
7use opentelemetry_sdk::metrics::reader::MetricReader;
8use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder};
9use prometheus_client::encoding::{EncodeCounterValue, EncodeGaugeValue, NoLabelSet};
10use prometheus_client::metrics::MetricType;
11use prometheus_client::registry::Unit;
12
13#[derive(Debug, Clone)]
27pub struct PrometheusExporter {
28 reader: Arc<ManualReader>,
29 prometheus_full_utf8: bool,
30}
31
32impl PrometheusExporter {
33 pub fn builder() -> PrometheusExporterBuilder {
35 PrometheusExporterBuilder::default()
36 }
37
38 pub fn collector(&self) -> Box<dyn prometheus_client::collector::Collector> {
41 Box::new(self.clone())
42 }
43}
44
45impl MetricReader for PrometheusExporter {
46 fn register_pipeline(&self, pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>) {
47 self.reader.register_pipeline(pipeline)
48 }
49
50 fn collect(
51 &self,
52 rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
53 ) -> opentelemetry_sdk::error::OTelSdkResult {
54 self.reader.collect(rm)
55 }
56
57 fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
58 self.reader.force_flush()
59 }
60
61 fn shutdown_with_timeout(&self, timeout: std::time::Duration) -> opentelemetry_sdk::error::OTelSdkResult {
62 self.reader.shutdown_with_timeout(timeout)
63 }
64
65 fn temporality(&self, kind: opentelemetry_sdk::metrics::InstrumentKind) -> opentelemetry_sdk::metrics::Temporality {
66 self.reader.temporality(kind)
67 }
68}
69
70#[derive(Default)]
72pub struct PrometheusExporterBuilder {
73 reader: ManualReaderBuilder,
74 prometheus_full_utf8: bool,
75}
76
77impl PrometheusExporterBuilder {
78 pub fn with_temporality(mut self, temporality: opentelemetry_sdk::metrics::Temporality) -> Self {
80 self.reader = self.reader.with_temporality(temporality);
81 self
82 }
83
84 pub fn with_prometheus_full_utf8(mut self, prometheus_full_utf8: bool) -> Self {
89 self.prometheus_full_utf8 = prometheus_full_utf8;
90 self
91 }
92
93 pub fn build(self) -> PrometheusExporter {
95 PrometheusExporter {
96 reader: Arc::new(self.reader.build()),
97 prometheus_full_utf8: self.prometheus_full_utf8,
98 }
99 }
100}
101
102pub fn exporter() -> PrometheusExporterBuilder {
104 PrometheusExporter::builder()
105}
106
107trait EncodeNumber {
108 fn into_gauge(self) -> impl EncodeGaugeValue;
109 fn into_counter(self) -> impl EncodeCounterValue;
110 fn to_f64(self) -> f64;
111}
112
113impl EncodeNumber for f64 {
114 fn into_gauge(self) -> impl EncodeGaugeValue {
115 self
116 }
117
118 fn into_counter(self) -> impl EncodeCounterValue {
119 self
120 }
121
122 fn to_f64(self) -> f64 {
123 self
124 }
125}
126
127impl EncodeNumber for i64 {
128 fn into_gauge(self) -> impl EncodeGaugeValue {
129 self
130 }
131
132 fn into_counter(self) -> impl EncodeCounterValue {
133 self.max(0) as u64
134 }
135
136 fn to_f64(self) -> f64 {
137 self as f64
138 }
139}
140
141impl EncodeNumber for u64 {
142 fn into_gauge(self) -> impl EncodeGaugeValue {
143 self
144 }
145
146 fn into_counter(self) -> impl EncodeCounterValue {
147 self
148 }
149
150 fn to_f64(self) -> f64 {
151 self as f64
152 }
153}
154
155fn encode_aggregated_metrics(
156 encoder: &mut prometheus_client::encoding::DescriptorEncoder,
157 metric: &Metric,
158 labels: KeyValueEncoder,
159) -> Result<(), std::fmt::Error> {
160 fn encode_metric_data<T: EncodeNumber + Copy>(
161 encoder: &mut prometheus_client::encoding::DescriptorEncoder,
162 metric: &Metric,
163 metric_data: &MetricData<T>,
164 labels: KeyValueEncoder,
165 ) -> Result<(), std::fmt::Error> {
166 let unit = if metric.unit().is_empty() {
167 None
168 } else {
169 Some(Unit::Other(metric.unit().to_string()))
170 };
171
172 match metric_data {
173 MetricData::ExponentialHistogram(_) => {
174 #[cfg(feature = "tracing")]
177 tracing::warn!(
178 name = "prometheus_collector_unknown_metric_type",
179 target = env!("CARGO_PKG_NAME"),
180 metric_name = metric.name(),
181 "exponential histograms are not supported"
182 );
183 return Ok(());
184 }
185 MetricData::Gauge(gauge) => {
186 let mut encoder =
187 encoder.encode_descriptor(metric.name(), metric.description(), unit.as_ref(), MetricType::Gauge)?;
188 for data_point in gauge.data_points() {
189 encoder
190 .encode_family(&labels.with_attrs(|| data_point.attributes()))?
191 .encode_gauge(&data_point.value().into_gauge())?;
192 }
193 }
194 MetricData::Histogram(histogram) => {
195 let mut encoder =
196 encoder.encode_descriptor(metric.name(), metric.description(), unit.as_ref(), MetricType::Histogram)?;
197 for data_point in histogram.data_points() {
198 let buckets = data_point.bounds().zip(data_point.bucket_counts()).collect::<Vec<_>>();
199
200 encoder
201 .encode_family(&labels.with_attrs(|| data_point.attributes()))?
202 .encode_histogram::<NoLabelSet>(data_point.sum().to_f64(), data_point.count(), &buckets, None)?;
203 }
204 }
205 MetricData::Sum(sum) => {
206 let mut encoder = encoder.encode_descriptor(
207 metric.name(),
208 metric.description(),
209 unit.as_ref(),
210 if sum.is_monotonic() {
211 MetricType::Counter
212 } else {
213 MetricType::Gauge
214 },
215 )?;
216 for data_point in sum.data_points() {
217 let attrs = labels.with_attrs(|| data_point.attributes());
218 let mut encoder = encoder.encode_family(&attrs)?;
219
220 if sum.is_monotonic() {
221 encoder.encode_counter::<NoLabelSet, _, f64>(&data_point.value().into_counter(), None)?;
223 } else {
224 encoder.encode_gauge(&data_point.value().into_gauge())?;
225 }
226 }
227 }
228 }
229 Ok(())
230 }
231
232 match metric.data() {
233 AggregatedMetrics::F64(metric_data) => encode_metric_data(encoder, metric, metric_data, labels),
234 AggregatedMetrics::I64(metric_data) => encode_metric_data(encoder, metric, metric_data, labels),
235 AggregatedMetrics::U64(metric_data) => encode_metric_data(encoder, metric, metric_data, labels),
236 }
237}
238
239impl prometheus_client::collector::Collector for PrometheusExporter {
240 fn encode(&self, mut encoder: prometheus_client::encoding::DescriptorEncoder) -> Result<(), std::fmt::Error> {
241 let mut metrics = ResourceMetrics::default();
242
243 if let Err(err) = self.reader.collect(&mut metrics) {
244 #[cfg(feature = "tracing")]
245 tracing::error!(
246 name = "prometheus_collector_collect_error",
247 target = env!("CARGO_PKG_NAME"),
248 error = err.to_string(),
249 ""
250 );
251 let _ = err;
252 return Err(std::fmt::Error);
253 }
254
255 let labels = KeyValueEncoder::new(self.prometheus_full_utf8);
256
257 encoder
258 .encode_descriptor("target", "Information about the target", None, MetricType::Info)?
259 .encode_info(&labels.with_resource(Some(metrics.resource())))?;
260
261 for scope_metrics in metrics.scope_metrics() {
262 for metric in scope_metrics.metrics() {
263 encode_aggregated_metrics(&mut encoder, metric, labels.with_scope(Some(scope_metrics.scope())))?;
264 }
265 }
266
267 Ok(())
268 }
269}
270
271fn scope_to_iter(scope: &InstrumentationScope) -> impl Iterator<Item = (&str, Cow<'_, str>)> {
272 [
273 ("otel.scope.name", Some(Cow::Borrowed(scope.name()))),
274 ("otel.scope.version", scope.version().map(Cow::Borrowed)),
275 ("otel.scope.schema_url", scope.schema_url().map(Cow::Borrowed)),
276 ]
277 .into_iter()
278 .chain(scope.attributes().map(|kv| (kv.key.as_str(), Some(kv.value.as_str()))))
279 .filter_map(|(key, value)| value.map(|v| (key, v)))
280}
281
282#[derive(Debug, Clone, Copy)]
283struct KeyValueEncoder<'a, F = fn() -> std::iter::Empty<&'a KeyValue>> {
284 resource: Option<&'a Resource>,
285 scope: Option<&'a InstrumentationScope>,
286 get_attrs: F,
287 prometheus_full_utf8: bool,
288}
289
290impl<'a> KeyValueEncoder<'a> {
291 fn new(prometheus_full_utf8: bool) -> Self {
292 Self {
293 resource: None,
294 scope: None,
295 get_attrs: || std::iter::empty(),
296 prometheus_full_utf8,
297 }
298 }
299
300 fn with_resource(self, resource: Option<&'a Resource>) -> Self {
301 Self { resource, ..self }
302 }
303
304 fn with_scope(self, scope: Option<&'a InstrumentationScope>) -> Self {
305 Self { scope, ..self }
306 }
307
308 fn with_attrs<F>(self, get_attrs: F) -> KeyValueEncoder<'a, F> {
309 KeyValueEncoder {
310 get_attrs,
311 prometheus_full_utf8: self.prometheus_full_utf8,
312 resource: self.resource,
313 scope: self.scope,
314 }
315 }
316}
317
318fn escape_key(s: &str) -> Cow<'_, str> {
319 let mut prefix = "";
321
322 if let Some((replace_idx, _)) = s.char_indices().find(|(i, c)| {
324 if *i == 0 && c.is_ascii_digit() {
325 prefix = "_";
327 true
328 } else {
329 !c.is_alphanumeric() && *c != '_' && *c != ':'
331 }
332 }) {
333 let (valid, rest) = s.split_at(replace_idx);
335 Cow::Owned(
336 prefix
337 .chars()
338 .chain(valid.chars())
339 .chain(rest.chars().map(|c| {
340 if c.is_ascii_alphanumeric() || c == '_' || c == ':' {
341 c
342 } else {
343 '_'
344 }
345 }))
346 .collect(),
347 )
348 } else {
349 Cow::Borrowed(s) }
351}
352
353impl<'a, F, I> prometheus_client::encoding::EncodeLabelSet for KeyValueEncoder<'a, F>
354where
355 F: Fn() -> I,
356 I: IntoIterator<Item = &'a KeyValue>,
357{
358 fn encode(&self, mut encoder: prometheus_client::encoding::LabelSetEncoder) -> Result<(), std::fmt::Error> {
359 use std::fmt::Write;
360
361 fn write_kv(
362 encoder: &mut prometheus_client::encoding::LabelSetEncoder,
363 key: &str,
364 value: &str,
365 prometheus_full_utf8: bool,
366 ) -> Result<(), std::fmt::Error> {
367 let mut label = encoder.encode_label();
368 let mut key_encoder = label.encode_label_key()?;
369 if prometheus_full_utf8 {
370 write!(&mut key_encoder, "{key}")?;
373 } else {
374 write!(&mut key_encoder, "{}", escape_key(key))?;
375 }
376
377 let mut value_encoder = key_encoder.encode_label_value()?;
378 write!(&mut value_encoder, "{value}")?;
379
380 value_encoder.finish()
381 }
382
383 if let Some(resource) = self.resource {
384 for (key, value) in resource.iter() {
385 write_kv(&mut encoder, key.as_str(), value.as_str().as_ref(), self.prometheus_full_utf8)?;
386 }
387 }
388
389 if let Some(scope) = self.scope {
390 for (key, value) in scope_to_iter(scope) {
391 write_kv(&mut encoder, key, value.as_ref(), self.prometheus_full_utf8)?;
392 }
393 }
394
395 for kv in (self.get_attrs)() {
396 write_kv(
397 &mut encoder,
398 kv.key.as_str(),
399 kv.value.as_str().as_ref(),
400 self.prometheus_full_utf8,
401 )?;
402 }
403
404 Ok(())
405 }
406}
407
408#[cfg(test)]
409#[cfg_attr(all(test, coverage_nightly), coverage(off))]
410mod tests {
411 use opentelemetry::KeyValue;
412 use opentelemetry::metrics::MeterProvider;
413 use opentelemetry_sdk::Resource;
414 use opentelemetry_sdk::metrics::SdkMeterProvider;
415 use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
416 use prometheus_client::registry::Registry;
417
418 use super::*;
419
420 fn setup_prometheus_exporter(
421 temporality: opentelemetry_sdk::metrics::Temporality,
422 full_utf8: bool,
423 ) -> (PrometheusExporter, Registry) {
424 let exporter = PrometheusExporter::builder()
425 .with_temporality(temporality)
426 .with_prometheus_full_utf8(full_utf8)
427 .build();
428 let mut registry = Registry::default();
429 registry.register_collector(exporter.collector());
430 (exporter, registry)
431 }
432
433 fn collect_and_encode(registry: &Registry) -> String {
434 let mut buffer = String::new();
435 prometheus_client::encoding::text::encode(&mut buffer, registry).unwrap();
436 buffer
437 }
438
439 #[test]
440 fn test_prometheus_collect() {
441 let (exporter, registry) = setup_prometheus_exporter(opentelemetry_sdk::metrics::Temporality::Cumulative, false);
442 let provider = SdkMeterProvider::builder()
443 .with_reader(exporter.clone())
444 .with_resource(
445 Resource::builder()
446 .with_attributes(vec![KeyValue::new("service.name", "test_service")])
447 .build(),
448 )
449 .build();
450 opentelemetry::global::set_meter_provider(provider.clone());
451
452 let meter = provider.meter("test_meter");
453 let counter = meter.u64_counter("test_counter").build();
454 counter.add(1, &[KeyValue::new("key", "value")]);
455
456 let encoded = collect_and_encode(®istry);
457
458 assert!(encoded.contains("test_counter"));
459 assert!(encoded.contains(r#"key="value""#));
460 assert!(encoded.contains(r#"test_counter_total{otel_scope_name="test_meter",key="value"} 1"#));
461 }
462
463 #[test]
464 fn test_prometheus_temporality() {
465 let exporter = PrometheusExporter::builder()
466 .with_temporality(opentelemetry_sdk::metrics::Temporality::Delta)
467 .build();
468
469 let temporality = exporter.temporality(opentelemetry_sdk::metrics::InstrumentKind::Counter);
470
471 assert_eq!(temporality, opentelemetry_sdk::metrics::Temporality::Delta);
472 }
473
474 #[test]
475 fn test_prometheus_full_utf8() {
476 let (exporter, registry) = setup_prometheus_exporter(opentelemetry_sdk::metrics::Temporality::Cumulative, true);
477 let provider = SdkMeterProvider::builder()
478 .with_reader(exporter.clone())
479 .with_resource(
480 Resource::builder()
481 .with_attributes(vec![KeyValue::new("service.name", "test_service")])
482 .build(),
483 )
484 .build();
485 opentelemetry::global::set_meter_provider(provider.clone());
486
487 let meter = provider.meter("test_meter");
488 let counter = meter.u64_counter("test_counter").build();
489 counter.add(1, &[KeyValue::new("key_😊", "value_😊")]);
490
491 let encoded = collect_and_encode(®istry);
492
493 assert!(encoded.contains(r#"key_😊="value_😊""#));
494 }
495
496 #[test]
497 fn test_known_metric_t_encode() {
498 let (exporter, registry) = setup_prometheus_exporter(opentelemetry_sdk::metrics::Temporality::Cumulative, false);
499 let provider = SdkMeterProvider::builder().with_reader(exporter.clone()).build();
500 let meter = provider.meter("test_meter");
501
502 let gauge_u64 = meter.u64_gauge("test_u64_gauge").build();
503 gauge_u64.record(42, &[KeyValue::new("key", "value")]);
504
505 let encoded = collect_and_encode(®istry);
506 assert!(encoded.contains(r#"test_u64_gauge{otel_scope_name="test_meter",key="value"} 42"#));
507
508 let counter_i64_sum = meter.i64_up_down_counter("test_i64_counter").build();
509 counter_i64_sum.add(-42, &[KeyValue::new("key", "value")]);
510
511 let encoded = collect_and_encode(®istry);
512 assert!(encoded.contains(r#"test_i64_counter{otel_scope_name="test_meter",key="value"} -42"#));
513 }
514
515 #[test]
516 fn test_known_metric_encode() {
517 let (exporter, registry) = setup_prometheus_exporter(opentelemetry_sdk::metrics::Temporality::Cumulative, false);
518 let provider = SdkMeterProvider::builder().with_reader(exporter.clone()).build();
519 let meter = provider.meter("test_meter");
520
521 meter
522 .f64_counter("test_f64_counter")
523 .build()
524 .add(1.0, &[KeyValue::new("key", "value")]);
525 assert!(
526 collect_and_encode(®istry).contains(r#"test_f64_counter_total{otel_scope_name="test_meter",key="value"} 1"#)
527 );
528 meter
529 .u64_counter("test_u64_counter")
530 .build()
531 .add(1, &[KeyValue::new("key", "value")]);
532 assert!(
533 collect_and_encode(®istry).contains(r#"test_u64_counter_total{otel_scope_name="test_meter",key="value"} 1"#)
534 );
535 meter
536 .f64_up_down_counter("test_f64_up_down_counter")
537 .build()
538 .add(1.0, &[KeyValue::new("key", "value")]);
539 assert!(
540 collect_and_encode(®istry)
541 .contains(r#"test_f64_up_down_counter{otel_scope_name="test_meter",key="value"} 1"#)
542 );
543 meter
544 .i64_up_down_counter("test_i64_up_down_counter")
545 .build()
546 .add(-1, &[KeyValue::new("key", "value")]);
547 assert!(
548 collect_and_encode(®istry)
549 .contains(r#"test_i64_up_down_counter{otel_scope_name="test_meter",key="value"} -1"#)
550 );
551
552 meter
553 .f64_gauge("test_f64_gauge")
554 .build()
555 .record(1.0, &[KeyValue::new("key", "value")]);
556 assert!(collect_and_encode(®istry).contains(r#"test_f64_gauge{otel_scope_name="test_meter",key="value"} 1"#));
557 meter
558 .i64_gauge("test_i64_gauge")
559 .build()
560 .record(-1, &[KeyValue::new("key", "value")]);
561 assert!(collect_and_encode(®istry).contains(r#"test_i64_gauge{otel_scope_name="test_meter",key="value"} -1"#));
562 meter
563 .u64_gauge("test_u64_gauge")
564 .build()
565 .record(1, &[KeyValue::new("key", "value")]);
566 assert!(collect_and_encode(®istry).contains(r#"test_u64_gauge{otel_scope_name="test_meter",key="value"} 1"#));
567
568 meter
569 .f64_histogram("test_f64_histogram")
570 .build()
571 .record(1.0, &[KeyValue::new("key", "value")]);
572 assert!(
573 collect_and_encode(®istry).contains(r#"test_f64_histogram_sum{otel_scope_name="test_meter",key="value"} 1"#)
574 );
575 meter
576 .u64_histogram("test_u64_histogram")
577 .build()
578 .record(1, &[KeyValue::new("key", "value")]);
579 assert!(
580 collect_and_encode(®istry).contains(r#"test_u64_histogram_sum{otel_scope_name="test_meter",key="value"} 1"#)
581 );
582 }
583
584 #[test]
585 fn test_prometheus_collect_histogram() {
586 let (exporter, registry) = setup_prometheus_exporter(opentelemetry_sdk::metrics::Temporality::Cumulative, false);
587 let provider = SdkMeterProvider::builder().with_reader(exporter.clone()).build();
588 let meter = provider.meter("test_meter");
589 let histogram = meter
590 .u64_histogram("test_histogram")
591 .with_boundaries(vec![5.0, 10.0, 20.0])
592 .build();
593 histogram.record(3, &[KeyValue::new("key", "value")]);
594 histogram.record(7, &[KeyValue::new("key", "value")]);
595 histogram.record(12, &[KeyValue::new("key", "value")]);
596 histogram.record(25, &[KeyValue::new("key", "value")]);
597
598 let mut metrics = ResourceMetrics::default();
599 exporter.collect(&mut metrics).unwrap();
600
601 let scope_metrics = metrics.scope_metrics().next().expect("scope metrics should be present");
602 let metric = scope_metrics
603 .metrics()
604 .find(|m| m.name() == "test_histogram")
605 .expect("histogram metric should be present");
606 let AggregatedMetrics::U64(MetricData::Histogram(histogram_data)) = metric.data() else {
607 unreachable!();
608 };
609
610 let data_point = histogram_data.data_points().next().expect("data point should be present");
611 assert_eq!(data_point.sum(), 47, "sum should be 3 + 7 + 12 + 25 = 47");
612 assert_eq!(data_point.count(), 4, "count should be 4");
613 assert_eq!(
614 data_point.bucket_counts().collect::<Vec<_>>(),
615 vec![1, 1, 1, 1],
616 "each value should fall into a separate bucket"
617 );
618 assert_eq!(
619 data_point.bounds().collect::<Vec<_>>(),
620 vec![5.0, 10.0, 20.0],
621 "boundaries should match the defined ones"
622 );
623
624 let encoded = collect_and_encode(®istry);
625 assert!(encoded.contains(r#"test_histogram_sum{otel_scope_name="test_meter",key="value"} 47"#));
626 }
627
628 #[test]
629 fn test_non_monotonic_sum_as_gauge() {
630 let (exporter, registry) = setup_prometheus_exporter(opentelemetry_sdk::metrics::Temporality::Cumulative, false);
631 let provider = SdkMeterProvider::builder()
632 .with_reader(exporter.clone())
633 .with_resource(
634 Resource::builder()
635 .with_attributes(vec![KeyValue::new("service.name", "test_service")])
636 .build(),
637 )
638 .build();
639 opentelemetry::global::set_meter_provider(provider.clone());
640
641 let meter = provider.meter("test_meter");
642 let sum_metric = meter.i64_up_down_counter("test_non_monotonic_sum").build();
643 sum_metric.add(10, &[KeyValue::new("key", "value")]);
644 sum_metric.add(-5, &[KeyValue::new("key", "value")]);
645
646 let encoded = collect_and_encode(®istry);
647
648 assert!(encoded.contains(r#"test_non_monotonic_sum{otel_scope_name="test_meter",key="value"} 5"#));
649 assert!(
650 !encoded.contains("test_non_monotonic_sum_total"),
651 "Non-monotonic sum should not have '_total' suffix"
652 );
653 }
654
655 #[test]
656 fn test_escape_key() {
657 assert_eq!(escape_key("valid_key"), "valid_key");
658 assert_eq!(escape_key("123start"), "_123start");
659 assert_eq!(escape_key("key with spaces"), "key_with_spaces");
660 assert_eq!(escape_key("key_with:dots"), "key_with:dots");
661 assert_eq!(escape_key("!@#$%"), "_____");
662 }
663}