1use crate::arrow::buffer::bit_util::sign_extend_be;
23use crate::arrow::parquet_column;
24use crate::basic::Type as PhysicalType;
25use crate::errors::{ParquetError, Result};
26use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
27use crate::file::page_index::column_index::ColumnIndexMetaData;
28use crate::file::statistics::Statistics as ParquetStatistics;
29use crate::schema::types::SchemaDescriptor;
30use arrow_array::builder::{
31 BinaryBuilder, BinaryViewBuilder, BooleanBuilder, Date32Builder, Date64Builder,
32 Decimal32Builder, Decimal64Builder, FixedSizeBinaryBuilder, Float16Builder, Float32Builder,
33 Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder,
34 LargeStringBuilder, StringBuilder, StringViewBuilder, Time32MillisecondBuilder,
35 Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
36 TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
37 TimestampSecondBuilder, UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder,
38};
39use arrow_array::{
40 ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, Decimal64Array,
41 Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, Int8Array,
42 Int16Array, Int32Array, Int64Array, LargeBinaryArray, Time32MillisecondArray,
43 Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
44 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt8Array,
45 UInt16Array, UInt32Array, UInt64Array, new_null_array,
46};
47use arrow_buffer::{NullBufferBuilder, i256};
48use arrow_schema::{DataType, Field, Schema, TimeUnit};
49use half::f16;
50use paste::paste;
51use std::sync::Arc;
52
53pub(crate) fn from_bytes_to_i32(b: &[u8]) -> i32 {
56 i32::from_be_bytes(sign_extend_be::<4>(b))
60}
61
62pub(crate) fn from_bytes_to_i64(b: &[u8]) -> i64 {
65 i64::from_be_bytes(sign_extend_be::<8>(b))
66}
67
68pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
71 i128::from_be_bytes(sign_extend_be::<16>(b))
72}
73
74pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
77 i256::from_be_bytes(sign_extend_be::<32>(b))
78}
79
80pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
82 match b {
83 [low, high] => Some(f16::from_be_bytes([*high, *low])),
84 _ => None,
85 }
86}
87
88macro_rules! make_stats_iterator {
100 ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
101 struct $iterator_type<'a, I>
108 where
109 I: Iterator<Item = Option<&'a ParquetStatistics>>,
110 {
111 iter: I,
112 }
113
114 impl<'a, I> $iterator_type<'a, I>
115 where
116 I: Iterator<Item = Option<&'a ParquetStatistics>>,
117 {
118 fn new(iter: I) -> Self {
120 Self { iter }
121 }
122 }
123
124 impl<'a, I> Iterator for $iterator_type<'a, I>
126 where
127 I: Iterator<Item = Option<&'a ParquetStatistics>>,
128 {
129 type Item = Option<&'a $stat_value_type>;
130
131 fn next(&mut self) -> Option<Self::Item> {
133 let next = self.iter.next();
134 next.map(|x| {
135 x.and_then(|stats| match stats {
136 $parquet_statistics_type(s) => s.$func(),
137 _ => None,
138 })
139 })
140 }
141
142 fn size_hint(&self) -> (usize, Option<usize>) {
143 self.iter.size_hint()
144 }
145 }
146 };
147}
148
149make_stats_iterator!(
150 MinBooleanStatsIterator,
151 min_opt,
152 ParquetStatistics::Boolean,
153 bool
154);
155make_stats_iterator!(
156 MaxBooleanStatsIterator,
157 max_opt,
158 ParquetStatistics::Boolean,
159 bool
160);
161make_stats_iterator!(
162 MinInt32StatsIterator,
163 min_opt,
164 ParquetStatistics::Int32,
165 i32
166);
167make_stats_iterator!(
168 MaxInt32StatsIterator,
169 max_opt,
170 ParquetStatistics::Int32,
171 i32
172);
173make_stats_iterator!(
174 MinInt64StatsIterator,
175 min_opt,
176 ParquetStatistics::Int64,
177 i64
178);
179make_stats_iterator!(
180 MaxInt64StatsIterator,
181 max_opt,
182 ParquetStatistics::Int64,
183 i64
184);
185make_stats_iterator!(
186 MinFloatStatsIterator,
187 min_opt,
188 ParquetStatistics::Float,
189 f32
190);
191make_stats_iterator!(
192 MaxFloatStatsIterator,
193 max_opt,
194 ParquetStatistics::Float,
195 f32
196);
197make_stats_iterator!(
198 MinDoubleStatsIterator,
199 min_opt,
200 ParquetStatistics::Double,
201 f64
202);
203make_stats_iterator!(
204 MaxDoubleStatsIterator,
205 max_opt,
206 ParquetStatistics::Double,
207 f64
208);
209make_stats_iterator!(
210 MinByteArrayStatsIterator,
211 min_bytes_opt,
212 ParquetStatistics::ByteArray,
213 [u8]
214);
215make_stats_iterator!(
216 MaxByteArrayStatsIterator,
217 max_bytes_opt,
218 ParquetStatistics::ByteArray,
219 [u8]
220);
221make_stats_iterator!(
222 MinFixedLenByteArrayStatsIterator,
223 min_bytes_opt,
224 ParquetStatistics::FixedLenByteArray,
225 [u8]
226);
227make_stats_iterator!(
228 MaxFixedLenByteArrayStatsIterator,
229 max_bytes_opt,
230 ParquetStatistics::FixedLenByteArray,
231 [u8]
232);
233
234macro_rules! make_decimal_stats_iterator {
252 ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => {
253 struct $iterator_type<'a, I>
254 where
255 I: Iterator<Item = Option<&'a ParquetStatistics>>,
256 {
257 iter: I,
258 }
259
260 impl<'a, I> $iterator_type<'a, I>
261 where
262 I: Iterator<Item = Option<&'a ParquetStatistics>>,
263 {
264 fn new(iter: I) -> Self {
265 Self { iter }
266 }
267 }
268
269 impl<'a, I> Iterator for $iterator_type<'a, I>
270 where
271 I: Iterator<Item = Option<&'a ParquetStatistics>>,
272 {
273 type Item = Option<$stat_value_type>;
274
275 fn next(&mut self) -> Option<Self::Item> {
276 let next = self.iter.next();
277 next.map(|x| {
278 x.and_then(|stats| match stats {
279 ParquetStatistics::Int32(s) => {
280 s.$func().map(|x| $stat_value_type::from(*x))
281 }
282 ParquetStatistics::Int64(s) => s
283 .$func()
284 .map(|x| $stat_value_type::try_from(*x).ok())
285 .flatten(),
286 ParquetStatistics::ByteArray(s) => s.$bytes_func().map($convert_func),
287 ParquetStatistics::FixedLenByteArray(s) => {
288 s.$bytes_func().map($convert_func)
289 }
290 _ => None,
291 })
292 })
293 }
294
295 fn size_hint(&self) -> (usize, Option<usize>) {
296 self.iter.size_hint()
297 }
298 }
299 };
300}
301
302make_decimal_stats_iterator!(
303 MinDecimal32StatsIterator,
304 min_opt,
305 min_bytes_opt,
306 i32,
307 from_bytes_to_i32
308);
309make_decimal_stats_iterator!(
310 MaxDecimal32StatsIterator,
311 max_opt,
312 max_bytes_opt,
313 i32,
314 from_bytes_to_i32
315);
316make_decimal_stats_iterator!(
317 MinDecimal64StatsIterator,
318 min_opt,
319 min_bytes_opt,
320 i64,
321 from_bytes_to_i64
322);
323make_decimal_stats_iterator!(
324 MaxDecimal64StatsIterator,
325 max_opt,
326 max_bytes_opt,
327 i64,
328 from_bytes_to_i64
329);
330make_decimal_stats_iterator!(
331 MinDecimal128StatsIterator,
332 min_opt,
333 min_bytes_opt,
334 i128,
335 from_bytes_to_i128
336);
337make_decimal_stats_iterator!(
338 MaxDecimal128StatsIterator,
339 max_opt,
340 max_bytes_opt,
341 i128,
342 from_bytes_to_i128
343);
344make_decimal_stats_iterator!(
345 MinDecimal256StatsIterator,
346 min_opt,
347 min_bytes_opt,
348 i256,
349 from_bytes_to_i256
350);
351make_decimal_stats_iterator!(
352 MaxDecimal256StatsIterator,
353 max_opt,
354 max_bytes_opt,
355 i256,
356 from_bytes_to_i256
357);
358
359macro_rules! get_statistics {
367 ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
368 paste! {
369 match $data_type {
370 DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
371 [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()),
372 ))),
373 DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
374 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
375 x.and_then(|x| i8::try_from(*x).ok())
376 }),
377 ))),
378 DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
379 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
380 x.and_then(|x| i16::try_from(*x).ok())
381 }),
382 ))),
383 DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
384 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
385 ))),
386 DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(
387 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
388 ))),
389 DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
390 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
391 x.and_then(|x| u8::try_from(*x).ok())
392 }),
393 ))),
394 DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
395 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
396 x.and_then(|x| u16::try_from(*x).ok())
397 }),
398 ))),
399 DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
400 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)),
401 ))),
402 DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
403 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)),
404 ))),
405 DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(
406 [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| {
407 from_bytes_to_f16(x)
408 })),
409 ))),
410 DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
411 [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()),
412 ))),
413 DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
414 [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()),
415 ))),
416 DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
417 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
418 ))),
419 DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
420 [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
421 .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
422 DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
423 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
424 DataType::Timestamp(unit, timezone) =>{
425 let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
426 Ok(match unit {
427 TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
428 TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
429 TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
430 TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
431 })
432 },
433 DataType::Time32(unit) => {
434 Ok(match unit {
435 TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
436 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
437 )),
438 TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(
439 [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
440 )),
441 _ => {
442 let len = $iterator.count();
443 new_null_array($data_type, len)
445 }
446 })
447 },
448 DataType::Time64(unit) => {
449 Ok(match unit {
450 TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
451 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
452 )),
453 TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(
454 [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
455 )),
456 _ => {
457 let len = $iterator.count();
458 new_null_array($data_type, len)
460 }
461 })
462 },
463 DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
464 [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
465 ))),
466 DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
467 [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
468 ))),
469 DataType::Utf8 => {
470 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
471 let mut builder = StringBuilder::new();
472 for x in iterator {
473 let Some(x) = x else {
474 builder.append_null(); continue;
476 };
477
478 let Ok(x) = std::str::from_utf8(x) else {
479 builder.append_null();
480 continue;
481 };
482
483 builder.append_value(x);
484 }
485 Ok(Arc::new(builder.finish()))
486 },
487 DataType::LargeUtf8 => {
488 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
489 let mut builder = LargeStringBuilder::new();
490 for x in iterator {
491 let Some(x) = x else {
492 builder.append_null(); continue;
494 };
495
496 let Ok(x) = std::str::from_utf8(x) else {
497 builder.append_null();
498 continue;
499 };
500
501 builder.append_value(x);
502 }
503 Ok(Arc::new(builder.finish()))
504 },
505 DataType::FixedSizeBinary(size) => {
506 let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
507 let mut builder = FixedSizeBinaryBuilder::new(*size);
508 for x in iterator {
509 let Some(x) = x else {
510 builder.append_null(); continue;
512 };
513
514 if x.len().try_into() != Ok(*size){
516 builder.append_null();
517 continue;
518 }
519
520 builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
521 }
522 Ok(Arc::new(builder.finish()))
523 },
524 DataType::Decimal32(precision, scale) => {
525 let arr = Decimal32Array::from_iter(
526 [<$stat_type_prefix Decimal32StatsIterator>]::new($iterator)
527 ).with_precision_and_scale(*precision, *scale)?;
528 Ok(Arc::new(arr))
529 },
530 DataType::Decimal64(precision, scale) => {
531 let arr = Decimal64Array::from_iter(
532 [<$stat_type_prefix Decimal64StatsIterator>]::new($iterator)
533 ).with_precision_and_scale(*precision, *scale)?;
534 Ok(Arc::new(arr))
535 },
536 DataType::Decimal128(precision, scale) => {
537 let arr = Decimal128Array::from_iter(
538 [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
539 ).with_precision_and_scale(*precision, *scale)?;
540 Ok(Arc::new(arr))
541 },
542 DataType::Decimal256(precision, scale) => {
543 let arr = Decimal256Array::from_iter(
544 [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator)
545 ).with_precision_and_scale(*precision, *scale)?;
546 Ok(Arc::new(arr))
547 },
548 DataType::Dictionary(_, value_type) => {
549 [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
550 },
551 DataType::Utf8View => {
552 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
553 let mut builder = StringViewBuilder::new();
554 for x in iterator {
555 let Some(x) = x else {
556 builder.append_null(); continue;
558 };
559
560 let Ok(x) = std::str::from_utf8(x) else {
561 builder.append_null();
562 continue;
563 };
564
565 builder.append_value(x);
566 }
567 Ok(Arc::new(builder.finish()))
568 },
569 DataType::BinaryView => {
570 let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
571 let mut builder = BinaryViewBuilder::new();
572 for x in iterator {
573 let Some(x) = x else {
574 builder.append_null(); continue;
576 };
577
578 builder.append_value(x);
579 }
580 Ok(Arc::new(builder.finish()))
581 }
582
583 DataType::Map(_,_) |
584 DataType::Duration(_) |
585 DataType::Interval(_) |
586 DataType::Date64 | DataType::Null |
588 DataType::List(_) |
589 DataType::ListView(_) |
590 DataType::FixedSizeList(_, _) |
591 DataType::LargeList(_) |
592 DataType::LargeListView(_) |
593 DataType::Struct(_) |
594 DataType::Union(_, _) |
595 DataType::RunEndEncoded(_, _) => {
596 let len = $iterator.count();
597 Ok(new_null_array($data_type, len))
599 }
600 }}}
601}
602
603macro_rules! get_data_page_statistics {
604 ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
605 {
606 let chunks: Vec<(usize, &ColumnIndexMetaData)> = $iterator.collect();
607 let capacity: usize = chunks.iter().map(|c| c.0).sum();
608 paste! {
609 match $data_type {
610 DataType::Boolean => {
611 let mut b = BooleanBuilder::with_capacity(capacity);
612 for (len, index) in chunks {
613 match index {
614 ColumnIndexMetaData::BOOLEAN(index) => {
615 for val in index.[<$stat_type_prefix:lower _values_iter>]() {
616 b.append_option(val.copied());
617 }
618 }
619 _ => b.append_nulls(len),
620 }
621 }
622 Ok(Arc::new(b.finish()))
623 },
624 DataType::UInt8 => {
625 let mut b = UInt8Builder::with_capacity(capacity);
626 for (len, index) in chunks {
627 match index {
628 ColumnIndexMetaData::INT32(index) => {
629 b.extend_from_iter_option(
630 index.[<$stat_type_prefix:lower _values_iter>]()
631 .map(|val| val.and_then(|&x| u8::try_from(x).ok())),
632 );
633 }
634 _ => b.append_nulls(len),
635 }
636 }
637 Ok(Arc::new(b.finish()))
638 },
639 DataType::UInt16 => {
640 let mut b = UInt16Builder::with_capacity(capacity);
641 for (len, index) in chunks {
642 match index {
643 ColumnIndexMetaData::INT32(index) => {
644 b.extend_from_iter_option(
645 index.[<$stat_type_prefix:lower _values_iter>]()
646 .map(|val| val.and_then(|&x| u16::try_from(x).ok())),
647 );
648 }
649 _ => b.append_nulls(len),
650 }
651 }
652 Ok(Arc::new(b.finish()))
653 },
654 DataType::UInt32 => {
655 let mut b = UInt32Builder::with_capacity(capacity);
656 for (len, index) in chunks {
657 match index {
658 ColumnIndexMetaData::INT32(index) => {
659 b.extend_from_iter_option(
660 index.[<$stat_type_prefix:lower _values_iter>]()
661 .map(|val| val.map(|&x| x as u32)),
662 );
663 }
664 _ => b.append_nulls(len),
665 }
666 }
667 Ok(Arc::new(b.finish()))
668 },
669 DataType::UInt64 => {
670 let mut b = UInt64Builder::with_capacity(capacity);
671 for (len, index) in chunks {
672 match index {
673 ColumnIndexMetaData::INT64(index) => {
674 b.extend_from_iter_option(
675 index.[<$stat_type_prefix:lower _values_iter>]()
676 .map(|val| val.map(|&x| x as u64)),
677 );
678 }
679 _ => b.append_nulls(len),
680 }
681 }
682 Ok(Arc::new(b.finish()))
683 },
684 DataType::Int8 => {
685 let mut b = Int8Builder::with_capacity(capacity);
686 for (len, index) in chunks {
687 match index {
688 ColumnIndexMetaData::INT32(index) => {
689 b.extend_from_iter_option(
690 index.[<$stat_type_prefix:lower _values_iter>]()
691 .map(|val| val.and_then(|&x| i8::try_from(x).ok())),
692 );
693 }
694 _ => b.append_nulls(len),
695 }
696 }
697 Ok(Arc::new(b.finish()))
698 },
699 DataType::Int16 => {
700 let mut b = Int16Builder::with_capacity(capacity);
701 for (len, index) in chunks {
702 match index {
703 ColumnIndexMetaData::INT32(index) => {
704 b.extend_from_iter_option(
705 index.[<$stat_type_prefix:lower _values_iter>]()
706 .map(|val| val.and_then(|&x| i16::try_from(x).ok())),
707 );
708 }
709 _ => b.append_nulls(len),
710 }
711 }
712 Ok(Arc::new(b.finish()))
713 },
714 DataType::Int32 => {
715 let mut b = Int32Builder::with_capacity(capacity);
716 for (len, index) in chunks {
717 match index {
718 ColumnIndexMetaData::INT32(index) => {
719 b.extend_from_iter_option(
720 index.[<$stat_type_prefix:lower _values_iter>]()
721 .map(|val| val.copied()),
722 );
723 }
724 _ => b.append_nulls(len),
725 }
726 }
727 Ok(Arc::new(b.finish()))
728 },
729 DataType::Int64 => {
730 let mut b = Int64Builder::with_capacity(capacity);
731 for (len, index) in chunks {
732 match index {
733 ColumnIndexMetaData::INT64(index) => {
734 b.extend_from_iter_option(
735 index.[<$stat_type_prefix:lower _values_iter>]()
736 .map(|val| val.copied()),
737 );
738 }
739 _ => b.append_nulls(len),
740 }
741 }
742 Ok(Arc::new(b.finish()))
743 },
744 DataType::Float16 => {
745 let mut b = Float16Builder::with_capacity(capacity);
746 for (len, index) in chunks {
747 match index {
748 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
749 b.extend_from_iter_option(
750 index.[<$stat_type_prefix:lower _values_iter>]()
751 .map(|val| val.and_then(|x| from_bytes_to_f16(x))),
752 );
753 }
754 _ => b.append_nulls(len),
755 }
756 }
757 Ok(Arc::new(b.finish()))
758 },
759 DataType::Float32 => {
760 let mut b = Float32Builder::with_capacity(capacity);
761 for (len, index) in chunks {
762 match index {
763 ColumnIndexMetaData::FLOAT(index) => {
764 b.extend_from_iter_option(
765 index.[<$stat_type_prefix:lower _values_iter>]()
766 .map(|val| val.copied()),
767 );
768 }
769 _ => b.append_nulls(len),
770 }
771 }
772 Ok(Arc::new(b.finish()))
773 },
774 DataType::Float64 => {
775 let mut b = Float64Builder::with_capacity(capacity);
776 for (len, index) in chunks {
777 match index {
778 ColumnIndexMetaData::DOUBLE(index) => {
779 b.extend_from_iter_option(
780 index.[<$stat_type_prefix:lower _values_iter>]()
781 .map(|val| val.copied()),
782 );
783 }
784 _ => b.append_nulls(len),
785 }
786 }
787 Ok(Arc::new(b.finish()))
788 },
789 DataType::Binary => {
790 let mut b = BinaryBuilder::with_capacity(capacity, capacity * 10);
791 for (len, index) in chunks {
792 match index {
793 ColumnIndexMetaData::BYTE_ARRAY(index) => {
794 for val in index.[<$stat_type_prefix:lower _values_iter>]() {
795 b.append_option(val.map(|x| x.as_ref()));
796 }
797 }
798 _ => b.append_nulls(len),
799 }
800 }
801 Ok(Arc::new(b.finish()))
802 },
803 DataType::LargeBinary => {
804 let mut b = LargeBinaryBuilder::with_capacity(capacity, capacity * 10);
805 for (len, index) in chunks {
806 match index {
807 ColumnIndexMetaData::BYTE_ARRAY(index) => {
808 for val in index.[<$stat_type_prefix:lower _values_iter>]() {
809 b.append_option(val.map(|x| x.as_ref()));
810 }
811 }
812 _ => b.append_nulls(len),
813 }
814 }
815 Ok(Arc::new(b.finish()))
816 },
817 DataType::Utf8 => {
818 let mut b = StringBuilder::with_capacity(capacity, capacity * 10);
819 for (len, index) in chunks {
820 match index {
821 ColumnIndexMetaData::BYTE_ARRAY(index) => {
822 for val in index.[<$stat_type_prefix:lower _values_iter>]() {
823 match val {
824 Some(x) => match std::str::from_utf8(x.as_ref()) {
825 Ok(s) => b.append_value(s),
826 _ => b.append_null(),
827 }
828 None => b.append_null(),
829 }
830 }
831 }
832 _ => b.append_nulls(len),
833 }
834 }
835 Ok(Arc::new(b.finish()))
836 },
837 DataType::LargeUtf8 => {
838 let mut b = LargeStringBuilder::with_capacity(capacity, capacity * 10);
839 for (len, index) in chunks {
840 match index {
841 ColumnIndexMetaData::BYTE_ARRAY(index) => {
842 for val in index.[<$stat_type_prefix:lower _values_iter>]() {
843 match val {
844 Some(x) => match std::str::from_utf8(x.as_ref()) {
845 Ok(s) => b.append_value(s),
846 _ => b.append_null(),
847 }
848 None => b.append_null(),
849 }
850 }
851 }
852 _ => b.append_nulls(len),
853 }
854 }
855 Ok(Arc::new(b.finish()))
856 },
857 DataType::Dictionary(_, value_type) => {
858 [<$stat_type_prefix:lower _ page_statistics>](value_type, chunks.into_iter(), $physical_type)
859 },
860 DataType::Timestamp(unit, timezone) => {
861 match unit {
862 TimeUnit::Second => {
863 let mut b = TimestampSecondBuilder::with_capacity(capacity);
864 for (len, index) in chunks {
865 match index {
866 ColumnIndexMetaData::INT64(index) => {
867 b.extend_from_iter_option(
868 index.[<$stat_type_prefix:lower _values_iter>]()
869 .map(|val| val.copied()),
870 );
871 }
872 _ => b.append_nulls(len),
873 }
874 }
875 Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
876 }
877 TimeUnit::Millisecond => {
878 let mut b = TimestampMillisecondBuilder::with_capacity(capacity);
879 for (len, index) in chunks {
880 match index {
881 ColumnIndexMetaData::INT64(index) => {
882 b.extend_from_iter_option(
883 index.[<$stat_type_prefix:lower _values_iter>]()
884 .map(|val| val.copied()),
885 );
886 }
887 _ => b.append_nulls(len),
888 }
889 }
890 Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
891 }
892 TimeUnit::Microsecond => {
893 let mut b = TimestampMicrosecondBuilder::with_capacity(capacity);
894 for (len, index) in chunks {
895 match index {
896 ColumnIndexMetaData::INT64(index) => {
897 b.extend_from_iter_option(
898 index.[<$stat_type_prefix:lower _values_iter>]()
899 .map(|val| val.copied()),
900 );
901 }
902 _ => b.append_nulls(len),
903 }
904 }
905 Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
906 }
907 TimeUnit::Nanosecond => {
908 let mut b = TimestampNanosecondBuilder::with_capacity(capacity);
909 for (len, index) in chunks {
910 match index {
911 ColumnIndexMetaData::INT64(index) => {
912 b.extend_from_iter_option(
913 index.[<$stat_type_prefix:lower _values_iter>]()
914 .map(|val| val.copied()),
915 );
916 }
917 _ => b.append_nulls(len),
918 }
919 }
920 Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone())))
921 }
922 }
923 },
924 DataType::Date32 => {
925 let mut b = Date32Builder::with_capacity(capacity);
926 for (len, index) in chunks {
927 match index {
928 ColumnIndexMetaData::INT32(index) => {
929 b.extend_from_iter_option(
930 index.[<$stat_type_prefix:lower _values_iter>]()
931 .map(|val| val.copied()),
932 );
933 }
934 _ => b.append_nulls(len),
935 }
936 }
937 Ok(Arc::new(b.finish()))
938 },
939 DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> {
940 let mut b = Date64Builder::with_capacity(capacity);
941 for (len, index) in chunks {
942 match index {
943 ColumnIndexMetaData::INT32(index) => {
944 b.extend_from_iter_option(
945 index.[<$stat_type_prefix:lower _values_iter>]()
946 .map(|val| val.map(|&x| (x as i64) * 24 * 60 * 60 * 1000)),
947 );
948 }
949 _ => b.append_nulls(len),
950 }
951 }
952 Ok(Arc::new(b.finish()))
953 },
954 DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => {
955 let mut b = Date64Builder::with_capacity(capacity);
956 for (len, index) in chunks {
957 match index {
958 ColumnIndexMetaData::INT64(index) => {
959 b.extend_from_iter_option(
960 index.[<$stat_type_prefix:lower _values_iter>]()
961 .map(|val| val.copied()),
962 );
963 }
964 _ => b.append_nulls(len),
965 }
966 }
967 Ok(Arc::new(b.finish()))
968 },
969 DataType::Decimal32(precision, scale) => {
970 let mut b = Decimal32Builder::with_capacity(capacity);
971 for (len, index) in chunks {
972 match index {
973 ColumnIndexMetaData::INT32(index) => {
974 b.extend_from_iter_option(
975 index.[<$stat_type_prefix:lower _values_iter>]()
976 .map(|val| val.copied()),
977 );
978 }
979 ColumnIndexMetaData::INT64(index) => {
980 b.extend_from_iter_option(
981 index.[<$stat_type_prefix:lower _values_iter>]()
982 .map(|val| val.and_then(|&x| i32::try_from(x).ok())),
983 );
984 }
985 ColumnIndexMetaData::BYTE_ARRAY(index) => {
986 b.extend_from_iter_option(
987 index.[<$stat_type_prefix:lower _values_iter>]()
988 .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))),
989 );
990 }
991 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
992 b.extend_from_iter_option(
993 index.[<$stat_type_prefix:lower _values_iter>]()
994 .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))),
995 );
996 }
997 _ => b.append_nulls(len),
998 }
999 }
1000 Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
1001 },
1002 DataType::Decimal64(precision, scale) => {
1003 let mut b = Decimal64Builder::with_capacity(capacity);
1004 for (len, index) in chunks {
1005 match index {
1006 ColumnIndexMetaData::INT32(index) => {
1007 b.extend_from_iter_option(
1008 index.[<$stat_type_prefix:lower _values_iter>]()
1009 .map(|val| val.map(|x| *x as i64)),
1010 );
1011 }
1012 ColumnIndexMetaData::INT64(index) => {
1013 b.extend_from_iter_option(
1014 index.[<$stat_type_prefix:lower _values_iter>]()
1015 .map(|val| val.copied()),
1016 );
1017 }
1018 ColumnIndexMetaData::BYTE_ARRAY(index) => {
1019 b.extend_from_iter_option(
1020 index.[<$stat_type_prefix:lower _values_iter>]()
1021 .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))),
1022 );
1023 }
1024 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
1025 b.extend_from_iter_option(
1026 index.[<$stat_type_prefix:lower _values_iter>]()
1027 .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))),
1028 );
1029 }
1030 _ => b.append_nulls(len),
1031 }
1032 }
1033 Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
1034 },
1035 DataType::Decimal128(precision, scale) => {
1036 let mut b = Decimal128Array::builder(capacity);
1037 for (len, index) in chunks {
1038 match index {
1039 ColumnIndexMetaData::INT32(index) => {
1040 b.extend_from_iter_option(
1041 index.[<$stat_type_prefix:lower _values_iter>]()
1042 .map(|val| val.map(|x| *x as i128)),
1043 );
1044 }
1045 ColumnIndexMetaData::INT64(index) => {
1046 b.extend_from_iter_option(
1047 index.[<$stat_type_prefix:lower _values_iter>]()
1048 .map(|val| val.map(|x| *x as i128)),
1049 );
1050 }
1051 ColumnIndexMetaData::BYTE_ARRAY(index) => {
1052 b.extend_from_iter_option(
1053 index.[<$stat_type_prefix:lower _values_iter>]()
1054 .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))),
1055 );
1056 }
1057 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
1058 b.extend_from_iter_option(
1059 index.[<$stat_type_prefix:lower _values_iter>]()
1060 .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))),
1061 );
1062 }
1063 _ => b.append_nulls(len),
1064 }
1065 }
1066 Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
1067 },
1068 DataType::Decimal256(precision, scale) => {
1069 let mut b = Decimal256Array::builder(capacity);
1070 for (len, index) in chunks {
1071 match index {
1072 ColumnIndexMetaData::INT32(index) => {
1073 b.extend_from_iter_option(
1074 index.[<$stat_type_prefix:lower _values_iter>]()
1075 .map(|val| val.map(|x| i256::from_i128(*x as i128))),
1076 );
1077 }
1078 ColumnIndexMetaData::INT64(index) => {
1079 b.extend_from_iter_option(
1080 index.[<$stat_type_prefix:lower _values_iter>]()
1081 .map(|val| val.map(|x| i256::from_i128(*x as i128))),
1082 );
1083 }
1084 ColumnIndexMetaData::BYTE_ARRAY(index) => {
1085 b.extend_from_iter_option(
1086 index.[<$stat_type_prefix:lower _values_iter>]()
1087 .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))),
1088 );
1089 }
1090 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
1091 b.extend_from_iter_option(
1092 index.[<$stat_type_prefix:lower _values_iter>]()
1093 .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))),
1094 );
1095 }
1096 _ => b.append_nulls(len),
1097 }
1098 }
1099 Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish()))
1100 },
1101 DataType::Time32(unit) => {
1102 match unit {
1103 TimeUnit::Second => {
1104 let mut b = Time32SecondBuilder::with_capacity(capacity);
1105 for (len, index) in chunks {
1106 match index {
1107 ColumnIndexMetaData::INT32(index) => {
1108 b.extend_from_iter_option(
1109 index.[<$stat_type_prefix:lower _values_iter>]()
1110 .map(|val| val.copied()),
1111 );
1112 }
1113 _ => b.append_nulls(len),
1114 }
1115 }
1116 Ok(Arc::new(b.finish()))
1117 }
1118 TimeUnit::Millisecond => {
1119 let mut b = Time32MillisecondBuilder::with_capacity(capacity);
1120 for (len, index) in chunks {
1121 match index {
1122 ColumnIndexMetaData::INT32(index) => {
1123 b.extend_from_iter_option(
1124 index.[<$stat_type_prefix:lower _values_iter>]()
1125 .map(|val| val.copied()),
1126 );
1127 }
1128 _ => b.append_nulls(len),
1129 }
1130 }
1131 Ok(Arc::new(b.finish()))
1132 }
1133 _ => {
1134 Ok(new_null_array($data_type, capacity))
1135 }
1136 }
1137 }
1138 DataType::Time64(unit) => {
1139 match unit {
1140 TimeUnit::Microsecond => {
1141 let mut b = Time64MicrosecondBuilder::with_capacity(capacity);
1142 for (len, index) in chunks {
1143 match index {
1144 ColumnIndexMetaData::INT64(index) => {
1145 b.extend_from_iter_option(
1146 index.[<$stat_type_prefix:lower _values_iter>]()
1147 .map(|val| val.copied()),
1148 );
1149 }
1150 _ => b.append_nulls(len),
1151 }
1152 }
1153 Ok(Arc::new(b.finish()))
1154 }
1155 TimeUnit::Nanosecond => {
1156 let mut b = Time64NanosecondBuilder::with_capacity(capacity);
1157 for (len, index) in chunks {
1158 match index {
1159 ColumnIndexMetaData::INT64(index) => {
1160 b.extend_from_iter_option(
1161 index.[<$stat_type_prefix:lower _values_iter>]()
1162 .map(|val| val.copied()),
1163 );
1164 }
1165 _ => b.append_nulls(len),
1166 }
1167 }
1168 Ok(Arc::new(b.finish()))
1169 }
1170 _ => {
1171 Ok(new_null_array($data_type, capacity))
1172 }
1173 }
1174 },
1175 DataType::FixedSizeBinary(size) => {
1176 let mut b = FixedSizeBinaryBuilder::with_capacity(capacity, *size);
1177 for (len, index) in chunks {
1178 match index {
1179 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => {
1180 for val in index.[<$stat_type_prefix:lower _values_iter>]() {
1181 match val {
1182 Some(v) => {
1183 if v.len() == *size as usize {
1184 let _ = b.append_value(v.as_ref())?;
1185 } else {
1186 b.append_null();
1187 }
1188 }
1189 None => b.append_null(),
1190 }
1191 }
1192 }
1193 _ => b.append_nulls(len),
1194 }
1195 }
1196 Ok(Arc::new(b.finish()))
1197 },
1198 DataType::Utf8View => {
1199 let mut b = StringViewBuilder::with_capacity(capacity);
1200 for (len, index) in chunks {
1201 match index {
1202 ColumnIndexMetaData::BYTE_ARRAY(index) => {
1203 for val in index.[<$stat_type_prefix:lower _values_iter>]() {
1204 match val {
1205 Some(x) => match std::str::from_utf8(x.as_ref()) {
1206 Ok(s) => b.append_value(s),
1207 _ => b.append_null(),
1208 }
1209 None => b.append_null(),
1210 }
1211 }
1212 }
1213 _ => {
1214 for _ in 0..len { b.append_null(); }
1215 }
1216 }
1217 }
1218 Ok(Arc::new(b.finish()))
1219 },
1220 DataType::BinaryView => {
1221 let mut b = BinaryViewBuilder::with_capacity(capacity);
1222 for (len, index) in chunks {
1223 match index {
1224 ColumnIndexMetaData::BYTE_ARRAY(index) => {
1225 for val in index.[<$stat_type_prefix:lower _values_iter>]() {
1226 match val {
1227 Some(v) => b.append_value(v.as_ref()),
1228 None => b.append_null(),
1229 }
1230 }
1231 }
1232 _ => {
1233 for _ in 0..len { b.append_null(); }
1234 }
1235 }
1236 }
1237 Ok(Arc::new(b.finish()))
1238 },
1239 DataType::Date64 | DataType::Null |
1241 DataType::Duration(_) |
1242 DataType::Interval(_) |
1243 DataType::List(_) |
1244 DataType::ListView(_) |
1245 DataType::FixedSizeList(_, _) |
1246 DataType::LargeList(_) |
1247 DataType::LargeListView(_) |
1248 DataType::Struct(_) |
1249 DataType::Union(_, _) |
1250 DataType::Map(_, _) |
1251 DataType::RunEndEncoded(_, _) => {
1252 Ok(new_null_array($data_type, capacity))
1254 },
1255 }
1256 }
1257 }
1258 }
1259}
1260fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1265 data_type: &DataType,
1266 iterator: I,
1267 physical_type: Option<PhysicalType>,
1268) -> Result<ArrayRef> {
1269 get_statistics!(Min, data_type, iterator, physical_type)
1270}
1271
1272fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1276 data_type: &DataType,
1277 iterator: I,
1278 physical_type: Option<PhysicalType>,
1279) -> Result<ArrayRef> {
1280 get_statistics!(Max, data_type, iterator, physical_type)
1281}
1282
1283pub(crate) fn min_page_statistics<'a, I>(
1286 data_type: &DataType,
1287 iterator: I,
1288 physical_type: Option<PhysicalType>,
1289) -> Result<ArrayRef>
1290where
1291 I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1292{
1293 get_data_page_statistics!(Min, data_type, iterator, physical_type)
1294}
1295
1296pub(crate) fn max_page_statistics<'a, I>(
1299 data_type: &DataType,
1300 iterator: I,
1301 physical_type: Option<PhysicalType>,
1302) -> Result<ArrayRef>
1303where
1304 I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1305{
1306 get_data_page_statistics!(Max, data_type, iterator, physical_type)
1307}
1308
1309pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
1314where
1315 I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
1316{
1317 let chunks: Vec<_> = iterator.collect();
1318 let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum();
1319 let mut values = Vec::with_capacity(total_capacity);
1320 let mut nulls = NullBufferBuilder::new(total_capacity);
1321 for (len, index) in chunks {
1322 match index.null_counts() {
1323 Some(counts) => {
1324 values.extend(counts.iter().map(|&x| x as u64));
1325 nulls.append_n_non_nulls(len);
1326 }
1327 None => {
1328 values.resize(values.len() + len, 0);
1329 nulls.append_n_nulls(len);
1330 }
1331 }
1332 }
1333 let null_buffer = nulls.build();
1334 let array = UInt64Array::new(values.into(), null_buffer);
1335 Ok(array)
1336}
1337
1338#[derive(Debug)]
1358pub struct StatisticsConverter<'a> {
1359 parquet_column_index: Option<usize>,
1361 arrow_field: &'a Field,
1363 missing_null_counts_as_zero: bool,
1365 physical_type: Option<PhysicalType>,
1367}
1368
1369impl<'a> StatisticsConverter<'a> {
1370 pub fn parquet_column_index(&self) -> Option<usize> {
1375 self.parquet_column_index
1376 }
1377
1378 pub fn arrow_field(&self) -> &'a Field {
1380 self.arrow_field
1381 }
1382
1383 pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
1396 self.missing_null_counts_as_zero = missing_null_counts_as_zero;
1397 self
1398 }
1399
1400 pub fn row_group_row_counts<I>(&self, metadatas: I) -> Result<Option<UInt64Array>>
1431 where
1432 I: IntoIterator<Item = &'a RowGroupMetaData>,
1433 {
1434 let Some(_) = self.parquet_column_index else {
1435 return Ok(None);
1436 };
1437
1438 let mut builder = UInt64Array::builder(10);
1439 for metadata in metadatas.into_iter() {
1440 let row_count = metadata.num_rows();
1441 let row_count: u64 = row_count.try_into().map_err(|e| {
1442 arrow_err!(format!(
1443 "Parquet row count {row_count} too large to convert to u64: {e}"
1444 ))
1445 })?;
1446 builder.append_value(row_count);
1447 }
1448 Ok(Some(builder.finish()))
1449 }
1450
1451 pub fn try_new<'b>(
1463 column_name: &'b str,
1464 arrow_schema: &'a Schema,
1465 parquet_schema: &'a SchemaDescriptor,
1466 ) -> Result<Self> {
1467 let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
1469 return Err(arrow_err!(format!(
1470 "Column '{}' not found in schema for statistics conversion",
1471 column_name
1472 )));
1473 };
1474
1475 let parquet_index = match parquet_column(parquet_schema, arrow_schema, column_name) {
1477 Some((parquet_idx, matched_field)) => {
1478 if matched_field.as_ref() != arrow_field {
1480 return Err(arrow_err!(format!(
1481 "Matched column '{:?}' does not match original matched column '{:?}'",
1482 matched_field, arrow_field
1483 )));
1484 }
1485 Some(parquet_idx)
1486 }
1487 None => None,
1488 };
1489
1490 Ok(Self {
1491 parquet_column_index: parquet_index,
1492 arrow_field,
1493 missing_null_counts_as_zero: true,
1494 physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
1495 })
1496 }
1497
1498 pub fn row_group_mins<I>(&self, metadatas: I) -> Result<ArrayRef>
1542 where
1543 I: IntoIterator<Item = &'a RowGroupMetaData>,
1544 {
1545 let data_type = self.arrow_field.data_type();
1546
1547 let Some(parquet_index) = self.parquet_column_index else {
1548 return Ok(self.make_null_array(data_type, metadatas));
1549 };
1550
1551 let iter = metadatas
1552 .into_iter()
1553 .map(|x| x.column(parquet_index).statistics());
1554 min_statistics(data_type, iter, self.physical_type)
1555 }
1556
1557 pub fn row_group_maxes<I>(&self, metadatas: I) -> Result<ArrayRef>
1561 where
1562 I: IntoIterator<Item = &'a RowGroupMetaData>,
1563 {
1564 let data_type = self.arrow_field.data_type();
1565
1566 let Some(parquet_index) = self.parquet_column_index else {
1567 return Ok(self.make_null_array(data_type, metadatas));
1568 };
1569
1570 let iter = metadatas
1571 .into_iter()
1572 .map(|x| x.column(parquet_index).statistics());
1573 max_statistics(data_type, iter, self.physical_type)
1574 }
1575
1576 pub fn row_group_is_max_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1580 where
1581 I: IntoIterator<Item = &'a RowGroupMetaData>,
1582 {
1583 let Some(parquet_index) = self.parquet_column_index else {
1584 let num_row_groups = metadatas.into_iter().count();
1585 return Ok(BooleanArray::from_iter(std::iter::repeat_n(
1586 None,
1587 num_row_groups,
1588 )));
1589 };
1590
1591 let is_max_value_exact = metadatas
1592 .into_iter()
1593 .map(|x| x.column(parquet_index).statistics())
1594 .map(|s| s.map(|s| s.max_is_exact()));
1595 Ok(BooleanArray::from_iter(is_max_value_exact))
1596 }
1597
1598 pub fn row_group_is_min_value_exact<I>(&self, metadatas: I) -> Result<BooleanArray>
1602 where
1603 I: IntoIterator<Item = &'a RowGroupMetaData>,
1604 {
1605 let Some(parquet_index) = self.parquet_column_index else {
1606 let num_row_groups = metadatas.into_iter().count();
1607 return Ok(BooleanArray::from_iter(std::iter::repeat_n(
1608 None,
1609 num_row_groups,
1610 )));
1611 };
1612
1613 let is_min_value_exact = metadatas
1614 .into_iter()
1615 .map(|x| x.column(parquet_index).statistics())
1616 .map(|s| s.map(|s| s.min_is_exact()));
1617 Ok(BooleanArray::from_iter(is_min_value_exact))
1618 }
1619
1620 pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
1624 where
1625 I: IntoIterator<Item = &'a RowGroupMetaData>,
1626 {
1627 let Some(parquet_index) = self.parquet_column_index else {
1628 let num_row_groups = metadatas.into_iter().count();
1629 return Ok(UInt64Array::from_iter(std::iter::repeat_n(
1630 None,
1631 num_row_groups,
1632 )));
1633 };
1634
1635 let null_counts = metadatas
1636 .into_iter()
1637 .map(|x| x.column(parquet_index).statistics())
1638 .map(|s| {
1639 s.and_then(|s| {
1640 if self.missing_null_counts_as_zero {
1641 Some(s.null_count_opt().unwrap_or(0))
1642 } else {
1643 s.null_count_opt()
1644 }
1645 })
1646 });
1647 Ok(UInt64Array::from_iter(null_counts))
1648 }
1649
1650 pub fn data_page_mins<I>(
1702 &self,
1703 column_page_index: &ParquetColumnIndex,
1704 column_offset_index: &ParquetOffsetIndex,
1705 row_group_indices: I,
1706 ) -> Result<ArrayRef>
1707 where
1708 I: IntoIterator<Item = &'a usize>,
1709 {
1710 let data_type = self.arrow_field.data_type();
1711
1712 let Some(parquet_index) = self.parquet_column_index else {
1713 return Ok(self.make_null_array(data_type, row_group_indices));
1714 };
1715
1716 let iter = row_group_indices.into_iter().map(|rg_index| {
1717 let column_page_index_per_row_group_per_column =
1718 &column_page_index[*rg_index][parquet_index];
1719 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1720 .page_locations()
1721 .len();
1722
1723 (*num_data_pages, column_page_index_per_row_group_per_column)
1724 });
1725
1726 min_page_statistics(data_type, iter, self.physical_type)
1727 }
1728
1729 pub fn data_page_maxes<I>(
1733 &self,
1734 column_page_index: &ParquetColumnIndex,
1735 column_offset_index: &ParquetOffsetIndex,
1736 row_group_indices: I,
1737 ) -> Result<ArrayRef>
1738 where
1739 I: IntoIterator<Item = &'a usize>,
1740 {
1741 let data_type = self.arrow_field.data_type();
1742
1743 let Some(parquet_index) = self.parquet_column_index else {
1744 return Ok(self.make_null_array(data_type, row_group_indices));
1745 };
1746
1747 let iter = row_group_indices.into_iter().map(|rg_index| {
1748 let column_page_index_per_row_group_per_column =
1749 &column_page_index[*rg_index][parquet_index];
1750 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1751 .page_locations()
1752 .len();
1753
1754 (*num_data_pages, column_page_index_per_row_group_per_column)
1755 });
1756
1757 max_page_statistics(data_type, iter, self.physical_type)
1758 }
1759
1760 pub fn data_page_null_counts<I>(
1764 &self,
1765 column_page_index: &ParquetColumnIndex,
1766 column_offset_index: &ParquetOffsetIndex,
1767 row_group_indices: I,
1768 ) -> Result<UInt64Array>
1769 where
1770 I: IntoIterator<Item = &'a usize>,
1771 {
1772 let Some(parquet_index) = self.parquet_column_index else {
1773 let num_row_groups = row_group_indices.into_iter().count();
1774 return Ok(UInt64Array::new_null(num_row_groups));
1775 };
1776
1777 let iter = row_group_indices.into_iter().map(|rg_index| {
1778 let column_page_index_per_row_group_per_column =
1779 &column_page_index[*rg_index][parquet_index];
1780 let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1781 .page_locations()
1782 .len();
1783
1784 (*num_data_pages, column_page_index_per_row_group_per_column)
1785 });
1786 null_counts_page_statistics(iter)
1787 }
1788
1789 pub fn data_page_row_counts<I>(
1807 &self,
1808 column_offset_index: &ParquetOffsetIndex,
1809 row_group_metadatas: &'a [RowGroupMetaData],
1810 row_group_indices: I,
1811 ) -> Result<Option<UInt64Array>>
1812 where
1813 I: IntoIterator<Item = &'a usize>,
1814 {
1815 let Some(parquet_index) = self.parquet_column_index else {
1816 return Ok(None);
1820 };
1821
1822 let mut row_counts = Vec::new();
1823 let mut nulls = NullBufferBuilder::new(0);
1824 for rg_idx in row_group_indices {
1825 let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
1826
1827 let row_count_per_page = page_locations
1828 .windows(2)
1829 .map(|loc| Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64));
1830
1831 let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
1833 let row_count_per_page = row_count_per_page.chain(std::iter::once(Some(
1834 *num_rows_in_row_group as u64
1835 - page_locations.last().unwrap().first_row_index as u64,
1836 )));
1837
1838 row_counts.extend(row_count_per_page.clone().map(|x| x.unwrap_or(0)));
1839 for val in row_count_per_page {
1840 if val.is_some() {
1841 nulls.append_non_null();
1842 } else {
1843 nulls.append_null();
1844 }
1845 }
1846 }
1847
1848 Ok(Some(UInt64Array::new(row_counts.into(), nulls.build())))
1849 }
1850
1851 fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
1853 where
1854 I: IntoIterator<Item = A>,
1855 {
1856 let num_row_groups = metadatas.into_iter().count();
1858 new_null_array(data_type, num_row_groups)
1859 }
1860}
1861
1862