1use crate::errors::{ParquetError, Result};
44use arrow_array::cast::AsArray;
45use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
46use arrow_buffer::{NullBuffer, OffsetBuffer};
47use arrow_schema::{DataType, Field};
48use std::ops::Range;
49use std::sync::Arc;
50
51pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
54 let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
55 builder.write(0..array.len());
56 Ok(builder.finish())
57}
58
59fn is_leaf(data_type: &DataType) -> bool {
62 matches!(
63 data_type,
64 DataType::Null
65 | DataType::Boolean
66 | DataType::Int8
67 | DataType::Int16
68 | DataType::Int32
69 | DataType::Int64
70 | DataType::UInt8
71 | DataType::UInt16
72 | DataType::UInt32
73 | DataType::UInt64
74 | DataType::Float16
75 | DataType::Float32
76 | DataType::Float64
77 | DataType::Utf8
78 | DataType::Utf8View
79 | DataType::LargeUtf8
80 | DataType::Timestamp(_, _)
81 | DataType::Date32
82 | DataType::Date64
83 | DataType::Time32(_)
84 | DataType::Time64(_)
85 | DataType::Duration(_)
86 | DataType::Interval(_)
87 | DataType::Binary
88 | DataType::LargeBinary
89 | DataType::BinaryView
90 | DataType::Decimal128(_, _)
91 | DataType::Decimal256(_, _)
92 | DataType::FixedSizeBinary(_)
93 )
94}
95
96#[derive(Debug, Default, Clone, Copy)]
98struct LevelContext {
99 rep_level: i16,
101 def_level: i16,
103}
104
105#[derive(Debug)]
107enum LevelInfoBuilder {
108 Primitive(ArrayLevels),
110 List(
112 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i32>, Option<NullBuffer>, ),
117 LargeList(
119 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i64>, Option<NullBuffer>, ),
124 FixedSizeList(
126 Box<LevelInfoBuilder>, LevelContext, usize, Option<NullBuffer>, ),
131 Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
133}
134
135impl LevelInfoBuilder {
136 fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
138 if field.data_type() != array.data_type() {
139 return Err(arrow_err!(format!(
140 "Incompatible type. Field '{}' has type {}, array has type {}",
141 field.name(),
142 field.data_type(),
143 array.data_type(),
144 )));
145 }
146
147 let is_nullable = field.is_nullable();
148
149 match array.data_type() {
150 d if is_leaf(d) => {
151 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
152 Ok(Self::Primitive(levels))
153 }
154 DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
155 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
156 Ok(Self::Primitive(levels))
157 }
158 DataType::Struct(children) => {
159 let array = array.as_struct();
160 let def_level = match is_nullable {
161 true => parent_ctx.def_level + 1,
162 false => parent_ctx.def_level,
163 };
164
165 let ctx = LevelContext {
166 rep_level: parent_ctx.rep_level,
167 def_level,
168 };
169
170 let children = children
171 .iter()
172 .zip(array.columns())
173 .map(|(f, a)| Self::try_new(f, ctx, a))
174 .collect::<Result<_>>()?;
175
176 Ok(Self::Struct(children, ctx, array.nulls().cloned()))
177 }
178 DataType::List(child)
179 | DataType::LargeList(child)
180 | DataType::Map(child, _)
181 | DataType::FixedSizeList(child, _) => {
182 let def_level = match is_nullable {
183 true => parent_ctx.def_level + 2,
184 false => parent_ctx.def_level + 1,
185 };
186
187 let ctx = LevelContext {
188 rep_level: parent_ctx.rep_level + 1,
189 def_level,
190 };
191
192 Ok(match field.data_type() {
193 DataType::List(_) => {
194 let list = array.as_list();
195 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
196 let offsets = list.offsets().clone();
197 Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
198 }
199 DataType::LargeList(_) => {
200 let list = array.as_list();
201 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
202 let offsets = list.offsets().clone();
203 let nulls = list.nulls().cloned();
204 Self::LargeList(Box::new(child), ctx, offsets, nulls)
205 }
206 DataType::Map(_, _) => {
207 let map = array.as_map();
208 let entries = Arc::new(map.entries().clone()) as ArrayRef;
209 let child = Self::try_new(child.as_ref(), ctx, &entries)?;
210 let offsets = map.offsets().clone();
211 Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
212 }
213 DataType::FixedSizeList(_, size) => {
214 let list = array.as_fixed_size_list();
215 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
216 let nulls = list.nulls().cloned();
217 Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
218 }
219 _ => unreachable!(),
220 })
221 }
222 d => Err(nyi_err!("Datatype {} is not yet supported", d)),
223 }
224 }
225
226 fn finish(self) -> Vec<ArrayLevels> {
229 match self {
230 LevelInfoBuilder::Primitive(v) => vec![v],
231 LevelInfoBuilder::List(v, _, _, _)
232 | LevelInfoBuilder::LargeList(v, _, _, _)
233 | LevelInfoBuilder::FixedSizeList(v, _, _, _) => v.finish(),
234 LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
235 }
236 }
237
238 fn write(&mut self, range: Range<usize>) {
240 match self {
241 LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
242 LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
243 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
244 }
245 LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
246 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
247 }
248 LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
249 Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
250 }
251 LevelInfoBuilder::Struct(children, ctx, nulls) => {
252 Self::write_struct(children, ctx, nulls.as_ref(), range)
253 }
254 }
255 }
256
257 fn write_list<O: OffsetSizeTrait>(
261 child: &mut LevelInfoBuilder,
262 ctx: &LevelContext,
263 offsets: &[O],
264 nulls: Option<&NullBuffer>,
265 range: Range<usize>,
266 ) {
267 let offsets = &offsets[range.start..range.end + 1];
268
269 let write_non_null_slice =
270 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
271 child.write(start_idx..end_idx);
272 child.visit_leaves(|leaf| {
273 let rep_levels = leaf.rep_levels.as_mut().unwrap();
274 let mut rev = rep_levels.iter_mut().rev();
275 let mut remaining = end_idx - start_idx;
276
277 loop {
278 let next = rev.next().unwrap();
279 if *next > ctx.rep_level {
280 continue;
282 }
283
284 remaining -= 1;
285 if remaining == 0 {
286 *next = ctx.rep_level - 1;
287 break;
288 }
289 }
290 })
291 };
292
293 let write_empty_slice = |child: &mut LevelInfoBuilder| {
294 child.visit_leaves(|leaf| {
295 let rep_levels = leaf.rep_levels.as_mut().unwrap();
296 rep_levels.push(ctx.rep_level - 1);
297 let def_levels = leaf.def_levels.as_mut().unwrap();
298 def_levels.push(ctx.def_level - 1);
299 })
300 };
301
302 let write_null_slice = |child: &mut LevelInfoBuilder| {
303 child.visit_leaves(|leaf| {
304 let rep_levels = leaf.rep_levels.as_mut().unwrap();
305 rep_levels.push(ctx.rep_level - 1);
306 let def_levels = leaf.def_levels.as_mut().unwrap();
307 def_levels.push(ctx.def_level - 2);
308 })
309 };
310
311 match nulls {
312 Some(nulls) => {
313 let null_offset = range.start;
314 for (idx, w) in offsets.windows(2).enumerate() {
316 let is_valid = nulls.is_valid(idx + null_offset);
317 let start_idx = w[0].as_usize();
318 let end_idx = w[1].as_usize();
319 if !is_valid {
320 write_null_slice(child)
321 } else if start_idx == end_idx {
322 write_empty_slice(child)
323 } else {
324 write_non_null_slice(child, start_idx, end_idx)
325 }
326 }
327 }
328 None => {
329 for w in offsets.windows(2) {
330 let start_idx = w[0].as_usize();
331 let end_idx = w[1].as_usize();
332 if start_idx == end_idx {
333 write_empty_slice(child)
334 } else {
335 write_non_null_slice(child, start_idx, end_idx)
336 }
337 }
338 }
339 }
340 }
341
342 fn write_struct(
344 children: &mut [LevelInfoBuilder],
345 ctx: &LevelContext,
346 nulls: Option<&NullBuffer>,
347 range: Range<usize>,
348 ) {
349 let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
350 for child in children {
351 child.visit_leaves(|info| {
352 let len = range.end - range.start;
353
354 let def_levels = info.def_levels.as_mut().unwrap();
355 def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
356
357 if let Some(rep_levels) = info.rep_levels.as_mut() {
358 rep_levels.extend(std::iter::repeat(ctx.rep_level).take(len));
359 }
360 })
361 }
362 };
363
364 let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
365 for child in children {
366 child.write(range.clone())
367 }
368 };
369
370 match nulls {
371 Some(validity) => {
372 let mut last_non_null_idx = None;
373 let mut last_null_idx = None;
374
375 for i in range.clone() {
377 match validity.is_valid(i) {
378 true => {
379 if let Some(last_idx) = last_null_idx.take() {
380 write_null(children, last_idx..i)
381 }
382 last_non_null_idx.get_or_insert(i);
383 }
384 false => {
385 if let Some(last_idx) = last_non_null_idx.take() {
386 write_non_null(children, last_idx..i)
387 }
388 last_null_idx.get_or_insert(i);
389 }
390 }
391 }
392
393 if let Some(last_idx) = last_null_idx.take() {
394 write_null(children, last_idx..range.end)
395 }
396
397 if let Some(last_idx) = last_non_null_idx.take() {
398 write_non_null(children, last_idx..range.end)
399 }
400 }
401 None => write_non_null(children, range),
402 }
403 }
404
405 fn write_fixed_size_list(
407 child: &mut LevelInfoBuilder,
408 ctx: &LevelContext,
409 fixed_size: usize,
410 nulls: Option<&NullBuffer>,
411 range: Range<usize>,
412 ) {
413 let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
414 let values_start = start_idx * fixed_size;
415 let values_end = end_idx * fixed_size;
416 child.write(values_start..values_end);
417
418 child.visit_leaves(|leaf| {
419 let rep_levels = leaf.rep_levels.as_mut().unwrap();
420
421 let row_indices = (0..fixed_size)
422 .rev()
423 .cycle()
424 .take(values_end - values_start);
425
426 rep_levels
428 .iter_mut()
429 .rev()
430 .filter(|&&mut r| r == ctx.rep_level)
432 .zip(row_indices)
433 .for_each(|(r, idx)| {
434 if idx == 0 {
435 *r = ctx.rep_level - 1;
436 }
437 });
438 })
439 };
440
441 let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
443 let len = end_idx - start_idx;
444 child.visit_leaves(|leaf| {
445 let rep_levels = leaf.rep_levels.as_mut().unwrap();
446 rep_levels.extend(std::iter::repeat(ctx.rep_level - 1).take(len));
447 let def_levels = leaf.def_levels.as_mut().unwrap();
448 def_levels.extend(std::iter::repeat(ctx.def_level - 1).take(len));
449 })
450 };
451
452 let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
453 if fixed_size > 0 {
454 write_non_null(child, start_idx, end_idx)
455 } else {
456 write_empty(child, start_idx, end_idx)
457 }
458 };
459
460 match nulls {
461 Some(nulls) => {
462 let mut start_idx = None;
463 for idx in range.clone() {
464 if nulls.is_valid(idx) {
465 start_idx.get_or_insert(idx);
467 } else {
468 if let Some(start) = start_idx.take() {
470 write_rows(child, start, idx);
471 }
472 child.visit_leaves(|leaf| {
474 let rep_levels = leaf.rep_levels.as_mut().unwrap();
475 rep_levels.push(ctx.rep_level - 1);
476 let def_levels = leaf.def_levels.as_mut().unwrap();
477 def_levels.push(ctx.def_level - 2);
478 })
479 }
480 }
481 if let Some(start) = start_idx.take() {
483 write_rows(child, start, range.end);
484 }
485 }
486 None => write_rows(child, range.start, range.end),
488 }
489 }
490
491 fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
493 let len = range.end - range.start;
494
495 match &mut info.def_levels {
496 Some(def_levels) => {
497 def_levels.reserve(len);
498 info.non_null_indices.reserve(len);
499
500 match info.array.logical_nulls() {
501 Some(nulls) => {
502 for i in range {
504 match nulls.is_valid(i) {
505 true => {
506 def_levels.push(info.max_def_level);
507 info.non_null_indices.push(i)
508 }
509 false => def_levels.push(info.max_def_level - 1),
510 }
511 }
512 }
513 None => {
514 let iter = std::iter::repeat(info.max_def_level).take(len);
515 def_levels.extend(iter);
516 info.non_null_indices.extend(range);
517 }
518 }
519 }
520 None => info.non_null_indices.extend(range),
521 }
522
523 if let Some(rep_levels) = &mut info.rep_levels {
524 rep_levels.extend(std::iter::repeat(info.max_rep_level).take(len))
525 }
526 }
527
528 fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
530 match self {
531 LevelInfoBuilder::Primitive(info) => visit(info),
532 LevelInfoBuilder::List(c, _, _, _)
533 | LevelInfoBuilder::LargeList(c, _, _, _)
534 | LevelInfoBuilder::FixedSizeList(c, _, _, _) => c.visit_leaves(visit),
535 LevelInfoBuilder::Struct(children, _, _) => {
536 for c in children {
537 c.visit_leaves(visit)
538 }
539 }
540 }
541 }
542}
543#[derive(Debug, Clone)]
546pub(crate) struct ArrayLevels {
547 def_levels: Option<Vec<i16>>,
551
552 rep_levels: Option<Vec<i16>>,
556
557 non_null_indices: Vec<usize>,
560
561 max_def_level: i16,
563
564 max_rep_level: i16,
566
567 array: ArrayRef,
569}
570
571impl PartialEq for ArrayLevels {
572 fn eq(&self, other: &Self) -> bool {
573 self.def_levels == other.def_levels
574 && self.rep_levels == other.rep_levels
575 && self.non_null_indices == other.non_null_indices
576 && self.max_def_level == other.max_def_level
577 && self.max_rep_level == other.max_rep_level
578 && self.array.as_ref() == other.array.as_ref()
579 }
580}
581impl Eq for ArrayLevels {}
582
583impl ArrayLevels {
584 fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
585 let max_rep_level = ctx.rep_level;
586 let max_def_level = match is_nullable {
587 true => ctx.def_level + 1,
588 false => ctx.def_level,
589 };
590
591 Self {
592 def_levels: (max_def_level != 0).then(Vec::new),
593 rep_levels: (max_rep_level != 0).then(Vec::new),
594 non_null_indices: vec![],
595 max_def_level,
596 max_rep_level,
597 array,
598 }
599 }
600
601 pub fn array(&self) -> &ArrayRef {
602 &self.array
603 }
604
605 pub fn def_levels(&self) -> Option<&[i16]> {
606 self.def_levels.as_deref()
607 }
608
609 pub fn rep_levels(&self) -> Option<&[i16]> {
610 self.rep_levels.as_deref()
611 }
612
613 pub fn non_null_indices(&self) -> &[usize] {
614 &self.non_null_indices
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621
622 use arrow_array::builder::*;
623 use arrow_array::types::Int32Type;
624 use arrow_array::*;
625 use arrow_buffer::{Buffer, ToByteSlice};
626 use arrow_cast::display::array_value_to_string;
627 use arrow_data::{ArrayData, ArrayDataBuilder};
628 use arrow_schema::{Fields, Schema};
629
630 #[test]
631 fn test_calculate_array_levels_twitter_example() {
632 let leaf_type = Field::new_list_field(DataType::Int32, false);
636 let inner_type = DataType::List(Arc::new(leaf_type));
637 let inner_field = Field::new("l2", inner_type.clone(), false);
638 let outer_type = DataType::List(Arc::new(inner_field));
639 let outer_field = Field::new("l1", outer_type.clone(), false);
640
641 let primitives = Int32Array::from_iter(0..10);
642
643 let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
645 let inner_list = ArrayDataBuilder::new(inner_type)
646 .len(4)
647 .add_buffer(offsets)
648 .add_child_data(primitives.to_data())
649 .build()
650 .unwrap();
651
652 let offsets = Buffer::from_iter([0_i32, 2, 4]);
653 let outer_list = ArrayDataBuilder::new(outer_type)
654 .len(2)
655 .add_buffer(offsets)
656 .add_child_data(inner_list)
657 .build()
658 .unwrap();
659 let outer_list = make_array(outer_list);
660
661 let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
662 assert_eq!(levels.len(), 1);
663
664 let expected = ArrayLevels {
665 def_levels: Some(vec![2; 10]),
666 rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
667 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
668 max_def_level: 2,
669 max_rep_level: 2,
670 array: Arc::new(primitives),
671 };
672 assert_eq!(&levels[0], &expected);
673 }
674
675 #[test]
676 fn test_calculate_one_level_1() {
677 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
679 let field = Field::new_list_field(DataType::Int32, false);
680
681 let levels = calculate_array_levels(&array, &field).unwrap();
682 assert_eq!(levels.len(), 1);
683
684 let expected_levels = ArrayLevels {
685 def_levels: None,
686 rep_levels: None,
687 non_null_indices: (0..10).collect(),
688 max_def_level: 0,
689 max_rep_level: 0,
690 array,
691 };
692 assert_eq!(&levels[0], &expected_levels);
693 }
694
695 #[test]
696 fn test_calculate_one_level_2() {
697 let array = Arc::new(Int32Array::from_iter([
699 Some(0),
700 None,
701 Some(0),
702 Some(0),
703 None,
704 ])) as ArrayRef;
705 let field = Field::new_list_field(DataType::Int32, true);
706
707 let levels = calculate_array_levels(&array, &field).unwrap();
708 assert_eq!(levels.len(), 1);
709
710 let expected_levels = ArrayLevels {
711 def_levels: Some(vec![1, 0, 1, 1, 0]),
712 rep_levels: None,
713 non_null_indices: vec![0, 2, 3],
714 max_def_level: 1,
715 max_rep_level: 0,
716 array,
717 };
718 assert_eq!(&levels[0], &expected_levels);
719 }
720
721 #[test]
722 fn test_calculate_array_levels_1() {
723 let leaf_field = Field::new_list_field(DataType::Int32, false);
724 let list_type = DataType::List(Arc::new(leaf_field));
725
726 let leaf_array = Int32Array::from_iter(0..5);
730 let offsets = Buffer::from_iter(0_i32..6);
732 let list = ArrayDataBuilder::new(list_type.clone())
733 .len(5)
734 .add_buffer(offsets)
735 .add_child_data(leaf_array.to_data())
736 .build()
737 .unwrap();
738 let list = make_array(list);
739
740 let list_field = Field::new("list", list_type.clone(), false);
741 let levels = calculate_array_levels(&list, &list_field).unwrap();
742 assert_eq!(levels.len(), 1);
743
744 let expected_levels = ArrayLevels {
745 def_levels: Some(vec![1; 5]),
746 rep_levels: Some(vec![0; 5]),
747 non_null_indices: (0..5).collect(),
748 max_def_level: 1,
749 max_rep_level: 1,
750 array: Arc::new(leaf_array),
751 };
752 assert_eq!(&levels[0], &expected_levels);
753
754 let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
763 let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
764 let list = ArrayDataBuilder::new(list_type.clone())
765 .len(5)
766 .add_buffer(offsets)
767 .add_child_data(leaf_array.to_data())
768 .null_bit_buffer(Some(Buffer::from([0b00011101])))
769 .build()
770 .unwrap();
771 let list = make_array(list);
772
773 let list_field = Field::new("list", list_type, true);
774 let levels = calculate_array_levels(&list, &list_field).unwrap();
775 assert_eq!(levels.len(), 1);
776
777 let expected_levels = ArrayLevels {
778 def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
779 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
780 non_null_indices: (0..11).collect(),
781 max_def_level: 2,
782 max_rep_level: 1,
783 array: Arc::new(leaf_array),
784 };
785 assert_eq!(&levels[0], &expected_levels);
786 }
787
788 #[test]
789 fn test_calculate_array_levels_2() {
790 let leaf = Int32Array::from_iter(0..11);
804 let leaf_field = Field::new("leaf", DataType::Int32, false);
805
806 let list_type = DataType::List(Arc::new(leaf_field));
807 let list = ArrayData::builder(list_type.clone())
808 .len(5)
809 .add_child_data(leaf.to_data())
810 .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
811 .build()
812 .unwrap();
813
814 let list = make_array(list);
815 let list_field = Arc::new(Field::new("list", list_type, true));
816
817 let struct_array =
818 StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
819 let array = Arc::new(struct_array) as ArrayRef;
820
821 let struct_field = Field::new("struct", array.data_type().clone(), true);
822
823 let levels = calculate_array_levels(&array, &struct_field).unwrap();
824 assert_eq!(levels.len(), 1);
825
826 let expected_levels = ArrayLevels {
827 def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
828 rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
829 non_null_indices: (4..11).collect(),
830 max_def_level: 3,
831 max_rep_level: 1,
832 array: Arc::new(leaf),
833 };
834
835 assert_eq!(&levels[0], &expected_levels);
836
837 let leaf = Int32Array::from_iter(100..122);
846 let leaf_field = Field::new("leaf", DataType::Int32, true);
847
848 let l1_type = DataType::List(Arc::new(leaf_field));
849 let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
850 let l1 = ArrayData::builder(l1_type.clone())
851 .len(11)
852 .add_child_data(leaf.to_data())
853 .add_buffer(offsets)
854 .build()
855 .unwrap();
856
857 let l1_field = Field::new("l1", l1_type, true);
858 let l2_type = DataType::List(Arc::new(l1_field));
859 let l2 = ArrayData::builder(l2_type)
860 .len(5)
861 .add_child_data(l1)
862 .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
863 .build()
864 .unwrap();
865
866 let l2 = make_array(l2);
867 let l2_field = Field::new("l2", l2.data_type().clone(), true);
868
869 let levels = calculate_array_levels(&l2, &l2_field).unwrap();
870 assert_eq!(levels.len(), 1);
871
872 let expected_levels = ArrayLevels {
873 def_levels: Some(vec![
874 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
875 ]),
876 rep_levels: Some(vec![
877 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
878 ]),
879 non_null_indices: (0..22).collect(),
880 max_def_level: 5,
881 max_rep_level: 2,
882 array: Arc::new(leaf),
883 };
884
885 assert_eq!(&levels[0], &expected_levels);
886 }
887
888 #[test]
889 fn test_calculate_array_levels_nested_list() {
890 let leaf_field = Field::new("leaf", DataType::Int32, false);
891 let list_type = DataType::List(Arc::new(leaf_field));
892
893 let leaf = Int32Array::from_iter([0; 4]);
901 let list = ArrayData::builder(list_type.clone())
902 .len(4)
903 .add_buffer(Buffer::from_iter(0_i32..5))
904 .add_child_data(leaf.to_data())
905 .build()
906 .unwrap();
907 let list = make_array(list);
908
909 let list_field = Field::new("list", list_type.clone(), false);
910 let levels = calculate_array_levels(&list, &list_field).unwrap();
911 assert_eq!(levels.len(), 1);
912
913 let expected_levels = ArrayLevels {
914 def_levels: Some(vec![1; 4]),
915 rep_levels: Some(vec![0; 4]),
916 non_null_indices: (0..4).collect(),
917 max_def_level: 1,
918 max_rep_level: 1,
919 array: Arc::new(leaf),
920 };
921 assert_eq!(&levels[0], &expected_levels);
922
923 let leaf = Int32Array::from_iter(0..8);
928 let list = ArrayData::builder(list_type.clone())
929 .len(4)
930 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
931 .null_bit_buffer(Some(Buffer::from([0b00001110])))
932 .add_child_data(leaf.to_data())
933 .build()
934 .unwrap();
935 let list = make_array(list);
936 let list_field = Arc::new(Field::new("list", list_type, true));
937
938 let struct_array = StructArray::from(vec![(list_field, list)]);
939 let array = Arc::new(struct_array) as ArrayRef;
940
941 let struct_field = Field::new("struct", array.data_type().clone(), true);
942 let levels = calculate_array_levels(&array, &struct_field).unwrap();
943 assert_eq!(levels.len(), 1);
944
945 let expected_levels = ArrayLevels {
946 def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
947 rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
948 non_null_indices: (0..7).collect(),
949 max_def_level: 3,
950 max_rep_level: 1,
951 array: Arc::new(leaf),
952 };
953 assert_eq!(&levels[0], &expected_levels);
954
955 let leaf = Int32Array::from_iter(201..216);
963 let leaf_field = Field::new("leaf", DataType::Int32, false);
964 let list_1_type = DataType::List(Arc::new(leaf_field));
965 let list_1 = ArrayData::builder(list_1_type.clone())
966 .len(7)
967 .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
968 .add_child_data(leaf.to_data())
969 .build()
970 .unwrap();
971
972 let list_1_field = Field::new("l1", list_1_type, true);
973 let list_2_type = DataType::List(Arc::new(list_1_field));
974 let list_2 = ArrayData::builder(list_2_type.clone())
975 .len(4)
976 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
977 .null_bit_buffer(Some(Buffer::from([0b00001110])))
978 .add_child_data(list_1)
979 .build()
980 .unwrap();
981
982 let list_2 = make_array(list_2);
983 let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
984
985 let struct_array =
986 StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
987 let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
988
989 let array = Arc::new(struct_array) as ArrayRef;
990 let levels = calculate_array_levels(&array, &struct_field).unwrap();
991 assert_eq!(levels.len(), 1);
992
993 let expected_levels = ArrayLevels {
994 def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
995 rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
996 non_null_indices: (0..15).collect(),
997 max_def_level: 5,
998 max_rep_level: 2,
999 array: Arc::new(leaf),
1000 };
1001 assert_eq!(&levels[0], &expected_levels);
1002 }
1003
1004 #[test]
1005 fn test_calculate_nested_struct_levels() {
1006 let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1016 let leaf = Arc::new(c) as ArrayRef;
1017 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1018 let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1019
1020 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1021 let a = StructArray::from((
1022 (vec![(b_field, Arc::new(b) as ArrayRef)]),
1023 Buffer::from([0b00101111]),
1024 ));
1025
1026 let a_field = Field::new("a", a.data_type().clone(), true);
1027 let a_array = Arc::new(a) as ArrayRef;
1028
1029 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1030 assert_eq!(levels.len(), 1);
1031
1032 let expected_levels = ArrayLevels {
1033 def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1034 rep_levels: None,
1035 non_null_indices: vec![0, 2, 5],
1036 max_def_level: 3,
1037 max_rep_level: 0,
1038 array: leaf,
1039 };
1040 assert_eq!(&levels[0], &expected_levels);
1041 }
1042
1043 #[test]
1044 fn list_single_column() {
1045 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1048 let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1049 let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1050 let a_list_data = ArrayData::builder(a_list_type.clone())
1051 .len(5)
1052 .add_buffer(a_value_offsets)
1053 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1054 .add_child_data(a_values.to_data())
1055 .build()
1056 .unwrap();
1057
1058 assert_eq!(a_list_data.null_count(), 1);
1059
1060 let a = ListArray::from(a_list_data);
1061
1062 let item_field = Field::new_list_field(a_list_type, true);
1063 let mut builder = levels(&item_field, a);
1064 builder.write(2..4);
1065 let levels = builder.finish();
1066
1067 assert_eq!(levels.len(), 1);
1068
1069 let list_level = &levels[0];
1070
1071 let expected_level = ArrayLevels {
1072 def_levels: Some(vec![0, 3, 3, 3]),
1073 rep_levels: Some(vec![0, 0, 1, 1]),
1074 non_null_indices: vec![3, 4, 5],
1075 max_def_level: 3,
1076 max_rep_level: 1,
1077 array: Arc::new(a_values),
1078 };
1079 assert_eq!(list_level, &expected_level);
1080 }
1081
1082 #[test]
1083 fn mixed_struct_list() {
1084 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1088 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1089 let struct_field_g = Arc::new(Field::new(
1090 "g",
1091 DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1092 false,
1093 ));
1094 let struct_field_e = Arc::new(Field::new(
1095 "e",
1096 DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1097 true,
1098 ));
1099 let schema = Schema::new(vec![
1100 Field::new("a", DataType::Int32, false),
1101 Field::new("b", DataType::Int32, true),
1102 Field::new(
1103 "c",
1104 DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1105 true, ),
1107 ]);
1108
1109 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1111 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1112 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1113 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1114
1115 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1116
1117 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1120
1121 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1123 .len(5)
1124 .add_buffer(g_value_offsets)
1125 .add_child_data(g_value.into_data())
1126 .build()
1127 .unwrap();
1128 let g = ListArray::from(g_list_data);
1129
1130 let e = StructArray::from(vec![
1131 (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1132 (struct_field_g, Arc::new(g) as ArrayRef),
1133 ]);
1134
1135 let c = StructArray::from(vec![
1136 (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1137 (struct_field_e, Arc::new(e) as ArrayRef),
1138 ]);
1139
1140 let batch = RecordBatch::try_new(
1142 Arc::new(schema),
1143 vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1144 )
1145 .unwrap();
1146
1147 let mut levels = vec![];
1150 batch
1151 .columns()
1152 .iter()
1153 .zip(batch.schema().fields())
1154 .for_each(|(array, field)| {
1155 let mut array_levels = calculate_array_levels(array, field).unwrap();
1156 levels.append(&mut array_levels);
1157 });
1158 assert_eq!(levels.len(), 5);
1159
1160 let list_level = &levels[0];
1162
1163 let expected_level = ArrayLevels {
1164 def_levels: None,
1165 rep_levels: None,
1166 non_null_indices: vec![0, 1, 2, 3, 4],
1167 max_def_level: 0,
1168 max_rep_level: 0,
1169 array: Arc::new(a),
1170 };
1171 assert_eq!(list_level, &expected_level);
1172
1173 let list_level = levels.get(1).unwrap();
1175
1176 let expected_level = ArrayLevels {
1177 def_levels: Some(vec![1, 0, 0, 1, 1]),
1178 rep_levels: None,
1179 non_null_indices: vec![0, 3, 4],
1180 max_def_level: 1,
1181 max_rep_level: 0,
1182 array: Arc::new(b),
1183 };
1184 assert_eq!(list_level, &expected_level);
1185
1186 let list_level = levels.get(2).unwrap();
1188
1189 let expected_level = ArrayLevels {
1190 def_levels: Some(vec![1, 1, 1, 2, 1]),
1191 rep_levels: None,
1192 non_null_indices: vec![3],
1193 max_def_level: 2,
1194 max_rep_level: 0,
1195 array: Arc::new(d),
1196 };
1197 assert_eq!(list_level, &expected_level);
1198
1199 let list_level = levels.get(3).unwrap();
1201
1202 let expected_level = ArrayLevels {
1203 def_levels: Some(vec![3, 2, 3, 2, 3]),
1204 rep_levels: None,
1205 non_null_indices: vec![0, 2, 4],
1206 max_def_level: 3,
1207 max_rep_level: 0,
1208 array: Arc::new(f),
1209 };
1210 assert_eq!(list_level, &expected_level);
1211 }
1212
1213 #[test]
1214 fn test_null_vs_nonnull_struct() {
1215 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1217 let schema = Schema::new(vec![Field::new(
1218 "some_nested_object",
1219 DataType::Struct(vec![offset_field.clone()].into()),
1220 false,
1221 )]);
1222
1223 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1225
1226 let some_nested_object =
1227 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1228
1229 let batch =
1231 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1232
1233 let struct_null_level =
1234 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1235
1236 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1239 let schema = Schema::new(vec![Field::new(
1240 "some_nested_object",
1241 DataType::Struct(vec![offset_field.clone()].into()),
1242 true,
1243 )]);
1244
1245 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1247
1248 let some_nested_object =
1249 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1250
1251 let batch =
1253 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1254
1255 let struct_non_null_level =
1256 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1257
1258 if struct_non_null_level == struct_null_level {
1260 panic!("Levels should not be equal, to reflect the difference in struct nullness");
1261 }
1262 }
1263
1264 #[test]
1265 fn test_map_array() {
1266 let json_content = r#"
1268 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1269 {"stocks":{"long": "$CCC", "short": null}}
1270 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1271 "#;
1272 let entries_struct_type = DataType::Struct(Fields::from(vec![
1273 Field::new("key", DataType::Utf8, false),
1274 Field::new("value", DataType::Utf8, true),
1275 ]));
1276 let stocks_field = Field::new(
1277 "stocks",
1278 DataType::Map(
1279 Arc::new(Field::new("entries", entries_struct_type, false)),
1280 false,
1281 ),
1282 false,
1284 );
1285 let schema = Arc::new(Schema::new(vec![stocks_field]));
1286 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1287 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1288
1289 let batch = reader.next().unwrap().unwrap();
1290
1291 let mut levels = vec![];
1293 batch
1294 .columns()
1295 .iter()
1296 .zip(batch.schema().fields())
1297 .for_each(|(array, field)| {
1298 let mut array_levels = calculate_array_levels(array, field).unwrap();
1299 levels.append(&mut array_levels);
1300 });
1301 assert_eq!(levels.len(), 2);
1302
1303 let map = batch.column(0).as_map();
1304
1305 let list_level = &levels[0];
1307
1308 let expected_level = ArrayLevels {
1309 def_levels: Some(vec![1; 7]),
1310 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1311 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1312 max_def_level: 1,
1313 max_rep_level: 1,
1314 array: map.keys().clone(),
1315 };
1316 assert_eq!(list_level, &expected_level);
1317
1318 let list_level = levels.get(1).unwrap();
1320
1321 let expected_level = ArrayLevels {
1322 def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1323 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1324 non_null_indices: vec![0, 1, 2, 4, 6],
1325 max_def_level: 2,
1326 max_rep_level: 1,
1327 array: map.values().clone(),
1328 };
1329 assert_eq!(list_level, &expected_level);
1330 }
1331
1332 #[test]
1333 fn test_list_of_struct() {
1334 let int_field = Field::new("a", DataType::Int32, true);
1336 let fields = Fields::from([Arc::new(int_field)]);
1337 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1338 let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1339
1340 let int_builder = Int32Builder::with_capacity(10);
1341 let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1342 let mut list_builder = ListBuilder::new(struct_builder);
1343
1344 let values = list_builder.values();
1348 values
1349 .field_builder::<Int32Builder>(0)
1350 .unwrap()
1351 .append_value(1);
1352 values.append(true);
1353 list_builder.append(true);
1354
1355 list_builder.append(true);
1357
1358 list_builder.append(false);
1360
1361 let values = list_builder.values();
1363 values
1364 .field_builder::<Int32Builder>(0)
1365 .unwrap()
1366 .append_null();
1367 values.append(false);
1368 values
1369 .field_builder::<Int32Builder>(0)
1370 .unwrap()
1371 .append_null();
1372 values.append(false);
1373 list_builder.append(true);
1374
1375 let values = list_builder.values();
1377 values
1378 .field_builder::<Int32Builder>(0)
1379 .unwrap()
1380 .append_null();
1381 values.append(true);
1382 list_builder.append(true);
1383
1384 let values = list_builder.values();
1386 values
1387 .field_builder::<Int32Builder>(0)
1388 .unwrap()
1389 .append_value(2);
1390 values.append(true);
1391 list_builder.append(true);
1392
1393 let array = Arc::new(list_builder.finish());
1394
1395 let values = array.values().as_struct().column(0).clone();
1396 let values_len = values.len();
1397 assert_eq!(values_len, 5);
1398
1399 let schema = Arc::new(Schema::new(vec![list_field]));
1400
1401 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1402
1403 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1404 let list_level = &levels[0];
1405
1406 let expected_level = ArrayLevels {
1407 def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1408 rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1409 non_null_indices: vec![0, 4],
1410 max_def_level: 4,
1411 max_rep_level: 1,
1412 array: values,
1413 };
1414
1415 assert_eq!(list_level, &expected_level);
1416 }
1417
1418 #[test]
1419 fn test_struct_mask_list() {
1420 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1422 Some(vec![Some(1), Some(2)]),
1423 Some(vec![None]),
1424 Some(vec![]),
1425 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1427 None, None,
1429 ]);
1430 let values = inner.values().clone();
1431
1432 assert_eq!(inner.values().len(), 7);
1434
1435 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1436 let array = Arc::new(inner) as ArrayRef;
1437 let nulls = Buffer::from([0b01010111]);
1438 let struct_a = StructArray::from((vec![(field, array)], nulls));
1439
1440 let field = Field::new("struct", struct_a.data_type().clone(), true);
1441 let array = Arc::new(struct_a) as ArrayRef;
1442 let levels = calculate_array_levels(&array, &field).unwrap();
1443
1444 assert_eq!(levels.len(), 1);
1445
1446 let expected_level = ArrayLevels {
1447 def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1448 rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1449 non_null_indices: vec![0, 1, 5, 6],
1450 max_def_level: 4,
1451 max_rep_level: 1,
1452 array: values,
1453 };
1454
1455 assert_eq!(&levels[0], &expected_level);
1456 }
1457
1458 #[test]
1459 fn test_list_mask_struct() {
1460 let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1464 Some(vec![None]), Some(vec![]), Some(vec![Some(3), None]),
1467 Some(vec![Some(4), Some(5), None, Some(6)]), None,
1469 None,
1470 ]);
1471 let a1_values = a1.values().clone();
1472 let a1 = Arc::new(a1) as ArrayRef;
1473
1474 let a2 = Arc::new(Int32Array::from_iter(vec![
1475 Some(1), Some(2), None,
1478 Some(4), Some(5),
1480 None,
1481 ])) as ArrayRef;
1482 let a2_values = a2.clone();
1483
1484 let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1485 let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1486
1487 let nulls = Buffer::from([0b00110111]);
1488 let struct_a = Arc::new(StructArray::from((
1489 vec![(field_a1, a1), (field_a2, a2)],
1490 nulls,
1491 ))) as ArrayRef;
1492
1493 let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1494 let nulls = Buffer::from([0b00111100]);
1495
1496 let list_type = DataType::List(Arc::new(Field::new(
1497 "struct",
1498 struct_a.data_type().clone(),
1499 true,
1500 )));
1501
1502 let data = ArrayDataBuilder::new(list_type.clone())
1503 .len(6)
1504 .null_bit_buffer(Some(nulls))
1505 .add_buffer(offsets)
1506 .add_child_data(struct_a.into_data())
1507 .build()
1508 .unwrap();
1509
1510 let list = make_array(data);
1511 let list_field = Field::new("col", list_type, true);
1512
1513 let expected = vec![
1514 r#""#.to_string(),
1515 r#""#.to_string(),
1516 r#"[]"#.to_string(),
1517 r#"[{list: [3, ], integers: }]"#.to_string(),
1518 r#"[, {list: , integers: 5}]"#.to_string(),
1519 r#"[]"#.to_string(),
1520 ];
1521
1522 let actual: Vec<_> = (0..6)
1523 .map(|x| array_value_to_string(&list, x).unwrap())
1524 .collect();
1525 assert_eq!(actual, expected);
1526
1527 let levels = calculate_array_levels(&list, &list_field).unwrap();
1528
1529 assert_eq!(levels.len(), 2);
1530
1531 let expected_level = ArrayLevels {
1532 def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1533 rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1534 non_null_indices: vec![1],
1535 max_def_level: 6,
1536 max_rep_level: 2,
1537 array: a1_values,
1538 };
1539
1540 assert_eq!(&levels[0], &expected_level);
1541
1542 let expected_level = ArrayLevels {
1543 def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1544 rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1545 non_null_indices: vec![4],
1546 max_def_level: 4,
1547 max_rep_level: 1,
1548 array: a2_values,
1549 };
1550
1551 assert_eq!(&levels[1], &expected_level);
1552 }
1553
1554 #[test]
1555 fn test_fixed_size_list() {
1556 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1558 builder.values().append_slice(&[1, 2]);
1559 builder.append(true);
1560 builder.values().append_slice(&[3, 4]);
1561 builder.append(false);
1562 builder.values().append_slice(&[5, 6]);
1563 builder.append(false);
1564 builder.values().append_slice(&[7, 8]);
1565 builder.append(true);
1566 builder.values().append_slice(&[9, 10]);
1567 builder.append(false);
1568 let a = builder.finish();
1569 let values = a.values().clone();
1570
1571 let item_field = Field::new_list_field(a.data_type().clone(), true);
1572 let mut builder = levels(&item_field, a);
1573 builder.write(1..4);
1574 let levels = builder.finish();
1575
1576 assert_eq!(levels.len(), 1);
1577
1578 let list_level = &levels[0];
1579
1580 let expected_level = ArrayLevels {
1581 def_levels: Some(vec![0, 0, 3, 3]),
1582 rep_levels: Some(vec![0, 0, 0, 1]),
1583 non_null_indices: vec![6, 7],
1584 max_def_level: 3,
1585 max_rep_level: 1,
1586 array: values,
1587 };
1588 assert_eq!(list_level, &expected_level);
1589 }
1590
1591 #[test]
1592 fn test_fixed_size_list_of_struct() {
1593 let field_a = Field::new("a", DataType::Int32, true);
1595 let field_b = Field::new("b", DataType::Int64, false);
1596 let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1597 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1598 let list_field = Field::new(
1599 "list",
1600 DataType::FixedSizeList(Arc::new(item_field), 2),
1601 true,
1602 );
1603
1604 let builder_a = Int32Builder::with_capacity(10);
1605 let builder_b = Int64Builder::with_capacity(10);
1606 let struct_builder =
1607 StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1608 let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1609
1610 let values = list_builder.values();
1619 values
1621 .field_builder::<Int32Builder>(0)
1622 .unwrap()
1623 .append_value(1);
1624 values
1625 .field_builder::<Int64Builder>(1)
1626 .unwrap()
1627 .append_value(2);
1628 values.append(true);
1629 values
1631 .field_builder::<Int32Builder>(0)
1632 .unwrap()
1633 .append_null();
1634 values
1635 .field_builder::<Int64Builder>(1)
1636 .unwrap()
1637 .append_value(0);
1638 values.append(false);
1639 list_builder.append(true);
1640
1641 let values = list_builder.values();
1643 values
1645 .field_builder::<Int32Builder>(0)
1646 .unwrap()
1647 .append_null();
1648 values
1649 .field_builder::<Int64Builder>(1)
1650 .unwrap()
1651 .append_value(0);
1652 values.append(false);
1653 values
1655 .field_builder::<Int32Builder>(0)
1656 .unwrap()
1657 .append_null();
1658 values
1659 .field_builder::<Int64Builder>(1)
1660 .unwrap()
1661 .append_value(0);
1662 values.append(false);
1663 list_builder.append(false);
1664
1665 let values = list_builder.values();
1667 values
1669 .field_builder::<Int32Builder>(0)
1670 .unwrap()
1671 .append_null();
1672 values
1673 .field_builder::<Int64Builder>(1)
1674 .unwrap()
1675 .append_value(0);
1676 values.append(false);
1677 values
1679 .field_builder::<Int32Builder>(0)
1680 .unwrap()
1681 .append_null();
1682 values
1683 .field_builder::<Int64Builder>(1)
1684 .unwrap()
1685 .append_value(0);
1686 values.append(false);
1687 list_builder.append(true);
1688
1689 let values = list_builder.values();
1691 values
1693 .field_builder::<Int32Builder>(0)
1694 .unwrap()
1695 .append_null();
1696 values
1697 .field_builder::<Int64Builder>(1)
1698 .unwrap()
1699 .append_value(3);
1700 values.append(true);
1701 values
1703 .field_builder::<Int32Builder>(0)
1704 .unwrap()
1705 .append_value(2);
1706 values
1707 .field_builder::<Int64Builder>(1)
1708 .unwrap()
1709 .append_value(4);
1710 values.append(true);
1711 list_builder.append(true);
1712
1713 let array = Arc::new(list_builder.finish());
1714
1715 assert_eq!(array.values().len(), 8);
1716 assert_eq!(array.len(), 4);
1717
1718 let struct_values = array.values().as_struct();
1719 let values_a = struct_values.column(0).clone();
1720 let values_b = struct_values.column(1).clone();
1721
1722 let schema = Arc::new(Schema::new(vec![list_field]));
1723 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1724
1725 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1726 let a_levels = &levels[0];
1727 let b_levels = &levels[1];
1728
1729 let expected_a = ArrayLevels {
1731 def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
1732 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1733 non_null_indices: vec![0, 7],
1734 max_def_level: 4,
1735 max_rep_level: 1,
1736 array: values_a,
1737 };
1738 let expected_b = ArrayLevels {
1740 def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
1741 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1742 non_null_indices: vec![0, 6, 7],
1743 max_def_level: 3,
1744 max_rep_level: 1,
1745 array: values_b,
1746 };
1747
1748 assert_eq!(a_levels, &expected_a);
1749 assert_eq!(b_levels, &expected_b);
1750 }
1751
1752 #[test]
1753 fn test_fixed_size_list_empty() {
1754 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
1755 builder.append(true);
1756 builder.append(false);
1757 builder.append(true);
1758 let array = builder.finish();
1759 let values = array.values().clone();
1760
1761 let item_field = Field::new_list_field(array.data_type().clone(), true);
1762 let mut builder = levels(&item_field, array);
1763 builder.write(0..3);
1764 let levels = builder.finish();
1765
1766 assert_eq!(levels.len(), 1);
1767
1768 let list_level = &levels[0];
1769
1770 let expected_level = ArrayLevels {
1771 def_levels: Some(vec![1, 0, 1]),
1772 rep_levels: Some(vec![0, 0, 0]),
1773 non_null_indices: vec![],
1774 max_def_level: 3,
1775 max_rep_level: 1,
1776 array: values,
1777 };
1778 assert_eq!(list_level, &expected_level);
1779 }
1780
1781 #[test]
1782 fn test_fixed_size_list_of_var_lists() {
1783 let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
1785 builder.values().append_value([Some(1), None, Some(3)]);
1786 builder.values().append_null();
1787 builder.append(true);
1788 builder.values().append_value([Some(4)]);
1789 builder.values().append_value([]);
1790 builder.append(true);
1791 builder.values().append_value([Some(5), Some(6)]);
1792 builder.values().append_value([None, None]);
1793 builder.append(true);
1794 builder.values().append_null();
1795 builder.values().append_null();
1796 builder.append(false);
1797 let a = builder.finish();
1798 let values = a.values().as_list::<i32>().values().clone();
1799
1800 let item_field = Field::new_list_field(a.data_type().clone(), true);
1801 let mut builder = levels(&item_field, a);
1802 builder.write(0..4);
1803 let levels = builder.finish();
1804
1805 let expected_level = ArrayLevels {
1806 def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
1807 rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
1808 non_null_indices: vec![0, 2, 3, 4, 5],
1809 max_def_level: 5,
1810 max_rep_level: 2,
1811 array: values,
1812 };
1813
1814 assert_eq!(levels[0], expected_level);
1815 }
1816
1817 #[test]
1818 fn test_null_dictionary_values() {
1819 let values = Int32Array::new(
1820 vec![1, 2, 3, 4].into(),
1821 Some(NullBuffer::from(vec![true, false, true, true])),
1822 );
1823 let keys = Int32Array::new(
1824 vec![1, 54, 2, 0].into(),
1825 Some(NullBuffer::from(vec![true, false, true, true])),
1826 );
1827 let dict = DictionaryArray::new(keys, Arc::new(values));
1829
1830 let item_field = Field::new_list_field(dict.data_type().clone(), true);
1831
1832 let mut builder = levels(&item_field, dict.clone());
1833 builder.write(0..4);
1834 let levels = builder.finish();
1835 let expected_level = ArrayLevels {
1836 def_levels: Some(vec![0, 0, 1, 1]),
1837 rep_levels: None,
1838 non_null_indices: vec![2, 3],
1839 max_def_level: 1,
1840 max_rep_level: 0,
1841 array: Arc::new(dict),
1842 };
1843 assert_eq!(levels[0], expected_level);
1844 }
1845
1846 #[test]
1847 fn mismatched_types() {
1848 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
1849 let field = Field::new_list_field(DataType::Float64, false);
1850
1851 let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
1852 .unwrap_err()
1853 .to_string();
1854
1855 assert_eq!(
1856 err,
1857 "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
1858 );
1859 }
1860
1861 fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
1862 let v = Arc::new(array) as ArrayRef;
1863 LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
1864 }
1865}