parquet/arrow/arrow_reader/
metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [ArrowReaderMetrics] for collecting metrics about the Arrow reader
19
20use std::sync::atomic::AtomicUsize;
21use std::sync::Arc;
22
23/// This enum represents the state of Arrow reader metrics collection.
24///
25/// The inner metrics are stored in an `Arc<ArrowReaderMetricsInner>`
26/// so cloning the `ArrowReaderMetrics` enum will not clone the inner metrics.
27///
28/// To access metrics, create an `ArrowReaderMetrics` via [`ArrowReaderMetrics::enabled()`]
29/// and configure the `ArrowReaderBuilder` with a clone.
30#[derive(Debug, Clone)]
31pub enum ArrowReaderMetrics {
32    /// Metrics are not collected (default)
33    Disabled,
34    /// Metrics are collected and stored in an `Arc`.
35    ///
36    /// Create this via [`ArrowReaderMetrics::enabled()`].
37    Enabled(Arc<ArrowReaderMetricsInner>),
38}
39
40impl ArrowReaderMetrics {
41    /// Creates a new instance of [`ArrowReaderMetrics::Disabled`]
42    pub fn disabled() -> Self {
43        Self::Disabled
44    }
45
46    /// Creates a new instance of [`ArrowReaderMetrics::Enabled`]
47    pub fn enabled() -> Self {
48        Self::Enabled(Arc::new(ArrowReaderMetricsInner::new()))
49    }
50
51    /// Predicate Cache: number of records read directly from the inner reader
52    ///
53    /// This is the total number of records read from the inner reader (that is
54    /// actually decoding). It measures the amount of work that could not be
55    /// avoided with caching.
56    ///
57    /// It returns the number of records read across all columns, so if you read
58    /// 2 columns each with 100 records, this will return 200.
59    ///
60    ///
61    /// Returns None if metrics are disabled.
62    pub fn records_read_from_inner(&self) -> Option<usize> {
63        match self {
64            Self::Disabled => None,
65            Self::Enabled(inner) => Some(
66                inner
67                    .records_read_from_inner
68                    .load(std::sync::atomic::Ordering::Relaxed),
69            ),
70        }
71    }
72
73    /// Predicate Cache: number of records read from the cache
74    ///
75    /// This is the total number of records read from the cache actually
76    /// decoding). It measures the amount of work that was avoided with caching.
77    ///
78    /// It returns the number of records read across all columns, so if you read
79    /// 2 columns each with 100 records from the cache, this will return 200.
80    ///
81    /// Returns None if metrics are disabled.
82    pub fn records_read_from_cache(&self) -> Option<usize> {
83        match self {
84            Self::Disabled => None,
85            Self::Enabled(inner) => Some(
86                inner
87                    .records_read_from_cache
88                    .load(std::sync::atomic::Ordering::Relaxed),
89            ),
90        }
91    }
92
93    /// Increments the count of records read from the inner reader
94    pub(crate) fn increment_inner_reads(&self, count: usize) {
95        let Self::Enabled(inner) = self else {
96            return;
97        };
98        inner
99            .records_read_from_inner
100            .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
101    }
102
103    /// Increments the count of records read from the cache
104    pub(crate) fn increment_cache_reads(&self, count: usize) {
105        let Self::Enabled(inner) = self else {
106            return;
107        };
108
109        inner
110            .records_read_from_cache
111            .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
112    }
113}
114
115/// Holds the actual metrics for the Arrow reader.
116///
117/// Please see [`ArrowReaderMetrics`] for the public interface.
118#[derive(Debug)]
119pub struct ArrowReaderMetricsInner {
120    // Metrics for Predicate Cache
121    /// Total number of records read from the inner reader (uncached)
122    records_read_from_inner: AtomicUsize,
123    /// Total number of records read from previously cached pages
124    records_read_from_cache: AtomicUsize,
125}
126
127impl ArrowReaderMetricsInner {
128    /// Creates a new instance of `ArrowReaderMetricsInner`
129    pub(crate) fn new() -> Self {
130        Self {
131            records_read_from_inner: AtomicUsize::new(0),
132            records_read_from_cache: AtomicUsize::new(0),
133        }
134    }
135}