parquet/arrow/arrow_reader/
statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`].
19
20/// Notice that all the corresponding tests are in
21/// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`.
22use crate::arrow::buffer::bit_util::sign_extend_be;
23use crate::arrow::parquet_column;
24use crate::basic::Type as PhysicalType;
25use crate::data_type::{ByteArray, FixedLenByteArray};
26use crate::errors::{ParquetError, Result};
27use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
28use crate::file::page_index::index::{Index, PageIndex};
29use crate::file::statistics::Statistics as ParquetStatistics;
30use crate::schema::types::SchemaDescriptor;
31use arrow_array::builder::{
32    BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
33    StringViewBuilder,
34};
35use arrow_array::{
36    new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
37    Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int16Array,
38    Int32Array, Int64Array, Int8Array, LargeBinaryArray, Time32MillisecondArray, Time32SecondArray,
39    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
40    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
41    UInt32Array, UInt64Array, UInt8Array,
42};
43use arrow_buffer::i256;
44use arrow_schema::{DataType, Field, Schema, TimeUnit};
45use half::f16;
46use paste::paste;
47use std::sync::Arc;
48
49// Convert the bytes array to i128.
50// The endian of the input bytes array must be big-endian.
51pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
52    // The bytes array are from parquet file and must be the big-endian.
53    // The endian is defined by parquet format, and the reference document
54    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
55    i128::from_be_bytes(sign_extend_be::<16>(b))
56}
57
58// Convert the bytes array to i256.
59// The endian of the input bytes array must be big-endian.
60pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
61    i256::from_be_bytes(sign_extend_be::<32>(b))
62}
63
64// Convert the bytes array to f16
65pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
66    match b {
67        [low, high] => Some(f16::from_be_bytes([*high, *low])),
68        _ => None,
69    }
70}
71
72/// Define an adapter iterator for extracting statistics from an iterator of
73/// `ParquetStatistics`
74///
75///
76/// Handles checking if the statistics are present and valid with the correct type.
77///
78/// Parameters:
79/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`)
80/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
81/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`)
82/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`)
83macro_rules! make_stats_iterator {
84    ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
85        /// Maps an iterator of `ParquetStatistics` into an iterator of
86        /// `&$stat_value_type``
87        ///
88        /// Yielded elements:
89        /// * Some(stats) if valid
90        /// * None if the statistics are not present, not valid, or not $stat_value_type
91        struct $iterator_type<'a, I>
92        where
93            I: Iterator<Item = Option<&'a ParquetStatistics>>,
94        {
95            iter: I,
96        }
97
98        impl<'a, I> $iterator_type<'a, I>
99        where
100            I: Iterator<Item = Option<&'a ParquetStatistics>>,
101        {
102            /// Create a new iterator to extract the statistics
103            fn new(iter: I) -> Self {
104                Self { iter }
105            }
106        }
107
108        /// Implement the Iterator trait for the iterator
109        impl<'a, I> Iterator for $iterator_type<'a, I>
110        where
111            I: Iterator<Item = Option<&'a ParquetStatistics>>,
112        {
113            type Item = Option<&'a $stat_value_type>;
114
115            /// return the next statistics value
116            fn next(&mut self) -> Option<Self::Item> {
117                let next = self.iter.next();
118                next.map(|x| {
119                    x.and_then(|stats| match stats {
120                        $parquet_statistics_type(s) => s.$func(),
121                        _ => None,
122                    })
123                })
124            }
125
126            fn size_hint(&self) -> (usize, Option<usize>) {
127                self.iter.size_hint()
128            }
129        }
130    };
131}
132
133make_stats_iterator!(
134    MinBooleanStatsIterator,
135    min_opt,
136    ParquetStatistics::Boolean,
137    bool
138);
139make_stats_iterator!(
140    MaxBooleanStatsIterator,
141    max_opt,
142    ParquetStatistics::Boolean,
143    bool
144);
145make_stats_iterator!(
146    MinInt32StatsIterator,
147    min_opt,
148    ParquetStatistics::Int32,
149    i32
150);
151make_stats_iterator!(
152    MaxInt32StatsIterator,
153    max_opt,
154    ParquetStatistics::Int32,
155    i32
156);
157make_stats_iterator!(
158    MinInt64StatsIterator,
159    min_opt,
160    ParquetStatistics::Int64,
161    i64
162);
163make_stats_iterator!(
164    MaxInt64StatsIterator,
165    max_opt,
166    ParquetStatistics::Int64,
167    i64
168);
169make_stats_iterator!(
170    MinFloatStatsIterator,
171    min_opt,
172    ParquetStatistics::Float,
173    f32
174);
175make_stats_iterator!(
176    MaxFloatStatsIterator,
177    max_opt,
178    ParquetStatistics::Float,
179    f32
180);
181make_stats_iterator!(
182    MinDoubleStatsIterator,
183    min_opt,
184    ParquetStatistics::Double,
185    f64
186);
187make_stats_iterator!(
188    MaxDoubleStatsIterator,
189    max_opt,
190    ParquetStatistics::Double,
191    f64
192);
193make_stats_iterator!(
194    MinByteArrayStatsIterator,
195    min_bytes_opt,
196    ParquetStatistics::ByteArray,
197    [u8]
198);
199make_stats_iterator!(
200    MaxByteArrayStatsIterator,
201    max_bytes_opt,
202    ParquetStatistics::ByteArray,
203    [u8]
204);
205make_stats_iterator!(
206    MinFixedLenByteArrayStatsIterator,
207    min_bytes_opt,
208    ParquetStatistics::FixedLenByteArray,
209    [u8]
210);
211make_stats_iterator!(
212    MaxFixedLenByteArrayStatsIterator,
213    max_bytes_opt,
214    ParquetStatistics::FixedLenByteArray,
215    [u8]
216);
217
218/// Special iterator adapter for extracting i128 values from from an iterator of
219/// `ParquetStatistics`
220///
221/// Handles checking if the statistics are present and valid with the correct type.
222///
223/// Depending on the parquet file, the statistics for `Decimal128` can be stored as
224/// `Int32`, `Int64` or `ByteArray` or `FixedSizeByteArray` :mindblown:
225///
226/// This iterator handles all cases, extracting the values
227/// and converting it to `stat_value_type`.
228///
229/// Parameters:
230/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`)
231/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
232/// * `$bytes_func` is the function to call to get the value as bytes (e.g. `min_bytes` or `max_bytes`)
233/// * `$stat_value_type` is the type of the statistics value (e.g. `i128`)
234/// * `convert_func` is the function to convert the bytes to stats value (e.g. `from_bytes_to_i128`)
235macro_rules! make_decimal_stats_iterator {
236    ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
237        struct $iterator_type<'a, I>
238        where
239            I: Iterator<Item = Option<&'a ParquetStatistics>>,
240        {
241            iter: I,
242        }
243
244        impl<'a, I> $iterator_type<'a, I>
245        where
246            I: Iterator<Item = Option<&'a ParquetStatistics>>,
247        {
248            fn new(iter: I) -> Self {
249                Self { iter }
250            }
251        }
252
253        impl<'a, I> Iterator for $iterator_type<'a, I>
254        where
255            I: Iterator<Item = Option<&'a ParquetStatistics>>,
256        {
257            type Item = Option<$stat_value_type>;
258
259            fn next(&mut self) -> Option<Self::Item> {
260                let next = self.iter.next();
261                next.map(|x| {
262                    x.and_then(|stats| match stats {
263                        ParquetStatistics::Int32(s) => {
264                            s.$func().map(|x| $stat_value_type::from(*x))
265                        }
266                        ParquetStatistics::Int64(s) => {
267                            s.$func().map(|x| $stat_value_type::from(*x))
268                        }
269                        ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
270                        ParquetStatistics::FixedLenByteArray(s) => {
271                            s.$bytes_func().map($convert_func)
272                        }
273                        _ => None,
274                    })
275                })
276            }
277
278            fn size_hint(&self) -> (usize, Option<usize>) {
279                self.iter.size_hint()
280            }
281        }
282    };
283}
284
285make_decimal_stats_iterator!(
286    MinDecimal128StatsIterator,
287    min_opt,
288    min_bytes_opt,
289    i128,
290    from_bytes_to_i128
291);
292make_decimal_stats_iterator!(
293    MaxDecimal128StatsIterator,
294    max_opt,
295    max_bytes_opt,
296    i128,
297    from_bytes_to_i128
298);
299make_decimal_stats_iterator!(
300    MinDecimal256StatsIterator,
301    min_opt,
302    min_bytes_opt,
303    i256,
304    from_bytes_to_i256
305);
306make_decimal_stats_iterator!(
307    MaxDecimal256StatsIterator,
308    max_opt,
309    max_bytes_opt,
310    i256,
311    from_bytes_to_i256
312);
313
314/// Special macro to combine the statistics iterators for min and max using the [`mod@paste`] macro.
315/// This is used to avoid repeating the same code for min and max statistics extractions
316///
317/// Parameters:
318/// stat_type_prefix: The prefix of the statistics iterator type (e.g. `Min` or `Max`)
319/// data_type: The data type of the statistics (e.g. `DataType::Int32`)
320/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from.
321macro_rules! get_statistics {
322    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
323        paste! {
324        match $data_type {
325            DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
326                [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
327            ))),
328            DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
329                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
330                    x.and_then(|x| i8::try_from(*x).ok())
331                }),
332            ))),
333            DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
334                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
335                    x.and_then(|x| i16::try_from(*x).ok())
336                }),
337            ))),
338            DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
339                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
340            ))),
341            DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
342                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
343            ))),
344            DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
345                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
346                    x.and_then(|x| u8::try_from(*x).ok())
347                }),
348            ))),
349            DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
350                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
351                    x.and_then(|x| u16::try_from(*x).ok())
352                }),
353            ))),
354            DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
355                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
356            ))),
357            DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
358                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
359            ))),
360            DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
361                [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
362                    from_bytes_to_f16(x)
363                })),
364            ))),
365            DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
366                [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
367            ))),
368            DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
369                [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
370            ))),
371            DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
372                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
373            ))),
374            DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
375                [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
376                    .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
377            DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
378                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
379            DataType::Timestamp(unit, timezone) =>{
380                let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
381                Ok(match unit {
382                    TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
383                    TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
384                    TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
385                    TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
386                })
387            },
388            DataType::Time32(unit) => {
389                Ok(match unit {
390                    TimeUnit::Second =>  Arc::new(Time32SecondArray::from_iter(
391                        [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
392                    )),
393                    TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
394                        [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
395                    )),
396                    _ => {
397                        let len = $iterator.count();
398                        // don't know how to extract statistics, so return a null array
399                        new_null_array($data_type, len)
400                    }
401                })
402            },
403            DataType::Time64(unit) => {
404                Ok(match unit {
405                    TimeUnit::Microsecond =>  Arc::new(Time64MicrosecondArray::from_iter(
406                        [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
407                    )),
408                    TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
409                        [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
410                    )),
411                    _ => {
412                        let len = $iterator.count();
413                        // don't know how to extract statistics, so return a null array
414                        new_null_array($data_type, len)
415                    }
416                })
417            },
418            DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
419                [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
420            ))),
421            DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
422                [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
423            ))),
424            DataType::Utf8 => {
425                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
426                let mut builder = StringBuilder::new();
427                for x in iterator {
428                    let Some(x) = x else {
429                        builder.append_null(); // no statistics value
430                        continue;
431                    };
432
433                    let Ok(x) = std::str::from_utf8(x) else {
434                        builder.append_null();
435                        continue;
436                    };
437
438                    builder.append_value(x);
439                }
440                Ok(Arc::new(builder.finish()))
441            },
442            DataType::LargeUtf8 => {
443                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
444                let mut builder = LargeStringBuilder::new();
445                for x in iterator {
446                    let Some(x) = x else {
447                        builder.append_null(); // no statistics value
448                        continue;
449                    };
450
451                    let Ok(x) = std::str::from_utf8(x) else {
452                        builder.append_null();
453                        continue;
454                    };
455
456                    builder.append_value(x);
457                }
458                Ok(Arc::new(builder.finish()))
459            },
460            DataType::FixedSizeBinary(size) => {
461                let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
462                let mut builder = FixedSizeBinaryBuilder::new(*size);
463                for x in iterator {
464                    let Some(x) = x else {
465                        builder.append_null(); // no statistics value
466                        continue;
467                    };
468
469                    // ignore invalid values
470                    if x.len().try_into() != Ok(*size){
471                        builder.append_null();
472                        continue;
473                    }
474
475                    builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
476                }
477                Ok(Arc::new(builder.finish()))
478            },
479            DataType::Decimal128(precision, scale) => {
480                let arr = Decimal128Array::from_iter(
481                    [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
482                ).with_precision_and_scale(*precision, *scale)?;
483                Ok(Arc::new(arr))
484            },
485            DataType::Decimal256(precision, scale) => {
486                let arr = Decimal256Array::from_iter(
487                    [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
488                ).with_precision_and_scale(*precision, *scale)?;
489                Ok(Arc::new(arr))
490            },
491            DataType::Dictionary(_, value_type) => {
492                [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
493            },
494            DataType::Utf8View => {
495                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
496                let mut builder = StringViewBuilder::new();
497                for x in iterator {
498                    let Some(x) = x else {
499                        builder.append_null(); // no statistics value
500                        continue;
501                    };
502
503                    let Ok(x) = std::str::from_utf8(x) else {
504                        builder.append_null();
505                        continue;
506                    };
507
508                    builder.append_value(x);
509                }
510                Ok(Arc::new(builder.finish()))
511            },
512            DataType::BinaryView => {
513                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
514                let mut builder = BinaryViewBuilder::new();
515                for x in iterator {
516                    let Some(x) = x else {
517                        builder.append_null(); // no statistics value
518                        continue;
519                    };
520
521                    builder.append_value(x);
522                }
523                Ok(Arc::new(builder.finish()))
524            }
525
526            DataType::Map(_,_) |
527            DataType::Duration(_) |
528            DataType::Interval(_) |
529            DataType::Date64 |  // required to cover $physical_type match guard
530            DataType::Null |
531            DataType::List(_) |
532            DataType::ListView(_) |
533            DataType::FixedSizeList(_, _) |
534            DataType::LargeList(_) |
535            DataType::LargeListView(_) |
536            DataType::Struct(_) |
537            DataType::Union(_, _) |
538            DataType::RunEndEncoded(_, _) => {
539                let len = $iterator.count();
540                // don't know how to extract statistics, so return a null array
541                Ok(new_null_array($data_type, len))
542            }
543        }}}
544}
545
546macro_rules! make_data_page_stats_iterator {
547    ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => {
548        struct $iterator_type<'a, I>
549        where
550            I: Iterator<Item = (usize, &'a Index)>,
551        {
552            iter: I,
553        }
554
555        impl<'a, I> $iterator_type<'a, I>
556        where
557            I: Iterator<Item = (usize, &'a Index)>,
558        {
559            fn new(iter: I) -> Self {
560                Self { iter }
561            }
562        }
563
564        impl<'a, I> Iterator for $iterator_type<'a, I>
565        where
566            I: Iterator<Item = (usize, &'a Index)>,
567        {
568            type Item = Vec<Option<$stat_value_type>>;
569
570            fn next(&mut self) -> Option<Self::Item> {
571                let next = self.iter.next();
572                match next {
573                    Some((len, index)) => match index {
574                        $index_type(native_index) => {
575                            Some(native_index.indexes.iter().map($func).collect::<Vec<_>>())
576                        }
577                        // No matching `Index` found;
578                        // thus no statistics that can be extracted.
579                        // We return vec![None; len] to effectively
580                        // create an arrow null-array with the length
581                        // corresponding to the number of entries in
582                        // `ParquetOffsetIndex` per row group per column.
583                        _ => Some(vec![None; len]),
584                    },
585                    _ => None,
586                }
587            }
588
589            fn size_hint(&self) -> (usize, Option<usize>) {
590                self.iter.size_hint()
591            }
592        }
593    };
594}
595
596make_data_page_stats_iterator!(
597    MinBooleanDataPageStatsIterator,
598    |x: &PageIndex<bool>| { x.min },
599    Index::BOOLEAN,
600    bool
601);
602make_data_page_stats_iterator!(
603    MaxBooleanDataPageStatsIterator,
604    |x: &PageIndex<bool>| { x.max },
605    Index::BOOLEAN,
606    bool
607);
608make_data_page_stats_iterator!(
609    MinInt32DataPageStatsIterator,
610    |x: &PageIndex<i32>| { x.min },
611    Index::INT32,
612    i32
613);
614make_data_page_stats_iterator!(
615    MaxInt32DataPageStatsIterator,
616    |x: &PageIndex<i32>| { x.max },
617    Index::INT32,
618    i32
619);
620make_data_page_stats_iterator!(
621    MinInt64DataPageStatsIterator,
622    |x: &PageIndex<i64>| { x.min },
623    Index::INT64,
624    i64
625);
626make_data_page_stats_iterator!(
627    MaxInt64DataPageStatsIterator,
628    |x: &PageIndex<i64>| { x.max },
629    Index::INT64,
630    i64
631);
632make_data_page_stats_iterator!(
633    MinFloat16DataPageStatsIterator,
634    |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
635    Index::FIXED_LEN_BYTE_ARRAY,
636    FixedLenByteArray
637);
638make_data_page_stats_iterator!(
639    MaxFloat16DataPageStatsIterator,
640    |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
641    Index::FIXED_LEN_BYTE_ARRAY,
642    FixedLenByteArray
643);
644make_data_page_stats_iterator!(
645    MinFloat32DataPageStatsIterator,
646    |x: &PageIndex<f32>| { x.min },
647    Index::FLOAT,
648    f32
649);
650make_data_page_stats_iterator!(
651    MaxFloat32DataPageStatsIterator,
652    |x: &PageIndex<f32>| { x.max },
653    Index::FLOAT,
654    f32
655);
656make_data_page_stats_iterator!(
657    MinFloat64DataPageStatsIterator,
658    |x: &PageIndex<f64>| { x.min },
659    Index::DOUBLE,
660    f64
661);
662make_data_page_stats_iterator!(
663    MaxFloat64DataPageStatsIterator,
664    |x: &PageIndex<f64>| { x.max },
665    Index::DOUBLE,
666    f64
667);
668make_data_page_stats_iterator!(
669    MinByteArrayDataPageStatsIterator,
670    |x: &PageIndex<ByteArray>| { x.min.clone() },
671    Index::BYTE_ARRAY,
672    ByteArray
673);
674make_data_page_stats_iterator!(
675    MaxByteArrayDataPageStatsIterator,
676    |x: &PageIndex<ByteArray>| { x.max.clone() },
677    Index::BYTE_ARRAY,
678    ByteArray
679);
680make_data_page_stats_iterator!(
681    MaxFixedLenByteArrayDataPageStatsIterator,
682    |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
683    Index::FIXED_LEN_BYTE_ARRAY,
684    FixedLenByteArray
685);
686
687make_data_page_stats_iterator!(
688    MinFixedLenByteArrayDataPageStatsIterator,
689    |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
690    Index::FIXED_LEN_BYTE_ARRAY,
691    FixedLenByteArray
692);
693
694macro_rules! get_decimal_page_stats_iterator {
695    ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
696        struct $iterator_type<'a, I>
697        where
698            I: Iterator<Item = (usize, &'a Index)>,
699        {
700            iter: I,
701        }
702
703        impl<'a, I> $iterator_type<'a, I>
704        where
705            I: Iterator<Item = (usize, &'a Index)>,
706        {
707            fn new(iter: I) -> Self {
708                Self { iter }
709            }
710        }
711
712        impl<'a, I> Iterator for $iterator_type<'a, I>
713        where
714            I: Iterator<Item = (usize, &'a Index)>,
715        {
716            type Item = Vec<Option<$stat_value_type>>;
717
718            fn next(&mut self) -> Option<Self::Item> {
719                let next = self.iter.next();
720                match next {
721                    Some((len, index)) => match index {
722                        Index::INT32(native_index) => Some(
723                            native_index
724                                .indexes
725                                .iter()
726                                .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x))))
727                                .collect::<Vec<_>>(),
728                        ),
729                        Index::INT64(native_index) => Some(
730                            native_index
731                                .indexes
732                                .iter()
733                                .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x))))
734                                .collect::<Vec<_>>(),
735                        ),
736                        Index::BYTE_ARRAY(native_index) => Some(
737                            native_index
738                                .indexes
739                                .iter()
740                                .map(|x| {
741                                    x.clone().$func.and_then(|x| Some($convert_func(x.data())))
742                                })
743                                .collect::<Vec<_>>(),
744                        ),
745                        Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
746                            native_index
747                                .indexes
748                                .iter()
749                                .map(|x| {
750                                    x.clone().$func.and_then(|x| Some($convert_func(x.data())))
751                                })
752                                .collect::<Vec<_>>(),
753                        ),
754                        _ => Some(vec![None; len]),
755                    },
756                    _ => None,
757                }
758            }
759
760            fn size_hint(&self) -> (usize, Option<usize>) {
761                self.iter.size_hint()
762            }
763        }
764    };
765}
766
767get_decimal_page_stats_iterator!(
768    MinDecimal128DataPageStatsIterator,
769    min,
770    i128,
771    from_bytes_to_i128
772);
773
774get_decimal_page_stats_iterator!(
775    MaxDecimal128DataPageStatsIterator,
776    max,
777    i128,
778    from_bytes_to_i128
779);
780
781get_decimal_page_stats_iterator!(
782    MinDecimal256DataPageStatsIterator,
783    min,
784    i256,
785    from_bytes_to_i256
786);
787
788get_decimal_page_stats_iterator!(
789    MaxDecimal256DataPageStatsIterator,
790    max,
791    i256,
792    from_bytes_to_i256
793);
794
795macro_rules! get_data_page_statistics {
796    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
797        paste! {
798            match $data_type {
799                DataType::Boolean => {
800                    let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
801                    let mut builder = BooleanBuilder::new();
802                    for x in iterator {
803                        for x in x.into_iter() {
804                            let Some(x) = x else {
805                                builder.append_null(); // no statistics value
806                                continue;
807                            };
808                            builder.append_value(x);
809                        }
810                    }
811                    Ok(Arc::new(builder.finish()))
812                },
813                DataType::UInt8 => Ok(Arc::new(
814                    UInt8Array::from_iter(
815                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
816                            .map(|x| {
817                                x.into_iter().map(|x| {
818                                    x.and_then(|x| u8::try_from(x).ok())
819                                })
820                            })
821                            .flatten()
822                    )
823                )),
824                DataType::UInt16 => Ok(Arc::new(
825                    UInt16Array::from_iter(
826                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
827                            .map(|x| {
828                                x.into_iter().map(|x| {
829                                    x.and_then(|x| u16::try_from(x).ok())
830                                })
831                            })
832                            .flatten()
833                    )
834                )),
835                DataType::UInt32 => Ok(Arc::new(
836                    UInt32Array::from_iter(
837                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
838                            .map(|x| {
839                                x.into_iter().map(|x| {
840                                    x.and_then(|x| Some(x as u32))
841                                })
842                            })
843                            .flatten()
844                ))),
845                DataType::UInt64 => Ok(Arc::new(
846                    UInt64Array::from_iter(
847                        [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator)
848                            .map(|x| {
849                                x.into_iter().map(|x| {
850                                    x.and_then(|x| Some(x as u64))
851                                })
852                            })
853                            .flatten()
854                ))),
855                DataType::Int8 => Ok(Arc::new(
856                    Int8Array::from_iter(
857                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
858                            .map(|x| {
859                                x.into_iter().map(|x| {
860                                    x.and_then(|x| i8::try_from(x).ok())
861                                })
862                            })
863                            .flatten()
864                    )
865                )),
866                DataType::Int16 => Ok(Arc::new(
867                    Int16Array::from_iter(
868                        [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
869                            .map(|x| {
870                                x.into_iter().map(|x| {
871                                    x.and_then(|x| i16::try_from(x).ok())
872                                })
873                            })
874                            .flatten()
875                    )
876                )),
877                DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
878                DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
879                DataType::Float16 => Ok(Arc::new(
880                    Float16Array::from_iter(
881                        [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
882                            .map(|x| {
883                                x.into_iter().map(|x| {
884                                    x.and_then(|x| from_bytes_to_f16(x.data()))
885                                })
886                            })
887                            .flatten()
888                    )
889                )),
890                DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
891                DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
892                DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
893                DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
894                DataType::Utf8 => {
895                    let mut builder = StringBuilder::new();
896                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
897                    for x in iterator {
898                        for x in x.into_iter() {
899                            let Some(x) = x else {
900                                builder.append_null(); // no statistics value
901                                continue;
902                            };
903
904                            let Ok(x) = std::str::from_utf8(x.data()) else {
905                                builder.append_null();
906                                continue;
907                            };
908
909                            builder.append_value(x);
910                        }
911                    }
912                    Ok(Arc::new(builder.finish()))
913                },
914                DataType::LargeUtf8 => {
915                    let mut builder = LargeStringBuilder::new();
916                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
917                    for x in iterator {
918                        for x in x.into_iter() {
919                            let Some(x) = x else {
920                                builder.append_null(); // no statistics value
921                                continue;
922                            };
923
924                            let Ok(x) = std::str::from_utf8(x.data()) else {
925                                builder.append_null();
926                                continue;
927                            };
928
929                            builder.append_value(x);
930                        }
931                    }
932                    Ok(Arc::new(builder.finish()))
933                },
934                DataType::Dictionary(_, value_type) => {
935                    [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type)
936                },
937                DataType::Timestamp(unit, timezone) => {
938                    let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
939                    Ok(match unit {
940                        TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
941                        TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
942                        TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
943                        TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
944                    })
945                },
946                DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
947                DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok(
948                    Arc::new(
949                        Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
950                            .map(|x| {
951                                x.into_iter()
952                                .map(|x| {
953                                    x.and_then(|x| i64::try_from(x).ok())
954                                })
955                                .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
956                            }).flatten()
957                        )
958                    )
959                ),
960                DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
961                DataType::Decimal128(precision, scale) => Ok(Arc::new(
962                    Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
963                DataType::Decimal256(precision, scale) => Ok(Arc::new(
964                    Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
965                DataType::Time32(unit) => {
966                    Ok(match unit {
967                        TimeUnit::Second =>  Arc::new(Time32SecondArray::from_iter(
968                            [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
969                        )),
970                        TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
971                            [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
972                        )),
973                        _ => {
974                            // don't know how to extract statistics, so return an empty array
975                            new_empty_array(&DataType::Time32(unit.clone()))
976                        }
977                    })
978                }
979                DataType::Time64(unit) => {
980                    Ok(match unit {
981                        TimeUnit::Microsecond =>  Arc::new(Time64MicrosecondArray::from_iter(
982                            [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
983                        )),
984                        TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
985                            [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
986                        )),
987                        _ => {
988                            // don't know how to extract statistics, so return an empty array
989                            new_empty_array(&DataType::Time64(unit.clone()))
990                        }
991                    })
992                },
993                DataType::FixedSizeBinary(size) => {
994                    let mut builder = FixedSizeBinaryBuilder::new(*size);
995                    let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
996                    for x in iterator {
997                        for x in x.into_iter() {
998                            let Some(x) = x else {
999                                builder.append_null(); // no statistics value
1000                                continue;
1001                            };
1002
1003                            if x.len() == *size as usize {
1004                                let _ = builder.append_value(x.data());
1005                            } else {
1006                                builder.append_null();
1007                            }
1008                        }
1009                    }
1010                    Ok(Arc::new(builder.finish()))
1011                },
1012                DataType::Utf8View => {
1013                    let mut builder = StringViewBuilder::new();
1014                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1015                    for x in iterator {
1016                        for x in x.into_iter() {
1017                            let Some(x) = x else {
1018                                builder.append_null(); // no statistics value
1019                                continue;
1020                            };
1021
1022                            let Ok(x) = std::str::from_utf8(x.data()) else {
1023                                builder.append_null();
1024                                continue;
1025                            };
1026
1027                            builder.append_value(x);
1028                        }
1029                    }
1030                    Ok(Arc::new(builder.finish()))
1031                },
1032                DataType::BinaryView => {
1033                    let mut builder = BinaryViewBuilder::new();
1034                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1035                    for x in iterator {
1036                        for x in x.into_iter() {
1037                            let Some(x) = x else {
1038                                builder.append_null(); // no statistics value
1039                                continue;
1040                            };
1041
1042                            builder.append_value(x);
1043                        }
1044                    }
1045                    Ok(Arc::new(builder.finish()))
1046                },
1047                DataType::Date64 |  // required to cover $physical_type match guard
1048                DataType::Null |
1049                DataType::Duration(_) |
1050                DataType::Interval(_) |
1051                DataType::List(_) |
1052                DataType::ListView(_) |
1053                DataType::FixedSizeList(_, _) |
1054                DataType::LargeList(_) |
1055                DataType::LargeListView(_) |
1056                DataType::Struct(_) |
1057                DataType::Union(_, _) |
1058                DataType::Map(_, _) |
1059                DataType::RunEndEncoded(_, _) => {
1060                    let len = $iterator.count();
1061                    // don't know how to extract statistics, so return a null array
1062                    Ok(new_null_array($data_type, len))
1063                },
1064            }
1065        }
1066    }
1067}
1068/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an
1069/// [`ArrayRef`]
1070///
1071/// This is an internal helper -- see [`StatisticsConverter`] for public API
1072fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1073    data_type: &DataType,
1074    iterator: I,
1075    physical_type: Option<PhysicalType>,
1076) -> Result<ArrayRef> {
1077    get_statistics!(Min, data_type, iterator, physical_type)
1078}
1079
1080/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
1081///
1082/// This is an internal helper -- see [`StatisticsConverter`] for public API
1083fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1084    data_type: &DataType,
1085    iterator: I,
1086    physical_type: Option<PhysicalType>,
1087) -> Result<ArrayRef> {
1088    get_statistics!(Max, data_type, iterator, physical_type)
1089}
1090
1091/// Extracts the min statistics from an iterator
1092/// of parquet page [`Index`]'es to an [`ArrayRef`]
1093pub(crate) fn min_page_statistics<'a, I>(
1094    data_type: &DataType,
1095    iterator: I,
1096    physical_type: Option<PhysicalType>,
1097) -> Result<ArrayRef>
1098where
1099    I: Iterator<Item = (usize, &'a Index)>,
1100{
1101    get_data_page_statistics!(Min, data_type, iterator, physical_type)
1102}
1103
1104/// Extracts the max statistics from an iterator
1105/// of parquet page [`Index`]'es to an [`ArrayRef`]
1106pub(crate) fn max_page_statistics<'a, I>(
1107    data_type: &DataType,
1108    iterator: I,
1109    physical_type: Option<PhysicalType>,
1110) -> Result<ArrayRef>
1111where
1112    I: Iterator<Item = (usize, &'a Index)>,
1113{
1114    get_data_page_statistics!(Max, data_type, iterator, physical_type)
1115}
1116
1117/// Extracts the null count statistics from an iterator
1118/// of parquet page [`Index`]'es to an [`ArrayRef`]
1119///
1120/// The returned Array is an [`UInt64Array`]
1121pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
1122where
1123    I: Iterator<Item = (usize, &'a Index)>,
1124{
1125    let iter = iterator.flat_map(|(len, index)| match index {
1126        Index::NONE => vec![None; len],
1127        Index::BOOLEAN(native_index) => native_index
1128            .indexes
1129            .iter()
1130            .map(|x| x.null_count.map(|x| x as u64))
1131            .collect::<Vec<_>>(),
1132        Index::INT32(native_index) => native_index
1133            .indexes
1134            .iter()
1135            .map(|x| x.null_count.map(|x| x as u64))
1136            .collect::<Vec<_>>(),
1137        Index::INT64(native_index) => native_index
1138            .indexes
1139            .iter()
1140            .map(|x| x.null_count.map(|x| x as u64))
1141            .collect::<Vec<_>>(),
1142        Index::FLOAT(native_index) => native_index
1143            .indexes
1144            .iter()
1145            .map(|x| x.null_count.map(|x| x as u64))
1146            .collect::<Vec<_>>(),
1147        Index::DOUBLE(native_index) => native_index
1148            .indexes
1149            .iter()
1150            .map(|x| x.null_count.map(|x| x as u64))
1151            .collect::<Vec<_>>(),
1152        Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
1153            .indexes
1154            .iter()
1155            .map(|x| x.null_count.map(|x| x as u64))
1156            .collect::<Vec<_>>(),
1157        Index::BYTE_ARRAY(native_index) => native_index
1158            .indexes
1159            .iter()
1160            .map(|x| x.null_count.map(|x| x as u64))
1161            .collect::<Vec<_>>(),
1162        _ => unimplemented!(),
1163    });
1164
1165    Ok(UInt64Array::from_iter(iter))
1166}
1167
1168/// Extracts Parquet statistics as Arrow arrays
1169///
1170/// This is used to convert Parquet statistics to Arrow [`ArrayRef`], with
1171/// proper type conversions. This information can be used for pruning Parquet
1172/// files, row groups, and data pages based on the statistics embedded in
1173/// Parquet metadata.
1174///
1175/// # Schemas
1176///
1177/// The converter uses the schema of the Parquet file and the Arrow schema to
1178/// convert the underlying statistics value (stored as a parquet value) into the
1179/// corresponding Arrow value. For example, Decimals are stored as binary in
1180/// parquet files and this structure handles mapping them to the `i128`
1181/// representation used in Arrow.
1182///
1183/// Note: The Parquet schema and Arrow schema do not have to be identical (for
1184/// example, the columns may be in different orders and one or the other schemas
1185/// may have additional columns). The function [`parquet_column`] is used to
1186/// match the column in the Parquet schema to the column in the Arrow schema.
1187#[derive(Debug)]
1188pub struct StatisticsConverter<'a> {
1189    /// the index of the matched column in the Parquet schema
1190    parquet_column_index: Option<usize>,
1191    /// The field (with data type) of the column in the Arrow schema
1192    arrow_field: &'a Field,
1193    /// treat missing null_counts as 0 nulls
1194    missing_null_counts_as_zero: bool,
1195    /// The physical type of the matched column in the Parquet schema
1196    physical_type: Option<PhysicalType>,
1197}
1198
1199impl<'a> StatisticsConverter<'a> {
1200    /// Return the index of the column in the Parquet schema, if any
1201    ///
1202    /// Returns `None` if the column is was present in the Arrow schema, but not
1203    /// present in the parquet file
1204    pub fn parquet_column_index(&self) -> Option<usize> {
1205        self.parquet_column_index
1206    }
1207
1208    /// Return the arrow schema's [`Field]` of the column in the Arrow schema
1209    pub fn arrow_field(&self) -> &'a Field {
1210        self.arrow_field
1211    }
1212
1213    /// Set the statistics converter to treat missing null counts as missing
1214    ///
1215    /// By default, the converter will treat missing null counts as though
1216    /// the null count is known to be `0`.
1217    ///
1218    /// Note that parquet files written by parquet-rs currently do not store
1219    /// null counts even when it is known there are zero nulls, and the reader
1220    /// will return 0 for the null counts in that instance. This behavior may
1221    /// change in a future release.
1222    ///
1223    /// Both parquet-java and parquet-cpp store null counts as 0 when there are
1224    /// no nulls, and don't write unknown values to the null count field.
1225    pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
1226        self.missing_null_counts_as_zero = missing_null_counts_as_zero;
1227        self
1228    }
1229
1230    /// Returns a [`UInt64Array`] with row counts for each row group
1231    ///
1232    /// # Return Value
1233    ///
1234    /// The returned array has no nulls, and has one value for each row group.
1235    /// Each value is the number of rows in the row group.
1236    ///
1237    /// # Example
1238    /// ```no_run
1239    /// # use arrow::datatypes::Schema;
1240    /// # use arrow_array::{ArrayRef, UInt64Array};
1241    /// # use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
1242    /// # use parquet::file::metadata::ParquetMetaData;
1243    /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
1244    /// # fn get_arrow_schema() -> Schema { unimplemented!() }
1245    /// // Given the metadata for a parquet file and the arrow schema
1246    /// let metadata: ParquetMetaData = get_parquet_metadata();
1247    /// let arrow_schema: Schema = get_arrow_schema();
1248    /// let parquet_schema = metadata.file_metadata().schema_descr();
1249    /// // create a converter
1250    /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
1251    ///   .unwrap();
1252    /// // get the row counts for each row group
1253    /// let row_counts = converter.row_group_row_counts(metadata
1254    ///   .row_groups()
1255    ///   .iter()
1256    /// ).unwrap();
1257    /// // file had 2 row groups, with 1024 and 23 rows respectively
1258    /// assert_eq!(row_counts, Some(UInt64Array::from(vec![1024, 23])));
1259    /// ```
1260    pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
1261    where
1262        I: IntoIterator<Item = &'a RowGroupMetaData>,
1263    {
1264        let Some(_) = self.parquet_column_index else {
1265            return Ok(None);
1266        };
1267
1268        let mut builder = UInt64Array::builder(10);
1269        for metadata in metadatas.into_iter() {
1270            let row_count = metadata.num_rows();
1271            let row_count: u64 = row_count.try_into().map_err(|e| {
1272                arrow_err!(format!(
1273                    "Parquet row count {row_count} too large to convert to u64: {e}"
1274                ))
1275            })?;
1276            builder.append_value(row_count);
1277        }
1278        Ok(Some(builder.finish()))
1279    }
1280
1281    /// Create a new `StatisticsConverter` to extract statistics for a column
1282    ///
1283    /// Note if there is no corresponding column in the parquet file, the returned
1284    /// arrays will be null. This can happen if the column is in the arrow
1285    /// schema but not in the parquet schema due to schema evolution.
1286    ///
1287    /// See example on [`Self::row_group_mins`] for usage
1288    ///
1289    /// # Errors
1290    ///
1291    /// * If the column is not found in the arrow schema
1292    pub fn try_new<'b>(
1293        column_name: &'b str,
1294        arrow_schema: &'a Schema,
1295        parquet_schema: &'a SchemaDescriptor,
1296    ) -> Result<Self> {
1297        // ensure the requested column is in the arrow schema
1298        let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
1299            return Err(arrow_err!(format!(
1300                "Column '{}' not found in schema for statistics conversion",
1301                column_name
1302            )));
1303        };
1304
1305        // find the column in the parquet schema, if not, return a null array
1306        let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
1307            Some((parquet_idx, matched_field)) => {
1308                // sanity check that matching field matches the arrow field
1309                if matched_field.as_ref() != arrow_field {
1310                    return Err(arrow_err!(format!(
1311                        "Matched column '{:?}' does not match original matched column '{:?}'",
1312                        matched_field, arrow_field
1313                    )));
1314                }
1315                Some(parquet_idx)
1316            }
1317            None => None,
1318        };
1319
1320        Ok(Self {
1321            parquet_column_index: parquet_index,
1322            arrow_field,
1323            missing_null_counts_as_zero: true,
1324            physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
1325        })
1326    }
1327
1328    /// Extract the minimum values from row group statistics in [`RowGroupMetaData`]
1329    ///
1330    /// # Return Value
1331    ///
1332    /// The returned array contains 1 value for each row group, in the same order as `metadatas`
1333    ///
1334    /// Each value is either
1335    /// * the minimum value for the column
1336    /// * a null value, if the statistics can not be extracted
1337    ///
1338    /// Note that a null value does NOT mean the min value was actually
1339    /// `null` it means it the requested statistic is unknown
1340    ///
1341    /// # Errors
1342    ///
1343    /// Reasons for not being able to extract the statistics include:
1344    /// * the column is not present in the parquet file
1345    /// * statistics for the column are not present in the row group
1346    /// * the stored statistic value can not be converted to the requested type
1347    ///
1348    /// # Example
1349    /// ```no_run
1350    /// # use std::sync::Arc;
1351    /// # use arrow::datatypes::Schema;
1352    /// # use arrow_array::{ArrayRef, Float64Array};
1353    /// # use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
1354    /// # use parquet::file::metadata::ParquetMetaData;
1355    /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
1356    /// # fn get_arrow_schema() -> Schema { unimplemented!() }
1357    /// // Given the metadata for a parquet file and the arrow schema
1358    /// let metadata: ParquetMetaData = get_parquet_metadata();
1359    /// let arrow_schema: Schema = get_arrow_schema();
1360    /// let parquet_schema = metadata.file_metadata().schema_descr();
1361    /// // create a converter
1362    /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
1363    ///   .unwrap();
1364    /// // get the minimum value for the column "foo" in the parquet file
1365    /// let min_values: ArrayRef = converter
1366    ///   .row_group_mins(metadata.row_groups().iter())
1367    ///   .unwrap();
1368    /// // if "foo" is a Float64 value, the returned array will contain Float64 values
1369    /// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0), Some(2.0)])) as _);
1370    /// ```
1371    pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
1372    where
1373        I: IntoIterator<Item = &'a RowGroupMetaData>,
1374    {
1375        let data_type = self.arrow_field.data_type();
1376
1377        let Some(parquet_index) = self.parquet_column_index else {
1378            return Ok(self.make_null_array(data_type, metadatas));
1379        };
1380
1381        let iter = metadatas
1382            .into_iter()
1383            .map(|x| x.column(parquet_index).statistics());
1384        min_statistics(data_type, iter, self.physical_type)
1385    }
1386
1387    /// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
1388    ///
1389    /// See docs on [`Self::row_group_mins`] for details
1390    pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
1391    where
1392        I: IntoIterator<Item = &'a RowGroupMetaData>,
1393    {
1394        let data_type = self.arrow_field.data_type();
1395
1396        let Some(parquet_index) = self.parquet_column_index else {
1397            return Ok(self.make_null_array(data_type, metadatas));
1398        };
1399
1400        let iter = metadatas
1401            .into_iter()
1402            .map(|x| x.column(parquet_index).statistics());
1403        max_statistics(data_type, iter, self.physical_type)
1404    }
1405
1406    /// Extract the null counts from row group statistics in [`RowGroupMetaData`]
1407    ///
1408    /// See docs on [`Self::row_group_mins`] for details
1409    pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
1410    where
1411        I: IntoIterator<Item = &'a RowGroupMetaData>,
1412    {
1413        let Some(parquet_index) = self.parquet_column_index else {
1414            let num_row_groups = metadatas.into_iter().count();
1415            return Ok(UInt64Array::from_iter(
1416                std::iter::repeat(None).take(num_row_groups),
1417            ));
1418        };
1419
1420        let null_counts = metadatas
1421            .into_iter()
1422            .map(|x| x.column(parquet_index).statistics())
1423            .map(|s| {
1424                s.and_then(|s| {
1425                    if self.missing_null_counts_as_zero {
1426                        Some(s.null_count_opt().unwrap_or(0))
1427                    } else {
1428                        s.null_count_opt()
1429                    }
1430                })
1431            });
1432        Ok(UInt64Array::from_iter(null_counts))
1433    }
1434
1435    /// Extract the minimum values from Data Page statistics.
1436    ///
1437    /// In Parquet files, in addition to the Column Chunk level statistics
1438    /// (stored for each column for each row group) there are also
1439    /// optional statistics stored for each data page, as part of
1440    /// the [`ParquetColumnIndex`].
1441    ///
1442    /// Since a single Column Chunk is stored as one or more pages,
1443    /// page level statistics can prune at a finer granularity.
1444    ///
1445    /// However since they are stored in a separate metadata
1446    /// structure ([`Index`]) there is different code to extract them as
1447    /// compared to arrow statistics.
1448    ///
1449    /// # Parameters:
1450    ///
1451    /// * `column_page_index`: The parquet column page indices, read from
1452    ///   `ParquetMetaData` column_index
1453    ///
1454    /// * `column_offset_index`: The parquet column offset indices, read from
1455    ///   `ParquetMetaData` offset_index
1456    ///
1457    /// * `row_group_indices`: The indices of the row groups, that are used to
1458    ///   extract the column page index and offset index on a per row group
1459    ///   per column basis.
1460    ///
1461    /// # Return Value
1462    ///
1463    /// The returned array contains 1 value for each `NativeIndex`
1464    /// in the underlying `Index`es, in the same order as they appear
1465    /// in `metadatas`.
1466    ///
1467    /// For example, if there are two `Index`es in `metadatas`:
1468    /// 1. the first having `3` `PageIndex` entries
1469    /// 2. the second having `2` `PageIndex` entries
1470    ///
1471    /// The returned array would have 5 rows.
1472    ///
1473    /// Each value is either:
1474    /// * the minimum value for the page
1475    /// * a null value, if the statistics can not be extracted
1476    ///
1477    /// Note that a null value does NOT mean the min value was actually
1478    /// `null` it means it the requested statistic is unknown
1479    ///
1480    /// # Errors
1481    ///
1482    /// Reasons for not being able to extract the statistics include:
1483    /// * the column is not present in the parquet file
1484    /// * statistics for the pages are not present in the row group
1485    /// * the stored statistic value can not be converted to the requested type
1486    pub fn data_page_mins<I>(
1487        &self,
1488        column_page_index: &ParquetColumnIndex,
1489        column_offset_index: &ParquetOffsetIndex,
1490        row_group_indices: I,
1491    ) -> Result<ArrayRef>
1492    where
1493        I: IntoIterator<Item = &'a usize>,
1494    {
1495        let data_type = self.arrow_field.data_type();
1496
1497        let Some(parquet_index) = self.parquet_column_index else {
1498            return Ok(self.make_null_array(data_type, row_group_indices));
1499        };
1500
1501        let iter = row_group_indices.into_iter().map(|rg_index| {
1502            let column_page_index_per_row_group_per_column =
1503                &column_page_index[*rg_index][parquet_index];
1504            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1505                .page_locations()
1506                .len();
1507
1508            (*num_data_pages, column_page_index_per_row_group_per_column)
1509        });
1510
1511        min_page_statistics(data_type, iter, self.physical_type)
1512    }
1513
1514    /// Extract the maximum values from Data Page statistics.
1515    ///
1516    /// See docs on [`Self::data_page_mins`] for details.
1517    pub fn data_page_maxes<I>(
1518        &self,
1519        column_page_index: &ParquetColumnIndex,
1520        column_offset_index: &ParquetOffsetIndex,
1521        row_group_indices: I,
1522    ) -> Result<ArrayRef>
1523    where
1524        I: IntoIterator<Item = &'a usize>,
1525    {
1526        let data_type = self.arrow_field.data_type();
1527
1528        let Some(parquet_index) = self.parquet_column_index else {
1529            return Ok(self.make_null_array(data_type, row_group_indices));
1530        };
1531
1532        let iter = row_group_indices.into_iter().map(|rg_index| {
1533            let column_page_index_per_row_group_per_column =
1534                &column_page_index[*rg_index][parquet_index];
1535            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1536                .page_locations()
1537                .len();
1538
1539            (*num_data_pages, column_page_index_per_row_group_per_column)
1540        });
1541
1542        max_page_statistics(data_type, iter, self.physical_type)
1543    }
1544
1545    /// Returns a [`UInt64Array`] with null counts for each data page.
1546    ///
1547    /// See docs on [`Self::data_page_mins`] for details.
1548    pub fn data_page_null_counts<I>(
1549        &self,
1550        column_page_index: &ParquetColumnIndex,
1551        column_offset_index: &ParquetOffsetIndex,
1552        row_group_indices: I,
1553    ) -> Result<UInt64Array>
1554    where
1555        I: IntoIterator<Item = &'a usize>,
1556    {
1557        let Some(parquet_index) = self.parquet_column_index else {
1558            let num_row_groups = row_group_indices.into_iter().count();
1559            return Ok(UInt64Array::from_iter(
1560                std::iter::repeat(None).take(num_row_groups),
1561            ));
1562        };
1563
1564        let iter = row_group_indices.into_iter().map(|rg_index| {
1565            let column_page_index_per_row_group_per_column =
1566                &column_page_index[*rg_index][parquet_index];
1567            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1568                .page_locations()
1569                .len();
1570
1571            (*num_data_pages, column_page_index_per_row_group_per_column)
1572        });
1573        null_counts_page_statistics(iter)
1574    }
1575
1576    /// Returns a [`UInt64Array`] with row counts for each data page.
1577    ///
1578    /// This function iterates over the given row group indexes and computes
1579    /// the row count for each page in the specified column.
1580    ///
1581    /// # Parameters:
1582    ///
1583    /// * `column_offset_index`: The parquet column offset indices, read from
1584    ///   `ParquetMetaData` offset_index
1585    ///
1586    /// * `row_group_metadatas`: The metadata slice of the row groups, read
1587    ///   from `ParquetMetaData` row_groups
1588    ///
1589    /// * `row_group_indices`: The indices of the row groups, that are used to
1590    ///   extract the column offset index on a per row group per column basis.
1591    ///
1592    /// See docs on [`Self::data_page_mins`] for details.
1593    pub fn data_page_row_counts<I>(
1594        &self,
1595        column_offset_index: &ParquetOffsetIndex,
1596        row_group_metadatas: &'a [RowGroupMetaData],
1597        row_group_indices: I,
1598    ) -> Result<Option<UInt64Array>>
1599    where
1600        I: IntoIterator<Item = &'a usize>,
1601    {
1602        let Some(parquet_index) = self.parquet_column_index else {
1603            // no matching column found in parquet_index;
1604            // thus we cannot extract page_locations in order to determine
1605            // the row count on a per DataPage basis.
1606            return Ok(None);
1607        };
1608
1609        let mut row_count_total = Vec::new();
1610        for rg_idx in row_group_indices {
1611            let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
1612
1613            let row_count_per_page = page_locations
1614                .windows(2)
1615                .map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
1616
1617            // append the last page row count
1618            let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
1619            let row_count_per_page = row_count_per_page
1620                .chain(std::iter::once(Some(
1621                    *num_rows_in_row_group as u64
1622                        - page_locations.last().unwrap().first_row_index as u64,
1623                )))
1624                .collect::<Vec<_>>();
1625
1626            row_count_total.extend(row_count_per_page);
1627        }
1628
1629        Ok(Some(UInt64Array::from_iter(row_count_total)))
1630    }
1631
1632    /// Returns a null array of data_type with one element per row group
1633    fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
1634    where
1635        I: IntoIterator<Item = A>,
1636    {
1637        // column was in the arrow schema but not in the parquet schema, so return a null array
1638        let num_row_groups = metadatas.into_iter().count();
1639        new_null_array(data_type, num_row_groups)
1640    }
1641}
1642
1643// See tests in parquet/tests/arrow_reader/statistics.rs