1use crate::arrow::buffer::bit_util::sign_extend_be;
23use crate::arrow::parquet_column;
24use crate::basic::Type as PhysicalType;
25use crate::data_type::{ByteArray, FixedLenByteArray};
26use crate::errors::{ParquetError, Result};
27use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
28use crate::file::page_index::index::{Index, PageIndex};
29use crate::file::statistics::Statistics as ParquetStatistics;
30use crate::schema::types::SchemaDescriptor;
31use arrow_array::builder::{
32 BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
33 StringViewBuilder,
34};
35use arrow_array::{
36 new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
37 Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int16Array,
38 Int32Array, Int64Array, Int8Array, LargeBinaryArray, Time32MillisecondArray, Time32SecondArray,
39 Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
40 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
41 UInt32Array, UInt64Array, UInt8Array,
42};
43use arrow_buffer::i256;
44use arrow_schema::{DataType, Field, Schema, TimeUnit};
45use half::f16;
46use paste::paste;
47use std::sync::Arc;
48
49pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
52 i128::from_be_bytes(sign_extend_be::<16>(b))
56}
57
58pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
61 i256::from_be_bytes(sign_extend_be::<32>(b))
62}
63
64pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
66 match b {
67 [low, high] => Some(f16::from_be_bytes([*high, *low])),
68 _ => None,
69 }
70}
71
72macro_rules! make_stats_iterator {
84 ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
85 struct $iterator_type<'a, I>
92 where
93 I: Iterator<Item = Option<&'a ParquetStatistics>>,
94 {
95 iter: I,
96 }
97
98 impl<'a, I> $iterator_type<'a, I>
99 where
100 I: Iterator<Item = Option<&'a ParquetStatistics>>,
101 {
102 fn new(iter: I) -> Self {
104 Self { iter }
105 }
106 }
107
108 impl<'a, I> Iterator for $iterator_type<'a, I>
110 where
111 I: Iterator<Item = Option<&'a ParquetStatistics>>,
112 {
113 type Item = Option<&'a $stat_value_type>;
114
115 fn next(&mut self) -> Option<Self::Item> {
117 let next = self.iter.next();
118 next.map(|x| {
119 x.and_then(|stats| match stats {
120 $parquet_statistics_type(s) => s.$func(),
121 _ => None,
122 })
123 })
124 }
125
126 fn size_hint(&self) -> (usize, Option<usize>) {
127 self.iter.size_hint()
128 }
129 }
130 };
131}
132
133make_stats_iterator!(
134 MinBooleanStatsIterator,
135 min_opt,
136 ParquetStatistics::Boolean,
137 bool
138);
139make_stats_iterator!(
140 MaxBooleanStatsIterator,
141 max_opt,
142 ParquetStatistics::Boolean,
143 bool
144);
145make_stats_iterator!(
146 MinInt32StatsIterator,
147 min_opt,
148 ParquetStatistics::Int32,
149 i32
150);
151make_stats_iterator!(
152 MaxInt32StatsIterator,
153 max_opt,
154 ParquetStatistics::Int32,
155 i32
156);
157make_stats_iterator!(
158 MinInt64StatsIterator,
159 min_opt,
160 ParquetStatistics::Int64,
161 i64
162);
163make_stats_iterator!(
164 MaxInt64StatsIterator,
165 max_opt,
166 ParquetStatistics::Int64,
167 i64
168);
169make_stats_iterator!(
170 MinFloatStatsIterator,
171 min_opt,
172 ParquetStatistics::Float,
173 f32
174);
175make_stats_iterator!(
176 MaxFloatStatsIterator,
177 max_opt,
178 ParquetStatistics::Float,
179 f32
180);
181make_stats_iterator!(
182 MinDoubleStatsIterator,
183 min_opt,
184 ParquetStatistics::Double,
185 f64
186);
187make_stats_iterator!(
188 MaxDoubleStatsIterator,
189 max_opt,
190 ParquetStatistics::Double,
191 f64
192);
193make_stats_iterator!(
194 MinByteArrayStatsIterator,
195 min_bytes_opt,
196 ParquetStatistics::ByteArray,
197 [u8]
198);
199make_stats_iterator!(
200 MaxByteArrayStatsIterator,
201 max_bytes_opt,
202 ParquetStatistics::ByteArray,
203 [u8]
204);
205make_stats_iterator!(
206 MinFixedLenByteArrayStatsIterator,
207 min_bytes_opt,
208 ParquetStatistics::FixedLenByteArray,
209 [u8]
210);
211make_stats_iterator!(
212 MaxFixedLenByteArrayStatsIterator,
213 max_bytes_opt,
214 ParquetStatistics::FixedLenByteArray,
215 [u8]
216);
217
218macro_rules! make_decimal_stats_iterator {
236 ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
237 struct $iterator_type<'a, I>
238 where
239 I: Iterator<Item = Option<&'a ParquetStatistics>>,
240 {
241 iter: I,
242 }
243
244 impl<'a, I> $iterator_type<'a, I>
245 where
246 I: Iterator<Item = Option<&'a ParquetStatistics>>,
247 {
248 fn new(iter: I) -> Self {
249 Self { iter }
250 }
251 }
252
253 impl<'a, I> Iterator for $iterator_type<'a, I>
254 where
255 I: Iterator<Item = Option<&'a ParquetStatistics>>,
256 {
257 type Item = Option<$stat_value_type>;
258
259 fn next(&mut self) -> Option<Self::Item> {
260 let next = self.iter.next();
261 next.map(|x| {
262 x.and_then(|stats| match stats {
263 ParquetStatistics::Int32(s) => {
264 s.$func().map(|x| $stat_value_type::from(*x))
265 }
266 ParquetStatistics::Int64(s) => {
267 s.$func().map(|x| $stat_value_type::from(*x))
268 }
269 ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
270 ParquetStatistics::FixedLenByteArray(s) => {
271 s.$bytes_func().map($convert_func)
272 }
273 _ => None,
274 })
275 })
276 }
277
278 fn size_hint(&self) -> (usize, Option<usize>) {
279 self.iter.size_hint()
280 }
281 }
282 };
283}
284
285make_decimal_stats_iterator!(
286 MinDecimal128StatsIterator,
287 min_opt,
288 min_bytes_opt,
289 i128,
290 from_bytes_to_i128
291);
292make_decimal_stats_iterator!(
293 MaxDecimal128StatsIterator,
294 max_opt,
295 max_bytes_opt,
296 i128,
297 from_bytes_to_i128
298);
299make_decimal_stats_iterator!(
300 MinDecimal256StatsIterator,
301 min_opt,
302 min_bytes_opt,
303 i256,
304 from_bytes_to_i256
305);
306make_decimal_stats_iterator!(
307 MaxDecimal256StatsIterator,
308 max_opt,
309 max_bytes_opt,
310 i256,
311 from_bytes_to_i256
312);
313
314macro_rules! get_statistics {
322 ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
323 paste! {
324 match $data_type {
325 DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
326 [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
327 ))),
328 DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
329 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
330 x.and_then(|x| i8::try_from(*x).ok())
331 }),
332 ))),
333 DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
334 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
335 x.and_then(|x| i16::try_from(*x).ok())
336 }),
337 ))),
338 DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
339 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
340 ))),
341 DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
342 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
343 ))),
344 DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
345 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
346 x.and_then(|x| u8::try_from(*x).ok())
347 }),
348 ))),
349 DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
350 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
351 x.and_then(|x| u16::try_from(*x).ok())
352 }),
353 ))),
354 DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
355 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
356 ))),
357 DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
358 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
359 ))),
360 DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
361 [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
362 from_bytes_to_f16(x)
363 })),
364 ))),
365 DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
366 [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
367 ))),
368 DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
369 [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
370 ))),
371 DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
372 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
373 ))),
374 DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
375 [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
376 .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
377 DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
378 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
379 DataType::Timestamp(unit, timezone) =>{
380 let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
381 Ok(match unit {
382 TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
383 TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
384 TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
385 TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
386 })
387 },
388 DataType::Time32(unit) => {
389 Ok(match unit {
390 TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
391 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
392 )),
393 TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
394 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
395 )),
396 _ => {
397 let len = $iterator.count();
398 new_null_array($data_type, len)
400 }
401 })
402 },
403 DataType::Time64(unit) => {
404 Ok(match unit {
405 TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
406 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
407 )),
408 TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
409 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
410 )),
411 _ => {
412 let len = $iterator.count();
413 new_null_array($data_type, len)
415 }
416 })
417 },
418 DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
419 [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
420 ))),
421 DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
422 [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
423 ))),
424 DataType::Utf8 => {
425 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
426 let mut builder = StringBuilder::new();
427 for x in iterator {
428 let Some(x) = x else {
429 builder.append_null(); continue;
431 };
432
433 let Ok(x) = std::str::from_utf8(x) else {
434 builder.append_null();
435 continue;
436 };
437
438 builder.append_value(x);
439 }
440 Ok(Arc::new(builder.finish()))
441 },
442 DataType::LargeUtf8 => {
443 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
444 let mut builder = LargeStringBuilder::new();
445 for x in iterator {
446 let Some(x) = x else {
447 builder.append_null(); continue;
449 };
450
451 let Ok(x) = std::str::from_utf8(x) else {
452 builder.append_null();
453 continue;
454 };
455
456 builder.append_value(x);
457 }
458 Ok(Arc::new(builder.finish()))
459 },
460 DataType::FixedSizeBinary(size) => {
461 let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
462 let mut builder = FixedSizeBinaryBuilder::new(*size);
463 for x in iterator {
464 let Some(x) = x else {
465 builder.append_null(); continue;
467 };
468
469 if x.len().try_into() != Ok(*size){
471 builder.append_null();
472 continue;
473 }
474
475 builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
476 }
477 Ok(Arc::new(builder.finish()))
478 },
479 DataType::Decimal128(precision, scale) => {
480 let arr = Decimal128Array::from_iter(
481 [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
482 ).with_precision_and_scale(*precision, *scale)?;
483 Ok(Arc::new(arr))
484 },
485 DataType::Decimal256(precision, scale) => {
486 let arr = Decimal256Array::from_iter(
487 [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
488 ).with_precision_and_scale(*precision, *scale)?;
489 Ok(Arc::new(arr))
490 },
491 DataType::Dictionary(_, value_type) => {
492 [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
493 },
494 DataType::Utf8View => {
495 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
496 let mut builder = StringViewBuilder::new();
497 for x in iterator {
498 let Some(x) = x else {
499 builder.append_null(); continue;
501 };
502
503 let Ok(x) = std::str::from_utf8(x) else {
504 builder.append_null();
505 continue;
506 };
507
508 builder.append_value(x);
509 }
510 Ok(Arc::new(builder.finish()))
511 },
512 DataType::BinaryView => {
513 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
514 let mut builder = BinaryViewBuilder::new();
515 for x in iterator {
516 let Some(x) = x else {
517 builder.append_null(); continue;
519 };
520
521 builder.append_value(x);
522 }
523 Ok(Arc::new(builder.finish()))
524 }
525
526 DataType::Map(_,_) |
527 DataType::Duration(_) |
528 DataType::Interval(_) |
529 DataType::Date64 | DataType::Null |
531 DataType::List(_) |
532 DataType::ListView(_) |
533 DataType::FixedSizeList(_, _) |
534 DataType::LargeList(_) |
535 DataType::LargeListView(_) |
536 DataType::Struct(_) |
537 DataType::Union(_, _) |
538 DataType::RunEndEncoded(_, _) => {
539 let len = $iterator.count();
540 Ok(new_null_array($data_type, len))
542 }
543 }}}
544}
545
546macro_rules! make_data_page_stats_iterator {
547 ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => {
548 struct $iterator_type<'a, I>
549 where
550 I: Iterator<Item = (usize, &'a Index)>,
551 {
552 iter: I,
553 }
554
555 impl<'a, I> $iterator_type<'a, I>
556 where
557 I: Iterator<Item = (usize, &'a Index)>,
558 {
559 fn new(iter: I) -> Self {
560 Self { iter }
561 }
562 }
563
564 impl<'a, I> Iterator for $iterator_type<'a, I>
565 where
566 I: Iterator<Item = (usize, &'a Index)>,
567 {
568 type Item = Vec<Option<$stat_value_type>>;
569
570 fn next(&mut self) -> Option<Self::Item> {
571 let next = self.iter.next();
572 match next {
573 Some((len, index)) => match index {
574 $index_type(native_index) => {
575 Some(native_index.indexes.iter().map($func).collect::<Vec<_>>())
576 }
577 _ => Some(vec![None; len]),
584 },
585 _ => None,
586 }
587 }
588
589 fn size_hint(&self) -> (usize, Option<usize>) {
590 self.iter.size_hint()
591 }
592 }
593 };
594}
595
596make_data_page_stats_iterator!(
597 MinBooleanDataPageStatsIterator,
598 |x: &PageIndex<bool>| { x.min },
599 Index::BOOLEAN,
600 bool
601);
602make_data_page_stats_iterator!(
603 MaxBooleanDataPageStatsIterator,
604 |x: &PageIndex<bool>| { x.max },
605 Index::BOOLEAN,
606 bool
607);
608make_data_page_stats_iterator!(
609 MinInt32DataPageStatsIterator,
610 |x: &PageIndex<i32>| { x.min },
611 Index::INT32,
612 i32
613);
614make_data_page_stats_iterator!(
615 MaxInt32DataPageStatsIterator,
616 |x: &PageIndex<i32>| { x.max },
617 Index::INT32,
618 i32
619);
620make_data_page_stats_iterator!(
621 MinInt64DataPageStatsIterator,
622 |x: &PageIndex<i64>| { x.min },
623 Index::INT64,
624 i64
625);
626make_data_page_stats_iterator!(
627 MaxInt64DataPageStatsIterator,
628 |x: &PageIndex<i64>| { x.max },
629 Index::INT64,
630 i64
631);
632make_data_page_stats_iterator!(
633 MinFloat16DataPageStatsIterator,
634 |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
635 Index::FIXED_LEN_BYTE_ARRAY,
636 FixedLenByteArray
637);
638make_data_page_stats_iterator!(
639 MaxFloat16DataPageStatsIterator,
640 |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
641 Index::FIXED_LEN_BYTE_ARRAY,
642 FixedLenByteArray
643);
644make_data_page_stats_iterator!(
645 MinFloat32DataPageStatsIterator,
646 |x: &PageIndex<f32>| { x.min },
647 Index::FLOAT,
648 f32
649);
650make_data_page_stats_iterator!(
651 MaxFloat32DataPageStatsIterator,
652 |x: &PageIndex<f32>| { x.max },
653 Index::FLOAT,
654 f32
655);
656make_data_page_stats_iterator!(
657 MinFloat64DataPageStatsIterator,
658 |x: &PageIndex<f64>| { x.min },
659 Index::DOUBLE,
660 f64
661);
662make_data_page_stats_iterator!(
663 MaxFloat64DataPageStatsIterator,
664 |x: &PageIndex<f64>| { x.max },
665 Index::DOUBLE,
666 f64
667);
668make_data_page_stats_iterator!(
669 MinByteArrayDataPageStatsIterator,
670 |x: &PageIndex<ByteArray>| { x.min.clone() },
671 Index::BYTE_ARRAY,
672 ByteArray
673);
674make_data_page_stats_iterator!(
675 MaxByteArrayDataPageStatsIterator,
676 |x: &PageIndex<ByteArray>| { x.max.clone() },
677 Index::BYTE_ARRAY,
678 ByteArray
679);
680make_data_page_stats_iterator!(
681 MaxFixedLenByteArrayDataPageStatsIterator,
682 |x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
683 Index::FIXED_LEN_BYTE_ARRAY,
684 FixedLenByteArray
685);
686
687make_data_page_stats_iterator!(
688 MinFixedLenByteArrayDataPageStatsIterator,
689 |x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
690 Index::FIXED_LEN_BYTE_ARRAY,
691 FixedLenByteArray
692);
693
694macro_rules! get_decimal_page_stats_iterator {
695 ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
696 struct $iterator_type<'a, I>
697 where
698 I: Iterator<Item = (usize, &'a Index)>,
699 {
700 iter: I,
701 }
702
703 impl<'a, I> $iterator_type<'a, I>
704 where
705 I: Iterator<Item = (usize, &'a Index)>,
706 {
707 fn new(iter: I) -> Self {
708 Self { iter }
709 }
710 }
711
712 impl<'a, I> Iterator for $iterator_type<'a, I>
713 where
714 I: Iterator<Item = (usize, &'a Index)>,
715 {
716 type Item = Vec<Option<$stat_value_type>>;
717
718 fn next(&mut self) -> Option<Self::Item> {
719 let next = self.iter.next();
720 match next {
721 Some((len, index)) => match index {
722 Index::INT32(native_index) => Some(
723 native_index
724 .indexes
725 .iter()
726 .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x))))
727 .collect::<Vec<_>>(),
728 ),
729 Index::INT64(native_index) => Some(
730 native_index
731 .indexes
732 .iter()
733 .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x))))
734 .collect::<Vec<_>>(),
735 ),
736 Index::BYTE_ARRAY(native_index) => Some(
737 native_index
738 .indexes
739 .iter()
740 .map(|x| {
741 x.clone().$func.and_then(|x| Some($convert_func(x.data())))
742 })
743 .collect::<Vec<_>>(),
744 ),
745 Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
746 native_index
747 .indexes
748 .iter()
749 .map(|x| {
750 x.clone().$func.and_then(|x| Some($convert_func(x.data())))
751 })
752 .collect::<Vec<_>>(),
753 ),
754 _ => Some(vec![None; len]),
755 },
756 _ => None,
757 }
758 }
759
760 fn size_hint(&self) -> (usize, Option<usize>) {
761 self.iter.size_hint()
762 }
763 }
764 };
765}
766
767get_decimal_page_stats_iterator!(
768 MinDecimal128DataPageStatsIterator,
769 min,
770 i128,
771 from_bytes_to_i128
772);
773
774get_decimal_page_stats_iterator!(
775 MaxDecimal128DataPageStatsIterator,
776 max,
777 i128,
778 from_bytes_to_i128
779);
780
781get_decimal_page_stats_iterator!(
782 MinDecimal256DataPageStatsIterator,
783 min,
784 i256,
785 from_bytes_to_i256
786);
787
788get_decimal_page_stats_iterator!(
789 MaxDecimal256DataPageStatsIterator,
790 max,
791 i256,
792 from_bytes_to_i256
793);
794
795macro_rules! get_data_page_statistics {
796 ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
797 paste! {
798 match $data_type {
799 DataType::Boolean => {
800 let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
801 let mut builder = BooleanBuilder::new();
802 for x in iterator {
803 for x in x.into_iter() {
804 let Some(x) = x else {
805 builder.append_null(); continue;
807 };
808 builder.append_value(x);
809 }
810 }
811 Ok(Arc::new(builder.finish()))
812 },
813 DataType::UInt8 => Ok(Arc::new(
814 UInt8Array::from_iter(
815 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
816 .map(|x| {
817 x.into_iter().map(|x| {
818 x.and_then(|x| u8::try_from(x).ok())
819 })
820 })
821 .flatten()
822 )
823 )),
824 DataType::UInt16 => Ok(Arc::new(
825 UInt16Array::from_iter(
826 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
827 .map(|x| {
828 x.into_iter().map(|x| {
829 x.and_then(|x| u16::try_from(x).ok())
830 })
831 })
832 .flatten()
833 )
834 )),
835 DataType::UInt32 => Ok(Arc::new(
836 UInt32Array::from_iter(
837 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
838 .map(|x| {
839 x.into_iter().map(|x| {
840 x.and_then(|x| Some(x as u32))
841 })
842 })
843 .flatten()
844 ))),
845 DataType::UInt64 => Ok(Arc::new(
846 UInt64Array::from_iter(
847 [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator)
848 .map(|x| {
849 x.into_iter().map(|x| {
850 x.and_then(|x| Some(x as u64))
851 })
852 })
853 .flatten()
854 ))),
855 DataType::Int8 => Ok(Arc::new(
856 Int8Array::from_iter(
857 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
858 .map(|x| {
859 x.into_iter().map(|x| {
860 x.and_then(|x| i8::try_from(x).ok())
861 })
862 })
863 .flatten()
864 )
865 )),
866 DataType::Int16 => Ok(Arc::new(
867 Int16Array::from_iter(
868 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
869 .map(|x| {
870 x.into_iter().map(|x| {
871 x.and_then(|x| i16::try_from(x).ok())
872 })
873 })
874 .flatten()
875 )
876 )),
877 DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
878 DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
879 DataType::Float16 => Ok(Arc::new(
880 Float16Array::from_iter(
881 [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
882 .map(|x| {
883 x.into_iter().map(|x| {
884 x.and_then(|x| from_bytes_to_f16(x.data()))
885 })
886 })
887 .flatten()
888 )
889 )),
890 DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
891 DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
892 DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
893 DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
894 DataType::Utf8 => {
895 let mut builder = StringBuilder::new();
896 let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
897 for x in iterator {
898 for x in x.into_iter() {
899 let Some(x) = x else {
900 builder.append_null(); continue;
902 };
903
904 let Ok(x) = std::str::from_utf8(x.data()) else {
905 builder.append_null();
906 continue;
907 };
908
909 builder.append_value(x);
910 }
911 }
912 Ok(Arc::new(builder.finish()))
913 },
914 DataType::LargeUtf8 => {
915 let mut builder = LargeStringBuilder::new();
916 let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
917 for x in iterator {
918 for x in x.into_iter() {
919 let Some(x) = x else {
920 builder.append_null(); continue;
922 };
923
924 let Ok(x) = std::str::from_utf8(x.data()) else {
925 builder.append_null();
926 continue;
927 };
928
929 builder.append_value(x);
930 }
931 }
932 Ok(Arc::new(builder.finish()))
933 },
934 DataType::Dictionary(_, value_type) => {
935 [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type)
936 },
937 DataType::Timestamp(unit, timezone) => {
938 let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
939 Ok(match unit {
940 TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
941 TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
942 TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
943 TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
944 })
945 },
946 DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
947 DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok(
948 Arc::new(
949 Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
950 .map(|x| {
951 x.into_iter()
952 .map(|x| {
953 x.and_then(|x| i64::try_from(x).ok())
954 })
955 .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
956 }).flatten()
957 )
958 )
959 ),
960 DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
961 DataType::Decimal128(precision, scale) => Ok(Arc::new(
962 Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
963 DataType::Decimal256(precision, scale) => Ok(Arc::new(
964 Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
965 DataType::Time32(unit) => {
966 Ok(match unit {
967 TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
968 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
969 )),
970 TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
971 [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
972 )),
973 _ => {
974 new_empty_array(&DataType::Time32(unit.clone()))
976 }
977 })
978 }
979 DataType::Time64(unit) => {
980 Ok(match unit {
981 TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
982 [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
983 )),
984 TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
985 [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
986 )),
987 _ => {
988 new_empty_array(&DataType::Time64(unit.clone()))
990 }
991 })
992 },
993 DataType::FixedSizeBinary(size) => {
994 let mut builder = FixedSizeBinaryBuilder::new(*size);
995 let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
996 for x in iterator {
997 for x in x.into_iter() {
998 let Some(x) = x else {
999 builder.append_null(); continue;
1001 };
1002
1003 if x.len() == *size as usize {
1004 let _ = builder.append_value(x.data());
1005 } else {
1006 builder.append_null();
1007 }
1008 }
1009 }
1010 Ok(Arc::new(builder.finish()))
1011 },
1012 DataType::Utf8View => {
1013 let mut builder = StringViewBuilder::new();
1014 let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1015 for x in iterator {
1016 for x in x.into_iter() {
1017 let Some(x) = x else {
1018 builder.append_null(); continue;
1020 };
1021
1022 let Ok(x) = std::str::from_utf8(x.data()) else {
1023 builder.append_null();
1024 continue;
1025 };
1026
1027 builder.append_value(x);
1028 }
1029 }
1030 Ok(Arc::new(builder.finish()))
1031 },
1032 DataType::BinaryView => {
1033 let mut builder = BinaryViewBuilder::new();
1034 let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
1035 for x in iterator {
1036 for x in x.into_iter() {
1037 let Some(x) = x else {
1038 builder.append_null(); continue;
1040 };
1041
1042 builder.append_value(x);
1043 }
1044 }
1045 Ok(Arc::new(builder.finish()))
1046 },
1047 DataType::Date64 | DataType::Null |
1049 DataType::Duration(_) |
1050 DataType::Interval(_) |
1051 DataType::List(_) |
1052 DataType::ListView(_) |
1053 DataType::FixedSizeList(_, _) |
1054 DataType::LargeList(_) |
1055 DataType::LargeListView(_) |
1056 DataType::Struct(_) |
1057 DataType::Union(_, _) |
1058 DataType::Map(_, _) |
1059 DataType::RunEndEncoded(_, _) => {
1060 let len = $iterator.count();
1061 Ok(new_null_array($data_type, len))
1063 },
1064 }
1065 }
1066 }
1067}
1068fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1073 data_type: &DataType,
1074 iterator: I,
1075 physical_type: Option<PhysicalType>,
1076) -> Result<ArrayRef> {
1077 get_statistics!(Min, data_type, iterator, physical_type)
1078}
1079
1080fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1084 data_type: &DataType,
1085 iterator: I,
1086 physical_type: Option<PhysicalType>,
1087) -> Result<ArrayRef> {
1088 get_statistics!(Max, data_type, iterator, physical_type)
1089}
1090
1091pub(crate) fn min_page_statistics<'a, I>(
1094 data_type: &DataType,
1095 iterator: I,
1096 physical_type: Option<PhysicalType>,
1097) -> Result<ArrayRef>
1098where
1099 I: Iterator<Item = (usize, &'a Index)>,
1100{
1101 get_data_page_statistics!(Min, data_type, iterator, physical_type)
1102}
1103
1104pub(crate) fn max_page_statistics<'a, I>(
1107 data_type: &DataType,
1108 iterator: I,
1109 physical_type: Option<PhysicalType>,
1110) -> Result<ArrayRef>
1111where
1112 I: Iterator<Item = (usize, &'a Index)>,
1113{
1114 get_data_page_statistics!(Max, data_type, iterator, physical_type)
1115}
1116
1117pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
1122where
1123 I: Iterator<Item = (usize, &'a Index)>,
1124{
1125 let iter = iterator.flat_map(|(len, index)| match index {
1126 Index::NONE => vec![None; len],
1127 Index::BOOLEAN(native_index) => native_index
1128 .indexes
1129 .iter()
1130 .map(|x| x.null_count.map(|x| x as u64))
1131 .collect::<Vec<_>>(),
1132 Index::INT32(native_index) => native_index
1133 .indexes
1134 .iter()
1135 .map(|x| x.null_count.map(|x| x as u64))
1136 .collect::<Vec<_>>(),
1137 Index::INT64(native_index) => native_index
1138 .indexes
1139 .iter()
1140 .map(|x| x.null_count.map(|x| x as u64))
1141 .collect::<Vec<_>>(),
1142 Index::FLOAT(native_index) => native_index
1143 .indexes
1144 .iter()
1145 .map(|x| x.null_count.map(|x| x as u64))
1146 .collect::<Vec<_>>(),
1147 Index::DOUBLE(native_index) => native_index
1148 .indexes
1149 .iter()
1150 .map(|x| x.null_count.map(|x| x as u64))
1151 .collect::<Vec<_>>(),
1152 Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
1153 .indexes
1154 .iter()
1155 .map(|x| x.null_count.map(|x| x as u64))
1156 .collect::<Vec<_>>(),
1157 Index::BYTE_ARRAY(native_index) => native_index
1158 .indexes
1159 .iter()
1160 .map(|x| x.null_count.map(|x| x as u64))
1161 .collect::<Vec<_>>(),
1162 _ => unimplemented!(),
1163 });
1164
1165 Ok(UInt64Array::from_iter(iter))
1166}
1167
1168#[derive(Debug)]
1188pub struct StatisticsConverter<'a> {
1189 parquet_column_index: Option<usize>,
1191 arrow_field: &'a Field,
1193 missing_null_counts_as_zero: bool,
1195 physical_type: Option<PhysicalType>,
1197}
1198
1199impl<'a> StatisticsConverter<'a> {
1200 pub fn parquet_column_index(&self) -> Option<usize> {
1205 self.parquet_column_index
1206 }
1207
1208 pub fn arrow_field(&self) -> &'a Field {
1210 self.arrow_field
1211 }
1212
1213 pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
1226 self.missing_null_counts_as_zero = missing_null_counts_as_zero;
1227 self
1228 }
1229
1230 pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
1261 where
1262 I: IntoIterator<Item = &'a RowGroupMetaData>,
1263 {
1264 let Some(_) = self.parquet_column_index else {
1265 return Ok(None);
1266 };
1267
1268 let mut builder = UInt64Array::builder(10);
1269 for metadata in metadatas.into_iter() {
1270 let row_count = metadata.num_rows();
1271 let row_count: u64 = row_count.try_into().map_err(|e| {
1272 arrow_err!(format!(
1273 "Parquet row count {row_count} too large to convert to u64: {e}"
1274 ))
1275 })?;
1276 builder.append_value(row_count);
1277 }
1278 Ok(Some(builder.finish()))
1279 }
1280
1281 pub fn try_new<'b>(
1293 column_name: &'b str,
1294 arrow_schema: &'a Schema,
1295 parquet_schema: &'a SchemaDescriptor,
1296 ) -> Result<Self> {
1297 let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
1299 return Err(arrow_err!(format!(
1300 "Column '{}' not found in schema for statistics conversion",
1301 column_name
1302 )));
1303 };
1304
1305 let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
1307 Some((parquet_idx, matched_field)) => {
1308 if matched_field.as_ref() != arrow_field {
1310 return Err(arrow_err!(format!(
1311 "Matched column '{:?}' does not match original matched column '{:?}'",
1312 matched_field, arrow_field
1313 )));
1314 }
1315 Some(parquet_idx)
1316 }
1317 None => None,
1318 };
1319
1320 Ok(Self {
1321 parquet_column_index: parquet_index,
1322 arrow_field,
1323 missing_null_counts_as_zero: true,
1324 physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
1325 })
1326 }
1327
1328 pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
1372 where
1373 I: IntoIterator<Item = &'a RowGroupMetaData>,
1374 {
1375 let data_type = self.arrow_field.data_type();
1376
1377 let Some(parquet_index) = self.parquet_column_index else {
1378 return Ok(self.make_null_array(data_type, metadatas));
1379 };
1380
1381 let iter = metadatas
1382 .into_iter()
1383 .map(|x| x.column(parquet_index).statistics());
1384 min_statistics(data_type, iter, self.physical_type)
1385 }
1386
1387 pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
1391 where
1392 I: IntoIterator<Item = &'a RowGroupMetaData>,
1393 {
1394 let data_type = self.arrow_field.data_type();
1395
1396 let Some(parquet_index) = self.parquet_column_index else {
1397 return Ok(self.make_null_array(data_type, metadatas));
1398 };
1399
1400 let iter = metadatas
1401 .into_iter()
1402 .map(|x| x.column(parquet_index).statistics());
1403 max_statistics(data_type, iter, self.physical_type)
1404 }
1405
1406 pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
1410 where
1411 I: IntoIterator<Item = &'a RowGroupMetaData>,
1412 {
1413 let Some(parquet_index) = self.parquet_column_index else {
1414 let num_row_groups = metadatas.into_iter().count();
1415 return Ok(UInt64Array::from_iter(
1416 std::iter::repeat(None).take(num_row_groups),
1417 ));
1418 };
1419
1420 let null_counts = metadatas
1421 .into_iter()
1422 .map(|x| x.column(parquet_index).statistics())
1423 .map(|s| {
1424 s.and_then(|s| {
1425 if self.missing_null_counts_as_zero {
1426 Some(s.null_count_opt().unwrap_or(0))
1427 } else {
1428 s.null_count_opt()
1429 }
1430 })
1431 });
1432 Ok(UInt64Array::from_iter(null_counts))
1433 }
1434
1435 pub fn data_page_mins<I>(
1487 &self,
1488 column_page_index: &ParquetColumnIndex,
1489 column_offset_index: &ParquetOffsetIndex,
1490 row_group_indices: I,
1491 ) -> Result<ArrayRef>
1492 where
1493 I: IntoIterator<Item = &'a usize>,
1494 {
1495 let data_type = self.arrow_field.data_type();
1496
1497 let Some(parquet_index) = self.parquet_column_index else {
1498 return Ok(self.make_null_array(data_type, row_group_indices));
1499 };
1500
1501 let iter = row_group_indices.into_iter().map(|rg_index| {
1502 let column_page_index_per_row_group_per_column =
1503 &column_page_index[*rg_index][parquet_index];
1504 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1505 .page_locations()
1506 .len();
1507
1508 (*num_data_pages, column_page_index_per_row_group_per_column)
1509 });
1510
1511 min_page_statistics(data_type, iter, self.physical_type)
1512 }
1513
1514 pub fn data_page_maxes<I>(
1518 &self,
1519 column_page_index: &ParquetColumnIndex,
1520 column_offset_index: &ParquetOffsetIndex,
1521 row_group_indices: I,
1522 ) -> Result<ArrayRef>
1523 where
1524 I: IntoIterator<Item = &'a usize>,
1525 {
1526 let data_type = self.arrow_field.data_type();
1527
1528 let Some(parquet_index) = self.parquet_column_index else {
1529 return Ok(self.make_null_array(data_type, row_group_indices));
1530 };
1531
1532 let iter = row_group_indices.into_iter().map(|rg_index| {
1533 let column_page_index_per_row_group_per_column =
1534 &column_page_index[*rg_index][parquet_index];
1535 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1536 .page_locations()
1537 .len();
1538
1539 (*num_data_pages, column_page_index_per_row_group_per_column)
1540 });
1541
1542 max_page_statistics(data_type, iter, self.physical_type)
1543 }
1544
1545 pub fn data_page_null_counts<I>(
1549 &self,
1550 column_page_index: &ParquetColumnIndex,
1551 column_offset_index: &ParquetOffsetIndex,
1552 row_group_indices: I,
1553 ) -> Result<UInt64Array>
1554 where
1555 I: IntoIterator<Item = &'a usize>,
1556 {
1557 let Some(parquet_index) = self.parquet_column_index else {
1558 let num_row_groups = row_group_indices.into_iter().count();
1559 return Ok(UInt64Array::from_iter(
1560 std::iter::repeat(None).take(num_row_groups),
1561 ));
1562 };
1563
1564 let iter = row_group_indices.into_iter().map(|rg_index| {
1565 let column_page_index_per_row_group_per_column =
1566 &column_page_index[*rg_index][parquet_index];
1567 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1568 .page_locations()
1569 .len();
1570
1571 (*num_data_pages, column_page_index_per_row_group_per_column)
1572 });
1573 null_counts_page_statistics(iter)
1574 }
1575
1576 pub fn data_page_row_counts<I>(
1594 &self,
1595 column_offset_index: &ParquetOffsetIndex,
1596 row_group_metadatas: &'a [RowGroupMetaData],
1597 row_group_indices: I,
1598 ) -> Result<Option<UInt64Array>>
1599 where
1600 I: IntoIterator<Item = &'a usize>,
1601 {
1602 let Some(parquet_index) = self.parquet_column_index else {
1603 return Ok(None);
1607 };
1608
1609 let mut row_count_total = Vec::new();
1610 for rg_idx in row_group_indices {
1611 let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
1612
1613 let row_count_per_page = page_locations
1614 .windows(2)
1615 .map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
1616
1617 let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
1619 let row_count_per_page = row_count_per_page
1620 .chain(std::iter::once(Some(
1621 *num_rows_in_row_group as u64
1622 - page_locations.last().unwrap().first_row_index as u64,
1623 )))
1624 .collect::<Vec<_>>();
1625
1626 row_count_total.extend(row_count_per_page);
1627 }
1628
1629 Ok(Some(UInt64Array::from_iter(row_count_total)))
1630 }
1631
1632 fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
1634 where
1635 I: IntoIterator<Item = A>,
1636 {
1637 let num_row_groups = metadatas.into_iter().count();
1639 new_null_array(data_type, num_row_groups)
1640 }
1641}
1642
1643