Skip to main content

parquet/arrow/arrow_reader/
statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`].
19
20/// Notice that all the corresponding tests are in
21/// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`.
22use crate::arrow::buffer::bit_util::sign_extend_be;
23use crate::arrow::parquet_column;
24use crate::basic::Type as PhysicalType;
25use crate::errors::{ParquetError, Result};
26use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
27use crate::file::page_index::column_index::ColumnIndexMetaData;
28use crate::file::statistics::Statistics as ParquetStatistics;
29use crate::schema::types::SchemaDescriptor;
30use arrow_array::builder::{
31    BinaryBuilder, BinaryViewBuilder, BooleanBuilder, Date32Builder, Date64Builder,
32    Decimal32Builder, Decimal64Builder, FixedSizeBinaryBuilder, Float16Builder, Float32Builder,
33    Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
34    LargeStringBuilder, StringBuilder, StringViewBuilder, Time32MillisecondBuilder,
35    Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
36    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
37    TimestampSecondBuilder, UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder,
38};
39use arrow_array::{
40    ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, Decimal64Array,
41    Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int8Array,
42    Int16Array, Int32Array, Int64Array, LargeBinaryArray, Time32MillisecondArray,
43    Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
44    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt8Array,
45    UInt16Array, UInt32Array, UInt64Array, new_null_array,
46};
47use arrow_buffer::{NullBufferBuilder, i256};
48use arrow_schema::{DataType, Field, Schema, TimeUnit};
49use half::f16;
50use paste::paste;
51use std::sync::Arc;
52
53// Convert the bytes array to i32.
54// The endian of the input bytes array must be big-endian.
55pub(crate) fn from_bytes_to_i32(b: &[u8]) -> i32 {
56    // The bytes array are from parquet file and must be the big-endian.
57    // The endian is defined by parquet format, and the reference document
58    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
59    i32::from_be_bytes(sign_extend_be::<4>(b))
60}
61
62// Convert the bytes array to i64.
63// The endian of the input bytes array must be big-endian.
64pub(crate) fn from_bytes_to_i64(b: &[u8]) -> i64 {
65    i64::from_be_bytes(sign_extend_be::<8>(b))
66}
67
68// Convert the bytes array to i128.
69// The endian of the input bytes array must be big-endian.
70pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
71    i128::from_be_bytes(sign_extend_be::<16>(b))
72}
73
74// Convert the bytes array to i256.
75// The endian of the input bytes array must be big-endian.
76pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
77    i256::from_be_bytes(sign_extend_be::<32>(b))
78}
79
80// Convert the bytes array to f16
81pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
82    match b {
83        [low, high] => Some(f16::from_be_bytes([*high, *low])),
84        _ => None,
85    }
86}
87
88/// Define an adapter iterator for extracting statistics from an iterator of
89/// `ParquetStatistics`
90///
91///
92/// Handles checking if the statistics are present and valid with the correct type.
93///
94/// Parameters:
95/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`)
96/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
97/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`)
98/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`)
99macro_rules! make_stats_iterator {
100    ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
101        /// Maps an iterator of `ParquetStatistics` into an iterator of
102        /// `&$stat_value_type``
103        ///
104        /// Yielded elements:
105        /// * Some(stats) if valid
106        /// * None if the statistics are not present, not valid, or not $stat_value_type
107        struct $iterator_type<'a, I>
108        where
109            I: Iterator<Item = Option<&'a ParquetStatistics>>,
110        {
111            iter: I,
112        }
113
114        impl<'a, I> $iterator_type<'a, I>
115        where
116            I: Iterator<Item = Option<&'a ParquetStatistics>>,
117        {
118            /// Create a new iterator to extract the statistics
119            fn new(iter: I) -> Self {
120                Self { iter }
121            }
122        }
123
124        /// Implement the Iterator trait for the iterator
125        impl<'a, I> Iterator for $iterator_type<'a, I>
126        where
127            I: Iterator<Item = Option<&'a ParquetStatistics>>,
128        {
129            type Item = Option<&'a $stat_value_type>;
130
131            /// return the next statistics value
132            fn next(&mut self) -> Option<Self::Item> {
133                let next = self.iter.next();
134                next.map(|x| {
135                    x.and_then(|stats| match stats {
136                        $parquet_statistics_type(s) => s.$func(),
137                        _ => None,
138                    })
139                })
140            }
141
142            fn size_hint(&self) -> (usize, Option<usize>) {
143                self.iter.size_hint()
144            }
145        }
146    };
147}
148
149make_stats_iterator!(
150    MinBooleanStatsIterator,
151    min_opt,
152    ParquetStatistics::Boolean,
153    bool
154);
155make_stats_iterator!(
156    MaxBooleanStatsIterator,
157    max_opt,
158    ParquetStatistics::Boolean,
159    bool
160);
161make_stats_iterator!(
162    MinInt32StatsIterator,
163    min_opt,
164    ParquetStatistics::Int32,
165    i32
166);
167make_stats_iterator!(
168    MaxInt32StatsIterator,
169    max_opt,
170    ParquetStatistics::Int32,
171    i32
172);
173make_stats_iterator!(
174    MinInt64StatsIterator,
175    min_opt,
176    ParquetStatistics::Int64,
177    i64
178);
179make_stats_iterator!(
180    MaxInt64StatsIterator,
181    max_opt,
182    ParquetStatistics::Int64,
183    i64
184);
185make_stats_iterator!(
186    MinFloatStatsIterator,
187    min_opt,
188    ParquetStatistics::Float,
189    f32
190);
191make_stats_iterator!(
192    MaxFloatStatsIterator,
193    max_opt,
194    ParquetStatistics::Float,
195    f32
196);
197make_stats_iterator!(
198    MinDoubleStatsIterator,
199    min_opt,
200    ParquetStatistics::Double,
201    f64
202);
203make_stats_iterator!(
204    MaxDoubleStatsIterator,
205    max_opt,
206    ParquetStatistics::Double,
207    f64
208);
209make_stats_iterator!(
210    MinByteArrayStatsIterator,
211    min_bytes_opt,
212    ParquetStatistics::ByteArray,
213    [u8]
214);
215make_stats_iterator!(
216    MaxByteArrayStatsIterator,
217    max_bytes_opt,
218    ParquetStatistics::ByteArray,
219    [u8]
220);
221make_stats_iterator!(
222    MinFixedLenByteArrayStatsIterator,
223    min_bytes_opt,
224    ParquetStatistics::FixedLenByteArray,
225    [u8]
226);
227make_stats_iterator!(
228    MaxFixedLenByteArrayStatsIterator,
229    max_bytes_opt,
230    ParquetStatistics::FixedLenByteArray,
231    [u8]
232);
233
234/// Special iterator adapter for extracting i128 values from from an iterator of
235/// `ParquetStatistics`
236///
237/// Handles checking if the statistics are present and valid with the correct type.
238///
239/// Depending on the parquet file, the statistics for `Decimal128` can be stored as
240/// `Int32`, `Int64` or `ByteArray` or `FixedSizeByteArray` :mindblown:
241///
242/// This iterator handles all cases, extracting the values
243/// and converting it to `stat_value_type`.
244///
245/// Parameters:
246/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`)
247/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
248/// * `$bytes_func` is the function to call to get the value as bytes (e.g. `min_bytes` or `max_bytes`)
249/// * `$stat_value_type` is the type of the statistics value (e.g. `i128`)
250/// * `convert_func` is the function to convert the bytes to stats value (e.g. `from_bytes_to_i128`)
251macro_rules! make_decimal_stats_iterator {
252    ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
253        struct $iterator_type<'a, I>
254        where
255            I: Iterator<Item = Option<&'a ParquetStatistics>>,
256        {
257            iter: I,
258        }
259
260        impl<'a, I> $iterator_type<'a, I>
261        where
262            I: Iterator<Item = Option<&'a ParquetStatistics>>,
263        {
264            fn new(iter: I) -> Self {
265                Self { iter }
266            }
267        }
268
269        impl<'a, I> Iterator for $iterator_type<'a, I>
270        where
271            I: Iterator<Item = Option<&'a ParquetStatistics>>,
272        {
273            type Item = Option<$stat_value_type>;
274
275            fn next(&mut self) -> Option<Self::Item> {
276                let next = self.iter.next();
277                next.map(|x| {
278                    x.and_then(|stats| match stats {
279                        ParquetStatistics::Int32(s) => {
280                            s.$func().map(|x| $stat_value_type::from(*x))
281                        }
282                        ParquetStatistics::Int64(s) => s
283                            .$func()
284                            .map(|x| $stat_value_type::try_from(*x).ok())
285                            .flatten(),
286                        ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
287                        ParquetStatistics::FixedLenByteArray(s) => {
288                            s.$bytes_func().map($convert_func)
289                        }
290                        _ => None,
291                    })
292                })
293            }
294
295            fn size_hint(&self) -> (usize, Option<usize>) {
296                self.iter.size_hint()
297            }
298        }
299    };
300}
301
302make_decimal_stats_iterator!(
303    MinDecimal32StatsIterator,
304    min_opt,
305    min_bytes_opt,
306    i32,
307    from_bytes_to_i32
308);
309make_decimal_stats_iterator!(
310    MaxDecimal32StatsIterator,
311    max_opt,
312    max_bytes_opt,
313    i32,
314    from_bytes_to_i32
315);
316make_decimal_stats_iterator!(
317    MinDecimal64StatsIterator,
318    min_opt,
319    min_bytes_opt,
320    i64,
321    from_bytes_to_i64
322);
323make_decimal_stats_iterator!(
324    MaxDecimal64StatsIterator,
325    max_opt,
326    max_bytes_opt,
327    i64,
328    from_bytes_to_i64
329);
330make_decimal_stats_iterator!(
331    MinDecimal128StatsIterator,
332    min_opt,
333    min_bytes_opt,
334    i128,
335    from_bytes_to_i128
336);
337make_decimal_stats_iterator!(
338    MaxDecimal128StatsIterator,
339    max_opt,
340    max_bytes_opt,
341    i128,
342    from_bytes_to_i128
343);
344make_decimal_stats_iterator!(
345    MinDecimal256StatsIterator,
346    min_opt,
347    min_bytes_opt,
348    i256,
349    from_bytes_to_i256
350);
351make_decimal_stats_iterator!(
352    MaxDecimal256StatsIterator,
353    max_opt,
354    max_bytes_opt,
355    i256,
356    from_bytes_to_i256
357);
358
359/// Special macro to combine the statistics iterators for min and max using the [`mod@paste`] macro.
360/// This is used to avoid repeating the same code for min and max statistics extractions
361///
362/// Parameters:
363/// stat_type_prefix: The prefix of the statistics iterator type (e.g. `Min` or `Max`)
364/// data_type: The data type of the statistics (e.g. `DataType::Int32`)
365/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from.
366macro_rules! get_statistics {
367    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
368        paste! {
369        match $data_type {
370            DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
371                [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
372            ))),
373            DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
374                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
375                    x.and_then(|x| i8::try_from(*x).ok())
376                }),
377            ))),
378            DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
379                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
380                    x.and_then(|x| i16::try_from(*x).ok())
381                }),
382            ))),
383            DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
384                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
385            ))),
386            DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
387                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
388            ))),
389            DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
390                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
391                    x.and_then(|x| u8::try_from(*x).ok())
392                }),
393            ))),
394            DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
395                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
396                    x.and_then(|x| u16::try_from(*x).ok())
397                }),
398            ))),
399            DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
400                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
401            ))),
402            DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
403                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
404            ))),
405            DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
406                [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
407                    from_bytes_to_f16(x)
408                })),
409            ))),
410            DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
411                [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
412            ))),
413            DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
414                [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
415            ))),
416            DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
417                [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
418            ))),
419            DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
420                [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
421                    .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
422            DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
423                [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
424            DataType::Timestamp(unit, timezone) =>{
425                let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
426                Ok(match unit {
427                    TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
428                    TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
429                    TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
430                    TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
431                })
432            },
433            DataType::Time32(unit) => {
434                Ok(match unit {
435                    TimeUnit::Second =>  Arc::new(Time32SecondArray::from_iter(
436                        [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
437                    )),
438                    TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
439                        [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
440                    )),
441                    _ => {
442                        let len = $iterator.count();
443                        // don't know how to extract statistics, so return a null array
444                        new_null_array($data_type, len)
445                    }
446                })
447            },
448            DataType::Time64(unit) => {
449                Ok(match unit {
450                    TimeUnit::Microsecond =>  Arc::new(Time64MicrosecondArray::from_iter(
451                        [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
452                    )),
453                    TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
454                        [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
455                    )),
456                    _ => {
457                        let len = $iterator.count();
458                        // don't know how to extract statistics, so return a null array
459                        new_null_array($data_type, len)
460                    }
461                })
462            },
463            DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
464                [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
465            ))),
466            DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
467                [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
468            ))),
469            DataType::Utf8 => {
470                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
471                let mut builder = StringBuilder::new();
472                for x in iterator {
473                    let Some(x) = x else {
474                        builder.append_null(); // no statistics value
475                        continue;
476                    };
477
478                    let Ok(x) = std::str::from_utf8(x) else {
479                        builder.append_null();
480                        continue;
481                    };
482
483                    builder.append_value(x);
484                }
485                Ok(Arc::new(builder.finish()))
486            },
487            DataType::LargeUtf8 => {
488                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
489                let mut builder = LargeStringBuilder::new();
490                for x in iterator {
491                    let Some(x) = x else {
492                        builder.append_null(); // no statistics value
493                        continue;
494                    };
495
496                    let Ok(x) = std::str::from_utf8(x) else {
497                        builder.append_null();
498                        continue;
499                    };
500
501                    builder.append_value(x);
502                }
503                Ok(Arc::new(builder.finish()))
504            },
505            DataType::FixedSizeBinary(size) => {
506                let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
507                let mut builder = FixedSizeBinaryBuilder::new(*size);
508                for x in iterator {
509                    let Some(x) = x else {
510                        builder.append_null(); // no statistics value
511                        continue;
512                    };
513
514                    // ignore invalid values
515                    if x.len().try_into() != Ok(*size){
516                        builder.append_null();
517                        continue;
518                    }
519
520                    builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
521                }
522                Ok(Arc::new(builder.finish()))
523            },
524            DataType::Decimal32(precision, scale) => {
525                let arr = Decimal32Array::from_iter(
526                    [<$stat_type_prefix Decimal32StatsIterator>]::new($iterator)
527                ).with_precision_and_scale(*precision, *scale)?;
528                Ok(Arc::new(arr))
529            },
530            DataType::Decimal64(precision, scale) => {
531                let arr = Decimal64Array::from_iter(
532                    [<$stat_type_prefix Decimal64StatsIterator>]::new($iterator)
533                ).with_precision_and_scale(*precision, *scale)?;
534                Ok(Arc::new(arr))
535            },
536            DataType::Decimal128(precision, scale) => {
537                let arr = Decimal128Array::from_iter(
538                    [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
539                ).with_precision_and_scale(*precision, *scale)?;
540                Ok(Arc::new(arr))
541            },
542            DataType::Decimal256(precision, scale) => {
543                let arr = Decimal256Array::from_iter(
544                    [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
545                ).with_precision_and_scale(*precision, *scale)?;
546                Ok(Arc::new(arr))
547            },
548            DataType::Dictionary(_, value_type) => {
549                [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
550            },
551            DataType::Utf8View => {
552                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
553                let mut builder = StringViewBuilder::new();
554                for x in iterator {
555                    let Some(x) = x else {
556                        builder.append_null(); // no statistics value
557                        continue;
558                    };
559
560                    let Ok(x) = std::str::from_utf8(x) else {
561                        builder.append_null();
562                        continue;
563                    };
564
565                    builder.append_value(x);
566                }
567                Ok(Arc::new(builder.finish()))
568            },
569            DataType::BinaryView => {
570                let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
571                let mut builder = BinaryViewBuilder::new();
572                for x in iterator {
573                    let Some(x) = x else {
574                        builder.append_null(); // no statistics value
575                        continue;
576                    };
577
578                    builder.append_value(x);
579                }
580                Ok(Arc::new(builder.finish()))
581            }
582
583            DataType::Map(_,_) |
584            DataType::Duration(_) |
585            DataType::Interval(_) |
586            DataType::Date64 |  // required to cover $physical_type match guard
587            DataType::Null |
588            DataType::List(_) |
589            DataType::ListView(_) |
590            DataType::FixedSizeList(_, _) |
591            DataType::LargeList(_) |
592            DataType::LargeListView(_) |
593            DataType::Struct(_) |
594            DataType::Union(_, _) |
595            DataType::RunEndEncoded(_, _) => {
596                let len = $iterator.count();
597                // don't know how to extract statistics, so return a null array
598                Ok(new_null_array($data_type, len))
599            }
600        }}}
601}
602
603macro_rules! get_data_page_statistics {
604    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
605        {
606            let chunks: Vec<(usize, &ColumnIndexMetaData)> = $iterator.collect();
607            let capacity: usize = chunks.iter().map(|c| c.0).sum();
608            paste! {
609                match $data_type {
610                DataType::Boolean => {
611                    let mut b = BooleanBuilder::with_capacity(capacity);
612                    for (len, index) in chunks {
613                        match index {
614                            ColumnIndexMetaData::BOOLEAN(index) => {
615                                for val in index.[<$stat_type_prefix:lower _values_iter>]() {
616                                    b.append_option(val.copied());
617                                }
618                            }
619                            _ => b.append_nulls(len),
620                        }
621                    }
622                    Ok(Arc::new(b.finish()))
623                },
624                DataType::UInt8 => {
625                    let mut b = UInt8Builder::with_capacity(capacity);
626                    for (len, index) in chunks {
627                        match index {
628                            ColumnIndexMetaData::INT32(index) => {
629                                b.extend_from_iter_option(
630                                    index.[<$stat_type_prefix:lower _values_iter>]()
631                                        .map(|val| val.and_then(|&x| u8::try_from(x).ok())),
632                                );
633                            }
634                            _ => b.append_nulls(len),
635                        }
636                    }
637                    Ok(Arc::new(b.finish()))
638                },
639                DataType::UInt16 => {
640                    let mut b = UInt16Builder::with_capacity(capacity);
641                    for (len, index) in chunks {
642                        match index {
643                            ColumnIndexMetaData::INT32(index) => {
644                                b.extend_from_iter_option(
645                                     index.[<$stat_type_prefix:lower _values_iter>]()
646                                        .map(|val| val.and_then(|&x| u16::try_from(x).ok())),
647                                );
648                            }
649                            _ => b.append_nulls(len),
650                        }
651                    }
652                    Ok(Arc::new(b.finish()))
653                },
654                DataType::UInt32 => {
655                    let mut b = UInt32Builder::with_capacity(capacity);
656                    for (len, index) in chunks {
657                        match index {
658                            ColumnIndexMetaData::INT32(index) => {
659                                b.extend_from_iter_option(
660                                    index.[<$stat_type_prefix:lower _values_iter>]()
661                                        .map(|val| val.map(|&x| x as u32)),
662                                );
663                            }
664                            _ => b.append_nulls(len),
665                        }
666                    }
667                    Ok(Arc::new(b.finish()))
668                },
669                DataType::UInt64 => {
670                    let mut b = UInt64Builder::with_capacity(capacity);
671                    for (len, index) in chunks {
672                        match index {
673                            ColumnIndexMetaData::INT64(index) => {
674                                b.extend_from_iter_option(
675                                    index.[<$stat_type_prefix:lower _values_iter>]()
676                                        .map(|val| val.map(|&x| x as u64)),
677                                );
678                            }
679                            _ => b.append_nulls(len),
680                        }
681                    }
682                    Ok(Arc::new(b.finish()))
683                },
684                DataType::Int8 => {
685                    let mut b = Int8Builder::with_capacity(capacity);
686                    for (len, index) in chunks {
687                        match index {
688                            ColumnIndexMetaData::INT32(index) => {
689                                b.extend_from_iter_option(
690                                    index.[<$stat_type_prefix:lower _values_iter>]()
691                                        .map(|val| val.and_then(|&x| i8::try_from(x).ok())),
692                                );
693                            }
694                            _ => b.append_nulls(len),
695                        }
696                    }
697                    Ok(Arc::new(b.finish()))
698                },
699                DataType::Int16 => {
700                    let mut b = Int16Builder::with_capacity(capacity);
701                    for (len, index) in chunks {
702                        match index {
703                            ColumnIndexMetaData::INT32(index) => {
704                                b.extend_from_iter_option(
705                                    index.[<$stat_type_prefix:lower _values_iter>]()
706                                        .map(|val| val.and_then(|&x| i16::try_from(x).ok())),
707                                );
708                            }
709                            _ => b.append_nulls(len),
710                        }
711                    }
712                    Ok(Arc::new(b.finish()))
713                },
714                DataType::Int32 => {
715                    let mut b = Int32Builder::with_capacity(capacity);
716                    for (len, index) in chunks {
717                        match index {
718                            ColumnIndexMetaData::INT32(index) => {
719                                b.extend_from_iter_option(
720                                    index.[<$stat_type_prefix:lower _values_iter>]()
721                                        .map(|val| val.copied()),
722                                );
723                            }
724                            _ => b.append_nulls(len),
725                        }
726                    }
727                    Ok(Arc::new(b.finish()))
728                },
729                DataType::Int64 => {
730                    let mut b = Int64Builder::with_capacity(capacity);
731                    for (len, index) in chunks {
732                        match index {
733                            ColumnIndexMetaData::INT64(index) => {
734                                b.extend_from_iter_option(
735                                    index.[<$stat_type_prefix:lower _values_iter>]()
736                                        .map(|val| val.copied()),
737                                );
738                            }
739                            _ => b.append_nulls(len),
740                        }
741                    }
742                    Ok(Arc::new(b.finish()))
743                },
744                DataType::Float16 => {
745                    let mut b = Float16Builder::with_capacity(capacity);
746                    for (len, index) in chunks {
747                        match index {
748                            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
749                                b.extend_from_iter_option(
750                                    index.[<$stat_type_prefix:lower _values_iter>]()
751                                        .map(|val| val.and_then(|x| from_bytes_to_f16(x))),
752                                );
753                            }
754                            _ => b.append_nulls(len),
755                        }
756                    }
757                    Ok(Arc::new(b.finish()))
758                },
759                DataType::Float32 => {
760                    let mut b = Float32Builder::with_capacity(capacity);
761                    for (len, index) in chunks {
762                        match index {
763                            ColumnIndexMetaData::FLOAT(index) => {
764                                b.extend_from_iter_option(
765                                    index.[<$stat_type_prefix:lower _values_iter>]()
766                                        .map(|val| val.copied()),
767                                );
768                            }
769                            _ => b.append_nulls(len),
770                        }
771                    }
772                    Ok(Arc::new(b.finish()))
773                },
774                DataType::Float64 => {
775                    let mut b = Float64Builder::with_capacity(capacity);
776                    for (len, index) in chunks {
777                        match index {
778                            ColumnIndexMetaData::DOUBLE(index) => {
779                                b.extend_from_iter_option(
780                                    index.[<$stat_type_prefix:lower _values_iter>]()
781                                        .map(|val| val.copied()),
782                                );
783                            }
784                            _ => b.append_nulls(len),
785                        }
786                    }
787                    Ok(Arc::new(b.finish()))
788                },
789                DataType::Binary => {
790                    let mut b = BinaryBuilder::with_capacity(capacity, capacity * 10);
791                    for (len, index) in chunks {
792                        match index {
793                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
794                                for val in index.[<$stat_type_prefix:lower _values_iter>]() {
795                                    b.append_option(val.map(|x| x.as_ref()));
796                                }
797                            }
798                            _ => b.append_nulls(len),
799                        }
800                    }
801                    Ok(Arc::new(b.finish()))
802                },
803                DataType::LargeBinary => {
804                    let mut b = LargeBinaryBuilder::with_capacity(capacity, capacity * 10);
805                    for (len, index) in chunks {
806                        match index {
807                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
808                                for val in index.[<$stat_type_prefix:lower _values_iter>]() {
809                                    b.append_option(val.map(|x| x.as_ref()));
810                                }
811                            }
812                            _ => b.append_nulls(len),
813                        }
814                    }
815                    Ok(Arc::new(b.finish()))
816                },
817                DataType::Utf8 => {
818                    let mut b = StringBuilder::with_capacity(capacity, capacity * 10);
819                    for (len, index) in chunks {
820                        match index {
821                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
822                                for val in index.[<$stat_type_prefix:lower _values_iter>]() {
823                                    match val {
824                                        Some(x) => match std::str::from_utf8(x.as_ref()) {
825                                            Ok(s) => b.append_value(s),
826                                            _ => b.append_null(),
827                                        }
828                                        None => b.append_null(),
829                                    }
830                                }
831                            }
832                            _ => b.append_nulls(len),
833                        }
834                    }
835                    Ok(Arc::new(b.finish()))
836                },
837                DataType::LargeUtf8 => {
838                    let mut b = LargeStringBuilder::with_capacity(capacity, capacity * 10);
839                    for (len, index) in chunks {
840                        match index {
841                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
842                                for val in index.[<$stat_type_prefix:lower _values_iter>]() {
843                                    match val {
844                                        Some(x) => match std::str::from_utf8(x.as_ref()) {
845                                            Ok(s) => b.append_value(s),
846                                            _ => b.append_null(),
847                                        }
848                                        None => b.append_null(),
849                                    }
850                                }
851                            }
852                            _ => b.append_nulls(len),
853                        }
854                    }
855                    Ok(Arc::new(b.finish()))
856                },
857                DataType::Dictionary(_, value_type) => {
858                    [<$stat_type_prefix:lower _ page_statistics>](value_type, chunks.into_iter(), $physical_type)
859                },
860                DataType::Timestamp(unit, timezone) => {
861                    match unit {
862                        TimeUnit::Second => {
863                            let mut b = TimestampSecondBuilder::with_capacity(capacity);
864                            for (len, index) in chunks {
865                                match index {
866                                    ColumnIndexMetaData::INT64(index) => {
867                                        b.extend_from_iter_option(
868                                            index.[<$stat_type_prefix:lower _values_iter>]()
869                                                .map(|val| val.copied()),
870                                        );
871                                    }
872                                    _ => b.append_nulls(len),
873                                }
874                            }
875                            Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
876                        }
877                        TimeUnit::Millisecond => {
878                            let mut b = TimestampMillisecondBuilder::with_capacity(capacity);
879                            for (len, index) in chunks {
880                                match index {
881                                    ColumnIndexMetaData::INT64(index) => {
882                                        b.extend_from_iter_option(
883                                            index.[<$stat_type_prefix:lower _values_iter>]()
884                                                .map(|val| val.copied()),
885                                        );
886                                    }
887                                    _ => b.append_nulls(len),
888                                }
889                            }
890                            Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
891                        }
892                        TimeUnit::Microsecond => {
893                            let mut b = TimestampMicrosecondBuilder::with_capacity(capacity);
894                            for (len, index) in chunks {
895                                match index {
896                                    ColumnIndexMetaData::INT64(index) => {
897                                        b.extend_from_iter_option(
898                                            index.[<$stat_type_prefix:lower _values_iter>]()
899                                                .map(|val| val.copied()),
900                                        );
901                                    }
902                                    _ => b.append_nulls(len),
903                                }
904                            }
905                            Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
906                        }
907                        TimeUnit::Nanosecond => {
908                            let mut b = TimestampNanosecondBuilder::with_capacity(capacity);
909                            for (len, index) in chunks {
910                                match index {
911                                    ColumnIndexMetaData::INT64(index) => {
912                                        b.extend_from_iter_option(
913                                            index.[<$stat_type_prefix:lower _values_iter>]()
914                                                .map(|val| val.copied()),
915                                        );
916                                    }
917                                    _ => b.append_nulls(len),
918                                }
919                            }
920                            Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
921                        }
922                    }
923                },
924                DataType::Date32 => {
925                    let mut b = Date32Builder::with_capacity(capacity);
926                    for (len, index) in chunks {
927                        match index {
928                            ColumnIndexMetaData::INT32(index) => {
929                                b.extend_from_iter_option(
930                                    index.[<$stat_type_prefix:lower _values_iter>]()
931                                        .map(|val| val.copied()),
932                                );
933                            }
934                            _ => b.append_nulls(len),
935                        }
936                    }
937                    Ok(Arc::new(b.finish()))
938                },
939                DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> {
940                    let mut b = Date64Builder::with_capacity(capacity);
941                    for (len, index) in chunks {
942                        match index {
943                            ColumnIndexMetaData::INT32(index) => {
944                                b.extend_from_iter_option(
945                                    index.[<$stat_type_prefix:lower _values_iter>]()
946                                        .map(|val| val.map(|&x| (x as i64) * 24 * 60 * 60 * 1000)),
947                                );
948                            }
949                            _ => b.append_nulls(len),
950                        }
951                    }
952                    Ok(Arc::new(b.finish()))
953                },
954                DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => {
955                    let mut b = Date64Builder::with_capacity(capacity);
956                    for (len, index) in chunks {
957                        match index {
958                            ColumnIndexMetaData::INT64(index) => {
959                                b.extend_from_iter_option(
960                                    index.[<$stat_type_prefix:lower _values_iter>]()
961                                        .map(|val| val.copied()),
962                                );
963                            }
964                            _ => b.append_nulls(len),
965                        }
966                    }
967                    Ok(Arc::new(b.finish()))
968                },
969                DataType::Decimal32(precision, scale) => {
970                    let mut b = Decimal32Builder::with_capacity(capacity);
971                    for (len, index) in chunks {
972                        match index {
973                            ColumnIndexMetaData::INT32(index) => {
974                                b.extend_from_iter_option(
975                                    index.[<$stat_type_prefix:lower _values_iter>]()
976                                        .map(|val| val.copied()),
977                                );
978                            }
979                            ColumnIndexMetaData::INT64(index) => {
980                                b.extend_from_iter_option(
981                                    index.[<$stat_type_prefix:lower _values_iter>]()
982                                        .map(|val| val.and_then(|&x| i32::try_from(x).ok())),
983                                );
984                            }
985                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
986                                b.extend_from_iter_option(
987                                    index.[<$stat_type_prefix:lower _values_iter>]()
988                                        .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))),
989                                );
990                            }
991                            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
992                                b.extend_from_iter_option(
993                                    index.[<$stat_type_prefix:lower _values_iter>]()
994                                        .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))),
995                                );
996                            }
997                            _ => b.append_nulls(len),
998                        }
999                    }
1000                    Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
1001                },
1002                DataType::Decimal64(precision, scale) => {
1003                    let mut b = Decimal64Builder::with_capacity(capacity);
1004                    for (len, index) in chunks {
1005                        match index {
1006                            ColumnIndexMetaData::INT32(index) => {
1007                                b.extend_from_iter_option(
1008                                    index.[<$stat_type_prefix:lower _values_iter>]()
1009                                        .map(|val| val.map(|x| *x as i64)),
1010                                );
1011                            }
1012                            ColumnIndexMetaData::INT64(index) => {
1013                                b.extend_from_iter_option(
1014                                    index.[<$stat_type_prefix:lower _values_iter>]()
1015                                        .map(|val| val.copied()),
1016                                );
1017                            }
1018                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
1019                                b.extend_from_iter_option(
1020                                    index.[<$stat_type_prefix:lower _values_iter>]()
1021                                        .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))),
1022                                );
1023                            }
1024                            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
1025                                b.extend_from_iter_option(
1026                                    index.[<$stat_type_prefix:lower _values_iter>]()
1027                                        .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))),
1028                                );
1029                            }
1030                            _ => b.append_nulls(len),
1031                        }
1032                    }
1033                    Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
1034                },
1035                DataType::Decimal128(precision, scale) => {
1036                    let mut b = Decimal128Array::builder(capacity);
1037                    for (len, index) in chunks {
1038                        match index {
1039                            ColumnIndexMetaData::INT32(index) => {
1040                                b.extend_from_iter_option(
1041                                    index.[<$stat_type_prefix:lower _values_iter>]()
1042                                        .map(|val| val.map(|x| *x as i128)),
1043                                );
1044                            }
1045                            ColumnIndexMetaData::INT64(index) => {
1046                                b.extend_from_iter_option(
1047                                    index.[<$stat_type_prefix:lower _values_iter>]()
1048                                        .map(|val| val.map(|x| *x as i128)),
1049                                );
1050                            }
1051                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
1052                                b.extend_from_iter_option(
1053                                    index.[<$stat_type_prefix:lower _values_iter>]()
1054                                        .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))),
1055                                );
1056                            }
1057                            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
1058                                b.extend_from_iter_option(
1059                                    index.[<$stat_type_prefix:lower _values_iter>]()
1060                                        .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))),
1061                                );
1062                            }
1063                            _ => b.append_nulls(len),
1064                        }
1065                    }
1066                    Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
1067                },
1068                DataType::Decimal256(precision, scale) => {
1069                    let mut b = Decimal256Array::builder(capacity);
1070                    for (len, index) in chunks {
1071                        match index {
1072                            ColumnIndexMetaData::INT32(index) => {
1073                                b.extend_from_iter_option(
1074                                    index.[<$stat_type_prefix:lower _values_iter>]()
1075                                        .map(|val| val.map(|x| i256::from_i128(*x as i128))),
1076                                );
1077                            }
1078                            ColumnIndexMetaData::INT64(index) => {
1079                                b.extend_from_iter_option(
1080                                    index.[<$stat_type_prefix:lower _values_iter>]()
1081                                        .map(|val| val.map(|x| i256::from_i128(*x as i128))),
1082                                );
1083                            }
1084                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
1085                                b.extend_from_iter_option(
1086                                    index.[<$stat_type_prefix:lower _values_iter>]()
1087                                        .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))),
1088                                );
1089                            }
1090                            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
1091                                b.extend_from_iter_option(
1092                                    index.[<$stat_type_prefix:lower _values_iter>]()
1093                                        .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))),
1094                                );
1095                            }
1096                            _ => b.append_nulls(len),
1097                        }
1098                    }
1099                    Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
1100                },
1101                DataType::Time32(unit) => {
1102                    match unit {
1103                        TimeUnit::Second => {
1104                            let mut b = Time32SecondBuilder::with_capacity(capacity);
1105                            for (len, index) in chunks {
1106                                match index {
1107                                    ColumnIndexMetaData::INT32(index) => {
1108                                        b.extend_from_iter_option(
1109                                            index.[<$stat_type_prefix:lower _values_iter>]()
1110                                                .map(|val| val.copied()),
1111                                        );
1112                                    }
1113                                    _ => b.append_nulls(len),
1114                                }
1115                            }
1116                            Ok(Arc::new(b.finish()))
1117                        }
1118                        TimeUnit::Millisecond => {
1119                            let mut b = Time32MillisecondBuilder::with_capacity(capacity);
1120                            for (len, index) in chunks {
1121                                match index {
1122                                    ColumnIndexMetaData::INT32(index) => {
1123                                        b.extend_from_iter_option(
1124                                            index.[<$stat_type_prefix:lower _values_iter>]()
1125                                                .map(|val| val.copied()),
1126                                        );
1127                                    }
1128                                    _ => b.append_nulls(len),
1129                                }
1130                            }
1131                            Ok(Arc::new(b.finish()))
1132                        }
1133                        _ => {
1134                            Ok(new_null_array($data_type, capacity))
1135                        }
1136                    }
1137                }
1138                DataType::Time64(unit) => {
1139                    match unit {
1140                        TimeUnit::Microsecond => {
1141                            let mut b = Time64MicrosecondBuilder::with_capacity(capacity);
1142                            for (len, index) in chunks {
1143                                match index {
1144                                    ColumnIndexMetaData::INT64(index) => {
1145                                        b.extend_from_iter_option(
1146                                            index.[<$stat_type_prefix:lower _values_iter>]()
1147                                                .map(|val| val.copied()),
1148                                        );
1149                                    }
1150                                    _ => b.append_nulls(len),
1151                                }
1152                            }
1153                            Ok(Arc::new(b.finish()))
1154                        }
1155                        TimeUnit::Nanosecond => {
1156                            let mut b = Time64NanosecondBuilder::with_capacity(capacity);
1157                            for (len, index) in chunks {
1158                                match index {
1159                                    ColumnIndexMetaData::INT64(index) => {
1160                                        b.extend_from_iter_option(
1161                                            index.[<$stat_type_prefix:lower _values_iter>]()
1162                                                .map(|val| val.copied()),
1163                                        );
1164                                    }
1165                                    _ => b.append_nulls(len),
1166                                }
1167                            }
1168                            Ok(Arc::new(b.finish()))
1169                        }
1170                        _ => {
1171                            Ok(new_null_array($data_type, capacity))
1172                        }
1173                    }
1174                },
1175                DataType::FixedSizeBinary(size) => {
1176                    let mut b = FixedSizeBinaryBuilder::with_capacity(capacity, *size);
1177                    for (len, index) in chunks {
1178                        match index {
1179                            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
1180                                for val in index.[<$stat_type_prefix:lower _values_iter>]() {
1181                                    match val {
1182                                        Some(v) => {
1183                                           if v.len() == *size as usize {
1184                                               let _ = b.append_value(v.as_ref())?;
1185                                           } else {
1186                                               b.append_null();
1187                                           }
1188                                       }
1189                                        None => b.append_null(),
1190                                    }
1191                                }
1192                            }
1193                            _ => b.append_nulls(len),
1194                        }
1195                    }
1196                    Ok(Arc::new(b.finish()))
1197                },
1198                DataType::Utf8View => {
1199                    let mut b = StringViewBuilder::with_capacity(capacity);
1200                    for (len, index) in chunks {
1201                        match index {
1202                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
1203                                for val in index.[<$stat_type_prefix:lower _values_iter>]() {
1204                                    match val {
1205                                        Some(x) => match std::str::from_utf8(x.as_ref()) {
1206                                            Ok(s) => b.append_value(s),
1207                                            _ => b.append_null(),
1208                                        }
1209                                        None => b.append_null(),
1210                                    }
1211                                }
1212                            }
1213                            _ => {
1214                                for _ in 0..len { b.append_null(); }
1215                            }
1216                        }
1217                    }
1218                    Ok(Arc::new(b.finish()))
1219                },
1220                DataType::BinaryView => {
1221                    let mut b = BinaryViewBuilder::with_capacity(capacity);
1222                    for (len, index) in chunks {
1223                        match index {
1224                            ColumnIndexMetaData::BYTE_ARRAY(index) => {
1225                                for val in index.[<$stat_type_prefix:lower _values_iter>]() {
1226                                    match val {
1227                                        Some(v) => b.append_value(v.as_ref()),
1228                                        None => b.append_null(),
1229                                    }
1230                                }
1231                            }
1232                            _ => {
1233                                for _ in 0..len { b.append_null(); }
1234                            }
1235                        }
1236                    }
1237                    Ok(Arc::new(b.finish()))
1238                },
1239                DataType::Date64 |  // required to cover $physical_type match guard
1240                DataType::Null |
1241                DataType::Duration(_) |
1242                DataType::Interval(_) |
1243                DataType::List(_) |
1244                DataType::ListView(_) |
1245                DataType::FixedSizeList(_, _) |
1246                DataType::LargeList(_) |
1247                DataType::LargeListView(_) |
1248                DataType::Struct(_) |
1249                DataType::Union(_, _) |
1250                DataType::Map(_, _) |
1251                DataType::RunEndEncoded(_, _) => {
1252                    // don't know how to extract statistics, so return a null array
1253                    Ok(new_null_array($data_type, capacity))
1254                },
1255            }
1256        }
1257        }
1258    }
1259}
1260/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an
1261/// [`ArrayRef`]
1262///
1263/// This is an internal helper -- see [`StatisticsConverter`] for public API
1264fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1265    data_type: &DataType,
1266    iterator: I,
1267    physical_type: Option<PhysicalType>,
1268) -> Result<ArrayRef> {
1269    get_statistics!(Min, data_type, iterator, physical_type)
1270}
1271
1272/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
1273///
1274/// This is an internal helper -- see [`StatisticsConverter`] for public API
1275fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1276    data_type: &DataType,
1277    iterator: I,
1278    physical_type: Option<PhysicalType>,
1279) -> Result<ArrayRef> {
1280    get_statistics!(Max, data_type, iterator, physical_type)
1281}
1282
1283/// Extracts the min statistics from an iterator
1284/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`]
1285pub(crate) fn min_page_statistics<'a, I>(
1286    data_type: &DataType,
1287    iterator: I,
1288    physical_type: Option<PhysicalType>,
1289) -> Result<ArrayRef>
1290where
1291    I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1292{
1293    get_data_page_statistics!(Min, data_type, iterator, physical_type)
1294}
1295
1296/// Extracts the max statistics from an iterator
1297/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`]
1298pub(crate) fn max_page_statistics<'a, I>(
1299    data_type: &DataType,
1300    iterator: I,
1301    physical_type: Option<PhysicalType>,
1302) -> Result<ArrayRef>
1303where
1304    I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1305{
1306    get_data_page_statistics!(Max, data_type, iterator, physical_type)
1307}
1308
1309/// Extracts the null count statistics from an iterator
1310/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`]
1311///
1312/// The returned Array is an [`UInt64Array`]
1313pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
1314where
1315    I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1316{
1317    let chunks: Vec<_> = iterator.collect();
1318    let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum();
1319    let mut values = Vec::with_capacity(total_capacity);
1320    let mut nulls = NullBufferBuilder::new(total_capacity);
1321    for (len, index) in chunks {
1322        match index.null_counts() {
1323            Some(counts) => {
1324                values.extend(counts.iter().map(|&x| x as u64));
1325                nulls.append_n_non_nulls(len);
1326            }
1327            None => {
1328                values.resize(values.len() + len, 0);
1329                nulls.append_n_nulls(len);
1330            }
1331        }
1332    }
1333    let null_buffer = nulls.build();
1334    let array = UInt64Array::new(values.into(), null_buffer);
1335    Ok(array)
1336}
1337
1338/// Extracts Parquet statistics as Arrow arrays
1339///
1340/// This is used to convert Parquet statistics to Arrow [`ArrayRef`], with
1341/// proper type conversions. This information can be used for pruning Parquet
1342/// files, row groups, and data pages based on the statistics embedded in
1343/// Parquet metadata.
1344///
1345/// # Schemas
1346///
1347/// The converter uses the schema of the Parquet file and the Arrow schema to
1348/// convert the underlying statistics value (stored as a parquet value) into the
1349/// corresponding Arrow value. For example, Decimals are stored as binary in
1350/// parquet files and this structure handles mapping them to the `i128`
1351/// representation used in Arrow.
1352///
1353/// Note: The Parquet schema and Arrow schema do not have to be identical (for
1354/// example, the columns may be in different orders and one or the other schemas
1355/// may have additional columns). The function [`parquet_column`] is used to
1356/// match the column in the Parquet schema to the column in the Arrow schema
1357/// when using [`Self::try_new`]. For nested fields (e.g., struct fields),
1358/// where `parquet_column` does not support schema resolution, use
1359/// [`Self::from_column_index`] instead with a pre-resolved leaf column index.
1360#[derive(Debug)]
1361pub struct StatisticsConverter<'a> {
1362    /// the index of the matched column in the Parquet schema
1363    parquet_column_index: Option<usize>,
1364    /// The field (with data type) of the column in the Arrow schema
1365    arrow_field: &'a Field,
1366    /// treat missing null_counts as 0 nulls
1367    missing_null_counts_as_zero: bool,
1368    /// The physical type of the matched column in the Parquet schema
1369    physical_type: Option<PhysicalType>,
1370}
1371
1372impl<'a> StatisticsConverter<'a> {
1373    /// Return the index of the column in the Parquet schema, if any
1374    ///
1375    /// Returns `None` if the column is was present in the Arrow schema, but not
1376    /// present in the parquet file
1377    pub fn parquet_column_index(&self) -> Option<usize> {
1378        self.parquet_column_index
1379    }
1380
1381    /// Return the arrow schema's [`Field]` of the column in the Arrow schema
1382    pub fn arrow_field(&self) -> &'a Field {
1383        self.arrow_field
1384    }
1385
1386    /// Set the statistics converter to treat missing null counts as missing
1387    ///
1388    /// By default, the converter will treat missing null counts as though
1389    /// the null count is known to be `0`.
1390    ///
1391    /// Note that parquet files written by parquet-rs currently do not store
1392    /// null counts even when it is known there are zero nulls, and the reader
1393    /// will return 0 for the null counts in that instance. This behavior may
1394    /// change in a future release.
1395    ///
1396    /// Both parquet-java and parquet-cpp store null counts as 0 when there are
1397    /// no nulls, and don't write unknown values to the null count field.
1398    pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
1399        self.missing_null_counts_as_zero = missing_null_counts_as_zero;
1400        self
1401    }
1402
1403    /// Returns a [`UInt64Array`] with row counts for each row group
1404    ///
1405    /// # Return Value
1406    ///
1407    /// The returned array has no nulls, and has one value for each row group.
1408    /// Each value is the number of rows in the row group.
1409    ///
1410    /// # Example
1411    /// ```no_run
1412    /// # use arrow::datatypes::Schema;
1413    /// # use arrow_array::{ArrayRef, UInt64Array};
1414    /// # use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
1415    /// # use parquet::file::metadata::ParquetMetaData;
1416    /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
1417    /// # fn get_arrow_schema() -> Schema { unimplemented!() }
1418    /// // Given the metadata for a parquet file and the arrow schema
1419    /// let metadata: ParquetMetaData = get_parquet_metadata();
1420    /// let arrow_schema: Schema = get_arrow_schema();
1421    /// let parquet_schema = metadata.file_metadata().schema_descr();
1422    /// // create a converter
1423    /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
1424    ///   .unwrap();
1425    /// // get the row counts for each row group
1426    /// let row_counts = converter.row_group_row_counts(metadata
1427    ///   .row_groups()
1428    ///   .iter()
1429    /// ).unwrap();
1430    /// // file had 2 row groups, with 1024 and 23 rows respectively
1431    /// assert_eq!(row_counts, Some(UInt64Array::from(vec![1024, 23])));
1432    /// ```
1433    pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
1434    where
1435        I: IntoIterator<Item = &'a RowGroupMetaData>,
1436    {
1437        let Some(_) = self.parquet_column_index else {
1438            return Ok(None);
1439        };
1440
1441        let mut builder = UInt64Array::builder(10);
1442        for metadata in metadatas.into_iter() {
1443            let row_count = metadata.num_rows();
1444            let row_count: u64 = row_count.try_into().map_err(|e| {
1445                arrow_err!(format!(
1446                    "Parquet row count {row_count} too large to convert to u64: {e}"
1447                ))
1448            })?;
1449            builder.append_value(row_count);
1450        }
1451        Ok(Some(builder.finish()))
1452    }
1453
1454    /// Create a new `StatisticsConverter` to extract statistics for a column
1455    ///
1456    /// Note if there is no corresponding column in the parquet file, the returned
1457    /// arrays will be null. This can happen if the column is in the arrow
1458    /// schema but not in the parquet schema due to schema evolution.
1459    ///
1460    /// This constructor only supports top-level, non-nested columns. For nested
1461    /// fields (e.g., fields within a struct), use [`Self::from_column_index`].
1462    ///
1463    /// See example on [`Self::row_group_mins`] for usage
1464    ///
1465    /// # Errors
1466    ///
1467    /// * If the column is not found in the arrow schema
1468    pub fn try_new<'b>(
1469        column_name: &'b str,
1470        arrow_schema: &'a Schema,
1471        parquet_schema: &'a SchemaDescriptor,
1472    ) -> Result<Self> {
1473        // ensure the requested column is in the arrow schema
1474        let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
1475            return Err(arrow_err!(format!(
1476                "Column '{}' not found in schema for statistics conversion",
1477                column_name
1478            )));
1479        };
1480
1481        // find the column in the parquet schema, if not, return a null array
1482        let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
1483            Some((parquet_idx, matched_field)) => {
1484                // sanity check that matching field matches the arrow field
1485                if matched_field.as_ref() != arrow_field {
1486                    return Err(arrow_err!(format!(
1487                        "Matched column '{:?}' does not match original matched column '{:?}'",
1488                        matched_field, arrow_field
1489                    )));
1490                }
1491                Some(parquet_idx)
1492            }
1493            None => None,
1494        };
1495
1496        Ok(Self {
1497            parquet_column_index: parquet_index,
1498            arrow_field,
1499            missing_null_counts_as_zero: true,
1500            physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
1501        })
1502    }
1503
1504    /// Create a new `StatisticsConverter` from a Parquet leaf column index directly.
1505    ///
1506    /// Unlike [`Self::try_new`], this constructor bypasses schema resolution and
1507    /// accepts a Parquet column index directly. This is useful for nested fields
1508    /// (e.g., struct fields) where the caller has already resolved the mapping
1509    /// from the Arrow field to the Parquet leaf column.
1510    ///
1511    /// # Arguments
1512    ///
1513    /// * `parquet_column_index` - The index of the leaf column in the Parquet schema
1514    /// * `arrow_field` - The Arrow field describing the column's data type
1515    /// * `parquet_schema` - The Parquet schema descriptor (used to look up the physical type)
1516    ///
1517    /// The caller must ensure that `arrow_field` describes the same leaf column as
1518    /// `parquet_column_index`. This mapping is not validated by the converter; if
1519    /// the Arrow type does not match the Parquet column statistics, extraction
1520    /// returns null statistics values rather than an error.
1521    ///
1522    /// # Errors
1523    ///
1524    /// * If the `parquet_column_index` is out of bounds
1525    pub fn from_column_index(
1526        parquet_column_index: usize,
1527        arrow_field: &'a Field,
1528        parquet_schema: &'a SchemaDescriptor,
1529    ) -> Result<Self> {
1530        if parquet_column_index >= parquet_schema.columns().len() {
1531            return Err(arrow_err!(format!(
1532                "Parquet column index {} out of bounds, column count {}",
1533                parquet_column_index,
1534                parquet_schema.columns().len()
1535            )));
1536        }
1537
1538        let physical_type = parquet_schema.column(parquet_column_index).physical_type();
1539
1540        Ok(Self {
1541            parquet_column_index: Some(parquet_column_index),
1542            arrow_field,
1543            missing_null_counts_as_zero: true,
1544            physical_type: Some(physical_type),
1545        })
1546    }
1547
1548    /// Extract the minimum values from row group statistics in [`RowGroupMetaData`]
1549    ///
1550    /// # Return Value
1551    ///
1552    /// The returned array contains 1 value for each row group, in the same order as `metadatas`
1553    ///
1554    /// Each value is either
1555    /// * the minimum value for the column
1556    /// * a null value, if the statistics can not be extracted
1557    ///
1558    /// Note that a null value does NOT mean the min value was actually
1559    /// `null` it means it the requested statistic is unknown
1560    ///
1561    /// # Errors
1562    ///
1563    /// Reasons for not being able to extract the statistics include:
1564    /// * the column is not present in the parquet file
1565    /// * statistics for the column are not present in the row group
1566    /// * the stored statistic value can not be converted to the requested type
1567    ///
1568    /// # Example
1569    /// ```no_run
1570    /// # use std::sync::Arc;
1571    /// # use arrow::datatypes::Schema;
1572    /// # use arrow_array::{ArrayRef, Float64Array};
1573    /// # use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
1574    /// # use parquet::file::metadata::ParquetMetaData;
1575    /// # fn get_parquet_metadata() -> ParquetMetaData { unimplemented!() }
1576    /// # fn get_arrow_schema() -> Schema { unimplemented!() }
1577    /// // Given the metadata for a parquet file and the arrow schema
1578    /// let metadata: ParquetMetaData = get_parquet_metadata();
1579    /// let arrow_schema: Schema = get_arrow_schema();
1580    /// let parquet_schema = metadata.file_metadata().schema_descr();
1581    /// // create a converter
1582    /// let converter = StatisticsConverter::try_new("foo", &arrow_schema, parquet_schema)
1583    ///   .unwrap();
1584    /// // get the minimum value for the column "foo" in the parquet file
1585    /// let min_values: ArrayRef = converter
1586    ///   .row_group_mins(metadata.row_groups().iter())
1587    ///   .unwrap();
1588    /// // if "foo" is a Float64 value, the returned array will contain Float64 values
1589    /// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0), Some(2.0)])) as _);
1590    /// ```
1591    pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
1592    where
1593        I: IntoIterator<Item = &'a RowGroupMetaData>,
1594    {
1595        let data_type = self.arrow_field.data_type();
1596
1597        let Some(parquet_index) = self.parquet_column_index else {
1598            return Ok(self.make_null_array(data_type, metadatas));
1599        };
1600
1601        let iter = metadatas
1602            .into_iter()
1603            .map(|x| x.column(parquet_index).statistics());
1604        min_statistics(data_type, iter, self.physical_type)
1605    }
1606
1607    /// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
1608    ///
1609    /// See docs on [`Self::row_group_mins`] for details
1610    pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
1611    where
1612        I: IntoIterator<Item = &'a RowGroupMetaData>,
1613    {
1614        let data_type = self.arrow_field.data_type();
1615
1616        let Some(parquet_index) = self.parquet_column_index else {
1617            return Ok(self.make_null_array(data_type, metadatas));
1618        };
1619
1620        let iter = metadatas
1621            .into_iter()
1622            .map(|x| x.column(parquet_index).statistics());
1623        max_statistics(data_type, iter, self.physical_type)
1624    }
1625
1626    /// Extract the `is_max_value_exact` flags from row group statistics in [`RowGroupMetaData`]
1627    ///
1628    /// See docs on [`Self::row_group_maxes`] for details
1629    pub fn row_group_is_max_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1630    where
1631        I: IntoIterator<Item = &'a RowGroupMetaData>,
1632    {
1633        let Some(parquet_index) = self.parquet_column_index else {
1634            let num_row_groups = metadatas.into_iter().count();
1635            return Ok(BooleanArray::from_iter(std::iter::repeat_n(
1636                None,
1637                num_row_groups,
1638            )));
1639        };
1640
1641        let is_max_value_exact = metadatas
1642            .into_iter()
1643            .map(|x| x.column(parquet_index).statistics())
1644            .map(|s| s.map(|s| s.max_is_exact()));
1645        Ok(BooleanArray::from_iter(is_max_value_exact))
1646    }
1647
1648    /// Extract the `is_min_value_exact` flags from row group statistics in [`RowGroupMetaData`]
1649    ///
1650    /// See docs on [`Self::row_group_mins`] for details
1651    pub fn row_group_is_min_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1652    where
1653        I: IntoIterator<Item = &'a RowGroupMetaData>,
1654    {
1655        let Some(parquet_index) = self.parquet_column_index else {
1656            let num_row_groups = metadatas.into_iter().count();
1657            return Ok(BooleanArray::from_iter(std::iter::repeat_n(
1658                None,
1659                num_row_groups,
1660            )));
1661        };
1662
1663        let is_min_value_exact = metadatas
1664            .into_iter()
1665            .map(|x| x.column(parquet_index).statistics())
1666            .map(|s| s.map(|s| s.min_is_exact()));
1667        Ok(BooleanArray::from_iter(is_min_value_exact))
1668    }
1669
1670    /// Extract the null counts from row group statistics in [`RowGroupMetaData`]
1671    ///
1672    /// See docs on [`Self::row_group_mins`] for details
1673    pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
1674    where
1675        I: IntoIterator<Item = &'a RowGroupMetaData>,
1676    {
1677        let Some(parquet_index) = self.parquet_column_index else {
1678            let num_row_groups = metadatas.into_iter().count();
1679            return Ok(UInt64Array::from_iter(std::iter::repeat_n(
1680                None,
1681                num_row_groups,
1682            )));
1683        };
1684
1685        let null_counts = metadatas
1686            .into_iter()
1687            .map(|x| x.column(parquet_index).statistics())
1688            .map(|s| {
1689                s.and_then(|s| {
1690                    if self.missing_null_counts_as_zero {
1691                        Some(s.null_count_opt().unwrap_or(0))
1692                    } else {
1693                        s.null_count_opt()
1694                    }
1695                })
1696            });
1697        Ok(UInt64Array::from_iter(null_counts))
1698    }
1699
1700    /// Extract the minimum values from Data Page statistics.
1701    ///
1702    /// In Parquet files, in addition to the Column Chunk level statistics
1703    /// (stored for each column for each row group) there are also
1704    /// optional statistics stored for each data page, as part of
1705    /// the [`ParquetColumnIndex`].
1706    ///
1707    /// Since a single Column Chunk is stored as one or more pages,
1708    /// page level statistics can prune at a finer granularity.
1709    ///
1710    /// However since they are stored in a separate metadata
1711    /// structure ([`ColumnIndexMetaData`]) there is different code to extract them as
1712    /// compared to arrow statistics.
1713    ///
1714    /// # Parameters:
1715    ///
1716    /// * `column_page_index`: The parquet column page indices, read from
1717    ///   `ParquetMetaData` column_index
1718    ///
1719    /// * `column_offset_index`: The parquet column offset indices, read from
1720    ///   `ParquetMetaData` offset_index
1721    ///
1722    /// * `row_group_indices`: The indices of the row groups, that are used to
1723    ///   extract the column page index and offset index on a per row group
1724    ///   per column basis.
1725    ///
1726    /// # Return Value
1727    ///
1728    /// The returned array contains 1 value for each `NativeIndex`
1729    /// in the underlying `Index`es, in the same order as they appear
1730    /// in `metadatas`.
1731    ///
1732    /// For example, if there are two `Index`es in `metadatas`:
1733    /// 1. the first having `3` `PageIndex` entries
1734    /// 2. the second having `2` `PageIndex` entries
1735    ///
1736    /// The returned array would have 5 rows.
1737    ///
1738    /// Each value is either:
1739    /// * the minimum value for the page
1740    /// * a null value, if the statistics can not be extracted
1741    ///
1742    /// Note that a null value does NOT mean the min value was actually
1743    /// `null` it means it the requested statistic is unknown
1744    ///
1745    /// # Errors
1746    ///
1747    /// Reasons for not being able to extract the statistics include:
1748    /// * the column is not present in the parquet file
1749    /// * statistics for the pages are not present in the row group
1750    /// * the stored statistic value can not be converted to the requested type
1751    pub fn data_page_mins<I>(
1752        &self,
1753        column_page_index: &ParquetColumnIndex,
1754        column_offset_index: &ParquetOffsetIndex,
1755        row_group_indices: I,
1756    ) -> Result<ArrayRef>
1757    where
1758        I: IntoIterator<Item = &'a usize>,
1759    {
1760        let data_type = self.arrow_field.data_type();
1761
1762        let Some(parquet_index) = self.parquet_column_index else {
1763            return Ok(self.make_null_array(data_type, row_group_indices));
1764        };
1765
1766        let iter = row_group_indices.into_iter().map(|rg_index| {
1767            let column_page_index_per_row_group_per_column =
1768                &column_page_index[*rg_index][parquet_index];
1769            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1770                .page_locations()
1771                .len();
1772
1773            (*num_data_pages, column_page_index_per_row_group_per_column)
1774        });
1775
1776        min_page_statistics(data_type, iter, self.physical_type)
1777    }
1778
1779    /// Extract the maximum values from Data Page statistics.
1780    ///
1781    /// See docs on [`Self::data_page_mins`] for details.
1782    pub fn data_page_maxes<I>(
1783        &self,
1784        column_page_index: &ParquetColumnIndex,
1785        column_offset_index: &ParquetOffsetIndex,
1786        row_group_indices: I,
1787    ) -> Result<ArrayRef>
1788    where
1789        I: IntoIterator<Item = &'a usize>,
1790    {
1791        let data_type = self.arrow_field.data_type();
1792
1793        let Some(parquet_index) = self.parquet_column_index else {
1794            return Ok(self.make_null_array(data_type, row_group_indices));
1795        };
1796
1797        let iter = row_group_indices.into_iter().map(|rg_index| {
1798            let column_page_index_per_row_group_per_column =
1799                &column_page_index[*rg_index][parquet_index];
1800            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1801                .page_locations()
1802                .len();
1803
1804            (*num_data_pages, column_page_index_per_row_group_per_column)
1805        });
1806
1807        max_page_statistics(data_type, iter, self.physical_type)
1808    }
1809
1810    /// Returns a [`UInt64Array`] with null counts for each data page.
1811    ///
1812    /// See docs on [`Self::data_page_mins`] for details.
1813    pub fn data_page_null_counts<I>(
1814        &self,
1815        column_page_index: &ParquetColumnIndex,
1816        column_offset_index: &ParquetOffsetIndex,
1817        row_group_indices: I,
1818    ) -> Result<UInt64Array>
1819    where
1820        I: IntoIterator<Item = &'a usize>,
1821    {
1822        let Some(parquet_index) = self.parquet_column_index else {
1823            let num_row_groups = row_group_indices.into_iter().count();
1824            return Ok(UInt64Array::new_null(num_row_groups));
1825        };
1826
1827        let iter = row_group_indices.into_iter().map(|rg_index| {
1828            let column_page_index_per_row_group_per_column =
1829                &column_page_index[*rg_index][parquet_index];
1830            let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1831                .page_locations()
1832                .len();
1833
1834            (*num_data_pages, column_page_index_per_row_group_per_column)
1835        });
1836        null_counts_page_statistics(iter)
1837    }
1838
1839    /// Returns a [`UInt64Array`] with row counts for each data page.
1840    ///
1841    /// This function iterates over the given row group indexes and computes
1842    /// the row count for each page in the specified column.
1843    ///
1844    /// # Parameters:
1845    ///
1846    /// * `column_offset_index`: The parquet column offset indices, read from
1847    ///   `ParquetMetaData` offset_index
1848    ///
1849    /// * `row_group_metadatas`: The metadata slice of the row groups, read
1850    ///   from `ParquetMetaData` row_groups
1851    ///
1852    /// * `row_group_indices`: The indices of the row groups, that are used to
1853    ///   extract the column offset index on a per row group per column basis.
1854    ///
1855    /// See docs on [`Self::data_page_mins`] for details.
1856    pub fn data_page_row_counts<I>(
1857        &self,
1858        column_offset_index: &ParquetOffsetIndex,
1859        row_group_metadatas: &'a [RowGroupMetaData],
1860        row_group_indices: I,
1861    ) -> Result<Option<UInt64Array>>
1862    where
1863        I: IntoIterator<Item = &'a usize>,
1864    {
1865        let Some(parquet_index) = self.parquet_column_index else {
1866            // no matching column found in parquet_index;
1867            // thus we cannot extract page_locations in order to determine
1868            // the row count on a per DataPage basis.
1869            return Ok(None);
1870        };
1871
1872        let mut row_counts = Vec::new();
1873        let mut nulls = NullBufferBuilder::new(0);
1874        for rg_idx in row_group_indices {
1875            let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
1876
1877            let row_count_per_page = page_locations
1878                .windows(2)
1879                .map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
1880
1881            // append the last page row count
1882            let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
1883            let row_count_per_page = row_count_per_page.chain(std::iter::once(Some(
1884                *num_rows_in_row_group as u64
1885                    - page_locations.last().unwrap().first_row_index as u64,
1886            )));
1887
1888            row_counts.extend(row_count_per_page.clone().map(|x| x.unwrap_or(0)));
1889            for val in row_count_per_page {
1890                if val.is_some() {
1891                    nulls.append_non_null();
1892                } else {
1893                    nulls.append_null();
1894                }
1895            }
1896        }
1897
1898        Ok(Some(UInt64Array::new(row_counts.into(), nulls.build())))
1899    }
1900
1901    /// Returns a null array of data_type with one element per row group
1902    fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
1903    where
1904        I: IntoIterator<Item = A>,
1905    {
1906        // column was in the arrow schema but not in the parquet schema, so return a null array
1907        let num_row_groups = metadatas.into_iter().count();
1908        new_null_array(data_type, num_row_groups)
1909    }
1910}
1911
1912// See tests in parquet/tests/arrow_reader/statistics.rs