1use std::cmp::Ordering;
19use std::sync::Arc;
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::errors::ParquetError;
23use crate::errors::Result;
24use arrow_array::FixedSizeListArray;
25use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
26use arrow_data::{transform::MutableArrayData, ArrayData};
27use arrow_schema::DataType as ArrowType;
28
29pub struct FixedSizeListArrayReader {
31 item_reader: Box<dyn ArrayReader>,
32 fixed_size: usize,
34 data_type: ArrowType,
35 def_level: i16,
37 rep_level: i16,
39 nullable: bool,
41}
42
43impl FixedSizeListArrayReader {
44 pub fn new(
46 item_reader: Box<dyn ArrayReader>,
47 fixed_size: usize,
48 data_type: ArrowType,
49 def_level: i16,
50 rep_level: i16,
51 nullable: bool,
52 ) -> Self {
53 Self {
54 item_reader,
55 fixed_size,
56 data_type,
57 def_level,
58 rep_level,
59 nullable,
60 }
61 }
62}
63
64impl ArrayReader for FixedSizeListArrayReader {
65 fn as_any(&self) -> &dyn std::any::Any {
66 self
67 }
68
69 fn get_data_type(&self) -> &ArrowType {
70 &self.data_type
71 }
72
73 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
74 let size = self.item_reader.read_records(batch_size)?;
75 Ok(size)
76 }
77
78 fn consume_batch(&mut self) -> Result<ArrayRef> {
79 let next_batch_array = self.item_reader.consume_batch()?;
80 if next_batch_array.is_empty() {
81 return Ok(new_empty_array(&self.data_type));
82 }
83
84 let def_levels = self
85 .get_def_levels()
86 .ok_or_else(|| general_err!("item_reader def levels are None"))?;
87 let rep_levels = self
88 .get_rep_levels()
89 .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
90
91 if !rep_levels.is_empty() && rep_levels[0] != 0 {
92 return Err(general_err!("first repetition level of batch must be 0"));
95 }
96
97 let mut validity = self
98 .nullable
99 .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
100
101 let data = next_batch_array.to_data();
102 let mut child_data_builder =
103 MutableArrayData::new(vec![&data], true, next_batch_array.len());
104
105 let mut child_idx = 0;
107 let mut list_len = 0;
109 let mut start_idx = None;
111 let mut row_len = 0;
112
113 def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
114 match r.cmp(&self.rep_level) {
115 Ordering::Greater => {
116 if *d < self.def_level {
118 return Err(general_err!(
119 "Encountered repetition level too large for definition level"
120 ));
121 }
122 }
123 Ordering::Equal => {
124 child_idx += 1;
126 row_len += 1;
127 }
128 Ordering::Less => {
129 list_len += 1;
131
132 if start_idx.is_some() && row_len != self.fixed_size {
137 return Err(general_err!(
138 "Encountered misaligned row with length {} (expected length {})",
139 row_len,
140 self.fixed_size
141 ));
142 }
143 row_len = 0;
144
145 if *d >= self.def_level {
146 row_len += 1;
147
148 if let Some(validity) = validity.as_mut() {
150 validity.append(true);
151 }
152 start_idx.get_or_insert(child_idx);
154 } else {
155 if let Some(start) = start_idx.take() {
158 child_data_builder.extend(0, start, child_idx);
160 }
161 child_data_builder.extend_nulls(self.fixed_size);
163
164 if let Some(validity) = validity.as_mut() {
165 validity.append(*d + 1 == self.def_level);
167 }
168 }
169 child_idx += 1;
170 }
171 }
172 Ok(())
173 })?;
174
175 let child_data = match start_idx {
176 Some(0) => {
177 next_batch_array.to_data()
179 }
180 Some(start) => {
181 child_data_builder.extend(0, start, child_idx);
183 child_data_builder.freeze()
184 }
185 None => child_data_builder.freeze(),
186 };
187
188 if list_len * self.fixed_size != child_data.len() {
190 return Err(general_err!(
191 "fixed-size list length must be a multiple of {} but array contains {} elements",
192 self.fixed_size,
193 child_data.len()
194 ));
195 }
196
197 let mut list_builder = ArrayData::builder(self.get_data_type().clone())
198 .len(list_len)
199 .add_child_data(child_data);
200
201 if let Some(builder) = validity {
202 list_builder = list_builder.null_bit_buffer(Some(builder.into()));
203 }
204
205 let list_data = unsafe { list_builder.build_unchecked() };
206
207 let result_array = FixedSizeListArray::from(list_data);
208 Ok(Arc::new(result_array))
209 }
210
211 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
212 self.item_reader.skip_records(num_records)
213 }
214
215 fn get_def_levels(&self) -> Option<&[i16]> {
216 self.item_reader.get_def_levels()
217 }
218
219 fn get_rep_levels(&self) -> Option<&[i16]> {
220 self.item_reader.get_rep_levels()
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use crate::arrow::{
228 array_reader::{test_util::InMemoryArrayReader, ListArrayReader},
229 arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader},
230 ArrowWriter,
231 };
232 use arrow::datatypes::{Field, Int32Type};
233 use arrow_array::{
234 builder::{FixedSizeListBuilder, Int32Builder, ListBuilder},
235 cast::AsArray,
236 FixedSizeListArray, ListArray, PrimitiveArray, RecordBatch,
237 };
238 use arrow_buffer::Buffer;
239 use arrow_data::ArrayDataBuilder;
240 use arrow_schema::Schema;
241 use bytes::Bytes;
242
243 #[test]
244 fn test_nullable_list() {
245 let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
247 vec![
248 None,
249 Some([Some(1), None, Some(2)]),
250 None,
251 Some([Some(3), Some(4), Some(5)]),
252 Some([None, None, None]),
253 ],
254 3,
255 );
256
257 let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
258 None,
259 Some(1),
260 None,
261 Some(2),
262 None,
263 Some(3),
264 Some(4),
265 Some(5),
266 None,
267 None,
268 None,
269 ]));
270 let item_array_reader = InMemoryArrayReader::new(
271 ArrowType::Int32,
272 array,
273 Some(vec![0, 3, 2, 3, 0, 3, 3, 3, 2, 2, 2]),
274 Some(vec![0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1]),
275 );
276
277 let mut list_array_reader = FixedSizeListArrayReader::new(
278 Box::new(item_array_reader),
279 3,
280 ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 3),
281 2,
282 1,
283 true,
284 );
285 let actual = list_array_reader.next_batch(1024).unwrap();
286 let actual = actual
287 .as_any()
288 .downcast_ref::<FixedSizeListArray>()
289 .unwrap();
290 assert_eq!(&expected, actual)
291 }
292
293 #[test]
294 fn test_required_list() {
295 let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
297 vec![
298 Some([Some(1), None]),
299 Some([Some(2), Some(3)]),
300 Some([None, None]),
301 Some([Some(4), Some(5)]),
302 ],
303 2,
304 );
305
306 let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
307 Some(1),
308 None,
309 Some(2),
310 Some(3),
311 None,
312 None,
313 Some(4),
314 Some(5),
315 ]));
316 let item_array_reader = InMemoryArrayReader::new(
317 ArrowType::Int32,
318 array,
319 Some(vec![2, 1, 2, 2, 1, 1, 2, 2]),
320 Some(vec![0, 1, 0, 1, 0, 1, 0, 1]),
321 );
322
323 let mut list_array_reader = FixedSizeListArrayReader::new(
324 Box::new(item_array_reader),
325 2,
326 ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 2),
327 1,
328 1,
329 false,
330 );
331 let actual = list_array_reader.next_batch(1024).unwrap();
332 let actual = actual
333 .as_any()
334 .downcast_ref::<FixedSizeListArray>()
335 .unwrap();
336 assert_eq!(&expected, actual)
337 }
338
339 #[test]
340 fn test_nested_list() {
341 let l2_type =
350 ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 2);
351 let l1_type =
352 ArrowType::FixedSizeList(Arc::new(Field::new_list_field(l2_type.clone(), false)), 1);
353
354 let array = PrimitiveArray::<Int32Type>::from(vec![
355 None,
356 None,
357 Some(1),
358 Some(2),
359 None,
360 Some(3),
361 None,
362 None,
363 Some(4),
364 Some(5),
365 None,
366 None,
367 ]);
368
369 let l2 = ArrayDataBuilder::new(l2_type.clone())
370 .len(6)
371 .add_child_data(array.into_data())
372 .build()
373 .unwrap();
374
375 let l1 = ArrayDataBuilder::new(l1_type.clone())
376 .len(6)
377 .add_child_data(l2)
378 .null_bit_buffer(Some(Buffer::from([0b110110])))
379 .build()
380 .unwrap();
381
382 let expected = FixedSizeListArray::from(l1);
383
384 let values = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
385 None,
386 Some(1),
387 Some(2),
388 None,
389 Some(3),
390 None,
391 Some(4),
392 Some(5),
393 None,
394 None,
395 ]));
396
397 let item_array_reader = InMemoryArrayReader::new(
398 ArrowType::Int32,
399 values,
400 Some(vec![0, 5, 5, 4, 5, 0, 5, 5, 4, 4]),
401 Some(vec![0, 0, 2, 0, 2, 0, 0, 2, 0, 2]),
402 );
403
404 let l2 =
405 FixedSizeListArrayReader::new(Box::new(item_array_reader), 2, l2_type, 4, 2, false);
406 let mut l1 = FixedSizeListArrayReader::new(Box::new(l2), 1, l1_type, 3, 1, true);
407
408 let expected_1 = expected.slice(0, 2);
409 let expected_2 = expected.slice(2, 4);
410
411 let actual = l1.next_batch(2).unwrap();
412 assert_eq!(actual.as_ref(), &expected_1);
413
414 let actual = l1.next_batch(1024).unwrap();
415 assert_eq!(actual.as_ref(), &expected_2);
416 }
417
418 #[test]
419 fn test_empty_list() {
420 let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
422 vec![None, Some([]), None, Some([])],
423 0,
424 );
425
426 let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
427 None, None, None, None,
428 ]));
429 let item_array_reader = InMemoryArrayReader::new(
430 ArrowType::Int32,
431 array,
432 Some(vec![0, 1, 0, 1]),
433 Some(vec![0, 0, 0, 0]),
434 );
435
436 let mut list_array_reader = FixedSizeListArrayReader::new(
437 Box::new(item_array_reader),
438 0,
439 ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 0),
440 2,
441 1,
442 true,
443 );
444 let actual = list_array_reader.next_batch(1024).unwrap();
445 let actual = actual
446 .as_any()
447 .downcast_ref::<FixedSizeListArray>()
448 .unwrap();
449 assert_eq!(&expected, actual)
450 }
451
452 #[test]
453 fn test_nested_var_list() {
454 let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
456 builder.values().append_value([Some(1), None, Some(3)]);
457 builder.values().append_null();
458 builder.append(true);
459 builder.values().append_value([Some(4)]);
460 builder.values().append_value([]);
461 builder.append(true);
462 builder.values().append_value([Some(5), Some(6)]);
463 builder.values().append_value([None, None]);
464 builder.append(true);
465 builder.values().append_null();
466 builder.values().append_null();
467 builder.append(false);
468 let expected = builder.finish();
469
470 let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
471 Some(1),
472 None,
473 Some(3),
474 None,
475 Some(4),
476 None,
477 Some(5),
478 Some(6),
479 None,
480 None,
481 None,
482 ]));
483
484 let inner_type = ArrowType::List(Arc::new(Field::new_list_field(ArrowType::Int32, true)));
485 let list_type =
486 ArrowType::FixedSizeList(Arc::new(Field::new_list_field(inner_type.clone(), true)), 2);
487
488 let item_array_reader = InMemoryArrayReader::new(
489 ArrowType::Int32,
490 array,
491 Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
492 Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
493 );
494
495 let inner_array_reader =
496 ListArrayReader::<i32>::new(Box::new(item_array_reader), inner_type, 4, 2, true);
497
498 let mut list_array_reader =
499 FixedSizeListArrayReader::new(Box::new(inner_array_reader), 2, list_type, 2, 1, true);
500 let actual = list_array_reader.next_batch(1024).unwrap();
501 let actual = actual
502 .as_any()
503 .downcast_ref::<FixedSizeListArray>()
504 .unwrap();
505 assert_eq!(&expected, actual)
506 }
507
508 #[test]
509 fn test_read_list_column() {
510 let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
520 vec![
521 Some(vec![Some(1), Some(2), Some(3), None]),
522 Some(vec![Some(5), Some(6), Some(7), Some(8)]),
523 None,
524 Some(vec![Some(9), None, Some(11), Some(12)]),
525 Some(vec![None, None, None, None]),
526 ],
527 4,
528 );
529
530 let primitive =
532 PrimitiveArray::<Int32Type>::from_iter(vec![None, Some(2), Some(3), None, Some(5)]);
533
534 let schema = Arc::new(Schema::new(vec![
535 Field::new(
536 "list",
537 ArrowType::FixedSizeList(
538 Arc::new(Field::new_list_field(ArrowType::Int32, true)),
539 4,
540 ),
541 true,
542 ),
543 Field::new("primitive", ArrowType::Int32, true),
544 ]));
545
546 let batch = RecordBatch::try_new(
548 schema.clone(),
549 vec![Arc::new(list.clone()), Arc::new(primitive.clone())],
550 )
551 .expect("unable to create record batch");
552
553 let mut buffer = Vec::with_capacity(1024);
555 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)
556 .expect("unable to create parquet writer");
557 writer.write(&batch).expect("unable to write record batch");
558 writer.close().expect("unable to close parquet writer");
559
560 let reader = Bytes::from(buffer);
562 let mut batch_reader = ParquetRecordBatchReader::try_new(reader, 1024)
563 .expect("unable to create parquet reader");
564 let actual = batch_reader
565 .next()
566 .expect("missing record batch")
567 .expect("unable to read record batch");
568
569 assert_eq!(schema, actual.schema());
571 let actual_list = actual
572 .column(0)
573 .as_any()
574 .downcast_ref::<FixedSizeListArray>()
575 .expect("unable to cast array to FixedSizeListArray");
576 let actual_primitive = actual.column(1).as_primitive::<Int32Type>();
577 assert_eq!(actual_list, &list);
578 assert_eq!(actual_primitive, &primitive);
579 }
580
581 #[test]
582 fn test_read_as_dyn_list() {
583 let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
593 vec![
594 Some(vec![Some(1), Some(2), Some(3), None]),
595 Some(vec![Some(5), Some(6), Some(7), Some(8)]),
596 None,
597 Some(vec![Some(9), None, Some(11), Some(12)]),
598 Some(vec![None, None, None, None]),
599 ],
600 4,
601 );
602
603 let schema = Arc::new(Schema::new(vec![Field::new(
604 "list",
605 ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 4),
606 true,
607 )]));
608
609 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list)]).unwrap();
611
612 let mut buffer = Vec::with_capacity(1024);
614 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None)
615 .expect("unable to create parquet writer");
616 writer.write(&batch).expect("unable to write record batch");
617 writer.close().expect("unable to close parquet writer");
618
619 let reader = Bytes::from(buffer);
621 let mut batch_reader = ArrowReaderBuilder::try_new_with_options(
622 reader,
623 ArrowReaderOptions::new().with_skip_arrow_metadata(true),
624 )
625 .expect("unable to create reader builder")
626 .build()
627 .expect("unable to create parquet reader");
628 let actual = batch_reader
629 .next()
630 .expect("missing record batch")
631 .expect("unable to read record batch");
632
633 let col = actual.column(0).as_list::<i32>();
635 let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
636 Some(vec![Some(1), Some(2), Some(3), None]),
637 Some(vec![Some(5), Some(6), Some(7), Some(8)]),
638 None,
639 Some(vec![Some(9), None, Some(11), Some(12)]),
640 Some(vec![None, None, None, None]),
641 ]);
642 assert_eq!(col, &expected);
643 }
644}