1use crate::{
24 data_type::{ByteArray, FixedLenByteArray},
25 errors::{ParquetError, Result},
26 parquet_thrift::{
27 ElementType, FieldType, ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
28 },
29};
30use std::ops::Deref;
31
32use crate::{
33 basic::BoundaryOrder,
34 data_type::{private::ParquetValueType, Int96},
35 file::page_index::index_reader::ThriftColumnIndex,
36};
37
38#[derive(Debug, Clone, PartialEq)]
40pub struct ColumnIndex {
41 pub(crate) null_pages: Vec<bool>,
42 pub(crate) boundary_order: BoundaryOrder,
43 pub(crate) null_counts: Option<Vec<i64>>,
44 pub(crate) repetition_level_histograms: Option<Vec<i64>>,
45 pub(crate) definition_level_histograms: Option<Vec<i64>>,
46}
47
48impl ColumnIndex {
49 pub fn num_pages(&self) -> u64 {
51 self.null_pages.len() as u64
52 }
53
54 pub fn null_count(&self, idx: usize) -> Option<i64> {
58 self.null_counts.as_ref().map(|nc| nc[idx])
59 }
60
61 pub fn repetition_level_histogram(&self, idx: usize) -> Option<&[i64]> {
63 if let Some(rep_hists) = self.repetition_level_histograms.as_ref() {
64 let num_lvls = rep_hists.len() / self.num_pages() as usize;
65 let start = num_lvls * idx;
66 Some(&rep_hists[start..start + num_lvls])
67 } else {
68 None
69 }
70 }
71
72 pub fn definition_level_histogram(&self, idx: usize) -> Option<&[i64]> {
74 if let Some(def_hists) = self.definition_level_histograms.as_ref() {
75 let num_lvls = def_hists.len() / self.num_pages() as usize;
76 let start = num_lvls * idx;
77 Some(&def_hists[start..start + num_lvls])
78 } else {
79 None
80 }
81 }
82
83 pub fn is_null_page(&self, idx: usize) -> bool {
85 self.null_pages[idx]
86 }
87}
88
89#[derive(Debug, Clone, PartialEq)]
91pub struct PrimitiveColumnIndex<T> {
92 pub(crate) column_index: ColumnIndex,
93 pub(crate) min_values: Vec<T>,
94 pub(crate) max_values: Vec<T>,
95}
96
97impl<T: ParquetValueType> PrimitiveColumnIndex<T> {
98 pub(crate) fn try_new(
99 null_pages: Vec<bool>,
100 boundary_order: BoundaryOrder,
101 null_counts: Option<Vec<i64>>,
102 repetition_level_histograms: Option<Vec<i64>>,
103 definition_level_histograms: Option<Vec<i64>>,
104 min_bytes: Vec<&[u8]>,
105 max_bytes: Vec<&[u8]>,
106 ) -> Result<Self> {
107 let len = null_pages.len();
108
109 let mut min_values = Vec::with_capacity(len);
110 let mut max_values = Vec::with_capacity(len);
111
112 for (i, is_null) in null_pages.iter().enumerate().take(len) {
113 if !is_null {
114 let min = min_bytes[i];
115 min_values.push(T::try_from_le_slice(min)?);
116
117 let max = max_bytes[i];
118 max_values.push(T::try_from_le_slice(max)?);
119 } else {
120 min_values.push(Default::default());
122 max_values.push(Default::default());
123 }
124 }
125
126 Ok(Self {
127 column_index: ColumnIndex {
128 null_pages,
129 boundary_order,
130 null_counts,
131 repetition_level_histograms,
132 definition_level_histograms,
133 },
134 min_values,
135 max_values,
136 })
137 }
138
139 pub(super) fn try_from_thrift(index: ThriftColumnIndex) -> Result<Self> {
140 Self::try_new(
141 index.null_pages,
142 index.boundary_order,
143 index.null_counts,
144 index.repetition_level_histograms,
145 index.definition_level_histograms,
146 index.min_values,
147 index.max_values,
148 )
149 }
150}
151
152impl<T> PrimitiveColumnIndex<T> {
153 pub fn min_values(&self) -> &[T] {
158 &self.min_values
159 }
160
161 pub fn max_values(&self) -> &[T] {
166 &self.max_values
167 }
168
169 pub fn min_values_iter(&self) -> impl Iterator<Item = Option<&T>> {
173 self.min_values.iter().enumerate().map(|(i, min)| {
174 if self.is_null_page(i) {
175 None
176 } else {
177 Some(min)
178 }
179 })
180 }
181
182 pub fn max_values_iter(&self) -> impl Iterator<Item = Option<&T>> {
186 self.max_values.iter().enumerate().map(|(i, min)| {
187 if self.is_null_page(i) {
188 None
189 } else {
190 Some(min)
191 }
192 })
193 }
194
195 pub fn min_value(&self, idx: usize) -> Option<&T> {
199 if self.null_pages[idx] {
200 None
201 } else {
202 Some(&self.min_values[idx])
203 }
204 }
205
206 pub fn max_value(&self, idx: usize) -> Option<&T> {
210 if self.null_pages[idx] {
211 None
212 } else {
213 Some(&self.max_values[idx])
214 }
215 }
216}
217
218impl<T> Deref for PrimitiveColumnIndex<T> {
219 type Target = ColumnIndex;
220
221 fn deref(&self) -> &Self::Target {
222 &self.column_index
223 }
224}
225
226impl<T: ParquetValueType> WriteThrift for PrimitiveColumnIndex<T> {
227 const ELEMENT_TYPE: ElementType = ElementType::Struct;
228 fn write_thrift<W: std::io::Write>(
229 &self,
230 writer: &mut ThriftCompactOutputProtocol<W>,
231 ) -> Result<()> {
232 self.null_pages.write_thrift_field(writer, 1, 0)?;
233
234 let len = self.null_pages.len();
236 writer.write_field_begin(FieldType::List, 2, 1)?;
237 writer.write_list_begin(ElementType::Binary, len)?;
238 for i in 0..len {
239 let min = self.min_value(i).map(|m| m.as_bytes()).unwrap_or(&[]);
240 min.write_thrift(writer)?;
241 }
242 writer.write_field_begin(FieldType::List, 3, 2)?;
243 writer.write_list_begin(ElementType::Binary, len)?;
244 for i in 0..len {
245 let max = self.max_value(i).map(|m| m.as_bytes()).unwrap_or(&[]);
246 max.write_thrift(writer)?;
247 }
248 let mut last_field_id = self.boundary_order.write_thrift_field(writer, 4, 3)?;
249 if self.null_counts.is_some() {
250 last_field_id =
251 self.null_counts
252 .as_ref()
253 .unwrap()
254 .write_thrift_field(writer, 5, last_field_id)?;
255 }
256 if self.repetition_level_histograms.is_some() {
257 last_field_id = self
258 .repetition_level_histograms
259 .as_ref()
260 .unwrap()
261 .write_thrift_field(writer, 6, last_field_id)?;
262 }
263 if self.definition_level_histograms.is_some() {
264 self.definition_level_histograms
265 .as_ref()
266 .unwrap()
267 .write_thrift_field(writer, 7, last_field_id)?;
268 }
269 writer.write_struct_end()
270 }
271}
272
273#[derive(Debug, Clone, PartialEq)]
275pub struct ByteArrayColumnIndex {
276 pub(crate) column_index: ColumnIndex,
277 pub(crate) min_bytes: Vec<u8>,
279 pub(crate) min_offsets: Vec<usize>,
280 pub(crate) max_bytes: Vec<u8>,
281 pub(crate) max_offsets: Vec<usize>,
282}
283
284impl ByteArrayColumnIndex {
285 pub(crate) fn try_new(
286 null_pages: Vec<bool>,
287 boundary_order: BoundaryOrder,
288 null_counts: Option<Vec<i64>>,
289 repetition_level_histograms: Option<Vec<i64>>,
290 definition_level_histograms: Option<Vec<i64>>,
291 min_values: Vec<&[u8]>,
292 max_values: Vec<&[u8]>,
293 ) -> Result<Self> {
294 let len = null_pages.len();
295
296 let min_len = min_values.iter().map(|&v| v.len()).sum();
297 let max_len = max_values.iter().map(|&v| v.len()).sum();
298 let mut min_bytes = vec![0u8; min_len];
299 let mut max_bytes = vec![0u8; max_len];
300
301 let mut min_offsets = vec![0usize; len + 1];
302 let mut max_offsets = vec![0usize; len + 1];
303
304 let mut min_pos = 0;
305 let mut max_pos = 0;
306
307 for (i, is_null) in null_pages.iter().enumerate().take(len) {
308 if !is_null {
309 let min = min_values[i];
310 let dst = &mut min_bytes[min_pos..min_pos + min.len()];
311 dst.copy_from_slice(min);
312 min_offsets[i] = min_pos;
313 min_pos += min.len();
314
315 let max = max_values[i];
316 let dst = &mut max_bytes[max_pos..max_pos + max.len()];
317 dst.copy_from_slice(max);
318 max_offsets[i] = max_pos;
319 max_pos += max.len();
320 } else {
321 min_offsets[i] = min_pos;
322 max_offsets[i] = max_pos;
323 }
324 }
325
326 min_offsets[len] = min_pos;
327 max_offsets[len] = max_pos;
328
329 Ok(Self {
330 column_index: ColumnIndex {
331 null_pages,
332 boundary_order,
333 null_counts,
334 repetition_level_histograms,
335 definition_level_histograms,
336 },
337 min_bytes,
338 min_offsets,
339 max_bytes,
340 max_offsets,
341 })
342 }
343
344 pub(super) fn try_from_thrift(index: ThriftColumnIndex) -> Result<Self> {
345 Self::try_new(
346 index.null_pages,
347 index.boundary_order,
348 index.null_counts,
349 index.repetition_level_histograms,
350 index.definition_level_histograms,
351 index.min_values,
352 index.max_values,
353 )
354 }
355
356 pub fn min_value(&self, idx: usize) -> Option<&[u8]> {
360 if self.null_pages[idx] {
361 None
362 } else {
363 let start = self.min_offsets[idx];
364 let end = self.min_offsets[idx + 1];
365 Some(&self.min_bytes[start..end])
366 }
367 }
368
369 pub fn max_value(&self, idx: usize) -> Option<&[u8]> {
373 if self.null_pages[idx] {
374 None
375 } else {
376 let start = self.max_offsets[idx];
377 let end = self.max_offsets[idx + 1];
378 Some(&self.max_bytes[start..end])
379 }
380 }
381
382 pub fn min_values_iter(&self) -> impl Iterator<Item = Option<&[u8]>> {
386 (0..self.num_pages() as usize).map(|i| {
387 if self.is_null_page(i) {
388 None
389 } else {
390 self.min_value(i)
391 }
392 })
393 }
394
395 pub fn max_values_iter(&self) -> impl Iterator<Item = Option<&[u8]>> {
399 (0..self.num_pages() as usize).map(|i| {
400 if self.is_null_page(i) {
401 None
402 } else {
403 self.max_value(i)
404 }
405 })
406 }
407}
408
409impl Deref for ByteArrayColumnIndex {
410 type Target = ColumnIndex;
411
412 fn deref(&self) -> &Self::Target {
413 &self.column_index
414 }
415}
416
417impl WriteThrift for ByteArrayColumnIndex {
418 const ELEMENT_TYPE: ElementType = ElementType::Struct;
419 fn write_thrift<W: std::io::Write>(
420 &self,
421 writer: &mut ThriftCompactOutputProtocol<W>,
422 ) -> Result<()> {
423 self.null_pages.write_thrift_field(writer, 1, 0)?;
424
425 let len = self.null_pages.len();
427 writer.write_field_begin(FieldType::List, 2, 1)?;
428 writer.write_list_begin(ElementType::Binary, len)?;
429 for i in 0..len {
430 let min = self.min_value(i).unwrap_or(&[]);
431 min.write_thrift(writer)?;
432 }
433 writer.write_field_begin(FieldType::List, 3, 2)?;
434 writer.write_list_begin(ElementType::Binary, len)?;
435 for i in 0..len {
436 let max = self.max_value(i).unwrap_or(&[]);
437 max.write_thrift(writer)?;
438 }
439 let mut last_field_id = self.boundary_order.write_thrift_field(writer, 4, 3)?;
440 if self.null_counts.is_some() {
441 last_field_id =
442 self.null_counts
443 .as_ref()
444 .unwrap()
445 .write_thrift_field(writer, 5, last_field_id)?;
446 }
447 if self.repetition_level_histograms.is_some() {
448 last_field_id = self
449 .repetition_level_histograms
450 .as_ref()
451 .unwrap()
452 .write_thrift_field(writer, 6, last_field_id)?;
453 }
454 if self.definition_level_histograms.is_some() {
455 self.definition_level_histograms
456 .as_ref()
457 .unwrap()
458 .write_thrift_field(writer, 7, last_field_id)?;
459 }
460 writer.write_struct_end()
461 }
462}
463
464macro_rules! colidx_enum_func {
466 ($self:ident, $func:ident, $arg:ident) => {{
467 match *$self {
468 Self::BOOLEAN(ref typed) => typed.$func($arg),
469 Self::INT32(ref typed) => typed.$func($arg),
470 Self::INT64(ref typed) => typed.$func($arg),
471 Self::INT96(ref typed) => typed.$func($arg),
472 Self::FLOAT(ref typed) => typed.$func($arg),
473 Self::DOUBLE(ref typed) => typed.$func($arg),
474 Self::BYTE_ARRAY(ref typed) => typed.$func($arg),
475 Self::FIXED_LEN_BYTE_ARRAY(ref typed) => typed.$func($arg),
476 _ => panic!(concat!(
477 "Cannot call ",
478 stringify!($func),
479 " on ColumnIndexMetaData::NONE"
480 )),
481 }
482 }};
483 ($self:ident, $func:ident) => {{
484 match *$self {
485 Self::BOOLEAN(ref typed) => typed.$func(),
486 Self::INT32(ref typed) => typed.$func(),
487 Self::INT64(ref typed) => typed.$func(),
488 Self::INT96(ref typed) => typed.$func(),
489 Self::FLOAT(ref typed) => typed.$func(),
490 Self::DOUBLE(ref typed) => typed.$func(),
491 Self::BYTE_ARRAY(ref typed) => typed.$func(),
492 Self::FIXED_LEN_BYTE_ARRAY(ref typed) => typed.$func(),
493 _ => panic!(concat!(
494 "Cannot call ",
495 stringify!($func),
496 " on ColumnIndexMetaData::NONE"
497 )),
498 }
499 }};
500}
501
502#[derive(Debug, Clone, PartialEq)]
509#[allow(non_camel_case_types)]
510pub enum ColumnIndexMetaData {
511 NONE,
515 BOOLEAN(PrimitiveColumnIndex<bool>),
517 INT32(PrimitiveColumnIndex<i32>),
519 INT64(PrimitiveColumnIndex<i64>),
521 INT96(PrimitiveColumnIndex<Int96>),
523 FLOAT(PrimitiveColumnIndex<f32>),
525 DOUBLE(PrimitiveColumnIndex<f64>),
527 BYTE_ARRAY(ByteArrayColumnIndex),
529 FIXED_LEN_BYTE_ARRAY(ByteArrayColumnIndex),
531}
532
533impl ColumnIndexMetaData {
534 pub fn is_sorted(&self) -> bool {
536 if let Some(order) = self.get_boundary_order() {
538 order != BoundaryOrder::UNORDERED
539 } else {
540 false
541 }
542 }
543
544 pub fn get_boundary_order(&self) -> Option<BoundaryOrder> {
546 match self {
547 Self::NONE => None,
548 Self::BOOLEAN(index) => Some(index.boundary_order),
549 Self::INT32(index) => Some(index.boundary_order),
550 Self::INT64(index) => Some(index.boundary_order),
551 Self::INT96(index) => Some(index.boundary_order),
552 Self::FLOAT(index) => Some(index.boundary_order),
553 Self::DOUBLE(index) => Some(index.boundary_order),
554 Self::BYTE_ARRAY(index) => Some(index.boundary_order),
555 Self::FIXED_LEN_BYTE_ARRAY(index) => Some(index.boundary_order),
556 }
557 }
558
559 pub fn null_counts(&self) -> Option<&Vec<i64>> {
563 match self {
564 Self::NONE => None,
565 Self::BOOLEAN(index) => index.null_counts.as_ref(),
566 Self::INT32(index) => index.null_counts.as_ref(),
567 Self::INT64(index) => index.null_counts.as_ref(),
568 Self::INT96(index) => index.null_counts.as_ref(),
569 Self::FLOAT(index) => index.null_counts.as_ref(),
570 Self::DOUBLE(index) => index.null_counts.as_ref(),
571 Self::BYTE_ARRAY(index) => index.null_counts.as_ref(),
572 Self::FIXED_LEN_BYTE_ARRAY(index) => index.null_counts.as_ref(),
573 }
574 }
575
576 pub fn num_pages(&self) -> u64 {
578 colidx_enum_func!(self, num_pages)
579 }
580
581 pub fn null_count(&self, idx: usize) -> Option<i64> {
585 colidx_enum_func!(self, null_count, idx)
586 }
587
588 pub fn repetition_level_histogram(&self, idx: usize) -> Option<&[i64]> {
590 colidx_enum_func!(self, repetition_level_histogram, idx)
591 }
592
593 pub fn definition_level_histogram(&self, idx: usize) -> Option<&[i64]> {
595 colidx_enum_func!(self, definition_level_histogram, idx)
596 }
597
598 pub fn is_null_page(&self, idx: usize) -> bool {
600 colidx_enum_func!(self, is_null_page, idx)
601 }
602}
603
604pub trait ColumnIndexIterators {
606 type Item;
609
610 fn min_values_iter(colidx: &ColumnIndexMetaData) -> impl Iterator<Item = Option<Self::Item>>;
612
613 fn max_values_iter(colidx: &ColumnIndexMetaData) -> impl Iterator<Item = Option<Self::Item>>;
615}
616
617macro_rules! column_index_iters {
618 ($item: ident, $variant: ident, $conv:expr) => {
619 impl ColumnIndexIterators for $item {
620 type Item = $item;
621
622 fn min_values_iter(
623 colidx: &ColumnIndexMetaData,
624 ) -> impl Iterator<Item = Option<Self::Item>> {
625 if let ColumnIndexMetaData::$variant(index) = colidx {
626 index.min_values_iter().map($conv)
627 } else {
628 panic!(concat!("Wrong type for ", stringify!($item), " iterator"))
629 }
630 }
631
632 fn max_values_iter(
633 colidx: &ColumnIndexMetaData,
634 ) -> impl Iterator<Item = Option<Self::Item>> {
635 if let ColumnIndexMetaData::$variant(index) = colidx {
636 index.max_values_iter().map($conv)
637 } else {
638 panic!(concat!("Wrong type for ", stringify!($item), " iterator"))
639 }
640 }
641 }
642 };
643}
644
645column_index_iters!(bool, BOOLEAN, |v| v.copied());
646column_index_iters!(i32, INT32, |v| v.copied());
647column_index_iters!(i64, INT64, |v| v.copied());
648column_index_iters!(Int96, INT96, |v| v.copied());
649column_index_iters!(f32, FLOAT, |v| v.copied());
650column_index_iters!(f64, DOUBLE, |v| v.copied());
651column_index_iters!(ByteArray, BYTE_ARRAY, |v| v
652 .map(|v| ByteArray::from(v.to_owned())));
653column_index_iters!(FixedLenByteArray, FIXED_LEN_BYTE_ARRAY, |v| v
654 .map(|v| FixedLenByteArray::from(v.to_owned())));
655
656impl WriteThrift for ColumnIndexMetaData {
657 const ELEMENT_TYPE: ElementType = ElementType::Struct;
658
659 fn write_thrift<W: std::io::Write>(
660 &self,
661 writer: &mut ThriftCompactOutputProtocol<W>,
662 ) -> Result<()> {
663 match self {
664 ColumnIndexMetaData::BOOLEAN(index) => index.write_thrift(writer),
665 ColumnIndexMetaData::INT32(index) => index.write_thrift(writer),
666 ColumnIndexMetaData::INT64(index) => index.write_thrift(writer),
667 ColumnIndexMetaData::INT96(index) => index.write_thrift(writer),
668 ColumnIndexMetaData::FLOAT(index) => index.write_thrift(writer),
669 ColumnIndexMetaData::DOUBLE(index) => index.write_thrift(writer),
670 ColumnIndexMetaData::BYTE_ARRAY(index) => index.write_thrift(writer),
671 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => index.write_thrift(writer),
672 _ => Err(general_err!("Cannot serialize NONE index")),
673 }
674 }
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680
681 #[test]
682 fn test_page_index_min_max_null() {
683 let column_index = PrimitiveColumnIndex {
684 column_index: ColumnIndex {
685 null_pages: vec![false],
686 boundary_order: BoundaryOrder::ASCENDING,
687 null_counts: Some(vec![0]),
688 repetition_level_histograms: Some(vec![1, 2]),
689 definition_level_histograms: Some(vec![1, 2, 3]),
690 },
691 min_values: vec![-123],
692 max_values: vec![234],
693 };
694
695 assert_eq!(column_index.min_value(0), Some(&-123));
696 assert_eq!(column_index.max_value(0), Some(&234));
697 assert_eq!(column_index.null_count(0), Some(0));
698 assert_eq!(column_index.repetition_level_histogram(0).unwrap(), &[1, 2]);
699 assert_eq!(
700 column_index.definition_level_histogram(0).unwrap(),
701 &[1, 2, 3]
702 );
703 }
704
705 #[test]
706 fn test_page_index_min_max_null_none() {
707 let column_index: PrimitiveColumnIndex<i32> = PrimitiveColumnIndex::<i32> {
708 column_index: ColumnIndex {
709 null_pages: vec![true],
710 boundary_order: BoundaryOrder::ASCENDING,
711 null_counts: Some(vec![1]),
712 repetition_level_histograms: None,
713 definition_level_histograms: Some(vec![1, 0]),
714 },
715 min_values: vec![Default::default()],
716 max_values: vec![Default::default()],
717 };
718
719 assert_eq!(column_index.min_value(0), None);
720 assert_eq!(column_index.max_value(0), None);
721 assert_eq!(column_index.null_count(0), Some(1));
722 assert_eq!(column_index.repetition_level_histogram(0), None);
723 assert_eq!(column_index.definition_level_histogram(0).unwrap(), &[1, 0]);
724 }
725
726 #[test]
727 fn test_invalid_column_index() {
728 let column_index = ThriftColumnIndex {
729 null_pages: vec![true, false],
730 min_values: vec![
731 &[],
732 &[], ],
734 max_values: vec![
735 &[],
736 &[], ],
738 null_counts: None,
739 repetition_level_histograms: None,
740 definition_level_histograms: None,
741 boundary_order: BoundaryOrder::UNORDERED,
742 };
743
744 let err = PrimitiveColumnIndex::<i32>::try_from_thrift(column_index).unwrap_err();
745 assert_eq!(
746 err.to_string(),
747 "Parquet error: error converting value, expected 4 bytes got 0"
748 );
749 }
750}