1use crate::{
24 data_type::{ByteArray, FixedLenByteArray},
25 errors::{ParquetError, Result},
26 parquet_thrift::{
27 ElementType, FieldType, ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
28 },
29};
30use std::ops::Deref;
31
32use crate::{
33 basic::BoundaryOrder,
34 data_type::{Int96, private::ParquetValueType},
35 file::page_index::index_reader::ThriftColumnIndex,
36};
37
38#[derive(Debug, Clone, PartialEq)]
40pub struct ColumnIndex {
41 pub(crate) null_pages: Vec<bool>,
42 pub(crate) boundary_order: BoundaryOrder,
43 pub(crate) null_counts: Option<Vec<i64>>,
44 pub(crate) repetition_level_histograms: Option<Vec<i64>>,
45 pub(crate) definition_level_histograms: Option<Vec<i64>>,
46}
47
48impl ColumnIndex {
49 pub fn num_pages(&self) -> u64 {
51 self.null_pages.len() as u64
52 }
53
54 pub fn null_count(&self, idx: usize) -> Option<i64> {
58 self.null_counts.as_ref().map(|nc| nc[idx])
59 }
60
61 pub fn repetition_level_histogram(&self, idx: usize) -> Option<&[i64]> {
63 if let Some(rep_hists) = self.repetition_level_histograms.as_ref() {
64 let num_lvls = rep_hists.len() / self.num_pages() as usize;
65 let start = num_lvls * idx;
66 Some(&rep_hists[start..start + num_lvls])
67 } else {
68 None
69 }
70 }
71
72 pub fn definition_level_histogram(&self, idx: usize) -> Option<&[i64]> {
74 if let Some(def_hists) = self.definition_level_histograms.as_ref() {
75 let num_lvls = def_hists.len() / self.num_pages() as usize;
76 let start = num_lvls * idx;
77 Some(&def_hists[start..start + num_lvls])
78 } else {
79 None
80 }
81 }
82
83 pub fn is_null_page(&self, idx: usize) -> bool {
85 self.null_pages[idx]
86 }
87}
88
89#[derive(Debug, Clone, PartialEq)]
91pub struct PrimitiveColumnIndex<T> {
92 pub(crate) column_index: ColumnIndex,
93 pub(crate) min_values: Vec<T>,
94 pub(crate) max_values: Vec<T>,
95}
96
97impl<T: ParquetValueType> PrimitiveColumnIndex<T> {
98 pub(crate) fn try_new(
99 null_pages: Vec<bool>,
100 boundary_order: BoundaryOrder,
101 null_counts: Option<Vec<i64>>,
102 repetition_level_histograms: Option<Vec<i64>>,
103 definition_level_histograms: Option<Vec<i64>>,
104 min_bytes: Vec<&[u8]>,
105 max_bytes: Vec<&[u8]>,
106 ) -> Result<Self> {
107 let len = null_pages.len();
108
109 let mut min_values = Vec::with_capacity(len);
110 let mut max_values = Vec::with_capacity(len);
111
112 for (i, is_null) in null_pages.iter().enumerate().take(len) {
113 if !is_null {
114 let min = min_bytes[i];
115 min_values.push(T::try_from_le_slice(min)?);
116
117 let max = max_bytes[i];
118 max_values.push(T::try_from_le_slice(max)?);
119 } else {
120 min_values.push(Default::default());
122 max_values.push(Default::default());
123 }
124 }
125
126 Ok(Self {
127 column_index: ColumnIndex {
128 null_pages,
129 boundary_order,
130 null_counts,
131 repetition_level_histograms,
132 definition_level_histograms,
133 },
134 min_values,
135 max_values,
136 })
137 }
138
139 pub(super) fn try_from_thrift(index: ThriftColumnIndex) -> Result<Self> {
140 Self::try_new(
141 index.null_pages,
142 index.boundary_order,
143 index.null_counts,
144 index.repetition_level_histograms,
145 index.definition_level_histograms,
146 index.min_values,
147 index.max_values,
148 )
149 }
150}
151
152impl<T> PrimitiveColumnIndex<T> {
153 pub fn min_values(&self) -> &[T] {
158 &self.min_values
159 }
160
161 pub fn max_values(&self) -> &[T] {
166 &self.max_values
167 }
168
169 pub fn min_values_iter(&self) -> impl Iterator<Item = Option<&T>> {
173 self.min_values.iter().enumerate().map(|(i, min)| {
174 if self.is_null_page(i) {
175 None
176 } else {
177 Some(min)
178 }
179 })
180 }
181
182 pub fn max_values_iter(&self) -> impl Iterator<Item = Option<&T>> {
186 self.max_values.iter().enumerate().map(|(i, min)| {
187 if self.is_null_page(i) {
188 None
189 } else {
190 Some(min)
191 }
192 })
193 }
194
195 #[inline]
199 pub fn min_value(&self, idx: usize) -> Option<&T> {
200 if self.null_pages[idx] {
201 None
202 } else {
203 Some(&self.min_values[idx])
204 }
205 }
206
207 #[inline]
211 pub fn max_value(&self, idx: usize) -> Option<&T> {
212 if self.null_pages[idx] {
213 None
214 } else {
215 Some(&self.max_values[idx])
216 }
217 }
218}
219
220impl<T> Deref for PrimitiveColumnIndex<T> {
221 type Target = ColumnIndex;
222
223 fn deref(&self) -> &Self::Target {
224 &self.column_index
225 }
226}
227
228impl<T: ParquetValueType> WriteThrift for PrimitiveColumnIndex<T> {
229 const ELEMENT_TYPE: ElementType = ElementType::Struct;
230 fn write_thrift<W: std::io::Write>(
231 &self,
232 writer: &mut ThriftCompactOutputProtocol<W>,
233 ) -> Result<()> {
234 self.null_pages.write_thrift_field(writer, 1, 0)?;
235
236 let len = self.null_pages.len();
238 writer.write_field_begin(FieldType::List, 2, 1)?;
239 writer.write_list_begin(ElementType::Binary, len)?;
240 for i in 0..len {
241 let min = self.min_value(i).map(|m| m.as_bytes()).unwrap_or(&[]);
242 min.write_thrift(writer)?;
243 }
244 writer.write_field_begin(FieldType::List, 3, 2)?;
245 writer.write_list_begin(ElementType::Binary, len)?;
246 for i in 0..len {
247 let max = self.max_value(i).map(|m| m.as_bytes()).unwrap_or(&[]);
248 max.write_thrift(writer)?;
249 }
250 let mut last_field_id = self.boundary_order.write_thrift_field(writer, 4, 3)?;
251 if self.null_counts.is_some() {
252 last_field_id =
253 self.null_counts
254 .as_ref()
255 .unwrap()
256 .write_thrift_field(writer, 5, last_field_id)?;
257 }
258 if self.repetition_level_histograms.is_some() {
259 last_field_id = self
260 .repetition_level_histograms
261 .as_ref()
262 .unwrap()
263 .write_thrift_field(writer, 6, last_field_id)?;
264 }
265 if self.definition_level_histograms.is_some() {
266 self.definition_level_histograms
267 .as_ref()
268 .unwrap()
269 .write_thrift_field(writer, 7, last_field_id)?;
270 }
271 writer.write_struct_end()
272 }
273}
274
275#[derive(Debug, Clone, PartialEq)]
277pub struct ByteArrayColumnIndex {
278 pub(crate) column_index: ColumnIndex,
279 pub(crate) min_bytes: Vec<u8>,
281 pub(crate) min_offsets: Vec<usize>,
282 pub(crate) max_bytes: Vec<u8>,
283 pub(crate) max_offsets: Vec<usize>,
284}
285
286impl ByteArrayColumnIndex {
287 pub(crate) fn try_new(
288 null_pages: Vec<bool>,
289 boundary_order: BoundaryOrder,
290 null_counts: Option<Vec<i64>>,
291 repetition_level_histograms: Option<Vec<i64>>,
292 definition_level_histograms: Option<Vec<i64>>,
293 min_values: Vec<&[u8]>,
294 max_values: Vec<&[u8]>,
295 ) -> Result<Self> {
296 let len = null_pages.len();
297
298 let min_len = min_values.iter().map(|&v| v.len()).sum();
299 let max_len = max_values.iter().map(|&v| v.len()).sum();
300 let mut min_bytes = vec![0u8; min_len];
301 let mut max_bytes = vec![0u8; max_len];
302
303 let mut min_offsets = vec![0usize; len + 1];
304 let mut max_offsets = vec![0usize; len + 1];
305
306 let mut min_pos = 0;
307 let mut max_pos = 0;
308
309 for (i, is_null) in null_pages.iter().enumerate().take(len) {
310 if !is_null {
311 let min = min_values[i];
312 let dst = &mut min_bytes[min_pos..min_pos + min.len()];
313 dst.copy_from_slice(min);
314 min_offsets[i] = min_pos;
315 min_pos += min.len();
316
317 let max = max_values[i];
318 let dst = &mut max_bytes[max_pos..max_pos + max.len()];
319 dst.copy_from_slice(max);
320 max_offsets[i] = max_pos;
321 max_pos += max.len();
322 } else {
323 min_offsets[i] = min_pos;
324 max_offsets[i] = max_pos;
325 }
326 }
327
328 min_offsets[len] = min_pos;
329 max_offsets[len] = max_pos;
330
331 Ok(Self {
332 column_index: ColumnIndex {
333 null_pages,
334 boundary_order,
335 null_counts,
336 repetition_level_histograms,
337 definition_level_histograms,
338 },
339 min_bytes,
340 min_offsets,
341 max_bytes,
342 max_offsets,
343 })
344 }
345
346 pub(super) fn try_from_thrift(index: ThriftColumnIndex) -> Result<Self> {
347 Self::try_new(
348 index.null_pages,
349 index.boundary_order,
350 index.null_counts,
351 index.repetition_level_histograms,
352 index.definition_level_histograms,
353 index.min_values,
354 index.max_values,
355 )
356 }
357
358 pub fn min_value(&self, idx: usize) -> Option<&[u8]> {
362 if self.null_pages[idx] {
363 None
364 } else {
365 let start = self.min_offsets[idx];
366 let end = self.min_offsets[idx + 1];
367 Some(&self.min_bytes[start..end])
368 }
369 }
370
371 pub fn max_value(&self, idx: usize) -> Option<&[u8]> {
375 if self.null_pages[idx] {
376 None
377 } else {
378 let start = self.max_offsets[idx];
379 let end = self.max_offsets[idx + 1];
380 Some(&self.max_bytes[start..end])
381 }
382 }
383
384 pub fn min_values_iter(&self) -> impl Iterator<Item = Option<&[u8]>> {
388 (0..self.num_pages() as usize).map(|i| self.min_value(i))
389 }
390
391 pub fn max_values_iter(&self) -> impl Iterator<Item = Option<&[u8]>> {
395 (0..self.num_pages() as usize).map(|i| self.max_value(i))
396 }
397}
398
399impl Deref for ByteArrayColumnIndex {
400 type Target = ColumnIndex;
401
402 fn deref(&self) -> &Self::Target {
403 &self.column_index
404 }
405}
406
407impl WriteThrift for ByteArrayColumnIndex {
408 const ELEMENT_TYPE: ElementType = ElementType::Struct;
409 fn write_thrift<W: std::io::Write>(
410 &self,
411 writer: &mut ThriftCompactOutputProtocol<W>,
412 ) -> Result<()> {
413 self.null_pages.write_thrift_field(writer, 1, 0)?;
414
415 let len = self.null_pages.len();
417 writer.write_field_begin(FieldType::List, 2, 1)?;
418 writer.write_list_begin(ElementType::Binary, len)?;
419 for i in 0..len {
420 let min = self.min_value(i).unwrap_or(&[]);
421 min.write_thrift(writer)?;
422 }
423 writer.write_field_begin(FieldType::List, 3, 2)?;
424 writer.write_list_begin(ElementType::Binary, len)?;
425 for i in 0..len {
426 let max = self.max_value(i).unwrap_or(&[]);
427 max.write_thrift(writer)?;
428 }
429 let mut last_field_id = self.boundary_order.write_thrift_field(writer, 4, 3)?;
430 if self.null_counts.is_some() {
431 last_field_id =
432 self.null_counts
433 .as_ref()
434 .unwrap()
435 .write_thrift_field(writer, 5, last_field_id)?;
436 }
437 if self.repetition_level_histograms.is_some() {
438 last_field_id = self
439 .repetition_level_histograms
440 .as_ref()
441 .unwrap()
442 .write_thrift_field(writer, 6, last_field_id)?;
443 }
444 if self.definition_level_histograms.is_some() {
445 self.definition_level_histograms
446 .as_ref()
447 .unwrap()
448 .write_thrift_field(writer, 7, last_field_id)?;
449 }
450 writer.write_struct_end()
451 }
452}
453
454macro_rules! colidx_enum_func {
456 ($self:ident, $func:ident, $arg:ident) => {{
457 match *$self {
458 Self::BOOLEAN(ref typed) => typed.$func($arg),
459 Self::INT32(ref typed) => typed.$func($arg),
460 Self::INT64(ref typed) => typed.$func($arg),
461 Self::INT96(ref typed) => typed.$func($arg),
462 Self::FLOAT(ref typed) => typed.$func($arg),
463 Self::DOUBLE(ref typed) => typed.$func($arg),
464 Self::BYTE_ARRAY(ref typed) => typed.$func($arg),
465 Self::FIXED_LEN_BYTE_ARRAY(ref typed) => typed.$func($arg),
466 _ => panic!(concat!(
467 "Cannot call ",
468 stringify!($func),
469 " on ColumnIndexMetaData::NONE"
470 )),
471 }
472 }};
473 ($self:ident, $func:ident) => {{
474 match *$self {
475 Self::BOOLEAN(ref typed) => typed.$func(),
476 Self::INT32(ref typed) => typed.$func(),
477 Self::INT64(ref typed) => typed.$func(),
478 Self::INT96(ref typed) => typed.$func(),
479 Self::FLOAT(ref typed) => typed.$func(),
480 Self::DOUBLE(ref typed) => typed.$func(),
481 Self::BYTE_ARRAY(ref typed) => typed.$func(),
482 Self::FIXED_LEN_BYTE_ARRAY(ref typed) => typed.$func(),
483 _ => panic!(concat!(
484 "Cannot call ",
485 stringify!($func),
486 " on ColumnIndexMetaData::NONE"
487 )),
488 }
489 }};
490}
491
492#[derive(Debug, Clone, PartialEq)]
499#[allow(non_camel_case_types)]
500pub enum ColumnIndexMetaData {
501 NONE,
505 BOOLEAN(PrimitiveColumnIndex<bool>),
507 INT32(PrimitiveColumnIndex<i32>),
509 INT64(PrimitiveColumnIndex<i64>),
511 INT96(PrimitiveColumnIndex<Int96>),
513 FLOAT(PrimitiveColumnIndex<f32>),
515 DOUBLE(PrimitiveColumnIndex<f64>),
517 BYTE_ARRAY(ByteArrayColumnIndex),
519 FIXED_LEN_BYTE_ARRAY(ByteArrayColumnIndex),
521}
522
523impl ColumnIndexMetaData {
524 pub fn is_sorted(&self) -> bool {
526 if let Some(order) = self.get_boundary_order() {
528 order != BoundaryOrder::UNORDERED
529 } else {
530 false
531 }
532 }
533
534 pub fn get_boundary_order(&self) -> Option<BoundaryOrder> {
536 match self {
537 Self::NONE => None,
538 Self::BOOLEAN(index) => Some(index.boundary_order),
539 Self::INT32(index) => Some(index.boundary_order),
540 Self::INT64(index) => Some(index.boundary_order),
541 Self::INT96(index) => Some(index.boundary_order),
542 Self::FLOAT(index) => Some(index.boundary_order),
543 Self::DOUBLE(index) => Some(index.boundary_order),
544 Self::BYTE_ARRAY(index) => Some(index.boundary_order),
545 Self::FIXED_LEN_BYTE_ARRAY(index) => Some(index.boundary_order),
546 }
547 }
548
549 pub fn null_counts(&self) -> Option<&Vec<i64>> {
553 match self {
554 Self::NONE => None,
555 Self::BOOLEAN(index) => index.null_counts.as_ref(),
556 Self::INT32(index) => index.null_counts.as_ref(),
557 Self::INT64(index) => index.null_counts.as_ref(),
558 Self::INT96(index) => index.null_counts.as_ref(),
559 Self::FLOAT(index) => index.null_counts.as_ref(),
560 Self::DOUBLE(index) => index.null_counts.as_ref(),
561 Self::BYTE_ARRAY(index) => index.null_counts.as_ref(),
562 Self::FIXED_LEN_BYTE_ARRAY(index) => index.null_counts.as_ref(),
563 }
564 }
565
566 pub fn num_pages(&self) -> u64 {
568 colidx_enum_func!(self, num_pages)
569 }
570
571 pub fn null_count(&self, idx: usize) -> Option<i64> {
575 colidx_enum_func!(self, null_count, idx)
576 }
577
578 pub fn repetition_level_histogram(&self, idx: usize) -> Option<&[i64]> {
580 colidx_enum_func!(self, repetition_level_histogram, idx)
581 }
582
583 pub fn definition_level_histogram(&self, idx: usize) -> Option<&[i64]> {
585 colidx_enum_func!(self, definition_level_histogram, idx)
586 }
587
588 #[inline]
590 pub fn is_null_page(&self, idx: usize) -> bool {
591 colidx_enum_func!(self, is_null_page, idx)
592 }
593}
594
595pub trait ColumnIndexIterators {
597 type Item;
600
601 fn min_values_iter(colidx: &ColumnIndexMetaData) -> impl Iterator<Item = Option<Self::Item>>;
603
604 fn max_values_iter(colidx: &ColumnIndexMetaData) -> impl Iterator<Item = Option<Self::Item>>;
606}
607
608macro_rules! column_index_iters {
609 ($item: ident, $variant: ident, $conv:expr) => {
610 impl ColumnIndexIterators for $item {
611 type Item = $item;
612
613 fn min_values_iter(
614 colidx: &ColumnIndexMetaData,
615 ) -> impl Iterator<Item = Option<Self::Item>> {
616 if let ColumnIndexMetaData::$variant(index) = colidx {
617 index.min_values_iter().map($conv)
618 } else {
619 panic!(concat!("Wrong type for ", stringify!($item), " iterator"))
620 }
621 }
622
623 fn max_values_iter(
624 colidx: &ColumnIndexMetaData,
625 ) -> impl Iterator<Item = Option<Self::Item>> {
626 if let ColumnIndexMetaData::$variant(index) = colidx {
627 index.max_values_iter().map($conv)
628 } else {
629 panic!(concat!("Wrong type for ", stringify!($item), " iterator"))
630 }
631 }
632 }
633 };
634}
635
636column_index_iters!(bool, BOOLEAN, |v| v.copied());
637column_index_iters!(i32, INT32, |v| v.copied());
638column_index_iters!(i64, INT64, |v| v.copied());
639column_index_iters!(Int96, INT96, |v| v.copied());
640column_index_iters!(f32, FLOAT, |v| v.copied());
641column_index_iters!(f64, DOUBLE, |v| v.copied());
642column_index_iters!(ByteArray, BYTE_ARRAY, |v| v
643 .map(|v| ByteArray::from(v.to_owned())));
644column_index_iters!(FixedLenByteArray, FIXED_LEN_BYTE_ARRAY, |v| v
645 .map(|v| FixedLenByteArray::from(v.to_owned())));
646
647impl WriteThrift for ColumnIndexMetaData {
648 const ELEMENT_TYPE: ElementType = ElementType::Struct;
649
650 fn write_thrift<W: std::io::Write>(
651 &self,
652 writer: &mut ThriftCompactOutputProtocol<W>,
653 ) -> Result<()> {
654 match self {
655 ColumnIndexMetaData::BOOLEAN(index) => index.write_thrift(writer),
656 ColumnIndexMetaData::INT32(index) => index.write_thrift(writer),
657 ColumnIndexMetaData::INT64(index) => index.write_thrift(writer),
658 ColumnIndexMetaData::INT96(index) => index.write_thrift(writer),
659 ColumnIndexMetaData::FLOAT(index) => index.write_thrift(writer),
660 ColumnIndexMetaData::DOUBLE(index) => index.write_thrift(writer),
661 ColumnIndexMetaData::BYTE_ARRAY(index) => index.write_thrift(writer),
662 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => index.write_thrift(writer),
663 _ => Err(general_err!("Cannot serialize NONE index")),
664 }
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671
672 #[test]
673 fn test_page_index_min_max_null() {
674 let column_index = PrimitiveColumnIndex {
675 column_index: ColumnIndex {
676 null_pages: vec![false],
677 boundary_order: BoundaryOrder::ASCENDING,
678 null_counts: Some(vec![0]),
679 repetition_level_histograms: Some(vec![1, 2]),
680 definition_level_histograms: Some(vec![1, 2, 3]),
681 },
682 min_values: vec![-123],
683 max_values: vec![234],
684 };
685
686 assert_eq!(column_index.min_value(0), Some(&-123));
687 assert_eq!(column_index.max_value(0), Some(&234));
688 assert_eq!(column_index.null_count(0), Some(0));
689 assert_eq!(column_index.repetition_level_histogram(0).unwrap(), &[1, 2]);
690 assert_eq!(
691 column_index.definition_level_histogram(0).unwrap(),
692 &[1, 2, 3]
693 );
694 }
695
696 #[test]
697 fn test_page_index_min_max_null_none() {
698 let column_index: PrimitiveColumnIndex<i32> = PrimitiveColumnIndex::<i32> {
699 column_index: ColumnIndex {
700 null_pages: vec![true],
701 boundary_order: BoundaryOrder::ASCENDING,
702 null_counts: Some(vec![1]),
703 repetition_level_histograms: None,
704 definition_level_histograms: Some(vec![1, 0]),
705 },
706 min_values: vec![Default::default()],
707 max_values: vec![Default::default()],
708 };
709
710 assert_eq!(column_index.min_value(0), None);
711 assert_eq!(column_index.max_value(0), None);
712 assert_eq!(column_index.null_count(0), Some(1));
713 assert_eq!(column_index.repetition_level_histogram(0), None);
714 assert_eq!(column_index.definition_level_histogram(0).unwrap(), &[1, 0]);
715 }
716
717 #[test]
718 fn test_invalid_column_index() {
719 let column_index = ThriftColumnIndex {
720 null_pages: vec![true, false],
721 min_values: vec![
722 &[],
723 &[], ],
725 max_values: vec![
726 &[],
727 &[], ],
729 null_counts: None,
730 repetition_level_histograms: None,
731 definition_level_histograms: None,
732 boundary_order: BoundaryOrder::UNORDERED,
733 };
734
735 let err = PrimitiveColumnIndex::<i32>::try_from_thrift(column_index).unwrap_err();
736 assert_eq!(
737 err.to_string(),
738 "Parquet error: error converting value, expected 4 bytes got 0"
739 );
740 }
741}