1use crate::basic::Type as PhysicalType;
19use crate::column::reader::{get_typed_column_reader, ColumnReader, ColumnReaderImpl};
20use crate::data_type::*;
21use crate::errors::{ParquetError, Result};
22use crate::record::api::Field;
23use crate::schema::types::ColumnDescPtr;
24
25macro_rules! triplet_enum_func {
29 ($self:ident, $func:ident, $( $token:tt ),*) => ({
30 match *$self {
31 TripletIter::BoolTripletIter($($token)* typed) => typed.$func(),
32 TripletIter::Int32TripletIter($($token)* typed) => typed.$func(),
33 TripletIter::Int64TripletIter($($token)* typed) => typed.$func(),
34 TripletIter::Int96TripletIter($($token)* typed) => typed.$func(),
35 TripletIter::FloatTripletIter($($token)* typed) => typed.$func(),
36 TripletIter::DoubleTripletIter($($token)* typed) => typed.$func(),
37 TripletIter::ByteArrayTripletIter($($token)* typed) => typed.$func(),
38 TripletIter::FixedLenByteArrayTripletIter($($token)* typed) => typed.$func()
39 }
40 });
41}
42
43#[allow(clippy::enum_variant_names)]
46pub enum TripletIter {
47 BoolTripletIter(TypedTripletIter<BoolType>),
48 Int32TripletIter(TypedTripletIter<Int32Type>),
49 Int64TripletIter(TypedTripletIter<Int64Type>),
50 Int96TripletIter(TypedTripletIter<Int96Type>),
51 FloatTripletIter(TypedTripletIter<FloatType>),
52 DoubleTripletIter(TypedTripletIter<DoubleType>),
53 ByteArrayTripletIter(TypedTripletIter<ByteArrayType>),
54 FixedLenByteArrayTripletIter(TypedTripletIter<FixedLenByteArrayType>),
55}
56
57impl TripletIter {
58 pub fn new(descr: ColumnDescPtr, reader: ColumnReader, batch_size: usize) -> Self {
60 match descr.physical_type() {
61 PhysicalType::BOOLEAN => {
62 TripletIter::BoolTripletIter(TypedTripletIter::new(descr, batch_size, reader))
63 }
64 PhysicalType::INT32 => {
65 TripletIter::Int32TripletIter(TypedTripletIter::new(descr, batch_size, reader))
66 }
67 PhysicalType::INT64 => {
68 TripletIter::Int64TripletIter(TypedTripletIter::new(descr, batch_size, reader))
69 }
70 PhysicalType::INT96 => {
71 TripletIter::Int96TripletIter(TypedTripletIter::new(descr, batch_size, reader))
72 }
73 PhysicalType::FLOAT => {
74 TripletIter::FloatTripletIter(TypedTripletIter::new(descr, batch_size, reader))
75 }
76 PhysicalType::DOUBLE => {
77 TripletIter::DoubleTripletIter(TypedTripletIter::new(descr, batch_size, reader))
78 }
79 PhysicalType::BYTE_ARRAY => {
80 TripletIter::ByteArrayTripletIter(TypedTripletIter::new(descr, batch_size, reader))
81 }
82 PhysicalType::FIXED_LEN_BYTE_ARRAY => TripletIter::FixedLenByteArrayTripletIter(
83 TypedTripletIter::new(descr, batch_size, reader),
84 ),
85 }
86 }
87
88 #[inline]
91 pub fn read_next(&mut self) -> Result<bool> {
92 triplet_enum_func!(self, read_next, ref, mut)
93 }
94
95 #[inline]
100 pub fn has_next(&self) -> bool {
101 triplet_enum_func!(self, has_next, ref)
102 }
103
104 #[inline]
106 pub fn current_def_level(&self) -> i16 {
107 triplet_enum_func!(self, current_def_level, ref)
108 }
109
110 #[inline]
112 pub fn max_def_level(&self) -> i16 {
113 triplet_enum_func!(self, max_def_level, ref)
114 }
115
116 #[inline]
118 pub fn current_rep_level(&self) -> i16 {
119 triplet_enum_func!(self, current_rep_level, ref)
120 }
121
122 #[inline]
124 pub fn max_rep_level(&self) -> i16 {
125 triplet_enum_func!(self, max_rep_level, ref)
126 }
127
128 #[inline]
132 pub fn is_null(&self) -> bool {
133 self.current_def_level() < self.max_def_level()
134 }
135
136 pub fn current_value(&self) -> Result<Field> {
138 if self.is_null() {
139 return Ok(Field::Null);
140 }
141 let field = match *self {
142 TripletIter::BoolTripletIter(ref typed) => {
143 Field::convert_bool(typed.column_descr(), *typed.current_value())
144 }
145 TripletIter::Int32TripletIter(ref typed) => {
146 Field::convert_int32(typed.column_descr(), *typed.current_value())
147 }
148 TripletIter::Int64TripletIter(ref typed) => {
149 Field::convert_int64(typed.column_descr(), *typed.current_value())
150 }
151 TripletIter::Int96TripletIter(ref typed) => {
152 Field::convert_int96(typed.column_descr(), *typed.current_value())
153 }
154 TripletIter::FloatTripletIter(ref typed) => {
155 Field::convert_float(typed.column_descr(), *typed.current_value())
156 }
157 TripletIter::DoubleTripletIter(ref typed) => {
158 Field::convert_double(typed.column_descr(), *typed.current_value())
159 }
160 TripletIter::ByteArrayTripletIter(ref typed) => {
161 Field::convert_byte_array(typed.column_descr(), typed.current_value().clone())?
162 }
163 TripletIter::FixedLenByteArrayTripletIter(ref typed) => Field::convert_byte_array(
164 typed.column_descr(),
165 typed.current_value().clone().into(),
166 )?,
167 };
168 Ok(field)
169 }
170}
171
172pub struct TypedTripletIter<T: DataType> {
175 reader: ColumnReaderImpl<T>,
176 column_descr: ColumnDescPtr,
177 batch_size: usize,
178 max_def_level: i16,
180 max_rep_level: i16,
181 values: Vec<T::T>,
183 def_levels: Option<Vec<i16>>,
184 rep_levels: Option<Vec<i16>>,
185 curr_triplet_index: usize,
187 triplets_left: usize,
189 has_next: bool,
191}
192
193impl<T: DataType> TypedTripletIter<T> {
194 fn new(descr: ColumnDescPtr, batch_size: usize, column_reader: ColumnReader) -> Self {
197 assert!(
198 batch_size > 0,
199 "Expected positive batch size, found: {batch_size}"
200 );
201
202 let max_def_level = descr.max_def_level();
203 let max_rep_level = descr.max_rep_level();
204
205 let def_levels = if max_def_level == 0 {
206 None
207 } else {
208 Some(vec![0; batch_size])
209 };
210 let rep_levels = if max_rep_level == 0 {
211 None
212 } else {
213 Some(vec![0; batch_size])
214 };
215
216 Self {
217 reader: get_typed_column_reader(column_reader),
218 column_descr: descr,
219 batch_size,
220 max_def_level,
221 max_rep_level,
222 values: vec![T::T::default(); batch_size],
223 def_levels,
224 rep_levels,
225 curr_triplet_index: 0,
226 triplets_left: 0,
227 has_next: false,
228 }
229 }
230
231 #[inline]
233 pub fn column_descr(&self) -> &ColumnDescPtr {
234 &self.column_descr
235 }
236
237 #[inline]
239 fn max_def_level(&self) -> i16 {
240 self.max_def_level
241 }
242
243 #[inline]
245 fn max_rep_level(&self) -> i16 {
246 self.max_rep_level
247 }
248
249 #[inline]
252 fn current_value(&self) -> &T::T {
253 assert!(
254 self.current_def_level() == self.max_def_level(),
255 "Cannot extract value, max definition level: {}, current level: {}",
256 self.max_def_level(),
257 self.current_def_level()
258 );
259 &self.values[self.curr_triplet_index]
260 }
261
262 #[inline]
265 fn current_def_level(&self) -> i16 {
266 match self.def_levels {
267 Some(ref vec) => vec[self.curr_triplet_index],
268 None => self.max_def_level,
269 }
270 }
271
272 #[inline]
275 fn current_rep_level(&self) -> i16 {
276 match self.rep_levels {
277 Some(ref vec) => vec[self.curr_triplet_index],
278 None => self.max_rep_level,
279 }
280 }
281
282 #[inline]
285 fn has_next(&self) -> bool {
286 self.has_next
287 }
288
289 fn read_next(&mut self) -> Result<bool> {
292 self.curr_triplet_index += 1;
293
294 while self.curr_triplet_index >= self.triplets_left {
298 let (records_read, values_read, levels_read) = {
299 self.values.clear();
300 if let Some(x) = &mut self.def_levels {
301 x.clear()
302 }
303 if let Some(x) = &mut self.rep_levels {
304 x.clear()
305 }
306
307 self.reader.read_records(
309 self.batch_size,
310 self.def_levels.as_mut(),
311 self.rep_levels.as_mut(),
312 &mut self.values,
313 )?
314 };
315
316 if records_read == 0 && values_read == 0 && levels_read == 0 {
318 self.has_next = false;
319 return Ok(false);
320 }
321
322 if levels_read == 0 || values_read == levels_read {
324 self.curr_triplet_index = 0;
327 self.triplets_left = values_read;
328 } else if values_read < levels_read {
329 let mut idx = values_read;
337 let def_levels = self.def_levels.as_ref().unwrap();
338 self.values.resize(levels_read, T::T::default());
339 for i in 0..levels_read {
340 if def_levels[levels_read - i - 1] == self.max_def_level {
341 idx -= 1; self.values.swap(levels_read - i - 1, idx);
343 }
344 }
345 self.curr_triplet_index = 0;
346 self.triplets_left = levels_read;
347 } else {
348 return Err(general_err!(
349 "Spacing of values/levels is wrong, values_read: {}, levels_read: {}",
350 values_read,
351 levels_read
352 ));
353 }
354 }
355
356 self.has_next = true;
357 Ok(true)
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364
365 use crate::file::reader::{FileReader, SerializedFileReader};
366 use crate::schema::types::ColumnPath;
367 use crate::util::test_common::file_util::get_test_file;
368
369 #[test]
370 #[should_panic(expected = "Expected positive batch size, found: 0")]
371 fn test_triplet_zero_batch_size() {
372 let column_path = ColumnPath::from(vec!["b_struct".to_string(), "b_c_int".to_string()]);
373 test_column_in_file("nulls.snappy.parquet", 0, &column_path, &[], &[], &[]);
374 }
375
376 #[test]
377 fn test_triplet_null_column() {
378 let path = vec!["b_struct", "b_c_int"];
379 let values = vec![];
380 let def_levels = vec![1, 1, 1, 1, 1, 1, 1, 1];
381 let rep_levels = vec![0, 0, 0, 0, 0, 0, 0, 0];
382 test_triplet_iter(
383 "nulls.snappy.parquet",
384 path,
385 &values,
386 &def_levels,
387 &rep_levels,
388 );
389 }
390
391 #[test]
392 fn test_triplet_required_column() {
393 let path = vec!["ID"];
394 let values = vec![Field::Long(8)];
395 let def_levels = vec![0];
396 let rep_levels = vec![0];
397 test_triplet_iter(
398 "nonnullable.impala.parquet",
399 path,
400 &values,
401 &def_levels,
402 &rep_levels,
403 );
404 }
405
406 #[test]
407 fn test_triplet_optional_column() {
408 let path = vec!["nested_struct", "A"];
409 let values = vec![Field::Int(1), Field::Int(7)];
410 let def_levels = vec![2, 1, 1, 1, 1, 0, 2];
411 let rep_levels = vec![0, 0, 0, 0, 0, 0, 0];
412 test_triplet_iter(
413 "nullable.impala.parquet",
414 path,
415 &values,
416 &def_levels,
417 &rep_levels,
418 );
419 }
420
421 #[test]
422 fn test_triplet_optional_list_column() {
423 let path = vec!["a", "list", "element", "list", "element", "list", "element"];
424 let values = vec![
425 Field::Str("a".to_string()),
426 Field::Str("b".to_string()),
427 Field::Str("c".to_string()),
428 Field::Str("d".to_string()),
429 Field::Str("a".to_string()),
430 Field::Str("b".to_string()),
431 Field::Str("c".to_string()),
432 Field::Str("d".to_string()),
433 Field::Str("e".to_string()),
434 Field::Str("a".to_string()),
435 Field::Str("b".to_string()),
436 Field::Str("c".to_string()),
437 Field::Str("d".to_string()),
438 Field::Str("e".to_string()),
439 Field::Str("f".to_string()),
440 ];
441 let def_levels = vec![7, 7, 7, 4, 7, 7, 7, 7, 7, 4, 7, 7, 7, 7, 7, 7, 4, 7];
442 let rep_levels = vec![0, 3, 2, 1, 2, 0, 3, 2, 3, 1, 2, 0, 3, 2, 3, 2, 1, 2];
443 test_triplet_iter(
444 "nested_lists.snappy.parquet",
445 path,
446 &values,
447 &def_levels,
448 &rep_levels,
449 );
450 }
451
452 #[test]
453 fn test_triplet_optional_map_column() {
454 let path = vec!["a", "key_value", "value", "key_value", "key"];
455 let values = vec![
456 Field::Int(1),
457 Field::Int(2),
458 Field::Int(1),
459 Field::Int(1),
460 Field::Int(3),
461 Field::Int(4),
462 Field::Int(5),
463 ];
464 let def_levels = vec![4, 4, 4, 2, 3, 4, 4, 4, 4];
465 let rep_levels = vec![0, 2, 0, 0, 0, 0, 0, 2, 2];
466 test_triplet_iter(
467 "nested_maps.snappy.parquet",
468 path,
469 &values,
470 &def_levels,
471 &rep_levels,
472 );
473 }
474
475 fn test_triplet_iter(
477 file_name: &str,
478 column_path: Vec<&str>,
479 expected_values: &[Field],
480 expected_def_levels: &[i16],
481 expected_rep_levels: &[i16],
482 ) {
483 let path: Vec<String> = column_path.iter().map(|x| x.to_string()).collect();
485 let column_path = ColumnPath::from(path);
486
487 let batch_sizes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128, 256];
488 for batch_size in batch_sizes {
489 test_column_in_file(
490 file_name,
491 batch_size,
492 &column_path,
493 expected_values,
494 expected_def_levels,
495 expected_rep_levels,
496 );
497 }
498 }
499
500 fn test_column_in_file(
502 file_name: &str,
503 batch_size: usize,
504 column_path: &ColumnPath,
505 expected_values: &[Field],
506 expected_def_levels: &[i16],
507 expected_rep_levels: &[i16],
508 ) {
509 let file = get_test_file(file_name);
510 let file_reader = SerializedFileReader::new(file).unwrap();
511 let metadata = file_reader.metadata();
512 let file_metadata = metadata.file_metadata();
514 let schema = file_metadata.schema_descr();
515 let row_group_reader = file_reader.get_row_group(0).unwrap();
517
518 for i in 0..schema.num_columns() {
519 let descr = schema.column(i);
520 if descr.path() == column_path {
521 let reader = row_group_reader.get_column_reader(i).unwrap();
522 test_triplet_column(
523 descr,
524 reader,
525 batch_size,
526 expected_values,
527 expected_def_levels,
528 expected_rep_levels,
529 );
530 }
531 }
532 }
533
534 fn test_triplet_column(
536 descr: ColumnDescPtr,
537 reader: ColumnReader,
538 batch_size: usize,
539 expected_values: &[Field],
540 expected_def_levels: &[i16],
541 expected_rep_levels: &[i16],
542 ) {
543 let mut iter = TripletIter::new(descr.clone(), reader, batch_size);
544 let mut values: Vec<Field> = Vec::new();
545 let mut def_levels: Vec<i16> = Vec::new();
546 let mut rep_levels: Vec<i16> = Vec::new();
547
548 assert_eq!(iter.max_def_level(), descr.max_def_level());
549 assert_eq!(iter.max_rep_level(), descr.max_rep_level());
550
551 while let Ok(true) = iter.read_next() {
552 assert!(iter.has_next());
553 if !iter.is_null() {
554 values.push(iter.current_value().unwrap());
555 }
556 def_levels.push(iter.current_def_level());
557 rep_levels.push(iter.current_rep_level());
558 }
559
560 assert_eq!(values, expected_values);
561 assert_eq!(def_levels, expected_def_levels);
562 assert_eq!(rep_levels, expected_rep_levels);
563 }
564}