1use crate::basic::Type as PhysicalType;
19use crate::column::reader::{ColumnReader, ColumnReaderImpl, get_typed_column_reader};
20use crate::data_type::*;
21use crate::errors::{ParquetError, Result};
22use crate::record::api::Field;
23use crate::schema::types::ColumnDescPtr;
24
25macro_rules! triplet_enum_func {
29 ($self:ident, $func:ident, $( $token:tt ),*) => ({
30 match *$self {
31 TripletIter::BoolTripletIter($($token)* typed) => typed.$func(),
32 TripletIter::Int32TripletIter($($token)* typed) => typed.$func(),
33 TripletIter::Int64TripletIter($($token)* typed) => typed.$func(),
34 TripletIter::Int96TripletIter($($token)* typed) => typed.$func(),
35 TripletIter::FloatTripletIter($($token)* typed) => typed.$func(),
36 TripletIter::DoubleTripletIter($($token)* typed) => typed.$func(),
37 TripletIter::ByteArrayTripletIter($($token)* typed) => typed.$func(),
38 TripletIter::FixedLenByteArrayTripletIter($($token)* typed) => typed.$func()
39 }
40 });
41}
42
43#[allow(clippy::enum_variant_names)]
46pub enum TripletIter {
47 BoolTripletIter(TypedTripletIter<BoolType>),
48 Int32TripletIter(TypedTripletIter<Int32Type>),
49 Int64TripletIter(TypedTripletIter<Int64Type>),
50 Int96TripletIter(TypedTripletIter<Int96Type>),
51 FloatTripletIter(TypedTripletIter<FloatType>),
52 DoubleTripletIter(TypedTripletIter<DoubleType>),
53 ByteArrayTripletIter(TypedTripletIter<ByteArrayType>),
54 FixedLenByteArrayTripletIter(TypedTripletIter<FixedLenByteArrayType>),
55}
56
57impl TripletIter {
58 pub fn new(descr: ColumnDescPtr, reader: ColumnReader, batch_size: usize) -> Self {
60 match descr.physical_type() {
61 PhysicalType::BOOLEAN => {
62 TripletIter::BoolTripletIter(TypedTripletIter::new(descr, batch_size, reader))
63 }
64 PhysicalType::INT32 => {
65 TripletIter::Int32TripletIter(TypedTripletIter::new(descr, batch_size, reader))
66 }
67 PhysicalType::INT64 => {
68 TripletIter::Int64TripletIter(TypedTripletIter::new(descr, batch_size, reader))
69 }
70 PhysicalType::INT96 => {
71 TripletIter::Int96TripletIter(TypedTripletIter::new(descr, batch_size, reader))
72 }
73 PhysicalType::FLOAT => {
74 TripletIter::FloatTripletIter(TypedTripletIter::new(descr, batch_size, reader))
75 }
76 PhysicalType::DOUBLE => {
77 TripletIter::DoubleTripletIter(TypedTripletIter::new(descr, batch_size, reader))
78 }
79 PhysicalType::BYTE_ARRAY => {
80 TripletIter::ByteArrayTripletIter(TypedTripletIter::new(descr, batch_size, reader))
81 }
82 PhysicalType::FIXED_LEN_BYTE_ARRAY => TripletIter::FixedLenByteArrayTripletIter(
83 TypedTripletIter::new(descr, batch_size, reader),
84 ),
85 }
86 }
87
88 #[inline]
91 pub fn read_next(&mut self) -> Result<bool> {
92 triplet_enum_func!(self, read_next, ref, mut)
93 }
94
95 #[inline]
100 pub fn has_next(&self) -> bool {
101 triplet_enum_func!(self, has_next, ref)
102 }
103
104 #[inline]
106 pub fn current_def_level(&self) -> i16 {
107 triplet_enum_func!(self, current_def_level, ref)
108 }
109
110 #[inline]
112 pub fn max_def_level(&self) -> i16 {
113 triplet_enum_func!(self, max_def_level, ref)
114 }
115
116 #[inline]
118 pub fn current_rep_level(&self) -> i16 {
119 triplet_enum_func!(self, current_rep_level, ref)
120 }
121
122 #[inline]
124 pub fn max_rep_level(&self) -> i16 {
125 triplet_enum_func!(self, max_rep_level, ref)
126 }
127
128 #[inline]
132 pub fn is_null(&self) -> bool {
133 self.current_def_level() < self.max_def_level()
134 }
135
136 pub fn current_value(&self) -> Result<Field> {
138 if self.is_null() {
139 return Ok(Field::Null);
140 }
141 let field = match *self {
142 TripletIter::BoolTripletIter(ref typed) => {
143 Field::convert_bool(typed.column_descr(), *typed.current_value())
144 }
145 TripletIter::Int32TripletIter(ref typed) => {
146 Field::convert_int32(typed.column_descr(), *typed.current_value())
147 }
148 TripletIter::Int64TripletIter(ref typed) => {
149 Field::convert_int64(typed.column_descr(), *typed.current_value())
150 }
151 TripletIter::Int96TripletIter(ref typed) => {
152 Field::convert_int96(typed.column_descr(), *typed.current_value())
153 }
154 TripletIter::FloatTripletIter(ref typed) => {
155 Field::convert_float(typed.column_descr(), *typed.current_value())
156 }
157 TripletIter::DoubleTripletIter(ref typed) => {
158 Field::convert_double(typed.column_descr(), *typed.current_value())
159 }
160 TripletIter::ByteArrayTripletIter(ref typed) => {
161 Field::convert_byte_array(typed.column_descr(), typed.current_value().clone())?
162 }
163 TripletIter::FixedLenByteArrayTripletIter(ref typed) => Field::convert_byte_array(
164 typed.column_descr(),
165 typed.current_value().clone().into(),
166 )?,
167 };
168 Ok(field)
169 }
170}
171
172pub struct TypedTripletIter<T: DataType> {
175 reader: ColumnReaderImpl<T>,
176 column_descr: ColumnDescPtr,
177 batch_size: usize,
178 max_def_level: i16,
180 max_rep_level: i16,
181 values: Vec<T::T>,
183 def_levels: Option<Vec<i16>>,
184 rep_levels: Option<Vec<i16>>,
185 curr_triplet_index: usize,
187 triplets_left: usize,
189 has_next: bool,
191}
192
193impl<T: DataType> TypedTripletIter<T> {
194 fn new(descr: ColumnDescPtr, batch_size: usize, column_reader: ColumnReader) -> Self {
197 assert!(
198 batch_size > 0,
199 "Expected positive batch size, found: {batch_size}"
200 );
201
202 let max_def_level = descr.max_def_level();
203 let max_rep_level = descr.max_rep_level();
204
205 let def_levels = if max_def_level == 0 {
206 None
207 } else {
208 Some(vec![0; batch_size])
209 };
210 let rep_levels = if max_rep_level == 0 {
211 None
212 } else {
213 Some(vec![0; batch_size])
214 };
215
216 Self {
217 reader: get_typed_column_reader(column_reader),
218 column_descr: descr,
219 batch_size,
220 max_def_level,
221 max_rep_level,
222 values: vec![T::T::default(); batch_size],
223 def_levels,
224 rep_levels,
225 curr_triplet_index: 0,
226 triplets_left: 0,
227 has_next: false,
228 }
229 }
230
231 #[inline]
233 pub fn column_descr(&self) -> &ColumnDescPtr {
234 &self.column_descr
235 }
236
237 #[inline]
239 fn max_def_level(&self) -> i16 {
240 self.max_def_level
241 }
242
243 #[inline]
245 fn max_rep_level(&self) -> i16 {
246 self.max_rep_level
247 }
248
249 #[inline]
252 fn current_value(&self) -> &T::T {
253 assert!(
254 self.current_def_level() == self.max_def_level(),
255 "Cannot extract value, max definition level: {}, current level: {}",
256 self.max_def_level(),
257 self.current_def_level()
258 );
259 &self.values[self.curr_triplet_index]
260 }
261
262 #[inline]
265 fn current_def_level(&self) -> i16 {
266 if !self.has_next {
267 return 0;
268 }
269 match self.def_levels {
270 Some(ref vec) => vec[self.curr_triplet_index],
271 None => self.max_def_level,
272 }
273 }
274
275 #[inline]
278 fn current_rep_level(&self) -> i16 {
279 if !self.has_next {
280 return 0;
281 }
282 match self.rep_levels {
283 Some(ref vec) => vec[self.curr_triplet_index],
284 None => self.max_rep_level,
285 }
286 }
287
288 #[inline]
291 fn has_next(&self) -> bool {
292 self.has_next
293 }
294
295 fn read_next(&mut self) -> Result<bool> {
298 self.curr_triplet_index += 1;
299
300 while self.curr_triplet_index >= self.triplets_left {
304 let (records_read, values_read, levels_read) = {
305 self.values.clear();
306 if let Some(x) = &mut self.def_levels {
307 x.clear()
308 }
309 if let Some(x) = &mut self.rep_levels {
310 x.clear()
311 }
312
313 self.reader.read_records(
315 self.batch_size,
316 self.def_levels.as_mut(),
317 self.rep_levels.as_mut(),
318 &mut self.values,
319 )?
320 };
321
322 if records_read == 0 && values_read == 0 && levels_read == 0 {
324 self.curr_triplet_index = 0;
325 self.has_next = false;
326 return Ok(false);
327 }
328
329 if levels_read == 0 || values_read == levels_read {
331 self.curr_triplet_index = 0;
334 self.triplets_left = values_read;
335 } else if values_read < levels_read {
336 let mut idx = values_read;
344 let def_levels = self.def_levels.as_ref().unwrap();
345 self.values.resize(levels_read, T::T::default());
346 for i in 0..levels_read {
347 if def_levels[levels_read - i - 1] == self.max_def_level {
348 idx -= 1; self.values.swap(levels_read - i - 1, idx);
350 }
351 }
352 self.curr_triplet_index = 0;
353 self.triplets_left = levels_read;
354 } else {
355 return Err(general_err!(
356 "Spacing of values/levels is wrong, values_read: {}, levels_read: {}",
357 values_read,
358 levels_read
359 ));
360 }
361 }
362
363 self.has_next = true;
364 Ok(true)
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 use crate::file::reader::{FileReader, SerializedFileReader};
373 use crate::schema::types::ColumnPath;
374 use crate::util::test_common::file_util::get_test_file;
375
376 #[test]
377 #[should_panic(expected = "Expected positive batch size, found: 0")]
378 fn test_triplet_zero_batch_size() {
379 let column_path = ColumnPath::from(vec!["b_struct".to_string(), "b_c_int".to_string()]);
380 test_column_in_file("nulls.snappy.parquet", 0, &column_path, &[], &[], &[]);
381 }
382
383 #[test]
384 fn test_triplet_null_column() {
385 let path = vec!["b_struct", "b_c_int"];
386 let values = vec![];
387 let def_levels = vec![1, 1, 1, 1, 1, 1, 1, 1];
388 let rep_levels = vec![0, 0, 0, 0, 0, 0, 0, 0];
389 test_triplet_iter(
390 "nulls.snappy.parquet",
391 path,
392 &values,
393 &def_levels,
394 &rep_levels,
395 );
396 }
397
398 #[test]
399 fn test_triplet_required_column() {
400 let path = vec!["ID"];
401 let values = vec![Field::Long(8)];
402 let def_levels = vec![0];
403 let rep_levels = vec![0];
404 test_triplet_iter(
405 "nonnullable.impala.parquet",
406 path,
407 &values,
408 &def_levels,
409 &rep_levels,
410 );
411 }
412
413 #[test]
414 fn test_triplet_optional_column() {
415 let path = vec!["nested_struct", "A"];
416 let values = vec![Field::Int(1), Field::Int(7)];
417 let def_levels = vec![2, 1, 1, 1, 1, 0, 2];
418 let rep_levels = vec![0, 0, 0, 0, 0, 0, 0];
419 test_triplet_iter(
420 "nullable.impala.parquet",
421 path,
422 &values,
423 &def_levels,
424 &rep_levels,
425 );
426 }
427
428 #[test]
429 fn test_triplet_optional_list_column() {
430 let path = vec!["a", "list", "element", "list", "element", "list", "element"];
431 let values = vec![
432 Field::Str("a".to_string()),
433 Field::Str("b".to_string()),
434 Field::Str("c".to_string()),
435 Field::Str("d".to_string()),
436 Field::Str("a".to_string()),
437 Field::Str("b".to_string()),
438 Field::Str("c".to_string()),
439 Field::Str("d".to_string()),
440 Field::Str("e".to_string()),
441 Field::Str("a".to_string()),
442 Field::Str("b".to_string()),
443 Field::Str("c".to_string()),
444 Field::Str("d".to_string()),
445 Field::Str("e".to_string()),
446 Field::Str("f".to_string()),
447 ];
448 let def_levels = vec![7, 7, 7, 4, 7, 7, 7, 7, 7, 4, 7, 7, 7, 7, 7, 7, 4, 7];
449 let rep_levels = vec![0, 3, 2, 1, 2, 0, 3, 2, 3, 1, 2, 0, 3, 2, 3, 2, 1, 2];
450 test_triplet_iter(
451 "nested_lists.snappy.parquet",
452 path,
453 &values,
454 &def_levels,
455 &rep_levels,
456 );
457 }
458
459 #[test]
460 fn test_triplet_optional_map_column() {
461 let path = vec!["a", "key_value", "value", "key_value", "key"];
462 let values = vec![
463 Field::Int(1),
464 Field::Int(2),
465 Field::Int(1),
466 Field::Int(1),
467 Field::Int(3),
468 Field::Int(4),
469 Field::Int(5),
470 ];
471 let def_levels = vec![4, 4, 4, 2, 3, 4, 4, 4, 4];
472 let rep_levels = vec![0, 2, 0, 0, 0, 0, 0, 2, 2];
473 test_triplet_iter(
474 "nested_maps.snappy.parquet",
475 path,
476 &values,
477 &def_levels,
478 &rep_levels,
479 );
480 }
481
482 fn test_triplet_iter(
484 file_name: &str,
485 column_path: Vec<&str>,
486 expected_values: &[Field],
487 expected_def_levels: &[i16],
488 expected_rep_levels: &[i16],
489 ) {
490 let path: Vec<String> = column_path.iter().map(|x| x.to_string()).collect();
492 let column_path = ColumnPath::from(path);
493
494 let batch_sizes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128, 256];
495 for batch_size in batch_sizes {
496 test_column_in_file(
497 file_name,
498 batch_size,
499 &column_path,
500 expected_values,
501 expected_def_levels,
502 expected_rep_levels,
503 );
504 }
505 }
506
507 fn test_column_in_file(
509 file_name: &str,
510 batch_size: usize,
511 column_path: &ColumnPath,
512 expected_values: &[Field],
513 expected_def_levels: &[i16],
514 expected_rep_levels: &[i16],
515 ) {
516 let file = get_test_file(file_name);
517 let file_reader = SerializedFileReader::new(file).unwrap();
518 let metadata = file_reader.metadata();
519 let file_metadata = metadata.file_metadata();
521 let schema = file_metadata.schema_descr();
522 let row_group_reader = file_reader.get_row_group(0).unwrap();
524
525 for i in 0..schema.num_columns() {
526 let descr = schema.column(i);
527 if descr.path() == column_path {
528 let reader = row_group_reader.get_column_reader(i).unwrap();
529 test_triplet_column(
530 descr,
531 reader,
532 batch_size,
533 expected_values,
534 expected_def_levels,
535 expected_rep_levels,
536 );
537 }
538 }
539 }
540
541 fn test_triplet_column(
543 descr: ColumnDescPtr,
544 reader: ColumnReader,
545 batch_size: usize,
546 expected_values: &[Field],
547 expected_def_levels: &[i16],
548 expected_rep_levels: &[i16],
549 ) {
550 let mut iter = TripletIter::new(descr.clone(), reader, batch_size);
551 let mut values: Vec<Field> = Vec::new();
552 let mut def_levels: Vec<i16> = Vec::new();
553 let mut rep_levels: Vec<i16> = Vec::new();
554
555 assert_eq!(iter.max_def_level(), descr.max_def_level());
556 assert_eq!(iter.max_rep_level(), descr.max_rep_level());
557
558 while let Ok(true) = iter.read_next() {
559 assert!(iter.has_next());
560 if !iter.is_null() {
561 values.push(iter.current_value().unwrap());
562 }
563 def_levels.push(iter.current_def_level());
564 rep_levels.push(iter.current_rep_level());
565 }
566
567 assert_eq!(values, expected_values);
568 assert_eq!(def_levels, expected_def_levels);
569 assert_eq!(rep_levels, expected_rep_levels);
570 }
571
572 fn open_triplet_iter(file_name: &str, path: &[&str], batch_size: usize) -> TripletIter {
573 let column_path = ColumnPath::from(path.iter().map(|x| x.to_string()).collect::<Vec<_>>());
574 let file = get_test_file(file_name);
575 let file_reader = SerializedFileReader::new(file).unwrap();
576 let metadata = file_reader.metadata();
577 let schema = metadata.file_metadata().schema_descr();
578 let row_group_reader = file_reader.get_row_group(0).unwrap();
579 for i in 0..schema.num_columns() {
580 let descr = schema.column(i);
581 if descr.path() == &column_path {
582 let reader = row_group_reader.get_column_reader(i).unwrap();
583 return TripletIter::new(descr.clone(), reader, batch_size);
584 }
585 }
586 panic!("Column {column_path:?} not found in {file_name}");
587 }
588
589 #[test]
590 fn test_current_def_level_safe_after_exhaustion() {
591 let mut iter = open_triplet_iter("nulls.snappy.parquet", &["b_struct", "b_c_int"], 256);
592 while let Ok(true) = iter.read_next() {}
593 assert!(!iter.has_next());
594 assert_eq!(iter.current_def_level(), 0);
595 }
596
597 #[test]
598 fn test_current_rep_level_safe_after_exhaustion() {
599 let mut iter = open_triplet_iter(
600 "nested_lists.snappy.parquet",
601 &["a", "list", "element", "list", "element", "list", "element"],
602 256,
603 );
604 while let Ok(true) = iter.read_next() {}
605 assert!(!iter.has_next());
606 assert_eq!(iter.current_rep_level(), 0);
607 }
608}