1use crate::arrow::buffer::bit_util::sign_extend_be;
23use crate::arrow::parquet_column;
24use crate::basic::Type as PhysicalType;
25use crate::data_type::{ByteArray, FixedLenByteArray};
26use crate::errors::{ParquetError, Result};
27use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
28use crate::file::page_index::index::{Index, PageIndex};
29use crate::file::statistics::Statistics as ParquetStatistics;
30use crate::schema::types::SchemaDescriptor;
31use arrow_array::builder::{
32 BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
33 StringViewBuilder,
34};
35use arrow_array::{
36 new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
37 Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array, Float16Array, Float32Array,
38 Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
39 Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
40 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
41 TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
42};
43use arrow_buffer::i256;
44use arrow_schema::{DataType, Field, Schema, TimeUnit};
45use half::f16;
46use paste::paste;
47use std::sync::Arc;
48
49pub(crate) fn from_bytes_to_i32(b: &[u8]) -> i32 {
52 i32::from_be_bytes(sign_extend_be::<4>(b))
56}
57
58pub(crate) fn from_bytes_to_i64(b: &[u8]) -> i64 {
61 i64::from_be_bytes(sign_extend_be::<8>(b))
62}
63
64pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
67 i128::from_be_bytes(sign_extend_be::<16>(b))
68}
69
70pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
73 i256::from_be_bytes(sign_extend_be::<32>(b))
74}
75
76pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
78 match b {
79 [low, high] => Some(f16::from_be_bytes([*high, *low])),
80 _ => None,
81 }
82}
83
84macro_rules! make_stats_iterator {
96 ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
97 struct $iterator_type<'a, I>
104 where
105 I: Iterator<Item = Option<&'a ParquetStatistics>>,
106 {
107 iter: I,
108 }
109
110 impl<'a, I> $iterator_type<'a, I>
111 where
112 I: Iterator<Item = Option<&'a ParquetStatistics>>,
113 {
114 fn new(iter: I) -> Self {
116 Self { iter }
117 }
118 }
119
120 impl<'a, I> Iterator for $iterator_type<'a, I>
122 where
123 I: Iterator<Item = Option<&'a ParquetStatistics>>,
124 {
125 type Item = Option<&'a $stat_value_type>;
126
127 fn next(&mut self) -> Option<Self::Item> {
129 let next = self.iter.next();
130 next.map(|x| {
131 x.and_then(|stats| match stats {
132 $parquet_statistics_type(s) => s.$func(),
133 _ => None,
134 })
135 })
136 }
137
138 fn size_hint(&self) -> (usize, Option<usize>) {
139 self.iter.size_hint()
140 }
141 }
142 };
143}
144
145make_stats_iterator!(
146 MinBooleanStatsIterator,
147 min_opt,
148 ParquetStatistics::Boolean,
149 bool
150);
151make_stats_iterator!(
152 MaxBooleanStatsIterator,
153 max_opt,
154 ParquetStatistics::Boolean,
155 bool
156);
157make_stats_iterator!(
158 MinInt32StatsIterator,
159 min_opt,
160 ParquetStatistics::Int32,
161 i32
162);
163make_stats_iterator!(
164 MaxInt32StatsIterator,
165 max_opt,
166 ParquetStatistics::Int32,
167 i32
168);
169make_stats_iterator!(
170 MinInt64StatsIterator,
171 min_opt,
172 ParquetStatistics::Int64,
173 i64
174);
175make_stats_iterator!(
176 MaxInt64StatsIterator,
177 max_opt,
178 ParquetStatistics::Int64,
179 i64
180);
181make_stats_iterator!(
182 MinFloatStatsIterator,
183 min_opt,
184 ParquetStatistics::Float,
185 f32
186);
187make_stats_iterator!(
188 MaxFloatStatsIterator,
189 max_opt,
190 ParquetStatistics::Float,
191 f32
192);
193make_stats_iterator!(
194 MinDoubleStatsIterator,
195 min_opt,
196 ParquetStatistics::Double,
197 f64
198);
199make_stats_iterator!(
200 MaxDoubleStatsIterator,
201 max_opt,
202 ParquetStatistics::Double,
203 f64
204);
205make_stats_iterator!(
206 MinByteArrayStatsIterator,
207 min_bytes_opt,
208 ParquetStatistics::ByteArray,
209 [u8]
210);
211make_stats_iterator!(
212 MaxByteArrayStatsIterator,
213 max_bytes_opt,
214 ParquetStatistics::ByteArray,
215 [u8]
216);
217make_stats_iterator!(
218 MinFixedLenByteArrayStatsIterator,
219 min_bytes_opt,
220 ParquetStatistics::FixedLenByteArray,
221 [u8]
222);
223make_stats_iterator!(
224 MaxFixedLenByteArrayStatsIterator,
225 max_bytes_opt,
226 ParquetStatistics::FixedLenByteArray,
227 [u8]
228);
229
230macro_rules! make_decimal_stats_iterator {
248 ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
249 struct $iterator_type<'a, I>
250 where
251 I: Iterator<Item = Option<&'a ParquetStatistics>>,
252 {
253 iter: I,
254 }
255
256 impl<'a, I> $iterator_type<'a, I>
257 where
258 I: Iterator<Item = Option<&'a ParquetStatistics>>,
259 {
260 fn new(iter: I) -> Self {
261 Self { iter }
262 }
263 }
264
265 impl<'a, I> Iterator for $iterator_type<'a, I>
266 where
267 I: Iterator<Item = Option<&'a ParquetStatistics>>,
268 {
269 type Item = Option<$stat_value_type>;
270
271 fn next(&mut self) -> Option<Self::Item> {
272 let next = self.iter.next();
273 next.map(|x| {
274 x.and_then(|stats| match stats {
275 ParquetStatistics::Int32(s) => {
276 s.$func().map(|x| $stat_value_type::from(*x))
277 }
278 ParquetStatistics::Int64(s) => s
279 .$func()
280 .map(|x| $stat_value_type::try_from(*x).ok())
281 .flatten(),
282 ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
283 ParquetStatistics::FixedLenByteArray(s) => {
284 s.$bytes_func().map($convert_func)
285 }
286 _ => None,
287 })
288 })
289 }
290
291 fn size_hint(&self) -> (usize, Option<usize>) {
292 self.iter.size_hint()
293 }
294 }
295 };
296}
297
298make_decimal_stats_iterator!(
299 MinDecimal32StatsIterator,
300 min_opt,
301 min_bytes_opt,
302 i32,
303 from_bytes_to_i32
304);
305make_decimal_stats_iterator!(
306 MaxDecimal32StatsIterator,
307 max_opt,
308 max_bytes_opt,
309 i32,
310 from_bytes_to_i32
311);
312make_decimal_stats_iterator!(
313 MinDecimal64StatsIterator,
314 min_opt,
315 min_bytes_opt,
316 i64,
317 from_bytes_to_i64
318);
319make_decimal_stats_iterator!(
320 MaxDecimal64StatsIterator,
321 max_opt,
322 max_bytes_opt,
323 i64,
324 from_bytes_to_i64
325);
326make_decimal_stats_iterator!(
327 MinDecimal128StatsIterator,
328 min_opt,
329 min_bytes_opt,
330 i128,
331 from_bytes_to_i128
332);
333make_decimal_stats_iterator!(
334 MaxDecimal128StatsIterator,
335 max_opt,
336 max_bytes_opt,
337 i128,
338 from_bytes_to_i128
339);
340make_decimal_stats_iterator!(
341 MinDecimal256StatsIterator,
342 min_opt,
343 min_bytes_opt,
344 i256,
345 from_bytes_to_i256
346);
347make_decimal_stats_iterator!(
348 MaxDecimal256StatsIterator,
349 max_opt,
350 max_bytes_opt,
351 i256,
352 from_bytes_to_i256
353);
354
355macro_rules! get_statistics {
363 ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
364 paste! {
365 match $data_type {
366 DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
367 [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
368 ))),
369 DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
370 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
371 x.and_then(|x| i8::try_from(*x).ok())
372 }),
373 ))),
374 DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
375 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
376 x.and_then(|x| i16::try_from(*x).ok())
377 }),
378 ))),
379 DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
380 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
381 ))),
382 DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
383 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
384 ))),
385 DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
386 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
387 x.and_then(|x| u8::try_from(*x).ok())
388 }),
389 ))),
390 DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
391 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
392 x.and_then(|x| u16::try_from(*x).ok())
393 }),
394 ))),
395 DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
396 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
397 ))),
398 DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
399 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
400 ))),
401 DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
402 [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
403 from_bytes_to_f16(x)
404 })),
405 ))),
406 DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
407 [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
408 ))),
409 DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
410 [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
411 ))),
412 DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
413 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
414 ))),
415 DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
416 [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
417 .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
418 DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
419 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
420 DataType::Timestamp(unit, timezone) =>{
421 let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
422 Ok(match unit {
423 TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
424 TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
425 TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
426 TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
427 })
428 },
429 DataType::Time32(unit) => {
430 Ok(match unit {
431 TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
432 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
433 )),
434 TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
435 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
436 )),
437 _ => {
438 let len = $iterator.count();
439 new_null_array($data_type, len)
441 }
442 })
443 },
444 DataType::Time64(unit) => {
445 Ok(match unit {
446 TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
447 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
448 )),
449 TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
450 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
451 )),
452 _ => {
453 let len = $iterator.count();
454 new_null_array($data_type, len)
456 }
457 })
458 },
459 DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
460 [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
461 ))),
462 DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
463 [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
464 ))),
465 DataType::Utf8 => {
466 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
467 let mut builder = StringBuilder::new();
468 for x in iterator {
469 let Some(x) = x else {
470 builder.append_null(); continue;
472 };
473
474 let Ok(x) = std::str::from_utf8(x) else {
475 builder.append_null();
476 continue;
477 };
478
479 builder.append_value(x);
480 }
481 Ok(Arc::new(builder.finish()))
482 },
483 DataType::LargeUtf8 => {
484 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
485 let mut builder = LargeStringBuilder::new();
486 for x in iterator {
487 let Some(x) = x else {
488 builder.append_null(); continue;
490 };
491
492 let Ok(x) = std::str::from_utf8(x) else {
493 builder.append_null();
494 continue;
495 };
496
497 builder.append_value(x);
498 }
499 Ok(Arc::new(builder.finish()))
500 },
501 DataType::FixedSizeBinary(size) => {
502 let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
503 let mut builder = FixedSizeBinaryBuilder::new(*size);
504 for x in iterator {
505 let Some(x) = x else {
506 builder.append_null(); continue;
508 };
509
510 if x.len().try_into() != Ok(*size){
512 builder.append_null();
513 continue;
514 }
515
516 builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
517 }
518 Ok(Arc::new(builder.finish()))
519 },
520 DataType::Decimal32(precision, scale) => {
521 let arr = Decimal32Array::from_iter(
522 [<$stat_type_prefix Decimal32StatsIterator>]::new($iterator)
523 ).with_precision_and_scale(*precision, *scale)?;
524 Ok(Arc::new(arr))
525 },
526 DataType::Decimal64(precision, scale) => {
527 let arr = Decimal64Array::from_iter(
528 [<$stat_type_prefix Decimal64StatsIterator>]::new($iterator)
529 ).with_precision_and_scale(*precision, *scale)?;
530 Ok(Arc::new(arr))
531 },
532 DataType::Decimal128(precision, scale) => {
533 let arr = Decimal128Array::from_iter(
534 [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
535 ).with_precision_and_scale(*precision, *scale)?;
536 Ok(Arc::new(arr))
537 },
538 DataType::Decimal256(precision, scale) => {
539 let arr = Decimal256Array::from_iter(
540 [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
541 ).with_precision_and_scale(*precision, *scale)?;
542 Ok(Arc::new(arr))
543 },
544 DataType::Dictionary(_, value_type) => {
545 [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
546 },
547 DataType::Utf8View => {
548 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
549 let mut builder = StringViewBuilder::new();
550 for x in iterator {
551 let Some(x) = x else {
552 builder.append_null(); continue;
554 };
555
556 let Ok(x) = std::str::from_utf8(x) else {
557 builder.append_null();
558 continue;
559 };
560
561 builder.append_value(x);
562 }
563 Ok(Arc::new(builder.finish()))
564 },
565 DataType::BinaryView => {
566 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
567 let mut builder = BinaryViewBuilder::new();
568 for x in iterator {
569 let Some(x) = x else {
570 builder.append_null(); continue;
572 };
573
574 builder.append_value(x);
575 }
576 Ok(Arc::new(builder.finish()))
577 }
578
579 DataType::Map(_,_) |
580 DataType::Duration(_) |
581 DataType::Interval(_) |
582 DataType::Date64 | DataType::Null |
584 DataType::List(_) |
585 DataType::ListView(_) |
586 DataType::FixedSizeList(_, _) |
587 DataType::LargeList(_) |
588 DataType::LargeListView(_) |
589 DataType::Struct(_) |
590 DataType::Union(_, _) |
591 DataType::RunEndEncoded(_, _) => {
592 let len = $iterator.count();
593 Ok(new_null_array($data_type, len))
595 }
596 }}}
597}
598
599macro_rules! make_data_page_stats_iterator {
600 ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => {
601 struct $iterator_type<'a, I>
602 where
603 I: Iterator<Item = (usize, &'a Index)>,
604 {
605 iter: I,
606 }
607
608 impl<'a, I> $iterator_type<'a, I>
609 where
610 I: Iterator<Item = (usize, &'a Index)>,
611 {
612 fn new(iter: I) -> Self {
613 Self { iter }
614 }
615 }
616
617 impl<'a, I> Iterator for $iterator_type<'a, I>
618 where
619 I: Iterator<Item = (usize, &'a Index)>,
620 {
621 type Item = Vec<Option<$stat_value_type>>;
622
623 fn next(&mut self) -> Option<Self::Item> {
624 let next = self.iter.next();
625 match next {
626 Some((len, index)) => match index {
627 $index_type(native_index) => {
628 Some(native_index.indexes.iter().map($func).collect::<Vec<_>>())
629 }
630 _ => Some(vec![None; len]),
637 },
638 _ => None,
639 }
640 }
641
642 fn size_hint(&self) -> (usize, Option<usize>) {
643 self.iter.size_hint()
644 }
645 }
646 };
647}
648
649make_data_page_stats_iterator!(
650 MinBooleanDataPageStatsIterator,
651 |x: &PageIndex<bool>| { x.min },
652 Index::BOOLEAN,
653 bool
654);
655make_data_page_stats_iterator!(
656 MaxBooleanDataPageStatsIterator,
657 |x: &PageIndex<bool>| { x.max },
658 Index::BOOLEAN,
659 bool
660);
661make_data_page_stats_iterator!(
662 MinInt32DataPageStatsIterator,
663 |x: &PageIndex<i32>| { x.min },
664 Index::INT32,
665 i32
666);
667make_data_page_stats_iterator!(
668 MaxInt32DataPageStatsIterator,
669 |x: &PageIndex<i32>| { x.max },
670 Index::INT32,
671 i32
672);
673make_data_page_stats_iterator!(
674 MinInt64DataPageStatsIterator,
675 |x: &PageIndex<i64>| { x.min },
676 Index::INT64,
677 i64
678);
679make_data_page_stats_iterator!(
680 MaxInt64DataPageStatsIterator,
681 |x: &PageIndex<i64>| { x.max },
682 Index::INT64,
683 i64
684);
685make_data_page_stats_iterator!(
686 MinFloat16DataPageStatsIterator,
687 |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
688 Index::FIXED_LEN_BYTE_ARRAY,
689 FixedLenByteArray
690);
691make_data_page_stats_iterator!(
692 MaxFloat16DataPageStatsIterator,
693 |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
694 Index::FIXED_LEN_BYTE_ARRAY,
695 FixedLenByteArray
696);
697make_data_page_stats_iterator!(
698 MinFloat32DataPageStatsIterator,
699 |x: &PageIndex<f32>| { x.min },
700 Index::FLOAT,
701 f32
702);
703make_data_page_stats_iterator!(
704 MaxFloat32DataPageStatsIterator,
705 |x: &PageIndex<f32>| { x.max },
706 Index::FLOAT,
707 f32
708);
709make_data_page_stats_iterator!(
710 MinFloat64DataPageStatsIterator,
711 |x: &PageIndex<f64>| { x.min },
712 Index::DOUBLE,
713 f64
714);
715make_data_page_stats_iterator!(
716 MaxFloat64DataPageStatsIterator,
717 |x: &PageIndex<f64>| { x.max },
718 Index::DOUBLE,
719 f64
720);
721make_data_page_stats_iterator!(
722 MinByteArrayDataPageStatsIterator,
723 |x: &PageIndex<ByteArray>| { x.min.clone() },
724 Index::BYTE_ARRAY,
725 ByteArray
726);
727make_data_page_stats_iterator!(
728 MaxByteArrayDataPageStatsIterator,
729 |x: &PageIndex<ByteArray>| { x.max.clone() },
730 Index::BYTE_ARRAY,
731 ByteArray
732);
733make_data_page_stats_iterator!(
734 MaxFixedLenByteArrayDataPageStatsIterator,
735 |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
736 Index::FIXED_LEN_BYTE_ARRAY,
737 FixedLenByteArray
738);
739
740make_data_page_stats_iterator!(
741 MinFixedLenByteArrayDataPageStatsIterator,
742 |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
743 Index::FIXED_LEN_BYTE_ARRAY,
744 FixedLenByteArray
745);
746
747macro_rules! get_decimal_page_stats_iterator {
748 ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
749 struct $iterator_type<'a, I>
750 where
751 I: Iterator<Item = (usize, &'a Index)>,
752 {
753 iter: I,
754 }
755
756 impl<'a, I> $iterator_type<'a, I>
757 where
758 I: Iterator<Item = (usize, &'a Index)>,
759 {
760 fn new(iter: I) -> Self {
761 Self { iter }
762 }
763 }
764
765 impl<'a, I> Iterator for $iterator_type<'a, I>
766 where
767 I: Iterator<Item = (usize, &'a Index)>,
768 {
769 type Item = Vec<Option<$stat_value_type>>;
770
771 fn next(&mut self) -> Option<Self::Item> {
772 let next = self.iter.next();
773 match next {
774 Some((len, index)) => match index {
775 Index::INT32(native_index) => Some(
776 native_index
777 .indexes
778 .iter()
779 .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x))))
780 .collect::<Vec<_>>(),
781 ),
782 Index::INT64(native_index) => Some(
783 native_index
784 .indexes
785 .iter()
786 .map(|x| x.$func.and_then(|x| $stat_value_type::try_from(x).ok()))
787 .collect::<Vec<_>>(),
788 ),
789 Index::BYTE_ARRAY(native_index) => Some(
790 native_index
791 .indexes
792 .iter()
793 .map(|x| {
794 x.clone().$func.and_then(|x| Some($convert_func(x.data())))
795 })
796 .collect::<Vec<_>>(),
797 ),
798 Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
799 native_index
800 .indexes
801 .iter()
802 .map(|x| {
803 x.clone().$func.and_then(|x| Some($convert_func(x.data())))
804 })
805 .collect::<Vec<_>>(),
806 ),
807 _ => Some(vec![None; len]),
808 },
809 _ => None,
810 }
811 }
812
813 fn size_hint(&self) -> (usize, Option<usize>) {
814 self.iter.size_hint()
815 }
816 }
817 };
818}
819
820get_decimal_page_stats_iterator!(
821 MinDecimal32DataPageStatsIterator,
822 min,
823 i32,
824 from_bytes_to_i32
825);
826
827get_decimal_page_stats_iterator!(
828 MaxDecimal32DataPageStatsIterator,
829 max,
830 i32,
831 from_bytes_to_i32
832);
833
834get_decimal_page_stats_iterator!(
835 MinDecimal64DataPageStatsIterator,
836 min,
837 i64,
838 from_bytes_to_i64
839);
840
841get_decimal_page_stats_iterator!(
842 MaxDecimal64DataPageStatsIterator,
843 max,
844 i64,
845 from_bytes_to_i64
846);
847
848get_decimal_page_stats_iterator!(
849 MinDecimal128DataPageStatsIterator,
850 min,
851 i128,
852 from_bytes_to_i128
853);
854
855get_decimal_page_stats_iterator!(
856 MaxDecimal128DataPageStatsIterator,
857 max,
858 i128,
859 from_bytes_to_i128
860);
861
862get_decimal_page_stats_iterator!(
863 MinDecimal256DataPageStatsIterator,
864 min,
865 i256,
866 from_bytes_to_i256
867);
868
869get_decimal_page_stats_iterator!(
870 MaxDecimal256DataPageStatsIterator,
871 max,
872 i256,
873 from_bytes_to_i256
874);
875
876macro_rules! get_data_page_statistics {
877 ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
878 paste! {
879 match $data_type {
880 DataType::Boolean => {
881 let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
882 let mut builder = BooleanBuilder::new();
883 for x in iterator {
884 for x in x.into_iter() {
885 let Some(x) = x else {
886 builder.append_null(); continue;
888 };
889 builder.append_value(x);
890 }
891 }
892 Ok(Arc::new(builder.finish()))
893 },
894 DataType::UInt8 => Ok(Arc::new(
895 UInt8Array::from_iter(
896 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
897 .map(|x| {
898 x.into_iter().map(|x| {
899 x.and_then(|x| u8::try_from(x).ok())
900 })
901 })
902 .flatten()
903 )
904 )),
905 DataType::UInt16 => Ok(Arc::new(
906 UInt16Array::from_iter(
907 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
908 .map(|x| {
909 x.into_iter().map(|x| {
910 x.and_then(|x| u16::try_from(x).ok())
911 })
912 })
913 .flatten()
914 )
915 )),
916 DataType::UInt32 => Ok(Arc::new(
917 UInt32Array::from_iter(
918 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
919 .map(|x| {
920 x.into_iter().map(|x| {
921 x.and_then(|x| Some(x as u32))
922 })
923 })
924 .flatten()
925 ))),
926 DataType::UInt64 => Ok(Arc::new(
927 UInt64Array::from_iter(
928 [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator)
929 .map(|x| {
930 x.into_iter().map(|x| {
931 x.and_then(|x| Some(x as u64))
932 })
933 })
934 .flatten()
935 ))),
936 DataType::Int8 => Ok(Arc::new(
937 Int8Array::from_iter(
938 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
939 .map(|x| {
940 x.into_iter().map(|x| {
941 x.and_then(|x| i8::try_from(x).ok())
942 })
943 })
944 .flatten()
945 )
946 )),
947 DataType::Int16 => Ok(Arc::new(
948 Int16Array::from_iter(
949 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
950 .map(|x| {
951 x.into_iter().map(|x| {
952 x.and_then(|x| i16::try_from(x).ok())
953 })
954 })
955 .flatten()
956 )
957 )),
958 DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
959 DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
960 DataType::Float16 => Ok(Arc::new(
961 Float16Array::from_iter(
962 [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
963 .map(|x| {
964 x.into_iter().map(|x| {
965 x.and_then(|x| from_bytes_to_f16(x.data()))
966 })
967 })
968 .flatten()
969 )
970 )),
971 DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
972 DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
973 DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
974 DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
975 DataType::Utf8 => {
976 let mut builder = StringBuilder::new();
977 let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
978 for x in iterator {
979 for x in x.into_iter() {
980 let Some(x) = x else {
981 builder.append_null(); continue;
983 };
984
985 let Ok(x) = std::str::from_utf8(x.data()) else {
986 builder.append_null();
987 continue;
988 };
989
990 builder.append_value(x);
991 }
992 }
993 Ok(Arc::new(builder.finish()))
994 },
995 DataType::LargeUtf8 => {
996 let mut builder = LargeStringBuilder::new();
997 let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
998 for x in iterator {
999 for x in x.into_iter() {
1000 let Some(x) = x else {
1001 builder.append_null(); continue;
1003 };
1004
1005 let Ok(x) = std::str::from_utf8(x.data()) else {
1006 builder.append_null();
1007 continue;
1008 };
1009
1010 builder.append_value(x);
1011 }
1012 }
1013 Ok(Arc::new(builder.finish()))
1014 },
1015 DataType::Dictionary(_, value_type) => {
1016 [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type)
1017 },
1018 DataType::Timestamp(unit, timezone) => {
1019 let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
1020 Ok(match unit {
1021 TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1022 TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1023 TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1024 TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1025 })
1026 },
1027 DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
1028 DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok(
1029 Arc::new(
1030 Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
1031 .map(|x| {
1032 x.into_iter()
1033 .map(|x| {
1034 x.and_then(|x| i64::try_from(x).ok())
1035 })
1036 .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
1037 }).flatten()
1038 )
1039 )
1040 ),
1041 DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
1042 DataType::Decimal32(precision, scale) => Ok(Arc::new(
1043 Decimal32Array::from_iter([<$stat_type_prefix Decimal32DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
1044 DataType::Decimal64(precision, scale) => Ok(Arc::new(
1045 Decimal64Array::from_iter([<$stat_type_prefix Decimal64DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
1046 DataType::Decimal128(precision, scale) => Ok(Arc::new(
1047 Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
1048 DataType::Decimal256(precision, scale) => Ok(Arc::new(
1049 Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
1050 DataType::Time32(unit) => {
1051 Ok(match unit {
1052 TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
1053 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
1054 )),
1055 TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
1056 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
1057 )),
1058 _ => {
1059 new_empty_array(&DataType::Time32(unit.clone()))
1061 }
1062 })
1063 }
1064 DataType::Time64(unit) => {
1065 Ok(match unit {
1066 TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
1067 [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
1068 )),
1069 TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
1070 [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
1071 )),
1072 _ => {
1073 new_empty_array(&DataType::Time64(unit.clone()))
1075 }
1076 })
1077 },
1078 DataType::FixedSizeBinary(size) => {
1079 let mut builder = FixedSizeBinaryBuilder::new(*size);
1080 let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
1081 for x in iterator {
1082 for x in x.into_iter() {
1083 let Some(x) = x else {
1084 builder.append_null(); continue;
1086 };
1087
1088 if x.len() == *size as usize {
1089 let _ = builder.append_value(x.data());
1090 } else {
1091 builder.append_null();
1092 }
1093 }
1094 }
1095 Ok(Arc::new(builder.finish()))
1096 },
1097 DataType::Utf8View => {
1098 let mut builder = StringViewBuilder::new();
1099 let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1100 for x in iterator {
1101 for x in x.into_iter() {
1102 let Some(x) = x else {
1103 builder.append_null(); continue;
1105 };
1106
1107 let Ok(x) = std::str::from_utf8(x.data()) else {
1108 builder.append_null();
1109 continue;
1110 };
1111
1112 builder.append_value(x);
1113 }
1114 }
1115 Ok(Arc::new(builder.finish()))
1116 },
1117 DataType::BinaryView => {
1118 let mut builder = BinaryViewBuilder::new();
1119 let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1120 for x in iterator {
1121 for x in x.into_iter() {
1122 let Some(x) = x else {
1123 builder.append_null(); continue;
1125 };
1126
1127 builder.append_value(x);
1128 }
1129 }
1130 Ok(Arc::new(builder.finish()))
1131 },
1132 DataType::Date64 | DataType::Null |
1134 DataType::Duration(_) |
1135 DataType::Interval(_) |
1136 DataType::List(_) |
1137 DataType::ListView(_) |
1138 DataType::FixedSizeList(_, _) |
1139 DataType::LargeList(_) |
1140 DataType::LargeListView(_) |
1141 DataType::Struct(_) |
1142 DataType::Union(_, _) |
1143 DataType::Map(_, _) |
1144 DataType::RunEndEncoded(_, _) => {
1145 let len = $iterator.count();
1146 Ok(new_null_array($data_type, len))
1148 },
1149 }
1150 }
1151 }
1152}
1153fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1158 data_type: &DataType,
1159 iterator: I,
1160 physical_type: Option<PhysicalType>,
1161) -> Result<ArrayRef> {
1162 get_statistics!(Min, data_type, iterator, physical_type)
1163}
1164
1165fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1169 data_type: &DataType,
1170 iterator: I,
1171 physical_type: Option<PhysicalType>,
1172) -> Result<ArrayRef> {
1173 get_statistics!(Max, data_type, iterator, physical_type)
1174}
1175
1176pub(crate) fn min_page_statistics<'a, I>(
1179 data_type: &DataType,
1180 iterator: I,
1181 physical_type: Option<PhysicalType>,
1182) -> Result<ArrayRef>
1183where
1184 I: Iterator<Item = (usize, &'a Index)>,
1185{
1186 get_data_page_statistics!(Min, data_type, iterator, physical_type)
1187}
1188
1189pub(crate) fn max_page_statistics<'a, I>(
1192 data_type: &DataType,
1193 iterator: I,
1194 physical_type: Option<PhysicalType>,
1195) -> Result<ArrayRef>
1196where
1197 I: Iterator<Item = (usize, &'a Index)>,
1198{
1199 get_data_page_statistics!(Max, data_type, iterator, physical_type)
1200}
1201
1202pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
1207where
1208 I: Iterator<Item = (usize, &'a Index)>,
1209{
1210 let iter = iterator.flat_map(|(len, index)| match index {
1211 Index::NONE => vec![None; len],
1212 Index::BOOLEAN(native_index) => native_index
1213 .indexes
1214 .iter()
1215 .map(|x| x.null_count.map(|x| x as u64))
1216 .collect::<Vec<_>>(),
1217 Index::INT32(native_index) => native_index
1218 .indexes
1219 .iter()
1220 .map(|x| x.null_count.map(|x| x as u64))
1221 .collect::<Vec<_>>(),
1222 Index::INT64(native_index) => native_index
1223 .indexes
1224 .iter()
1225 .map(|x| x.null_count.map(|x| x as u64))
1226 .collect::<Vec<_>>(),
1227 Index::FLOAT(native_index) => native_index
1228 .indexes
1229 .iter()
1230 .map(|x| x.null_count.map(|x| x as u64))
1231 .collect::<Vec<_>>(),
1232 Index::DOUBLE(native_index) => native_index
1233 .indexes
1234 .iter()
1235 .map(|x| x.null_count.map(|x| x as u64))
1236 .collect::<Vec<_>>(),
1237 Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
1238 .indexes
1239 .iter()
1240 .map(|x| x.null_count.map(|x| x as u64))
1241 .collect::<Vec<_>>(),
1242 Index::BYTE_ARRAY(native_index) => native_index
1243 .indexes
1244 .iter()
1245 .map(|x| x.null_count.map(|x| x as u64))
1246 .collect::<Vec<_>>(),
1247 _ => unimplemented!(),
1248 });
1249
1250 Ok(UInt64Array::from_iter(iter))
1251}
1252
1253#[derive(Debug)]
1273pub struct StatisticsConverter<'a> {
1274 parquet_column_index: Option<usize>,
1276 arrow_field: &'a Field,
1278 missing_null_counts_as_zero: bool,
1280 physical_type: Option<PhysicalType>,
1282}
1283
1284impl<'a> StatisticsConverter<'a> {
1285 pub fn parquet_column_index(&self) -> Option<usize> {
1290 self.parquet_column_index
1291 }
1292
1293 pub fn arrow_field(&self) -> &'a Field {
1295 self.arrow_field
1296 }
1297
1298 pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
1311 self.missing_null_counts_as_zero = missing_null_counts_as_zero;
1312 self
1313 }
1314
1315 pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
1346 where
1347 I: IntoIterator<Item = &'a RowGroupMetaData>,
1348 {
1349 let Some(_) = self.parquet_column_index else {
1350 return Ok(None);
1351 };
1352
1353 let mut builder = UInt64Array::builder(10);
1354 for metadata in metadatas.into_iter() {
1355 let row_count = metadata.num_rows();
1356 let row_count: u64 = row_count.try_into().map_err(|e| {
1357 arrow_err!(format!(
1358 "Parquet row count {row_count} too large to convert to u64: {e}"
1359 ))
1360 })?;
1361 builder.append_value(row_count);
1362 }
1363 Ok(Some(builder.finish()))
1364 }
1365
1366 pub fn try_new<'b>(
1378 column_name: &'b str,
1379 arrow_schema: &'a Schema,
1380 parquet_schema: &'a SchemaDescriptor,
1381 ) -> Result<Self> {
1382 let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
1384 return Err(arrow_err!(format!(
1385 "Column '{}' not found in schema for statistics conversion",
1386 column_name
1387 )));
1388 };
1389
1390 let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
1392 Some((parquet_idx, matched_field)) => {
1393 if matched_field.as_ref() != arrow_field {
1395 return Err(arrow_err!(format!(
1396 "Matched column '{:?}' does not match original matched column '{:?}'",
1397 matched_field, arrow_field
1398 )));
1399 }
1400 Some(parquet_idx)
1401 }
1402 None => None,
1403 };
1404
1405 Ok(Self {
1406 parquet_column_index: parquet_index,
1407 arrow_field,
1408 missing_null_counts_as_zero: true,
1409 physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
1410 })
1411 }
1412
1413 pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
1457 where
1458 I: IntoIterator<Item = &'a RowGroupMetaData>,
1459 {
1460 let data_type = self.arrow_field.data_type();
1461
1462 let Some(parquet_index) = self.parquet_column_index else {
1463 return Ok(self.make_null_array(data_type, metadatas));
1464 };
1465
1466 let iter = metadatas
1467 .into_iter()
1468 .map(|x| x.column(parquet_index).statistics());
1469 min_statistics(data_type, iter, self.physical_type)
1470 }
1471
1472 pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
1476 where
1477 I: IntoIterator<Item = &'a RowGroupMetaData>,
1478 {
1479 let data_type = self.arrow_field.data_type();
1480
1481 let Some(parquet_index) = self.parquet_column_index else {
1482 return Ok(self.make_null_array(data_type, metadatas));
1483 };
1484
1485 let iter = metadatas
1486 .into_iter()
1487 .map(|x| x.column(parquet_index).statistics());
1488 max_statistics(data_type, iter, self.physical_type)
1489 }
1490
1491 pub fn row_group_is_max_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1495 where
1496 I: IntoIterator<Item = &'a RowGroupMetaData>,
1497 {
1498 let Some(parquet_index) = self.parquet_column_index else {
1499 let num_row_groups = metadatas.into_iter().count();
1500 return Ok(BooleanArray::from_iter(
1501 std::iter::repeat(None).take(num_row_groups),
1502 ));
1503 };
1504
1505 let is_max_value_exact = metadatas
1506 .into_iter()
1507 .map(|x| x.column(parquet_index).statistics())
1508 .map(|s| s.map(|s| s.max_is_exact()));
1509 Ok(BooleanArray::from_iter(is_max_value_exact))
1510 }
1511
1512 pub fn row_group_is_min_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1516 where
1517 I: IntoIterator<Item = &'a RowGroupMetaData>,
1518 {
1519 let Some(parquet_index) = self.parquet_column_index else {
1520 let num_row_groups = metadatas.into_iter().count();
1521 return Ok(BooleanArray::from_iter(
1522 std::iter::repeat(None).take(num_row_groups),
1523 ));
1524 };
1525
1526 let is_min_value_exact = metadatas
1527 .into_iter()
1528 .map(|x| x.column(parquet_index).statistics())
1529 .map(|s| s.map(|s| s.min_is_exact()));
1530 Ok(BooleanArray::from_iter(is_min_value_exact))
1531 }
1532
1533 pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
1537 where
1538 I: IntoIterator<Item = &'a RowGroupMetaData>,
1539 {
1540 let Some(parquet_index) = self.parquet_column_index else {
1541 let num_row_groups = metadatas.into_iter().count();
1542 return Ok(UInt64Array::from_iter(
1543 std::iter::repeat(None).take(num_row_groups),
1544 ));
1545 };
1546
1547 let null_counts = metadatas
1548 .into_iter()
1549 .map(|x| x.column(parquet_index).statistics())
1550 .map(|s| {
1551 s.and_then(|s| {
1552 if self.missing_null_counts_as_zero {
1553 Some(s.null_count_opt().unwrap_or(0))
1554 } else {
1555 s.null_count_opt()
1556 }
1557 })
1558 });
1559 Ok(UInt64Array::from_iter(null_counts))
1560 }
1561
1562 pub fn data_page_mins<I>(
1614 &self,
1615 column_page_index: &ParquetColumnIndex,
1616 column_offset_index: &ParquetOffsetIndex,
1617 row_group_indices: I,
1618 ) -> Result<ArrayRef>
1619 where
1620 I: IntoIterator<Item = &'a usize>,
1621 {
1622 let data_type = self.arrow_field.data_type();
1623
1624 let Some(parquet_index) = self.parquet_column_index else {
1625 return Ok(self.make_null_array(data_type, row_group_indices));
1626 };
1627
1628 let iter = row_group_indices.into_iter().map(|rg_index| {
1629 let column_page_index_per_row_group_per_column =
1630 &column_page_index[*rg_index][parquet_index];
1631 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1632 .page_locations()
1633 .len();
1634
1635 (*num_data_pages, column_page_index_per_row_group_per_column)
1636 });
1637
1638 min_page_statistics(data_type, iter, self.physical_type)
1639 }
1640
1641 pub fn data_page_maxes<I>(
1645 &self,
1646 column_page_index: &ParquetColumnIndex,
1647 column_offset_index: &ParquetOffsetIndex,
1648 row_group_indices: I,
1649 ) -> Result<ArrayRef>
1650 where
1651 I: IntoIterator<Item = &'a usize>,
1652 {
1653 let data_type = self.arrow_field.data_type();
1654
1655 let Some(parquet_index) = self.parquet_column_index else {
1656 return Ok(self.make_null_array(data_type, row_group_indices));
1657 };
1658
1659 let iter = row_group_indices.into_iter().map(|rg_index| {
1660 let column_page_index_per_row_group_per_column =
1661 &column_page_index[*rg_index][parquet_index];
1662 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1663 .page_locations()
1664 .len();
1665
1666 (*num_data_pages, column_page_index_per_row_group_per_column)
1667 });
1668
1669 max_page_statistics(data_type, iter, self.physical_type)
1670 }
1671
1672 pub fn data_page_null_counts<I>(
1676 &self,
1677 column_page_index: &ParquetColumnIndex,
1678 column_offset_index: &ParquetOffsetIndex,
1679 row_group_indices: I,
1680 ) -> Result<UInt64Array>
1681 where
1682 I: IntoIterator<Item = &'a usize>,
1683 {
1684 let Some(parquet_index) = self.parquet_column_index else {
1685 let num_row_groups = row_group_indices.into_iter().count();
1686 return Ok(UInt64Array::from_iter(
1687 std::iter::repeat(None).take(num_row_groups),
1688 ));
1689 };
1690
1691 let iter = row_group_indices.into_iter().map(|rg_index| {
1692 let column_page_index_per_row_group_per_column =
1693 &column_page_index[*rg_index][parquet_index];
1694 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1695 .page_locations()
1696 .len();
1697
1698 (*num_data_pages, column_page_index_per_row_group_per_column)
1699 });
1700 null_counts_page_statistics(iter)
1701 }
1702
1703 pub fn data_page_row_counts<I>(
1721 &self,
1722 column_offset_index: &ParquetOffsetIndex,
1723 row_group_metadatas: &'a [RowGroupMetaData],
1724 row_group_indices: I,
1725 ) -> Result<Option<UInt64Array>>
1726 where
1727 I: IntoIterator<Item = &'a usize>,
1728 {
1729 let Some(parquet_index) = self.parquet_column_index else {
1730 return Ok(None);
1734 };
1735
1736 let mut row_count_total = Vec::new();
1737 for rg_idx in row_group_indices {
1738 let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
1739
1740 let row_count_per_page = page_locations
1741 .windows(2)
1742 .map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
1743
1744 let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
1746 let row_count_per_page = row_count_per_page
1747 .chain(std::iter::once(Some(
1748 *num_rows_in_row_group as u64
1749 - page_locations.last().unwrap().first_row_index as u64,
1750 )))
1751 .collect::<Vec<_>>();
1752
1753 row_count_total.extend(row_count_per_page);
1754 }
1755
1756 Ok(Some(UInt64Array::from_iter(row_count_total)))
1757 }
1758
1759 fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
1761 where
1762 I: IntoIterator<Item = A>,
1763 {
1764 let num_row_groups = metadatas.into_iter().count();
1766 new_null_array(data_type, num_row_groups)
1767 }
1768}
1769
1770