parquet/arrow/arrow_reader/
statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`].
19
20/// Notice that all the corresponding tests are in
21/// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`.
22use crate::arrow::buffer::bit_util::sign_extend_be;
23use crate::arrow::parquet_column;
24use crate::basic::Type as PhysicalType;
25use crate::data_type::{ByteArray, FixedLenByteArray};
26use crate::errors::{ParquetError, Result};
27use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
28use crate::file::page_index::index::{Index, PageIndex};
29use crate::file::statistics::Statistics as ParquetStatistics;
30use crate::schema::types::SchemaDescriptor;
31use arrow_array::builder::{
32    BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
33    StringViewBuilder,
34};
35use arrow_array::{
36    new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
37    Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array, Float16Array, Float32Array,
38    Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
39    Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
40    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
41    TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
42};
43use arrow_buffer::i256;
44use arrow_schema::{DataType, Field, Schema, TimeUnit};
45use half::f16;
46use paste::paste;
47use std::sync::Arc;
48
49// Convert the bytes array to i32.
50// The endian of the input bytes array must be big-endian.
51pub(crate) fn from_bytes_to_i32(b: &[u8]) -> i32 {
52    // The bytes array are from parquet file and must be the big-endian.
53    // The endian is defined by parquet format, and the reference document
54    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
55    i32::from_be_bytes(sign_extend_be::<4>(b))
56}
57
58// Convert the bytes array to i64.
59// The endian of the input bytes array must be big-endian.
60pub(crate) fn from_bytes_to_i64(b: &[u8]) -> i64 {
61    i64::from_be_bytes(sign_extend_be::<8>(b))
62}
63
64// Convert the bytes array to i128.
65// The endian of the input bytes array must be big-endian.
66pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
67    i128::from_be_bytes(sign_extend_be::<16>(b))
68}
69
70// Convert the bytes array to i256.
71// The endian of the input bytes array must be big-endian.
72pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
73    i256::from_be_bytes(sign_extend_be::<32>(b))
74}
75
76// Convert the bytes array to f16
77pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
78    match b {
79        [low, high] => Some(f16::from_be_bytes([*high, *low])),
80        _ => None,
81    }
82}
83
84/// Define an adapter iterator for extracting statistics from an iterator of
85/// `ParquetStatistics`
86///
87///
88/// Handles checking if the statistics are present and valid with the correct type.
89///
90/// Parameters:
91/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`)
92/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
93/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`)
94/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`)
95macro_rules! make_stats_iterator {
96    ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
97        /// Maps an iterator of `ParquetStatistics` into an iterator of
98        /// `&$stat_value_type``
99        ///
100        /// Yielded elements:
101        /// * Some(stats) if valid
102        /// * None if the statistics are not present, not valid, or not $stat_value_type
103        struct $iterator_type<'a, I>
104        where
105            I: Iterator<Item = Option<&'a ParquetStatistics>>,
106        {
107            iter: I,
108        }
109
110        impl<'a, I> $iterator_type<'a, I>
111        where
112            I: Iterator<Item = Option<&'a ParquetStatistics>>,
113        {
114            /// Create a new iterator to extract the statistics
115            fn new(iter: I) -> Self {
116                Self { iter }
117            }
118        }
119
120        /// Implement the Iterator trait for the iterator
121        impl<'a, I> Iterator for $iterator_type<'a, I>
122        where
123            I: Iterator<Item = Option<&'a ParquetStatistics>>,
124        {
125            type Item = Option<&'a $stat_value_type>;
126
127            /// return the next statistics value
128            fn next(&mut self) -> Option<Self::Item> {
129                let next = self.iter.next();
130                next.map(|x| {
131                    x.and_then(|stats| match stats {
132                        $parquet_statistics_type(s) => s.$func(),
133                        _ => None,
134                    })
135                })
136            }
137
138            fn size_hint(&self) -> (usize, Option<usize>) {
139                self.iter.size_hint()
140            }
141        }
142    };
143}
144
145make_stats_iterator!(
146    MinBooleanStatsIterator,
147    min_opt,
148    ParquetStatistics::Boolean,
149    bool
150);
151make_stats_iterator!(
152    MaxBooleanStatsIterator,
153    max_opt,
154    ParquetStatistics::Boolean,
155    bool
156);
157make_stats_iterator!(
158    MinInt32StatsIterator,
159    min_opt,
160    ParquetStatistics::Int32,
161    i32
162);
163make_stats_iterator!(
164    MaxInt32StatsIterator,
165    max_opt,
166    ParquetStatistics::Int32,
167    i32
168);
169make_stats_iterator!(
170    MinInt64StatsIterator,
171    min_opt,
172    ParquetStatistics::Int64,
173    i64
174);
175make_stats_iterator!(
176    MaxInt64StatsIterator,
177    max_opt,
178    ParquetStatistics::Int64,
179    i64
180);
181make_stats_iterator!(
182    MinFloatStatsIterator,
183    min_opt,
184    ParquetStatistics::Float,
185    f32
186);
187make_stats_iterator!(
188    MaxFloatStatsIterator,
189    max_opt,
190    ParquetStatistics::Float,
191    f32
192);
193make_stats_iterator!(
194    MinDoubleStatsIterator,
195    min_opt,
196    ParquetStatistics::Double,
197    f64
198);
199make_stats_iterator!(
200    MaxDoubleStatsIterator,
201    max_opt,
202    ParquetStatistics::Double,
203    f64
204);
205make_stats_iterator!(
206    MinByteArrayStatsIterator,
207    min_bytes_opt,
208    ParquetStatistics::ByteArray,
209    [u8]
210);
211make_stats_iterator!(
212    MaxByteArrayStatsIterator,
213    max_bytes_opt,
214    ParquetStatistics::ByteArray,
215    [u8]
216);
217make_stats_iterator!(
218    MinFixedLenByteArrayStatsIterator,
219    min_bytes_opt,
220    ParquetStatistics::FixedLenByteArray,
221    [u8]
222);
223make_stats_iterator!(
224    MaxFixedLenByteArrayStatsIterator,
225    max_bytes_opt,
226    ParquetStatistics::FixedLenByteArray,
227    [u8]
228);
229
230/// Special iterator adapter for extracting i128 values from from an iterator of
231/// `ParquetStatistics`
232///
233/// Handles checking if the statistics are present and valid with the correct type.
234///
235/// Depending on the parquet file, the statistics for `Decimal128` can be stored as
236/// `Int32`, `Int64` or `ByteArray` or `FixedSizeByteArray` :mindblown:
237///
238/// This iterator handles all cases, extracting the values
239/// and converting it to `stat_value_type`.
240///
241/// Parameters:
242/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`)
243/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
244/// * `$bytes_func` is the function to call to get the value as bytes (e.g. `min_bytes` or `max_bytes`)
245/// * `$stat_value_type` is the type of the statistics value (e.g. `i128`)
246/// * `convert_func` is the function to convert the bytes to stats value (e.g. `from_bytes_to_i128`)
247macro_rules! make_decimal_stats_iterator {
248    ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
249        struct $iterator_type<'a, I>
250        where
251            I: Iterator<Item = Option<&'a ParquetStatistics>>,
252        {
253            iter: I,
254        }
255
256        impl<'a, I> $iterator_type<'a, I>
257        where
258            I: Iterator<Item = Option<&'a ParquetStatistics>>,
259        {
260            fn new(iter: I) -> Self {
261                Self { iter }
262            }
263        }
264
265        impl<'a, I> Iterator for $iterator_type<'a, I>
266        where
267            I: Iterator<Item = Option<&'a ParquetStatistics>>,
268        {
269            type Item = Option<$stat_value_type>;
270
271            fn next(&mut self) -> Option<Self::Item> {
272                let next = self.iter.next();
273                next.map(|x| {
274                    x.and_then(|stats| match stats {
275                        ParquetStatistics::Int32(s) => {
276                            s.$func().map(|x| $stat_value_type::from(*x))
277                        }
278                        ParquetStatistics::Int64(s) => s
279                            .$func()
280                            .map(|x| $stat_value_type::try_from(*x).ok())
281                            .flatten(),
282                        ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
283                        ParquetStatistics::FixedLenByteArray(s) => {
284                            s.$bytes_func().map($convert_func)
285                        }
286                        _ => None,
287                    })
288                })
289            }
290
291            fn size_hint(&self) -> (usize, Option<usize>) {
292                self.iter.size_hint()
293            }
294        }
295    };
296}
297
298make_decimal_stats_iterator!(
299    MinDecimal32StatsIterator,
300    min_opt,
301    min_bytes_opt,
302    i32,
303    from_bytes_to_i32
304);
305make_decimal_stats_iterator!(
306    MaxDecimal32StatsIterator,
307    max_opt,
308    max_bytes_opt,
309    i32,
310    from_bytes_to_i32
311);
312make_decimal_stats_iterator!(
313    MinDecimal64StatsIterator,
314    min_opt,
315    min_bytes_opt,
316    i64,
317    from_bytes_to_i64
318);
319make_decimal_stats_iterator!(
320    MaxDecimal64StatsIterator,
321    max_opt,
322    max_bytes_opt,
323    i64,
324    from_bytes_to_i64
325);
326make_decimal_stats_iterator!(
327    MinDecimal128StatsIterator,
328    min_opt,
329    min_bytes_opt,
330    i128,
331    from_bytes_to_i128
332);
333make_decimal_stats_iterator!(
334    MaxDecimal128StatsIterator,
335    max_opt,
336    max_bytes_opt,
337    i128,
338    from_bytes_to_i128
339);
340make_decimal_stats_iterator!(
341    MinDecimal256StatsIterator,
342    min_opt,
343    min_bytes_opt,
344    i256,
345    from_bytes_to_i256
346);
347make_decimal_stats_iterator!(
348    MaxDecimal256StatsIterator,
349    max_opt,
350    max_bytes_opt,
351    i256,
352    from_bytes_to_i256
353);
354
355/// Special macro to combine the statistics iterators for min and max using the [`mod@paste`] macro.
356/// This is used to avoid repeating the same code for min and max statistics extractions
357///
358/// Parameters:
359/// stat_type_prefix: The prefix of the statistics iterator type (e.g. `Min` or `Max`)
360/// data_type: The data type of the statistics (e.g. `DataType::Int32`)
361/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from.
362macro_rules! get_statistics {
363    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
364        paste! {
365        match $data_type {
366            DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
367                [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
368            ))),
369            DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
370                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
371                    x.and_then(|x| i8::try_from(*x).ok())
372                }),
373            ))),
374            DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
375                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
376                    x.and_then(|x| i16::try_from(*x).ok())
377                }),
378            ))),
379            DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
380                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
381            ))),
382            DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
383                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
384            ))),
385            DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
386                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
387                    x.and_then(|x| u8::try_from(*x).ok())
388                }),
389            ))),
390            DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
391                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
392                    x.and_then(|x| u16::try_from(*x).ok())
393                }),
394            ))),
395            DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
396                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
397            ))),
398            DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
399                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
400            ))),
401            DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
402                [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
403                    from_bytes_to_f16(x)
404                })),
405            ))),
406            DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
407                [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
408            ))),
409            DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
410                [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
411            ))),
412            DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
413                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
414            ))),
415            DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
416                [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
417                    .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
418            DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
419                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
420            DataType::Timestamp(unit, timezone) =>{
421                let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
422                Ok(match unit {
423                    TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
424                    TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
425                    TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
426                    TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
427                })
428            },
429            DataType::Time32(unit) => {
430                Ok(match unit {
431                    TimeUnit::Second =>  Arc::new(Time32SecondArray::from_iter(
432                        [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
433                    )),
434                    TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
435                        [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
436                    )),
437                    _ => {
438                        let len = $iterator.count();
439                        // don't know how to extract statistics, so return a null array
440                        new_null_array($data_type, len)
441                    }
442                })
443            },
444            DataType::Time64(unit) => {
445                Ok(match unit {
446                    TimeUnit::Microsecond =>  Arc::new(Time64MicrosecondArray::from_iter(
447                        [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
448                    )),
449                    TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
450                        [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
451                    )),
452                    _ => {
453                        let len = $iterator.count();
454                        // don't know how to extract statistics, so return a null array
455                        new_null_array($data_type, len)
456                    }
457                })
458            },
459            DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
460                [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
461            ))),
462            DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
463                [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
464            ))),
465            DataType::Utf8 => {
466                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
467                let mut builder = StringBuilder::new();
468                for x in iterator {
469                    let Some(x) = x else {
470                        builder.append_null(); // no statistics value
471                        continue;
472                    };
473
474                    let Ok(x) = std::str::from_utf8(x) else {
475                        builder.append_null();
476                        continue;
477                    };
478
479                    builder.append_value(x);
480                }
481                Ok(Arc::new(builder.finish()))
482            },
483            DataType::LargeUtf8 => {
484                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
485                let mut builder = LargeStringBuilder::new();
486                for x in iterator {
487                    let Some(x) = x else {
488                        builder.append_null(); // no statistics value
489                        continue;
490                    };
491
492                    let Ok(x) = std::str::from_utf8(x) else {
493                        builder.append_null();
494                        continue;
495                    };
496
497                    builder.append_value(x);
498                }
499                Ok(Arc::new(builder.finish()))
500            },
501            DataType::FixedSizeBinary(size) => {
502                let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
503                let mut builder = FixedSizeBinaryBuilder::new(*size);
504                for x in iterator {
505                    let Some(x) = x else {
506                        builder.append_null(); // no statistics value
507                        continue;
508                    };
509
510                    // ignore invalid values
511                    if x.len().try_into() != Ok(*size){
512                        builder.append_null();
513                        continue;
514                    }
515
516                    builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
517                }
518                Ok(Arc::new(builder.finish()))
519            },
520            DataType::Decimal32(precision, scale) => {
521                let arr = Decimal32Array::from_iter(
522                    [<$stat_type_prefix Decimal32StatsIterator>]::new($iterator)
523                ).with_precision_and_scale(*precision, *scale)?;
524                Ok(Arc::new(arr))
525            },
526            DataType::Decimal64(precision, scale) => {
527                let arr = Decimal64Array::from_iter(
528                    [<$stat_type_prefix Decimal64StatsIterator>]::new($iterator)
529                ).with_precision_and_scale(*precision, *scale)?;
530                Ok(Arc::new(arr))
531            },
532            DataType::Decimal128(precision, scale) => {
533                let arr = Decimal128Array::from_iter(
534                    [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
535                ).with_precision_and_scale(*precision, *scale)?;
536                Ok(Arc::new(arr))
537            },
538            DataType::Decimal256(precision, scale) => {
539                let arr = Decimal256Array::from_iter(
540                    [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
541                ).with_precision_and_scale(*precision, *scale)?;
542                Ok(Arc::new(arr))
543            },
544            DataType::Dictionary(_, value_type) => {
545                [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
546            },
547            DataType::Utf8View => {
548                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
549                let mut builder = StringViewBuilder::new();
550                for x in iterator {
551                    let Some(x) = x else {
552                        builder.append_null(); // no statistics value
553                        continue;
554                    };
555
556                    let Ok(x) = std::str::from_utf8(x) else {
557                        builder.append_null();
558                        continue;
559                    };
560
561                    builder.append_value(x);
562                }
563                Ok(Arc::new(builder.finish()))
564            },
565            DataType::BinaryView => {
566                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
567                let mut builder = BinaryViewBuilder::new();
568                for x in iterator {
569                    let Some(x) = x else {
570                        builder.append_null(); // no statistics value
571                        continue;
572                    };
573
574                    builder.append_value(x);
575                }
576                Ok(Arc::new(builder.finish()))
577            }
578
579            DataType::Map(_,_) |
580            DataType::Duration(_) |
581            DataType::Interval(_) |
582            DataType::Date64 |  // required to cover $physical_type match guard
583            DataType::Null |
584            DataType::List(_) |
585            DataType::ListView(_) |
586            DataType::FixedSizeList(_, _) |
587            DataType::LargeList(_) |
588            DataType::LargeListView(_) |
589            DataType::Struct(_) |
590            DataType::Union(_, _) |
591            DataType::RunEndEncoded(_, _) => {
592                let len = $iterator.count();
593                // don't know how to extract statistics, so return a null array
594                Ok(new_null_array($data_type, len))
595            }
596        }}}
597}
598
599macro_rules! make_data_page_stats_iterator {
600    ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => {
601        struct $iterator_type<'a, I>
602        where
603            I: Iterator<Item = (usize, &'a Index)>,
604        {
605            iter: I,
606        }
607
608        impl<'a, I> $iterator_type<'a, I>
609        where
610            I: Iterator<Item = (usize, &'a Index)>,
611        {
612            fn new(iter: I) -> Self {
613                Self { iter }
614            }
615        }
616
617        impl<'a, I> Iterator for $iterator_type<'a, I>
618        where
619            I: Iterator<Item = (usize, &'a Index)>,
620        {
621            type Item = Vec<Option<$stat_value_type>>;
622
623            fn next(&mut self) -> Option<Self::Item> {
624                let next = self.iter.next();
625                match next {
626                    Some((len, index)) => match index {
627                        $index_type(native_index) => {
628                            Some(native_index.indexes.iter().map($func).collect::<Vec<_>>())
629                        }
630                        // No matching `Index` found;
631                        // thus no statistics that can be extracted.
632                        // We return vec![None; len] to effectively
633                        // create an arrow null-array with the length
634                        // corresponding to the number of entries in
635                        // `ParquetOffsetIndex` per row group per column.
636                        _ => Some(vec![None; len]),
637                    },
638                    _ => None,
639                }
640            }
641
642            fn size_hint(&self) -> (usize, Option<usize>) {
643                self.iter.size_hint()
644            }
645        }
646    };
647}
648
649make_data_page_stats_iterator!(
650    MinBooleanDataPageStatsIterator,
651    |x: &PageIndex<bool>| { x.min },
652    Index::BOOLEAN,
653    bool
654);
655make_data_page_stats_iterator!(
656    MaxBooleanDataPageStatsIterator,
657    |x: &PageIndex<bool>| { x.max },
658    Index::BOOLEAN,
659    bool
660);
661make_data_page_stats_iterator!(
662    MinInt32DataPageStatsIterator,
663    |x: &PageIndex<i32>| { x.min },
664    Index::INT32,
665    i32
666);
667make_data_page_stats_iterator!(
668    MaxInt32DataPageStatsIterator,
669    |x: &PageIndex<i32>| { x.max },
670    Index::INT32,
671    i32
672);
673make_data_page_stats_iterator!(
674    MinInt64DataPageStatsIterator,
675    |x: &PageIndex<i64>| { x.min },
676    Index::INT64,
677    i64
678);
679make_data_page_stats_iterator!(
680    MaxInt64DataPageStatsIterator,
681    |x: &PageIndex<i64>| { x.max },
682    Index::INT64,
683    i64
684);
685make_data_page_stats_iterator!(
686    MinFloat16DataPageStatsIterator,
687    |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
688    Index::FIXED_LEN_BYTE_ARRAY,
689    FixedLenByteArray
690);
691make_data_page_stats_iterator!(
692    MaxFloat16DataPageStatsIterator,
693    |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
694    Index::FIXED_LEN_BYTE_ARRAY,
695    FixedLenByteArray
696);
697make_data_page_stats_iterator!(
698    MinFloat32DataPageStatsIterator,
699    |x: &PageIndex<f32>| { x.min },
700    Index::FLOAT,
701    f32
702);
703make_data_page_stats_iterator!(
704    MaxFloat32DataPageStatsIterator,
705    |x: &PageIndex<f32>| { x.max },
706    Index::FLOAT,
707    f32
708);
709make_data_page_stats_iterator!(
710    MinFloat64DataPageStatsIterator,
711    |x: &PageIndex<f64>| { x.min },
712    Index::DOUBLE,
713    f64
714);
715make_data_page_stats_iterator!(
716    MaxFloat64DataPageStatsIterator,
717    |x: &PageIndex<f64>| { x.max },
718    Index::DOUBLE,
719    f64
720);
721make_data_page_stats_iterator!(
722    MinByteArrayDataPageStatsIterator,
723    |x: &PageIndex<ByteArray>| { x.min.clone() },
724    Index::BYTE_ARRAY,
725    ByteArray
726);
727make_data_page_stats_iterator!(
728    MaxByteArrayDataPageStatsIterator,
729    |x: &PageIndex<ByteArray>| { x.max.clone() },
730    Index::BYTE_ARRAY,
731    ByteArray
732);
733make_data_page_stats_iterator!(
734    MaxFixedLenByteArrayDataPageStatsIterator,
735    |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
736    Index::FIXED_LEN_BYTE_ARRAY,
737    FixedLenByteArray
738);
739
740make_data_page_stats_iterator!(
741    MinFixedLenByteArrayDataPageStatsIterator,
742    |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
743    Index::FIXED_LEN_BYTE_ARRAY,
744    FixedLenByteArray
745);
746
747macro_rules! get_decimal_page_stats_iterator {
748    ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
749        struct $iterator_type<'a, I>
750        where
751            I: Iterator<Item = (usize, &'a Index)>,
752        {
753            iter: I,
754        }
755
756        impl<'a, I> $iterator_type<'a, I>
757        where
758            I: Iterator<Item = (usize, &'a Index)>,
759        {
760            fn new(iter: I) -> Self {
761                Self { iter }
762            }
763        }
764
765        impl<'a, I> Iterator for $iterator_type<'a, I>
766        where
767            I: Iterator<Item = (usize, &'a Index)>,
768        {
769            type Item = Vec<Option<$stat_value_type>>;
770
771            fn next(&mut self) -> Option<Self::Item> {
772                let next = self.iter.next();
773                match next {
774                    Some((len, index)) => match index {
775                        Index::INT32(native_index) => Some(
776                            native_index
777                                .indexes
778                                .iter()
779                                .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x))))
780                                .collect::<Vec<_>>(),
781                        ),
782                        Index::INT64(native_index) => Some(
783                            native_index
784                                .indexes
785                                .iter()
786                                .map(|x| x.$func.and_then(|x| $stat_value_type::try_from(x).ok()))
787                                .collect::<Vec<_>>(),
788                        ),
789                        Index::BYTE_ARRAY(native_index) => Some(
790                            native_index
791                                .indexes
792                                .iter()
793                                .map(|x| {
794                                    x.clone().$func.and_then(|x| Some($convert_func(x.data())))
795                                })
796                                .collect::<Vec<_>>(),
797                        ),
798                        Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
799                            native_index
800                                .indexes
801                                .iter()
802                                .map(|x| {
803                                    x.clone().$func.and_then(|x| Some($convert_func(x.data())))
804                                })
805                                .collect::<Vec<_>>(),
806                        ),
807                        _ => Some(vec![None; len]),
808                    },
809                    _ => None,
810                }
811            }
812
813            fn size_hint(&self) -> (usize, Option<usize>) {
814                self.iter.size_hint()
815            }
816        }
817    };
818}
819
820get_decimal_page_stats_iterator!(
821    MinDecimal32DataPageStatsIterator,
822    min,
823    i32,
824    from_bytes_to_i32
825);
826
827get_decimal_page_stats_iterator!(
828    MaxDecimal32DataPageStatsIterator,
829    max,
830    i32,
831    from_bytes_to_i32
832);
833
834get_decimal_page_stats_iterator!(
835    MinDecimal64DataPageStatsIterator,
836    min,
837    i64,
838    from_bytes_to_i64
839);
840
841get_decimal_page_stats_iterator!(
842    MaxDecimal64DataPageStatsIterator,
843    max,
844    i64,
845    from_bytes_to_i64
846);
847
848get_decimal_page_stats_iterator!(
849    MinDecimal128DataPageStatsIterator,
850    min,
851    i128,
852    from_bytes_to_i128
853);
854
855get_decimal_page_stats_iterator!(
856    MaxDecimal128DataPageStatsIterator,
857    max,
858    i128,
859    from_bytes_to_i128
860);
861
862get_decimal_page_stats_iterator!(
863    MinDecimal256DataPageStatsIterator,
864    min,
865    i256,
866    from_bytes_to_i256
867);
868
869get_decimal_page_stats_iterator!(
870    MaxDecimal256DataPageStatsIterator,
871    max,
872    i256,
873    from_bytes_to_i256
874);
875
876macro_rules! get_data_page_statistics {
877    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
878        paste! {
879            match $data_type {
880                DataType::Boolean => {
881                    let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
882                    let mut builder = BooleanBuilder::new();
883                    for x in iterator {
884                        for x in x.into_iter() {
885                            let Some(x) = x else {
886                                builder.append_null(); // no statistics value
887                                continue;
888                            };
889                            builder.append_value(x);
890                        }
891                    }
892                    Ok(Arc::new(builder.finish()))
893                },
894                DataType::UInt8 => Ok(Arc::new(
895                    UInt8Array::from_iter(
896                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
897                            .map(|x| {
898                                x.into_iter().map(|x| {
899                                    x.and_then(|x| u8::try_from(x).ok())
900                                })
901                            })
902                            .flatten()
903                    )
904                )),
905                DataType::UInt16 => Ok(Arc::new(
906                    UInt16Array::from_iter(
907                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
908                            .map(|x| {
909                                x.into_iter().map(|x| {
910                                    x.and_then(|x| u16::try_from(x).ok())
911                                })
912                            })
913                            .flatten()
914                    )
915                )),
916                DataType::UInt32 => Ok(Arc::new(
917                    UInt32Array::from_iter(
918                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
919                            .map(|x| {
920                                x.into_iter().map(|x| {
921                                    x.and_then(|x| Some(x as u32))
922                                })
923                            })
924                            .flatten()
925                ))),
926                DataType::UInt64 => Ok(Arc::new(
927                    UInt64Array::from_iter(
928                        [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator)
929                            .map(|x| {
930                                x.into_iter().map(|x| {
931                                    x.and_then(|x| Some(x as u64))
932                                })
933                            })
934                            .flatten()
935                ))),
936                DataType::Int8 => Ok(Arc::new(
937                    Int8Array::from_iter(
938                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
939                            .map(|x| {
940                                x.into_iter().map(|x| {
941                                    x.and_then(|x| i8::try_from(x).ok())
942                                })
943                            })
944                            .flatten()
945                    )
946                )),
947                DataType::Int16 => Ok(Arc::new(
948                    Int16Array::from_iter(
949                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
950                            .map(|x| {
951                                x.into_iter().map(|x| {
952                                    x.and_then(|x| i16::try_from(x).ok())
953                                })
954                            })
955                            .flatten()
956                    )
957                )),
958                DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
959                DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
960                DataType::Float16 => Ok(Arc::new(
961                    Float16Array::from_iter(
962                        [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
963                            .map(|x| {
964                                x.into_iter().map(|x| {
965                                    x.and_then(|x| from_bytes_to_f16(x.data()))
966                                })
967                            })
968                            .flatten()
969                    )
970                )),
971                DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
972                DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
973                DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
974                DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
975                DataType::Utf8 => {
976                    let mut builder = StringBuilder::new();
977                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
978                    for x in iterator {
979                        for x in x.into_iter() {
980                            let Some(x) = x else {
981                                builder.append_null(); // no statistics value
982                                continue;
983                            };
984
985                            let Ok(x) = std::str::from_utf8(x.data()) else {
986                                builder.append_null();
987                                continue;
988                            };
989
990                            builder.append_value(x);
991                        }
992                    }
993                    Ok(Arc::new(builder.finish()))
994                },
995                DataType::LargeUtf8 => {
996                    let mut builder = LargeStringBuilder::new();
997                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
998                    for x in iterator {
999                        for x in x.into_iter() {
1000                            let Some(x) = x else {
1001                                builder.append_null(); // no statistics value
1002                                continue;
1003                            };
1004
1005                            let Ok(x) = std::str::from_utf8(x.data()) else {
1006                                builder.append_null();
1007                                continue;
1008                            };
1009
1010                            builder.append_value(x);
1011                        }
1012                    }
1013                    Ok(Arc::new(builder.finish()))
1014                },
1015                DataType::Dictionary(_, value_type) => {
1016                    [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type)
1017                },
1018                DataType::Timestamp(unit, timezone) => {
1019                    let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
1020                    Ok(match unit {
1021                        TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1022                        TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1023                        TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1024                        TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1025                    })
1026                },
1027                DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
1028                DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok(
1029                    Arc::new(
1030                        Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
1031                            .map(|x| {
1032                                x.into_iter()
1033                                .map(|x| {
1034                                    x.and_then(|x| i64::try_from(x).ok())
1035                                })
1036                                .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
1037                            }).flatten()
1038                        )
1039                    )
1040                ),
1041                DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
1042                DataType::Decimal32(precision, scale) => Ok(Arc::new(
1043                    Decimal32Array::from_iter([<$stat_type_prefix Decimal32DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
1044                DataType::Decimal64(precision, scale) => Ok(Arc::new(
1045                    Decimal64Array::from_iter([<$stat_type_prefix Decimal64DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
1046                DataType::Decimal128(precision, scale) => Ok(Arc::new(
1047                    Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
1048                DataType::Decimal256(precision, scale) => Ok(Arc::new(
1049                    Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
1050                DataType::Time32(unit) => {
1051                    Ok(match unit {
1052                        TimeUnit::Second =>  Arc::new(Time32SecondArray::from_iter(
1053                            [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
1054                        )),
1055                        TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
1056                            [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
1057                        )),
1058                        _ => {
1059                            // don't know how to extract statistics, so return an empty array
1060                            new_empty_array(&DataType::Time32(unit.clone()))
1061                        }
1062                    })
1063                }
1064                DataType::Time64(unit) => {
1065                    Ok(match unit {
1066                        TimeUnit::Microsecond =>  Arc::new(Time64MicrosecondArray::from_iter(
1067                            [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
1068                        )),
1069                        TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
1070                            [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
1071                        )),
1072                        _ => {
1073                            // don't know how to extract statistics, so return an empty array
1074                            new_empty_array(&DataType::Time64(unit.clone()))
1075                        }
1076                    })
1077                },
1078                DataType::FixedSizeBinary(size) => {
1079                    let mut builder = FixedSizeBinaryBuilder::new(*size);
1080                    let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
1081                    for x in iterator {
1082                        for x in x.into_iter() {
1083                            let Some(x) = x else {
1084                                builder.append_null(); // no statistics value
1085                                continue;
1086                            };
1087
1088                            if x.len() == *size as usize {
1089                                let _ = builder.append_value(x.data());
1090                            } else {
1091                                builder.append_null();
1092                            }
1093                        }
1094                    }
1095                    Ok(Arc::new(builder.finish()))
1096                },
1097                DataType::Utf8View => {
1098                    let mut builder = StringViewBuilder::new();
1099                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1100                    for x in iterator {
1101                        for x in x.into_iter() {
1102                            let Some(x) = x else {
1103                                builder.append_null(); // no statistics value
1104                                continue;
1105                            };
1106
1107                            let Ok(x) = std::str::from_utf8(x.data()) else {
1108                                builder.append_null();
1109                                continue;
1110                            };
1111
1112                            builder.append_value(x);
1113                        }
1114                    }
1115                    Ok(Arc::new(builder.finish()))
1116                },
1117                DataType::BinaryView => {
1118                    let mut builder = BinaryViewBuilder::new();
1119                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1120                    for x in iterator {
1121                        for x in x.into_iter() {
1122                            let Some(x) = x else {
1123                                builder.append_null(); // no statistics value
1124                                continue;
1125                            };
1126
1127                            builder.append_value(x);
1128                        }
1129                    }
1130                    Ok(Arc::new(builder.finish()))
1131                },
1132                DataType::Date64 |  // required to cover $physical_type match guard
1133                DataType::Null |
1134                DataType::Duration(_) |
1135                DataType::Interval(_) |
1136                DataType::List(_) |
1137                DataType::ListView(_) |
1138                DataType::FixedSizeList(_, _) |
1139                DataType::LargeList(_) |
1140                DataType::LargeListView(_) |
1141                DataType::Struct(_) |
1142                DataType::Union(_, _) |
1143                DataType::Map(_, _) |
1144                DataType::RunEndEncoded(_, _) => {
1145                    let len = $iterator.count();
1146                    // don't know how to extract statistics, so return a null array
1147                    Ok(new_null_array($data_type, len))
1148                },
1149            }
1150        }
1151    }
1152}
1153/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an
1154/// [`ArrayRef`]
1155///
1156/// This is an internal helper -- see [`StatisticsConverter`] for public API
1157fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1158    data_type: &DataType,
1159    iterator: I,
1160    physical_type: Option<PhysicalType>,
1161) -> Result<ArrayRef> {
1162    get_statistics!(Min, data_type, iterator, physical_type)
1163}
1164
1165/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
1166///
1167/// This is an internal helper -- see [`StatisticsConverter`] for public API
1168fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1169    data_type: &DataType,
1170    iterator: I,
1171    physical_type: Option<PhysicalType>,
1172) -> Result<ArrayRef> {
1173    get_statistics!(Max, data_type, iterator, physical_type)
1174}
1175
1176/// Extracts the min statistics from an iterator
1177/// of parquet page [`Index`]'es to an [`ArrayRef`]
1178pub(crate) fn min_page_statistics<'a, I>(
1179    data_type: &DataType,
1180    iterator: I,
1181    physical_type: Option<PhysicalType>,
1182) -> Result<ArrayRef>
1183where
1184    I: Iterator<Item = (usize, &'a Index)>,
1185{
1186    get_data_page_statistics!(Min, data_type, iterator, physical_type)
1187}
1188
1189/// Extracts the max statistics from an iterator
1190/// of parquet page [`Index`]'es to an [`ArrayRef`]
1191pub(crate) fn max_page_statistics<'a, I>(
1192    data_type: &DataType,
1193    iterator: I,
1194    physical_type: Option<PhysicalType>,
1195) -> Result<ArrayRef>
1196where
1197    I: Iterator<Item = (usize, &'a Index)>,
1198{
1199    get_data_page_statistics!(Max, data_type, iterator, physical_type)
1200}
1201
1202/// Extracts the null count statistics from an iterator
1203/// of parquet page [`Index`]'es to an [`ArrayRef`]
1204///
1205/// The returned Array is an [`UInt64Array`]
1206pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
1207where
1208    I: Iterator<Item = (usize, &'a Index)>,
1209{
1210    let iter = iterator.flat_map(|(len, index)| match index {
1211        Index::NONE => vec![None; len],
1212        Index::BOOLEAN(native_index) => native_index
1213            .indexes
1214            .iter()
1215            .map(|x| x.null_count.map(|x| x as u64))
1216            .collect::<Vec<_>>(),
1217        Index::INT32(native_index) => native_index
1218            .indexes
1219            .iter()
1220            .map(|x| x.null_count.map(|x| x as u64))
1221            .collect::<Vec<_>>(),
1222        Index::INT64(native_index) => native_index
1223            .indexes
1224            .iter()
1225            .map(|x| x.null_count.map(|x| x as u64))
1226            .collect::<Vec<_>>(),
1227        Index::FLOAT(native_index) => native_index
1228            .indexes
1229            .iter()
1230            .map(|x| x.null_count.map(|x| x as u64))
1231            .collect::<Vec<_>>(),
1232        Index::DOUBLE(native_index) => native_index
1233            .indexes
1234            .iter()
1235            .map(|x| x.null_count.map(|x| x as u64))
1236            .collect::<Vec<_>>(),
1237        Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
1238            .indexes
1239            .iter()
1240            .map(|x| x.null_count.map(|x| x as u64))
1241            .collect::<Vec<_>>(),
1242        Index::BYTE_ARRAY(native_index) => native_index
1243            .indexes
1244            .iter()
1245            .map(|x| x.null_count.map(|x| x as u64))
1246            .collect::<Vec<_>>(),
1247        _ => unimplemented!(),
1248    });
1249
1250    Ok(UInt64Array::from_iter(iter))
1251}
1252
1253/// Extracts Parquet statistics as Arrow arrays
1254///
1255/// This is used to convert Parquet statistics to Arrow [`ArrayRef`], with
1256/// proper type conversions. This information can be used for pruning Parquet
1257/// files, row groups, and data pages based on the statistics embedded in
1258/// Parquet metadata.
1259///
1260/// # Schemas
1261///
1262/// The converter uses the schema of the Parquet file and the Arrow schema to
1263/// convert the underlying statistics value (stored as a parquet value) into the
1264/// corresponding Arrow value. For example, Decimals are stored as binary in
1265/// parquet files and this structure handles mapping them to the `i128`
1266/// representation used in Arrow.
1267///
1268/// Note: The Parquet schema and Arrow schema do not have to be identical (for
1269/// example, the columns may be in different orders and one or the other schemas
1270/// may have additional columns). The function [`parquet_column`] is used to
1271/// match the column in the Parquet schema to the column in the Arrow schema.
1272#[derive(Debug)]
1273pub struct StatisticsConverter<'a> {
1274    /// the index of the matched column in the Parquet schema
1275    parquet_column_index: Option<usize>,
1276    /// The field (with data type) of the column in the Arrow schema
1277    arrow_field: &'a Field,
1278    /// treat missing null_counts as 0 nulls
1279    missing_null_counts_as_zero: bool,
1280    /// The physical type of the matched column in the Parquet schema
1281    physical_type: Option<PhysicalType>,
1282}
1283
1284impl<'a> StatisticsConverter<'a> {
1285    /// Return the index of the column in the Parquet schema, if any
1286    ///
1287    /// Returns `None` if the column is was present in the Arrow schema, but not
1288    /// present in the parquet file
1289    pub fn parquet_column_index(&self) -> Option<usize> {
1290        self.parquet_column_index
1291    }
1292
1293    /// Return the arrow schema's [`Field]` of the column in the Arrow schema
1294    pub fn arrow_field(&self) -> &'a Field {
1295        self.arrow_field
1296    }
1297
1298    /// Set the statistics converter to treat missing null counts as missing
1299    ///
1300    /// By default, the converter will treat missing null counts as though
1301    /// the null count is known to be `0`.
1302    ///
1303    /// Note that parquet files written by parquet-rs currently do not store
1304    /// null counts even when it is known there are zero nulls, and the reader
1305    /// will return 0 for the null counts in that instance. This behavior may
1306    /// change in a future release.
1307    ///
1308    /// Both parquet-java and parquet-cpp store null counts as 0 when there are
1309    /// no nulls, and don't write unknown values to the null count field.
1310    pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
1311        self.missing_null_counts_as_zero = missing_null_counts_as_zero;
1312        self
1313    }
1314
1315    /// Returns a [`UInt64Array`] with row counts for each row group
1316    ///
1317    /// # Return Value
1318    ///
1319    /// The returned array has no nulls, and has one value for each row group.
1320    /// Each value is the number of rows in the row group.
1321    ///
1322    /// # Example
1323    /// ```no_run
1324    /// # use arrow::datatypes::Schema;
1325    /// # use arrow_array::{ArrayRef, UInt64Array};
1326    /// # use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
1327    /// # use parquet::file::metadata::ParquetMetaData;
1328    /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
1329    /// # fn get_arrow_schema() -> Schema { unimplemented!() }
1330    /// // Given the metadata for a parquet file and the arrow schema
1331    /// let metadata: ParquetMetaData = get_parquet_metadata();
1332    /// let arrow_schema: Schema = get_arrow_schema();
1333    /// let parquet_schema = metadata.file_metadata().schema_descr();
1334    /// // create a converter
1335    /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
1336    ///   .unwrap();
1337    /// // get the row counts for each row group
1338    /// let row_counts = converter.row_group_row_counts(metadata
1339    ///   .row_groups()
1340    ///   .iter()
1341    /// ).unwrap();
1342    /// // file had 2 row groups, with 1024 and 23 rows respectively
1343    /// assert_eq!(row_counts, Some(UInt64Array::from(vec![1024, 23])));
1344    /// ```
1345    pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
1346    where
1347        I: IntoIterator<Item = &'a RowGroupMetaData>,
1348    {
1349        let Some(_) = self.parquet_column_index else {
1350            return Ok(None);
1351        };
1352
1353        let mut builder = UInt64Array::builder(10);
1354        for metadata in metadatas.into_iter() {
1355            let row_count = metadata.num_rows();
1356            let row_count: u64 = row_count.try_into().map_err(|e| {
1357                arrow_err!(format!(
1358                    "Parquet row count {row_count} too large to convert to u64: {e}"
1359                ))
1360            })?;
1361            builder.append_value(row_count);
1362        }
1363        Ok(Some(builder.finish()))
1364    }
1365
1366    /// Create a new `StatisticsConverter` to extract statistics for a column
1367    ///
1368    /// Note if there is no corresponding column in the parquet file, the returned
1369    /// arrays will be null. This can happen if the column is in the arrow
1370    /// schema but not in the parquet schema due to schema evolution.
1371    ///
1372    /// See example on [`Self::row_group_mins`] for usage
1373    ///
1374    /// # Errors
1375    ///
1376    /// * If the column is not found in the arrow schema
1377    pub fn try_new<'b>(
1378        column_name: &'b str,
1379        arrow_schema: &'a Schema,
1380        parquet_schema: &'a SchemaDescriptor,
1381    ) -> Result<Self> {
1382        // ensure the requested column is in the arrow schema
1383        let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
1384            return Err(arrow_err!(format!(
1385                "Column '{}' not found in schema for statistics conversion",
1386                column_name
1387            )));
1388        };
1389
1390        // find the column in the parquet schema, if not, return a null array
1391        let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
1392            Some((parquet_idx, matched_field)) => {
1393                // sanity check that matching field matches the arrow field
1394                if matched_field.as_ref() != arrow_field {
1395                    return Err(arrow_err!(format!(
1396                        "Matched column '{:?}' does not match original matched column '{:?}'",
1397                        matched_field, arrow_field
1398                    )));
1399                }
1400                Some(parquet_idx)
1401            }
1402            None => None,
1403        };
1404
1405        Ok(Self {
1406            parquet_column_index: parquet_index,
1407            arrow_field,
1408            missing_null_counts_as_zero: true,
1409            physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
1410        })
1411    }
1412
1413    /// Extract the minimum values from row group statistics in [`RowGroupMetaData`]
1414    ///
1415    /// # Return Value
1416    ///
1417    /// The returned array contains 1 value for each row group, in the same order as `metadatas`
1418    ///
1419    /// Each value is either
1420    /// * the minimum value for the column
1421    /// * a null value, if the statistics can not be extracted
1422    ///
1423    /// Note that a null value does NOT mean the min value was actually
1424    /// `null` it means it the requested statistic is unknown
1425    ///
1426    /// # Errors
1427    ///
1428    /// Reasons for not being able to extract the statistics include:
1429    /// * the column is not present in the parquet file
1430    /// * statistics for the column are not present in the row group
1431    /// * the stored statistic value can not be converted to the requested type
1432    ///
1433    /// # Example
1434    /// ```no_run
1435    /// # use std::sync::Arc;
1436    /// # use arrow::datatypes::Schema;
1437    /// # use arrow_array::{ArrayRef, Float64Array};
1438    /// # use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
1439    /// # use parquet::file::metadata::ParquetMetaData;
1440    /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
1441    /// # fn get_arrow_schema() -> Schema { unimplemented!() }
1442    /// // Given the metadata for a parquet file and the arrow schema
1443    /// let metadata: ParquetMetaData = get_parquet_metadata();
1444    /// let arrow_schema: Schema = get_arrow_schema();
1445    /// let parquet_schema = metadata.file_metadata().schema_descr();
1446    /// // create a converter
1447    /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
1448    ///   .unwrap();
1449    /// // get the minimum value for the column "foo" in the parquet file
1450    /// let min_values: ArrayRef = converter
1451    ///   .row_group_mins(metadata.row_groups().iter())
1452    ///   .unwrap();
1453    /// // if "foo" is a Float64 value, the returned array will contain Float64 values
1454    /// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0), Some(2.0)])) as _);
1455    /// ```
1456    pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
1457    where
1458        I: IntoIterator<Item = &'a RowGroupMetaData>,
1459    {
1460        let data_type = self.arrow_field.data_type();
1461
1462        let Some(parquet_index) = self.parquet_column_index else {
1463            return Ok(self.make_null_array(data_type, metadatas));
1464        };
1465
1466        let iter = metadatas
1467            .into_iter()
1468            .map(|x| x.column(parquet_index).statistics());
1469        min_statistics(data_type, iter, self.physical_type)
1470    }
1471
1472    /// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
1473    ///
1474    /// See docs on [`Self::row_group_mins`] for details
1475    pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
1476    where
1477        I: IntoIterator<Item = &'a RowGroupMetaData>,
1478    {
1479        let data_type = self.arrow_field.data_type();
1480
1481        let Some(parquet_index) = self.parquet_column_index else {
1482            return Ok(self.make_null_array(data_type, metadatas));
1483        };
1484
1485        let iter = metadatas
1486            .into_iter()
1487            .map(|x| x.column(parquet_index).statistics());
1488        max_statistics(data_type, iter, self.physical_type)
1489    }
1490
1491    /// Extract the `is_max_value_exact` flags from row group statistics in [`RowGroupMetaData`]
1492    ///
1493    /// See docs on [`Self::row_group_maxes`] for details
1494    pub fn row_group_is_max_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1495    where
1496        I: IntoIterator<Item = &'a RowGroupMetaData>,
1497    {
1498        let Some(parquet_index) = self.parquet_column_index else {
1499            let num_row_groups = metadatas.into_iter().count();
1500            return Ok(BooleanArray::from_iter(
1501                std::iter::repeat(None).take(num_row_groups),
1502            ));
1503        };
1504
1505        let is_max_value_exact = metadatas
1506            .into_iter()
1507            .map(|x| x.column(parquet_index).statistics())
1508            .map(|s| s.map(|s| s.max_is_exact()));
1509        Ok(BooleanArray::from_iter(is_max_value_exact))
1510    }
1511
1512    /// Extract the `is_min_value_exact` flags from row group statistics in [`RowGroupMetaData`]
1513    ///
1514    /// See docs on [`Self::row_group_mins`] for details
1515    pub fn row_group_is_min_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1516    where
1517        I: IntoIterator<Item = &'a RowGroupMetaData>,
1518    {
1519        let Some(parquet_index) = self.parquet_column_index else {
1520            let num_row_groups = metadatas.into_iter().count();
1521            return Ok(BooleanArray::from_iter(
1522                std::iter::repeat(None).take(num_row_groups),
1523            ));
1524        };
1525
1526        let is_min_value_exact = metadatas
1527            .into_iter()
1528            .map(|x| x.column(parquet_index).statistics())
1529            .map(|s| s.map(|s| s.min_is_exact()));
1530        Ok(BooleanArray::from_iter(is_min_value_exact))
1531    }
1532
1533    /// Extract the null counts from row group statistics in [`RowGroupMetaData`]
1534    ///
1535    /// See docs on [`Self::row_group_mins`] for details
1536    pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
1537    where
1538        I: IntoIterator<Item = &'a RowGroupMetaData>,
1539    {
1540        let Some(parquet_index) = self.parquet_column_index else {
1541            let num_row_groups = metadatas.into_iter().count();
1542            return Ok(UInt64Array::from_iter(
1543                std::iter::repeat(None).take(num_row_groups),
1544            ));
1545        };
1546
1547        let null_counts = metadatas
1548            .into_iter()
1549            .map(|x| x.column(parquet_index).statistics())
1550            .map(|s| {
1551                s.and_then(|s| {
1552                    if self.missing_null_counts_as_zero {
1553                        Some(s.null_count_opt().unwrap_or(0))
1554                    } else {
1555                        s.null_count_opt()
1556                    }
1557                })
1558            });
1559        Ok(UInt64Array::from_iter(null_counts))
1560    }
1561
1562    /// Extract the minimum values from Data Page statistics.
1563    ///
1564    /// In Parquet files, in addition to the Column Chunk level statistics
1565    /// (stored for each column for each row group) there are also
1566    /// optional statistics stored for each data page, as part of
1567    /// the [`ParquetColumnIndex`].
1568    ///
1569    /// Since a single Column Chunk is stored as one or more pages,
1570    /// page level statistics can prune at a finer granularity.
1571    ///
1572    /// However since they are stored in a separate metadata
1573    /// structure ([`Index`]) there is different code to extract them as
1574    /// compared to arrow statistics.
1575    ///
1576    /// # Parameters:
1577    ///
1578    /// * `column_page_index`: The parquet column page indices, read from
1579    ///   `ParquetMetaData` column_index
1580    ///
1581    /// * `column_offset_index`: The parquet column offset indices, read from
1582    ///   `ParquetMetaData` offset_index
1583    ///
1584    /// * `row_group_indices`: The indices of the row groups, that are used to
1585    ///   extract the column page index and offset index on a per row group
1586    ///   per column basis.
1587    ///
1588    /// # Return Value
1589    ///
1590    /// The returned array contains 1 value for each `NativeIndex`
1591    /// in the underlying `Index`es, in the same order as they appear
1592    /// in `metadatas`.
1593    ///
1594    /// For example, if there are two `Index`es in `metadatas`:
1595    /// 1. the first having `3` `PageIndex` entries
1596    /// 2. the second having `2` `PageIndex` entries
1597    ///
1598    /// The returned array would have 5 rows.
1599    ///
1600    /// Each value is either:
1601    /// * the minimum value for the page
1602    /// * a null value, if the statistics can not be extracted
1603    ///
1604    /// Note that a null value does NOT mean the min value was actually
1605    /// `null` it means it the requested statistic is unknown
1606    ///
1607    /// # Errors
1608    ///
1609    /// Reasons for not being able to extract the statistics include:
1610    /// * the column is not present in the parquet file
1611    /// * statistics for the pages are not present in the row group
1612    /// * the stored statistic value can not be converted to the requested type
1613    pub fn data_page_mins<I>(
1614        &self,
1615        column_page_index: &ParquetColumnIndex,
1616        column_offset_index: &ParquetOffsetIndex,
1617        row_group_indices: I,
1618    ) -> Result<ArrayRef>
1619    where
1620        I: IntoIterator<Item = &'a usize>,
1621    {
1622        let data_type = self.arrow_field.data_type();
1623
1624        let Some(parquet_index) = self.parquet_column_index else {
1625            return Ok(self.make_null_array(data_type, row_group_indices));
1626        };
1627
1628        let iter = row_group_indices.into_iter().map(|rg_index| {
1629            let column_page_index_per_row_group_per_column =
1630                &column_page_index[*rg_index][parquet_index];
1631            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1632                .page_locations()
1633                .len();
1634
1635            (*num_data_pages, column_page_index_per_row_group_per_column)
1636        });
1637
1638        min_page_statistics(data_type, iter, self.physical_type)
1639    }
1640
1641    /// Extract the maximum values from Data Page statistics.
1642    ///
1643    /// See docs on [`Self::data_page_mins`] for details.
1644    pub fn data_page_maxes<I>(
1645        &self,
1646        column_page_index: &ParquetColumnIndex,
1647        column_offset_index: &ParquetOffsetIndex,
1648        row_group_indices: I,
1649    ) -> Result<ArrayRef>
1650    where
1651        I: IntoIterator<Item = &'a usize>,
1652    {
1653        let data_type = self.arrow_field.data_type();
1654
1655        let Some(parquet_index) = self.parquet_column_index else {
1656            return Ok(self.make_null_array(data_type, row_group_indices));
1657        };
1658
1659        let iter = row_group_indices.into_iter().map(|rg_index| {
1660            let column_page_index_per_row_group_per_column =
1661                &column_page_index[*rg_index][parquet_index];
1662            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1663                .page_locations()
1664                .len();
1665
1666            (*num_data_pages, column_page_index_per_row_group_per_column)
1667        });
1668
1669        max_page_statistics(data_type, iter, self.physical_type)
1670    }
1671
1672    /// Returns a [`UInt64Array`] with null counts for each data page.
1673    ///
1674    /// See docs on [`Self::data_page_mins`] for details.
1675    pub fn data_page_null_counts<I>(
1676        &self,
1677        column_page_index: &ParquetColumnIndex,
1678        column_offset_index: &ParquetOffsetIndex,
1679        row_group_indices: I,
1680    ) -> Result<UInt64Array>
1681    where
1682        I: IntoIterator<Item = &'a usize>,
1683    {
1684        let Some(parquet_index) = self.parquet_column_index else {
1685            let num_row_groups = row_group_indices.into_iter().count();
1686            return Ok(UInt64Array::from_iter(
1687                std::iter::repeat(None).take(num_row_groups),
1688            ));
1689        };
1690
1691        let iter = row_group_indices.into_iter().map(|rg_index| {
1692            let column_page_index_per_row_group_per_column =
1693                &column_page_index[*rg_index][parquet_index];
1694            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1695                .page_locations()
1696                .len();
1697
1698            (*num_data_pages, column_page_index_per_row_group_per_column)
1699        });
1700        null_counts_page_statistics(iter)
1701    }
1702
1703    /// Returns a [`UInt64Array`] with row counts for each data page.
1704    ///
1705    /// This function iterates over the given row group indexes and computes
1706    /// the row count for each page in the specified column.
1707    ///
1708    /// # Parameters:
1709    ///
1710    /// * `column_offset_index`: The parquet column offset indices, read from
1711    ///   `ParquetMetaData` offset_index
1712    ///
1713    /// * `row_group_metadatas`: The metadata slice of the row groups, read
1714    ///   from `ParquetMetaData` row_groups
1715    ///
1716    /// * `row_group_indices`: The indices of the row groups, that are used to
1717    ///   extract the column offset index on a per row group per column basis.
1718    ///
1719    /// See docs on [`Self::data_page_mins`] for details.
1720    pub fn data_page_row_counts<I>(
1721        &self,
1722        column_offset_index: &ParquetOffsetIndex,
1723        row_group_metadatas: &'a [RowGroupMetaData],
1724        row_group_indices: I,
1725    ) -> Result<Option<UInt64Array>>
1726    where
1727        I: IntoIterator<Item = &'a usize>,
1728    {
1729        let Some(parquet_index) = self.parquet_column_index else {
1730            // no matching column found in parquet_index;
1731            // thus we cannot extract page_locations in order to determine
1732            // the row count on a per DataPage basis.
1733            return Ok(None);
1734        };
1735
1736        let mut row_count_total = Vec::new();
1737        for rg_idx in row_group_indices {
1738            let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
1739
1740            let row_count_per_page = page_locations
1741                .windows(2)
1742                .map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
1743
1744            // append the last page row count
1745            let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
1746            let row_count_per_page = row_count_per_page
1747                .chain(std::iter::once(Some(
1748                    *num_rows_in_row_group as u64
1749                        - page_locations.last().unwrap().first_row_index as u64,
1750                )))
1751                .collect::<Vec<_>>();
1752
1753            row_count_total.extend(row_count_per_page);
1754        }
1755
1756        Ok(Some(UInt64Array::from_iter(row_count_total)))
1757    }
1758
1759    /// Returns a null array of data_type with one element per row group
1760    fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
1761    where
1762        I: IntoIterator<Item = A>,
1763    {
1764        // column was in the arrow schema but not in the parquet schema, so return a null array
1765        let num_row_groups = metadatas.into_iter().count();
1766        new_null_array(data_type, num_row_groups)
1767    }
1768}
1769
1770// See tests in parquet/tests/arrow_reader/statistics.rs