parquet/arrow/arrow_reader/
statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`].
19
20/// Notice that all the corresponding tests are in
21/// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`.
22use crate::arrow::buffer::bit_util::sign_extend_be;
23use crate::arrow::parquet_column;
24use crate::basic::Type as PhysicalType;
25use crate::data_type::{ByteArray, FixedLenByteArray};
26use crate::errors::{ParquetError, Result};
27use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
28use crate::file::page_index::column_index::{ColumnIndexIterators, ColumnIndexMetaData};
29use crate::file::statistics::Statistics as ParquetStatistics;
30use crate::schema::types::SchemaDescriptor;
31use arrow_array::builder::{
32    BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
33    StringViewBuilder,
34};
35use arrow_array::{
36    ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, Decimal64Array,
37    Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int8Array,
38    Int16Array, Int32Array, Int64Array, LargeBinaryArray, Time32MillisecondArray,
39    Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
40    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt8Array,
41    UInt16Array, UInt32Array, UInt64Array, new_empty_array, new_null_array,
42};
43use arrow_buffer::i256;
44use arrow_schema::{DataType, Field, Schema, TimeUnit};
45use half::f16;
46use paste::paste;
47use std::sync::Arc;
48
49// Convert the bytes array to i32.
50// The endian of the input bytes array must be big-endian.
51pub(crate) fn from_bytes_to_i32(b: &[u8]) -> i32 {
52    // The bytes array are from parquet file and must be the big-endian.
53    // The endian is defined by parquet format, and the reference document
54    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
55    i32::from_be_bytes(sign_extend_be::<4>(b))
56}
57
58// Convert the bytes array to i64.
59// The endian of the input bytes array must be big-endian.
60pub(crate) fn from_bytes_to_i64(b: &[u8]) -> i64 {
61    i64::from_be_bytes(sign_extend_be::<8>(b))
62}
63
64// Convert the bytes array to i128.
65// The endian of the input bytes array must be big-endian.
66pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
67    i128::from_be_bytes(sign_extend_be::<16>(b))
68}
69
70// Convert the bytes array to i256.
71// The endian of the input bytes array must be big-endian.
72pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
73    i256::from_be_bytes(sign_extend_be::<32>(b))
74}
75
76// Convert the bytes array to f16
77pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
78    match b {
79        [low, high] => Some(f16::from_be_bytes([*high, *low])),
80        _ => None,
81    }
82}
83
84/// Define an adapter iterator for extracting statistics from an iterator of
85/// `ParquetStatistics`
86///
87///
88/// Handles checking if the statistics are present and valid with the correct type.
89///
90/// Parameters:
91/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`)
92/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
93/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`)
94/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`)
95macro_rules! make_stats_iterator {
96    ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
97        /// Maps an iterator of `ParquetStatistics` into an iterator of
98        /// `&$stat_value_type``
99        ///
100        /// Yielded elements:
101        /// * Some(stats) if valid
102        /// * None if the statistics are not present, not valid, or not $stat_value_type
103        struct $iterator_type<'a, I>
104        where
105            I: Iterator<Item = Option<&'a ParquetStatistics>>,
106        {
107            iter: I,
108        }
109
110        impl<'a, I> $iterator_type<'a, I>
111        where
112            I: Iterator<Item = Option<&'a ParquetStatistics>>,
113        {
114            /// Create a new iterator to extract the statistics
115            fn new(iter: I) -> Self {
116                Self { iter }
117            }
118        }
119
120        /// Implement the Iterator trait for the iterator
121        impl<'a, I> Iterator for $iterator_type<'a, I>
122        where
123            I: Iterator<Item = Option<&'a ParquetStatistics>>,
124        {
125            type Item = Option<&'a $stat_value_type>;
126
127            /// return the next statistics value
128            fn next(&mut self) -> Option<Self::Item> {
129                let next = self.iter.next();
130                next.map(|x| {
131                    x.and_then(|stats| match stats {
132                        $parquet_statistics_type(s) => s.$func(),
133                        _ => None,
134                    })
135                })
136            }
137
138            fn size_hint(&self) -> (usize, Option<usize>) {
139                self.iter.size_hint()
140            }
141        }
142    };
143}
144
145make_stats_iterator!(
146    MinBooleanStatsIterator,
147    min_opt,
148    ParquetStatistics::Boolean,
149    bool
150);
151make_stats_iterator!(
152    MaxBooleanStatsIterator,
153    max_opt,
154    ParquetStatistics::Boolean,
155    bool
156);
157make_stats_iterator!(
158    MinInt32StatsIterator,
159    min_opt,
160    ParquetStatistics::Int32,
161    i32
162);
163make_stats_iterator!(
164    MaxInt32StatsIterator,
165    max_opt,
166    ParquetStatistics::Int32,
167    i32
168);
169make_stats_iterator!(
170    MinInt64StatsIterator,
171    min_opt,
172    ParquetStatistics::Int64,
173    i64
174);
175make_stats_iterator!(
176    MaxInt64StatsIterator,
177    max_opt,
178    ParquetStatistics::Int64,
179    i64
180);
181make_stats_iterator!(
182    MinFloatStatsIterator,
183    min_opt,
184    ParquetStatistics::Float,
185    f32
186);
187make_stats_iterator!(
188    MaxFloatStatsIterator,
189    max_opt,
190    ParquetStatistics::Float,
191    f32
192);
193make_stats_iterator!(
194    MinDoubleStatsIterator,
195    min_opt,
196    ParquetStatistics::Double,
197    f64
198);
199make_stats_iterator!(
200    MaxDoubleStatsIterator,
201    max_opt,
202    ParquetStatistics::Double,
203    f64
204);
205make_stats_iterator!(
206    MinByteArrayStatsIterator,
207    min_bytes_opt,
208    ParquetStatistics::ByteArray,
209    [u8]
210);
211make_stats_iterator!(
212    MaxByteArrayStatsIterator,
213    max_bytes_opt,
214    ParquetStatistics::ByteArray,
215    [u8]
216);
217make_stats_iterator!(
218    MinFixedLenByteArrayStatsIterator,
219    min_bytes_opt,
220    ParquetStatistics::FixedLenByteArray,
221    [u8]
222);
223make_stats_iterator!(
224    MaxFixedLenByteArrayStatsIterator,
225    max_bytes_opt,
226    ParquetStatistics::FixedLenByteArray,
227    [u8]
228);
229
230/// Special iterator adapter for extracting i128 values from from an iterator of
231/// `ParquetStatistics`
232///
233/// Handles checking if the statistics are present and valid with the correct type.
234///
235/// Depending on the parquet file, the statistics for `Decimal128` can be stored as
236/// `Int32`, `Int64` or `ByteArray` or `FixedSizeByteArray` :mindblown:
237///
238/// This iterator handles all cases, extracting the values
239/// and converting it to `stat_value_type`.
240///
241/// Parameters:
242/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`)
243/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
244/// * `$bytes_func` is the function to call to get the value as bytes (e.g. `min_bytes` or `max_bytes`)
245/// * `$stat_value_type` is the type of the statistics value (e.g. `i128`)
246/// * `convert_func` is the function to convert the bytes to stats value (e.g. `from_bytes_to_i128`)
247macro_rules! make_decimal_stats_iterator {
248    ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
249        struct $iterator_type<'a, I>
250        where
251            I: Iterator<Item = Option<&'a ParquetStatistics>>,
252        {
253            iter: I,
254        }
255
256        impl<'a, I> $iterator_type<'a, I>
257        where
258            I: Iterator<Item = Option<&'a ParquetStatistics>>,
259        {
260            fn new(iter: I) -> Self {
261                Self { iter }
262            }
263        }
264
265        impl<'a, I> Iterator for $iterator_type<'a, I>
266        where
267            I: Iterator<Item = Option<&'a ParquetStatistics>>,
268        {
269            type Item = Option<$stat_value_type>;
270
271            fn next(&mut self) -> Option<Self::Item> {
272                let next = self.iter.next();
273                next.map(|x| {
274                    x.and_then(|stats| match stats {
275                        ParquetStatistics::Int32(s) => {
276                            s.$func().map(|x| $stat_value_type::from(*x))
277                        }
278                        ParquetStatistics::Int64(s) => s
279                            .$func()
280                            .map(|x| $stat_value_type::try_from(*x).ok())
281                            .flatten(),
282                        ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
283                        ParquetStatistics::FixedLenByteArray(s) => {
284                            s.$bytes_func().map($convert_func)
285                        }
286                        _ => None,
287                    })
288                })
289            }
290
291            fn size_hint(&self) -> (usize, Option<usize>) {
292                self.iter.size_hint()
293            }
294        }
295    };
296}
297
298make_decimal_stats_iterator!(
299    MinDecimal32StatsIterator,
300    min_opt,
301    min_bytes_opt,
302    i32,
303    from_bytes_to_i32
304);
305make_decimal_stats_iterator!(
306    MaxDecimal32StatsIterator,
307    max_opt,
308    max_bytes_opt,
309    i32,
310    from_bytes_to_i32
311);
312make_decimal_stats_iterator!(
313    MinDecimal64StatsIterator,
314    min_opt,
315    min_bytes_opt,
316    i64,
317    from_bytes_to_i64
318);
319make_decimal_stats_iterator!(
320    MaxDecimal64StatsIterator,
321    max_opt,
322    max_bytes_opt,
323    i64,
324    from_bytes_to_i64
325);
326make_decimal_stats_iterator!(
327    MinDecimal128StatsIterator,
328    min_opt,
329    min_bytes_opt,
330    i128,
331    from_bytes_to_i128
332);
333make_decimal_stats_iterator!(
334    MaxDecimal128StatsIterator,
335    max_opt,
336    max_bytes_opt,
337    i128,
338    from_bytes_to_i128
339);
340make_decimal_stats_iterator!(
341    MinDecimal256StatsIterator,
342    min_opt,
343    min_bytes_opt,
344    i256,
345    from_bytes_to_i256
346);
347make_decimal_stats_iterator!(
348    MaxDecimal256StatsIterator,
349    max_opt,
350    max_bytes_opt,
351    i256,
352    from_bytes_to_i256
353);
354
355/// Special macro to combine the statistics iterators for min and max using the [`mod@paste`] macro.
356/// This is used to avoid repeating the same code for min and max statistics extractions
357///
358/// Parameters:
359/// stat_type_prefix: The prefix of the statistics iterator type (e.g. `Min` or `Max`)
360/// data_type: The data type of the statistics (e.g. `DataType::Int32`)
361/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from.
362macro_rules! get_statistics {
363    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
364        paste! {
365        match $data_type {
366            DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
367                [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
368            ))),
369            DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
370                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
371                    x.and_then(|x| i8::try_from(*x).ok())
372                }),
373            ))),
374            DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
375                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
376                    x.and_then(|x| i16::try_from(*x).ok())
377                }),
378            ))),
379            DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
380                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
381            ))),
382            DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
383                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
384            ))),
385            DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
386                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
387                    x.and_then(|x| u8::try_from(*x).ok())
388                }),
389            ))),
390            DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
391                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
392                    x.and_then(|x| u16::try_from(*x).ok())
393                }),
394            ))),
395            DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
396                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
397            ))),
398            DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
399                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
400            ))),
401            DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
402                [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
403                    from_bytes_to_f16(x)
404                })),
405            ))),
406            DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
407                [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
408            ))),
409            DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
410                [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
411            ))),
412            DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
413                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
414            ))),
415            DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
416                [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
417                    .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
418            DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
419                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
420            DataType::Timestamp(unit, timezone) =>{
421                let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
422                Ok(match unit {
423                    TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
424                    TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
425                    TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
426                    TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
427                })
428            },
429            DataType::Time32(unit) => {
430                Ok(match unit {
431                    TimeUnit::Second =>  Arc::new(Time32SecondArray::from_iter(
432                        [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
433                    )),
434                    TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
435                        [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
436                    )),
437                    _ => {
438                        let len = $iterator.count();
439                        // don't know how to extract statistics, so return a null array
440                        new_null_array($data_type, len)
441                    }
442                })
443            },
444            DataType::Time64(unit) => {
445                Ok(match unit {
446                    TimeUnit::Microsecond =>  Arc::new(Time64MicrosecondArray::from_iter(
447                        [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
448                    )),
449                    TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
450                        [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
451                    )),
452                    _ => {
453                        let len = $iterator.count();
454                        // don't know how to extract statistics, so return a null array
455                        new_null_array($data_type, len)
456                    }
457                })
458            },
459            DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
460                [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
461            ))),
462            DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
463                [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
464            ))),
465            DataType::Utf8 => {
466                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
467                let mut builder = StringBuilder::new();
468                for x in iterator {
469                    let Some(x) = x else {
470                        builder.append_null(); // no statistics value
471                        continue;
472                    };
473
474                    let Ok(x) = std::str::from_utf8(x) else {
475                        builder.append_null();
476                        continue;
477                    };
478
479                    builder.append_value(x);
480                }
481                Ok(Arc::new(builder.finish()))
482            },
483            DataType::LargeUtf8 => {
484                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
485                let mut builder = LargeStringBuilder::new();
486                for x in iterator {
487                    let Some(x) = x else {
488                        builder.append_null(); // no statistics value
489                        continue;
490                    };
491
492                    let Ok(x) = std::str::from_utf8(x) else {
493                        builder.append_null();
494                        continue;
495                    };
496
497                    builder.append_value(x);
498                }
499                Ok(Arc::new(builder.finish()))
500            },
501            DataType::FixedSizeBinary(size) => {
502                let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
503                let mut builder = FixedSizeBinaryBuilder::new(*size);
504                for x in iterator {
505                    let Some(x) = x else {
506                        builder.append_null(); // no statistics value
507                        continue;
508                    };
509
510                    // ignore invalid values
511                    if x.len().try_into() != Ok(*size){
512                        builder.append_null();
513                        continue;
514                    }
515
516                    builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
517                }
518                Ok(Arc::new(builder.finish()))
519            },
520            DataType::Decimal32(precision, scale) => {
521                let arr = Decimal32Array::from_iter(
522                    [<$stat_type_prefix Decimal32StatsIterator>]::new($iterator)
523                ).with_precision_and_scale(*precision, *scale)?;
524                Ok(Arc::new(arr))
525            },
526            DataType::Decimal64(precision, scale) => {
527                let arr = Decimal64Array::from_iter(
528                    [<$stat_type_prefix Decimal64StatsIterator>]::new($iterator)
529                ).with_precision_and_scale(*precision, *scale)?;
530                Ok(Arc::new(arr))
531            },
532            DataType::Decimal128(precision, scale) => {
533                let arr = Decimal128Array::from_iter(
534                    [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
535                ).with_precision_and_scale(*precision, *scale)?;
536                Ok(Arc::new(arr))
537            },
538            DataType::Decimal256(precision, scale) => {
539                let arr = Decimal256Array::from_iter(
540                    [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
541                ).with_precision_and_scale(*precision, *scale)?;
542                Ok(Arc::new(arr))
543            },
544            DataType::Dictionary(_, value_type) => {
545                [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
546            },
547            DataType::Utf8View => {
548                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
549                let mut builder = StringViewBuilder::new();
550                for x in iterator {
551                    let Some(x) = x else {
552                        builder.append_null(); // no statistics value
553                        continue;
554                    };
555
556                    let Ok(x) = std::str::from_utf8(x) else {
557                        builder.append_null();
558                        continue;
559                    };
560
561                    builder.append_value(x);
562                }
563                Ok(Arc::new(builder.finish()))
564            },
565            DataType::BinaryView => {
566                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
567                let mut builder = BinaryViewBuilder::new();
568                for x in iterator {
569                    let Some(x) = x else {
570                        builder.append_null(); // no statistics value
571                        continue;
572                    };
573
574                    builder.append_value(x);
575                }
576                Ok(Arc::new(builder.finish()))
577            }
578
579            DataType::Map(_,_) |
580            DataType::Duration(_) |
581            DataType::Interval(_) |
582            DataType::Date64 |  // required to cover $physical_type match guard
583            DataType::Null |
584            DataType::List(_) |
585            DataType::ListView(_) |
586            DataType::FixedSizeList(_, _) |
587            DataType::LargeList(_) |
588            DataType::LargeListView(_) |
589            DataType::Struct(_) |
590            DataType::Union(_, _) |
591            DataType::RunEndEncoded(_, _) => {
592                let len = $iterator.count();
593                // don't know how to extract statistics, so return a null array
594                Ok(new_null_array($data_type, len))
595            }
596        }}}
597}
598
599macro_rules! make_data_page_stats_iterator {
600    ($iterator_type: ident, $func: ident, $stat_value_type: ty) => {
601        struct $iterator_type<'a, I>
602        where
603            I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
604        {
605            iter: I,
606        }
607
608        impl<'a, I> $iterator_type<'a, I>
609        where
610            I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
611        {
612            fn new(iter: I) -> Self {
613                Self { iter }
614            }
615        }
616
617        impl<'a, I> Iterator for $iterator_type<'a, I>
618        where
619            I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
620        {
621            type Item = Vec<Option<$stat_value_type>>;
622
623            fn next(&mut self) -> Option<Self::Item> {
624                let next = self.iter.next();
625                match next {
626                    Some((len, index)) => match index {
627                        // No matching `Index` found;
628                        // thus no statistics that can be extracted.
629                        // We return vec![None; len] to effectively
630                        // create an arrow null-array with the length
631                        // corresponding to the number of entries in
632                        // `ParquetOffsetIndex` per row group per column.
633                        ColumnIndexMetaData::NONE => Some(vec![None; len]),
634                        _ => Some(<$stat_value_type>::$func(&index).collect::<Vec<_>>()),
635                    },
636                    _ => None,
637                }
638            }
639
640            fn size_hint(&self) -> (usize, Option<usize>) {
641                self.iter.size_hint()
642            }
643        }
644    };
645}
646
647make_data_page_stats_iterator!(MinBooleanDataPageStatsIterator, min_values_iter, bool);
648make_data_page_stats_iterator!(MaxBooleanDataPageStatsIterator, max_values_iter, bool);
649make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min_values_iter, i32);
650make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max_values_iter, i32);
651make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min_values_iter, i64);
652make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max_values_iter, i64);
653make_data_page_stats_iterator!(
654    MinFloat16DataPageStatsIterator,
655    min_values_iter,
656    FixedLenByteArray
657);
658make_data_page_stats_iterator!(
659    MaxFloat16DataPageStatsIterator,
660    max_values_iter,
661    FixedLenByteArray
662);
663make_data_page_stats_iterator!(MinFloat32DataPageStatsIterator, min_values_iter, f32);
664make_data_page_stats_iterator!(MaxFloat32DataPageStatsIterator, max_values_iter, f32);
665make_data_page_stats_iterator!(MinFloat64DataPageStatsIterator, min_values_iter, f64);
666make_data_page_stats_iterator!(MaxFloat64DataPageStatsIterator, max_values_iter, f64);
667make_data_page_stats_iterator!(
668    MinByteArrayDataPageStatsIterator,
669    min_values_iter,
670    ByteArray
671);
672make_data_page_stats_iterator!(
673    MaxByteArrayDataPageStatsIterator,
674    max_values_iter,
675    ByteArray
676);
677make_data_page_stats_iterator!(
678    MaxFixedLenByteArrayDataPageStatsIterator,
679    max_values_iter,
680    FixedLenByteArray
681);
682
683make_data_page_stats_iterator!(
684    MinFixedLenByteArrayDataPageStatsIterator,
685    min_values_iter,
686    FixedLenByteArray
687);
688
689macro_rules! get_decimal_page_stats_iterator {
690    ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
691        struct $iterator_type<'a, I>
692        where
693            I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
694        {
695            iter: I,
696        }
697
698        impl<'a, I> $iterator_type<'a, I>
699        where
700            I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
701        {
702            fn new(iter: I) -> Self {
703                Self { iter }
704            }
705        }
706
707        impl<'a, I> Iterator for $iterator_type<'a, I>
708        where
709            I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
710        {
711            type Item = Vec<Option<$stat_value_type>>;
712
713            // Some(native_index.$func().map(|v| v.map($conv)).collect::<Vec<_>>())
714            fn next(&mut self) -> Option<Self::Item> {
715                let next = self.iter.next();
716                match next {
717                    Some((len, index)) => match index {
718                        ColumnIndexMetaData::INT32(native_index) => Some(
719                            native_index
720                                .$func()
721                                .map(|x| x.map(|x| $stat_value_type::from(*x)))
722                                .collect::<Vec<_>>(),
723                        ),
724                        ColumnIndexMetaData::INT64(native_index) => Some(
725                            native_index
726                                .$func()
727                                .map(|x| x.map(|x| $stat_value_type::try_from(*x).unwrap()))
728                                .collect::<Vec<_>>(),
729                        ),
730                        ColumnIndexMetaData::BYTE_ARRAY(native_index) => Some(
731                            native_index
732                                .$func()
733                                .map(|x| x.map(|x| $convert_func(x)))
734                                .collect::<Vec<_>>(),
735                        ),
736                        ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
737                            native_index
738                                .$func()
739                                .map(|x| x.map(|x| $convert_func(x)))
740                                .collect::<Vec<_>>(),
741                        ),
742                        _ => Some(vec![None; len]),
743                    },
744                    _ => None,
745                }
746            }
747
748            fn size_hint(&self) -> (usize, Option<usize>) {
749                self.iter.size_hint()
750            }
751        }
752    };
753}
754
755get_decimal_page_stats_iterator!(
756    MinDecimal32DataPageStatsIterator,
757    min_values_iter,
758    i32,
759    from_bytes_to_i32
760);
761
762get_decimal_page_stats_iterator!(
763    MaxDecimal32DataPageStatsIterator,
764    max_values_iter,
765    i32,
766    from_bytes_to_i32
767);
768
769get_decimal_page_stats_iterator!(
770    MinDecimal64DataPageStatsIterator,
771    min_values_iter,
772    i64,
773    from_bytes_to_i64
774);
775
776get_decimal_page_stats_iterator!(
777    MaxDecimal64DataPageStatsIterator,
778    max_values_iter,
779    i64,
780    from_bytes_to_i64
781);
782
783get_decimal_page_stats_iterator!(
784    MinDecimal128DataPageStatsIterator,
785    min_values_iter,
786    i128,
787    from_bytes_to_i128
788);
789
790get_decimal_page_stats_iterator!(
791    MaxDecimal128DataPageStatsIterator,
792    max_values_iter,
793    i128,
794    from_bytes_to_i128
795);
796
797get_decimal_page_stats_iterator!(
798    MinDecimal256DataPageStatsIterator,
799    min_values_iter,
800    i256,
801    from_bytes_to_i256
802);
803
804get_decimal_page_stats_iterator!(
805    MaxDecimal256DataPageStatsIterator,
806    max_values_iter,
807    i256,
808    from_bytes_to_i256
809);
810
811macro_rules! get_data_page_statistics {
812    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
813        paste! {
814            match $data_type {
815                DataType::Boolean => {
816                    let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
817                    let mut builder = BooleanBuilder::new();
818                    for x in iterator {
819                        for x in x.into_iter() {
820                            let Some(x) = x else {
821                                builder.append_null(); // no statistics value
822                                continue;
823                            };
824                            builder.append_value(x);
825                        }
826                    }
827                    Ok(Arc::new(builder.finish()))
828                },
829                DataType::UInt8 => Ok(Arc::new(
830                    UInt8Array::from_iter(
831                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
832                            .map(|x| {
833                                x.into_iter().map(|x| {
834                                    x.and_then(|x| u8::try_from(x).ok())
835                                })
836                            })
837                            .flatten()
838                    )
839                )),
840                DataType::UInt16 => Ok(Arc::new(
841                    UInt16Array::from_iter(
842                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
843                            .map(|x| {
844                                x.into_iter().map(|x| {
845                                    x.and_then(|x| u16::try_from(x).ok())
846                                })
847                            })
848                            .flatten()
849                    )
850                )),
851                DataType::UInt32 => Ok(Arc::new(
852                    UInt32Array::from_iter(
853                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
854                            .map(|x| {
855                                x.into_iter().map(|x| {
856                                    x.and_then(|x| Some(x as u32))
857                                })
858                            })
859                            .flatten()
860                ))),
861                DataType::UInt64 => Ok(Arc::new(
862                    UInt64Array::from_iter(
863                        [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator)
864                            .map(|x| {
865                                x.into_iter().map(|x| {
866                                    x.and_then(|x| Some(x as u64))
867                                })
868                            })
869                            .flatten()
870                ))),
871                DataType::Int8 => Ok(Arc::new(
872                    Int8Array::from_iter(
873                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
874                            .map(|x| {
875                                x.into_iter().map(|x| {
876                                    x.and_then(|x| i8::try_from(x).ok())
877                                })
878                            })
879                            .flatten()
880                    )
881                )),
882                DataType::Int16 => Ok(Arc::new(
883                    Int16Array::from_iter(
884                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
885                            .map(|x| {
886                                x.into_iter().map(|x| {
887                                    x.and_then(|x| i16::try_from(x).ok())
888                                })
889                            })
890                            .flatten()
891                    )
892                )),
893                DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
894                DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
895                DataType::Float16 => Ok(Arc::new(
896                    Float16Array::from_iter(
897                        [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
898                            .map(|x| {
899                                x.into_iter().map(|x| {
900                                    x.and_then(|x| from_bytes_to_f16(x.data()))
901                                })
902                            })
903                            .flatten()
904                    )
905                )),
906                DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
907                DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
908                DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
909                DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
910                DataType::Utf8 => {
911                    let mut builder = StringBuilder::new();
912                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
913                    for x in iterator {
914                        for x in x.into_iter() {
915                            let Some(x) = x else {
916                                builder.append_null(); // no statistics value
917                                continue;
918                            };
919
920                            let Ok(x) = std::str::from_utf8(x.data()) else {
921                                builder.append_null();
922                                continue;
923                            };
924
925                            builder.append_value(x);
926                        }
927                    }
928                    Ok(Arc::new(builder.finish()))
929                },
930                DataType::LargeUtf8 => {
931                    let mut builder = LargeStringBuilder::new();
932                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
933                    for x in iterator {
934                        for x in x.into_iter() {
935                            let Some(x) = x else {
936                                builder.append_null(); // no statistics value
937                                continue;
938                            };
939
940                            let Ok(x) = std::str::from_utf8(x.data()) else {
941                                builder.append_null();
942                                continue;
943                            };
944
945                            builder.append_value(x);
946                        }
947                    }
948                    Ok(Arc::new(builder.finish()))
949                },
950                DataType::Dictionary(_, value_type) => {
951                    [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type)
952                },
953                DataType::Timestamp(unit, timezone) => {
954                    let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
955                    Ok(match unit {
956                        TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
957                        TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
958                        TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
959                        TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
960                    })
961                },
962                DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
963                DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok(
964                    Arc::new(
965                        Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
966                            .map(|x| {
967                                x.into_iter()
968                                .map(|x| {
969                                    x.and_then(|x| i64::try_from(x).ok())
970                                })
971                                .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
972                            }).flatten()
973                        )
974                    )
975                ),
976                DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
977                DataType::Decimal32(precision, scale) => Ok(Arc::new(
978                    Decimal32Array::from_iter([<$stat_type_prefix Decimal32DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
979                DataType::Decimal64(precision, scale) => Ok(Arc::new(
980                    Decimal64Array::from_iter([<$stat_type_prefix Decimal64DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
981                DataType::Decimal128(precision, scale) => Ok(Arc::new(
982                    Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
983                DataType::Decimal256(precision, scale) => Ok(Arc::new(
984                    Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
985                DataType::Time32(unit) => {
986                    Ok(match unit {
987                        TimeUnit::Second =>  Arc::new(Time32SecondArray::from_iter(
988                            [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
989                        )),
990                        TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
991                            [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
992                        )),
993                        _ => {
994                            // don't know how to extract statistics, so return an empty array
995                            new_empty_array(&DataType::Time32(unit.clone()))
996                        }
997                    })
998                }
999                DataType::Time64(unit) => {
1000                    Ok(match unit {
1001                        TimeUnit::Microsecond =>  Arc::new(Time64MicrosecondArray::from_iter(
1002                            [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
1003                        )),
1004                        TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
1005                            [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
1006                        )),
1007                        _ => {
1008                            // don't know how to extract statistics, so return an empty array
1009                            new_empty_array(&DataType::Time64(unit.clone()))
1010                        }
1011                    })
1012                },
1013                DataType::FixedSizeBinary(size) => {
1014                    let mut builder = FixedSizeBinaryBuilder::new(*size);
1015                    let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
1016                    for x in iterator {
1017                        for x in x.into_iter() {
1018                            let Some(x) = x else {
1019                                builder.append_null(); // no statistics value
1020                                continue;
1021                            };
1022
1023                            if x.len() == *size as usize {
1024                                let _ = builder.append_value(x.data());
1025                            } else {
1026                                builder.append_null();
1027                            }
1028                        }
1029                    }
1030                    Ok(Arc::new(builder.finish()))
1031                },
1032                DataType::Utf8View => {
1033                    let mut builder = StringViewBuilder::new();
1034                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1035                    for x in iterator {
1036                        for x in x.into_iter() {
1037                            let Some(x) = x else {
1038                                builder.append_null(); // no statistics value
1039                                continue;
1040                            };
1041
1042                            let Ok(x) = std::str::from_utf8(x.data()) else {
1043                                builder.append_null();
1044                                continue;
1045                            };
1046
1047                            builder.append_value(x);
1048                        }
1049                    }
1050                    Ok(Arc::new(builder.finish()))
1051                },
1052                DataType::BinaryView => {
1053                    let mut builder = BinaryViewBuilder::new();
1054                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1055                    for x in iterator {
1056                        for x in x.into_iter() {
1057                            let Some(x) = x else {
1058                                builder.append_null(); // no statistics value
1059                                continue;
1060                            };
1061
1062                            builder.append_value(x);
1063                        }
1064                    }
1065                    Ok(Arc::new(builder.finish()))
1066                },
1067                DataType::Date64 |  // required to cover $physical_type match guard
1068                DataType::Null |
1069                DataType::Duration(_) |
1070                DataType::Interval(_) |
1071                DataType::List(_) |
1072                DataType::ListView(_) |
1073                DataType::FixedSizeList(_, _) |
1074                DataType::LargeList(_) |
1075                DataType::LargeListView(_) |
1076                DataType::Struct(_) |
1077                DataType::Union(_, _) |
1078                DataType::Map(_, _) |
1079                DataType::RunEndEncoded(_, _) => {
1080                    let len = $iterator.count();
1081                    // don't know how to extract statistics, so return a null array
1082                    Ok(new_null_array($data_type, len))
1083                },
1084            }
1085        }
1086    }
1087}
1088/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an
1089/// [`ArrayRef`]
1090///
1091/// This is an internal helper -- see [`StatisticsConverter`] for public API
1092fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1093    data_type: &DataType,
1094    iterator: I,
1095    physical_type: Option<PhysicalType>,
1096) -> Result<ArrayRef> {
1097    get_statistics!(Min, data_type, iterator, physical_type)
1098}
1099
1100/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
1101///
1102/// This is an internal helper -- see [`StatisticsConverter`] for public API
1103fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1104    data_type: &DataType,
1105    iterator: I,
1106    physical_type: Option<PhysicalType>,
1107) -> Result<ArrayRef> {
1108    get_statistics!(Max, data_type, iterator, physical_type)
1109}
1110
1111/// Extracts the min statistics from an iterator
1112/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`]
1113pub(crate) fn min_page_statistics<'a, I>(
1114    data_type: &DataType,
1115    iterator: I,
1116    physical_type: Option<PhysicalType>,
1117) -> Result<ArrayRef>
1118where
1119    I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1120{
1121    get_data_page_statistics!(Min, data_type, iterator, physical_type)
1122}
1123
1124/// Extracts the max statistics from an iterator
1125/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`]
1126pub(crate) fn max_page_statistics<'a, I>(
1127    data_type: &DataType,
1128    iterator: I,
1129    physical_type: Option<PhysicalType>,
1130) -> Result<ArrayRef>
1131where
1132    I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1133{
1134    get_data_page_statistics!(Max, data_type, iterator, physical_type)
1135}
1136
1137/// Extracts the null count statistics from an iterator
1138/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`]
1139///
1140/// The returned Array is an [`UInt64Array`]
1141pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
1142where
1143    I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1144{
1145    let iter = iterator.flat_map(|(len, index)| match index {
1146        ColumnIndexMetaData::NONE => vec![None; len],
1147        column_index => column_index.null_counts().map_or(vec![None; len], |v| {
1148            v.iter().map(|i| Some(*i as u64)).collect::<Vec<_>>()
1149        }),
1150    });
1151
1152    Ok(UInt64Array::from_iter(iter))
1153}
1154
1155/// Extracts Parquet statistics as Arrow arrays
1156///
1157/// This is used to convert Parquet statistics to Arrow [`ArrayRef`], with
1158/// proper type conversions. This information can be used for pruning Parquet
1159/// files, row groups, and data pages based on the statistics embedded in
1160/// Parquet metadata.
1161///
1162/// # Schemas
1163///
1164/// The converter uses the schema of the Parquet file and the Arrow schema to
1165/// convert the underlying statistics value (stored as a parquet value) into the
1166/// corresponding Arrow value. For example, Decimals are stored as binary in
1167/// parquet files and this structure handles mapping them to the `i128`
1168/// representation used in Arrow.
1169///
1170/// Note: The Parquet schema and Arrow schema do not have to be identical (for
1171/// example, the columns may be in different orders and one or the other schemas
1172/// may have additional columns). The function [`parquet_column`] is used to
1173/// match the column in the Parquet schema to the column in the Arrow schema.
1174#[derive(Debug)]
1175pub struct StatisticsConverter<'a> {
1176    /// the index of the matched column in the Parquet schema
1177    parquet_column_index: Option<usize>,
1178    /// The field (with data type) of the column in the Arrow schema
1179    arrow_field: &'a Field,
1180    /// treat missing null_counts as 0 nulls
1181    missing_null_counts_as_zero: bool,
1182    /// The physical type of the matched column in the Parquet schema
1183    physical_type: Option<PhysicalType>,
1184}
1185
1186impl<'a> StatisticsConverter<'a> {
1187    /// Return the index of the column in the Parquet schema, if any
1188    ///
1189    /// Returns `None` if the column is was present in the Arrow schema, but not
1190    /// present in the parquet file
1191    pub fn parquet_column_index(&self) -> Option<usize> {
1192        self.parquet_column_index
1193    }
1194
1195    /// Return the arrow schema's [`Field]` of the column in the Arrow schema
1196    pub fn arrow_field(&self) -> &'a Field {
1197        self.arrow_field
1198    }
1199
1200    /// Set the statistics converter to treat missing null counts as missing
1201    ///
1202    /// By default, the converter will treat missing null counts as though
1203    /// the null count is known to be `0`.
1204    ///
1205    /// Note that parquet files written by parquet-rs currently do not store
1206    /// null counts even when it is known there are zero nulls, and the reader
1207    /// will return 0 for the null counts in that instance. This behavior may
1208    /// change in a future release.
1209    ///
1210    /// Both parquet-java and parquet-cpp store null counts as 0 when there are
1211    /// no nulls, and don't write unknown values to the null count field.
1212    pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
1213        self.missing_null_counts_as_zero = missing_null_counts_as_zero;
1214        self
1215    }
1216
1217    /// Returns a [`UInt64Array`] with row counts for each row group
1218    ///
1219    /// # Return Value
1220    ///
1221    /// The returned array has no nulls, and has one value for each row group.
1222    /// Each value is the number of rows in the row group.
1223    ///
1224    /// # Example
1225    /// ```no_run
1226    /// # use arrow::datatypes::Schema;
1227    /// # use arrow_array::{ArrayRef, UInt64Array};
1228    /// # use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
1229    /// # use parquet::file::metadata::ParquetMetaData;
1230    /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
1231    /// # fn get_arrow_schema() -> Schema { unimplemented!() }
1232    /// // Given the metadata for a parquet file and the arrow schema
1233    /// let metadata: ParquetMetaData = get_parquet_metadata();
1234    /// let arrow_schema: Schema = get_arrow_schema();
1235    /// let parquet_schema = metadata.file_metadata().schema_descr();
1236    /// // create a converter
1237    /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
1238    ///   .unwrap();
1239    /// // get the row counts for each row group
1240    /// let row_counts = converter.row_group_row_counts(metadata
1241    ///   .row_groups()
1242    ///   .iter()
1243    /// ).unwrap();
1244    /// // file had 2 row groups, with 1024 and 23 rows respectively
1245    /// assert_eq!(row_counts, Some(UInt64Array::from(vec![1024, 23])));
1246    /// ```
1247    pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
1248    where
1249        I: IntoIterator<Item = &'a RowGroupMetaData>,
1250    {
1251        let Some(_) = self.parquet_column_index else {
1252            return Ok(None);
1253        };
1254
1255        let mut builder = UInt64Array::builder(10);
1256        for metadata in metadatas.into_iter() {
1257            let row_count = metadata.num_rows();
1258            let row_count: u64 = row_count.try_into().map_err(|e| {
1259                arrow_err!(format!(
1260                    "Parquet row count {row_count} too large to convert to u64: {e}"
1261                ))
1262            })?;
1263            builder.append_value(row_count);
1264        }
1265        Ok(Some(builder.finish()))
1266    }
1267
1268    /// Create a new `StatisticsConverter` to extract statistics for a column
1269    ///
1270    /// Note if there is no corresponding column in the parquet file, the returned
1271    /// arrays will be null. This can happen if the column is in the arrow
1272    /// schema but not in the parquet schema due to schema evolution.
1273    ///
1274    /// See example on [`Self::row_group_mins`] for usage
1275    ///
1276    /// # Errors
1277    ///
1278    /// * If the column is not found in the arrow schema
1279    pub fn try_new<'b>(
1280        column_name: &'b str,
1281        arrow_schema: &'a Schema,
1282        parquet_schema: &'a SchemaDescriptor,
1283    ) -> Result<Self> {
1284        // ensure the requested column is in the arrow schema
1285        let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
1286            return Err(arrow_err!(format!(
1287                "Column '{}' not found in schema for statistics conversion",
1288                column_name
1289            )));
1290        };
1291
1292        // find the column in the parquet schema, if not, return a null array
1293        let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
1294            Some((parquet_idx, matched_field)) => {
1295                // sanity check that matching field matches the arrow field
1296                if matched_field.as_ref() != arrow_field {
1297                    return Err(arrow_err!(format!(
1298                        "Matched column '{:?}' does not match original matched column '{:?}'",
1299                        matched_field, arrow_field
1300                    )));
1301                }
1302                Some(parquet_idx)
1303            }
1304            None => None,
1305        };
1306
1307        Ok(Self {
1308            parquet_column_index: parquet_index,
1309            arrow_field,
1310            missing_null_counts_as_zero: true,
1311            physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
1312        })
1313    }
1314
1315    /// Extract the minimum values from row group statistics in [`RowGroupMetaData`]
1316    ///
1317    /// # Return Value
1318    ///
1319    /// The returned array contains 1 value for each row group, in the same order as `metadatas`
1320    ///
1321    /// Each value is either
1322    /// * the minimum value for the column
1323    /// * a null value, if the statistics can not be extracted
1324    ///
1325    /// Note that a null value does NOT mean the min value was actually
1326    /// `null` it means it the requested statistic is unknown
1327    ///
1328    /// # Errors
1329    ///
1330    /// Reasons for not being able to extract the statistics include:
1331    /// * the column is not present in the parquet file
1332    /// * statistics for the column are not present in the row group
1333    /// * the stored statistic value can not be converted to the requested type
1334    ///
1335    /// # Example
1336    /// ```no_run
1337    /// # use std::sync::Arc;
1338    /// # use arrow::datatypes::Schema;
1339    /// # use arrow_array::{ArrayRef, Float64Array};
1340    /// # use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
1341    /// # use parquet::file::metadata::ParquetMetaData;
1342    /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
1343    /// # fn get_arrow_schema() -> Schema { unimplemented!() }
1344    /// // Given the metadata for a parquet file and the arrow schema
1345    /// let metadata: ParquetMetaData = get_parquet_metadata();
1346    /// let arrow_schema: Schema = get_arrow_schema();
1347    /// let parquet_schema = metadata.file_metadata().schema_descr();
1348    /// // create a converter
1349    /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
1350    ///   .unwrap();
1351    /// // get the minimum value for the column "foo" in the parquet file
1352    /// let min_values: ArrayRef = converter
1353    ///   .row_group_mins(metadata.row_groups().iter())
1354    ///   .unwrap();
1355    /// // if "foo" is a Float64 value, the returned array will contain Float64 values
1356    /// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0), Some(2.0)])) as _);
1357    /// ```
1358    pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
1359    where
1360        I: IntoIterator<Item = &'a RowGroupMetaData>,
1361    {
1362        let data_type = self.arrow_field.data_type();
1363
1364        let Some(parquet_index) = self.parquet_column_index else {
1365            return Ok(self.make_null_array(data_type, metadatas));
1366        };
1367
1368        let iter = metadatas
1369            .into_iter()
1370            .map(|x| x.column(parquet_index).statistics());
1371        min_statistics(data_type, iter, self.physical_type)
1372    }
1373
1374    /// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
1375    ///
1376    /// See docs on [`Self::row_group_mins`] for details
1377    pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
1378    where
1379        I: IntoIterator<Item = &'a RowGroupMetaData>,
1380    {
1381        let data_type = self.arrow_field.data_type();
1382
1383        let Some(parquet_index) = self.parquet_column_index else {
1384            return Ok(self.make_null_array(data_type, metadatas));
1385        };
1386
1387        let iter = metadatas
1388            .into_iter()
1389            .map(|x| x.column(parquet_index).statistics());
1390        max_statistics(data_type, iter, self.physical_type)
1391    }
1392
1393    /// Extract the `is_max_value_exact` flags from row group statistics in [`RowGroupMetaData`]
1394    ///
1395    /// See docs on [`Self::row_group_maxes`] for details
1396    pub fn row_group_is_max_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1397    where
1398        I: IntoIterator<Item = &'a RowGroupMetaData>,
1399    {
1400        let Some(parquet_index) = self.parquet_column_index else {
1401            let num_row_groups = metadatas.into_iter().count();
1402            return Ok(BooleanArray::from_iter(std::iter::repeat_n(
1403                None,
1404                num_row_groups,
1405            )));
1406        };
1407
1408        let is_max_value_exact = metadatas
1409            .into_iter()
1410            .map(|x| x.column(parquet_index).statistics())
1411            .map(|s| s.map(|s| s.max_is_exact()));
1412        Ok(BooleanArray::from_iter(is_max_value_exact))
1413    }
1414
1415    /// Extract the `is_min_value_exact` flags from row group statistics in [`RowGroupMetaData`]
1416    ///
1417    /// See docs on [`Self::row_group_mins`] for details
1418    pub fn row_group_is_min_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1419    where
1420        I: IntoIterator<Item = &'a RowGroupMetaData>,
1421    {
1422        let Some(parquet_index) = self.parquet_column_index else {
1423            let num_row_groups = metadatas.into_iter().count();
1424            return Ok(BooleanArray::from_iter(std::iter::repeat_n(
1425                None,
1426                num_row_groups,
1427            )));
1428        };
1429
1430        let is_min_value_exact = metadatas
1431            .into_iter()
1432            .map(|x| x.column(parquet_index).statistics())
1433            .map(|s| s.map(|s| s.min_is_exact()));
1434        Ok(BooleanArray::from_iter(is_min_value_exact))
1435    }
1436
1437    /// Extract the null counts from row group statistics in [`RowGroupMetaData`]
1438    ///
1439    /// See docs on [`Self::row_group_mins`] for details
1440    pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
1441    where
1442        I: IntoIterator<Item = &'a RowGroupMetaData>,
1443    {
1444        let Some(parquet_index) = self.parquet_column_index else {
1445            let num_row_groups = metadatas.into_iter().count();
1446            return Ok(UInt64Array::from_iter(std::iter::repeat_n(
1447                None,
1448                num_row_groups,
1449            )));
1450        };
1451
1452        let null_counts = metadatas
1453            .into_iter()
1454            .map(|x| x.column(parquet_index).statistics())
1455            .map(|s| {
1456                s.and_then(|s| {
1457                    if self.missing_null_counts_as_zero {
1458                        Some(s.null_count_opt().unwrap_or(0))
1459                    } else {
1460                        s.null_count_opt()
1461                    }
1462                })
1463            });
1464        Ok(UInt64Array::from_iter(null_counts))
1465    }
1466
1467    /// Extract the minimum values from Data Page statistics.
1468    ///
1469    /// In Parquet files, in addition to the Column Chunk level statistics
1470    /// (stored for each column for each row group) there are also
1471    /// optional statistics stored for each data page, as part of
1472    /// the [`ParquetColumnIndex`].
1473    ///
1474    /// Since a single Column Chunk is stored as one or more pages,
1475    /// page level statistics can prune at a finer granularity.
1476    ///
1477    /// However since they are stored in a separate metadata
1478    /// structure ([`ColumnIndexMetaData`]) there is different code to extract them as
1479    /// compared to arrow statistics.
1480    ///
1481    /// # Parameters:
1482    ///
1483    /// * `column_page_index`: The parquet column page indices, read from
1484    ///   `ParquetMetaData` column_index
1485    ///
1486    /// * `column_offset_index`: The parquet column offset indices, read from
1487    ///   `ParquetMetaData` offset_index
1488    ///
1489    /// * `row_group_indices`: The indices of the row groups, that are used to
1490    ///   extract the column page index and offset index on a per row group
1491    ///   per column basis.
1492    ///
1493    /// # Return Value
1494    ///
1495    /// The returned array contains 1 value for each `NativeIndex`
1496    /// in the underlying `Index`es, in the same order as they appear
1497    /// in `metadatas`.
1498    ///
1499    /// For example, if there are two `Index`es in `metadatas`:
1500    /// 1. the first having `3` `PageIndex` entries
1501    /// 2. the second having `2` `PageIndex` entries
1502    ///
1503    /// The returned array would have 5 rows.
1504    ///
1505    /// Each value is either:
1506    /// * the minimum value for the page
1507    /// * a null value, if the statistics can not be extracted
1508    ///
1509    /// Note that a null value does NOT mean the min value was actually
1510    /// `null` it means it the requested statistic is unknown
1511    ///
1512    /// # Errors
1513    ///
1514    /// Reasons for not being able to extract the statistics include:
1515    /// * the column is not present in the parquet file
1516    /// * statistics for the pages are not present in the row group
1517    /// * the stored statistic value can not be converted to the requested type
1518    pub fn data_page_mins<I>(
1519        &self,
1520        column_page_index: &ParquetColumnIndex,
1521        column_offset_index: &ParquetOffsetIndex,
1522        row_group_indices: I,
1523    ) -> Result<ArrayRef>
1524    where
1525        I: IntoIterator<Item = &'a usize>,
1526    {
1527        let data_type = self.arrow_field.data_type();
1528
1529        let Some(parquet_index) = self.parquet_column_index else {
1530            return Ok(self.make_null_array(data_type, row_group_indices));
1531        };
1532
1533        let iter = row_group_indices.into_iter().map(|rg_index| {
1534            let column_page_index_per_row_group_per_column =
1535                &column_page_index[*rg_index][parquet_index];
1536            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1537                .page_locations()
1538                .len();
1539
1540            (*num_data_pages, column_page_index_per_row_group_per_column)
1541        });
1542
1543        min_page_statistics(data_type, iter, self.physical_type)
1544    }
1545
1546    /// Extract the maximum values from Data Page statistics.
1547    ///
1548    /// See docs on [`Self::data_page_mins`] for details.
1549    pub fn data_page_maxes<I>(
1550        &self,
1551        column_page_index: &ParquetColumnIndex,
1552        column_offset_index: &ParquetOffsetIndex,
1553        row_group_indices: I,
1554    ) -> Result<ArrayRef>
1555    where
1556        I: IntoIterator<Item = &'a usize>,
1557    {
1558        let data_type = self.arrow_field.data_type();
1559
1560        let Some(parquet_index) = self.parquet_column_index else {
1561            return Ok(self.make_null_array(data_type, row_group_indices));
1562        };
1563
1564        let iter = row_group_indices.into_iter().map(|rg_index| {
1565            let column_page_index_per_row_group_per_column =
1566                &column_page_index[*rg_index][parquet_index];
1567            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1568                .page_locations()
1569                .len();
1570
1571            (*num_data_pages, column_page_index_per_row_group_per_column)
1572        });
1573
1574        max_page_statistics(data_type, iter, self.physical_type)
1575    }
1576
1577    /// Returns a [`UInt64Array`] with null counts for each data page.
1578    ///
1579    /// See docs on [`Self::data_page_mins`] for details.
1580    pub fn data_page_null_counts<I>(
1581        &self,
1582        column_page_index: &ParquetColumnIndex,
1583        column_offset_index: &ParquetOffsetIndex,
1584        row_group_indices: I,
1585    ) -> Result<UInt64Array>
1586    where
1587        I: IntoIterator<Item = &'a usize>,
1588    {
1589        let Some(parquet_index) = self.parquet_column_index else {
1590            let num_row_groups = row_group_indices.into_iter().count();
1591            return Ok(UInt64Array::from_iter(std::iter::repeat_n(
1592                None,
1593                num_row_groups,
1594            )));
1595        };
1596
1597        let iter = row_group_indices.into_iter().map(|rg_index| {
1598            let column_page_index_per_row_group_per_column =
1599                &column_page_index[*rg_index][parquet_index];
1600            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1601                .page_locations()
1602                .len();
1603
1604            (*num_data_pages, column_page_index_per_row_group_per_column)
1605        });
1606        null_counts_page_statistics(iter)
1607    }
1608
1609    /// Returns a [`UInt64Array`] with row counts for each data page.
1610    ///
1611    /// This function iterates over the given row group indexes and computes
1612    /// the row count for each page in the specified column.
1613    ///
1614    /// # Parameters:
1615    ///
1616    /// * `column_offset_index`: The parquet column offset indices, read from
1617    ///   `ParquetMetaData` offset_index
1618    ///
1619    /// * `row_group_metadatas`: The metadata slice of the row groups, read
1620    ///   from `ParquetMetaData` row_groups
1621    ///
1622    /// * `row_group_indices`: The indices of the row groups, that are used to
1623    ///   extract the column offset index on a per row group per column basis.
1624    ///
1625    /// See docs on [`Self::data_page_mins`] for details.
1626    pub fn data_page_row_counts<I>(
1627        &self,
1628        column_offset_index: &ParquetOffsetIndex,
1629        row_group_metadatas: &'a [RowGroupMetaData],
1630        row_group_indices: I,
1631    ) -> Result<Option<UInt64Array>>
1632    where
1633        I: IntoIterator<Item = &'a usize>,
1634    {
1635        let Some(parquet_index) = self.parquet_column_index else {
1636            // no matching column found in parquet_index;
1637            // thus we cannot extract page_locations in order to determine
1638            // the row count on a per DataPage basis.
1639            return Ok(None);
1640        };
1641
1642        let mut row_count_total = Vec::new();
1643        for rg_idx in row_group_indices {
1644            let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
1645
1646            let row_count_per_page = page_locations
1647                .windows(2)
1648                .map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
1649
1650            // append the last page row count
1651            let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
1652            let row_count_per_page = row_count_per_page
1653                .chain(std::iter::once(Some(
1654                    *num_rows_in_row_group as u64
1655                        - page_locations.last().unwrap().first_row_index as u64,
1656                )))
1657                .collect::<Vec<_>>();
1658
1659            row_count_total.extend(row_count_per_page);
1660        }
1661
1662        Ok(Some(UInt64Array::from_iter(row_count_total)))
1663    }
1664
1665    /// Returns a null array of data_type with one element per row group
1666    fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
1667    where
1668        I: IntoIterator<Item = A>,
1669    {
1670        // column was in the arrow schema but not in the parquet schema, so return a null array
1671        let num_row_groups = metadatas.into_iter().count();
1672        new_null_array(data_type, num_row_groups)
1673    }
1674}
1675
1676// See tests in parquet/tests/arrow_reader/statistics.rs