1use crate::errors::{ParquetError, Result};
44use arrow_array::cast::AsArray;
45use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
46use arrow_buffer::bit_iterator::BitIndexIterator;
47use arrow_buffer::{NullBuffer, OffsetBuffer};
48use arrow_schema::{DataType, Field};
49use std::ops::Range;
50use std::sync::Arc;
51
52pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
55 let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
56 builder.write(0..array.len());
57 Ok(builder.finish())
58}
59
60fn is_leaf(data_type: &DataType) -> bool {
63 matches!(
64 data_type,
65 DataType::Null
66 | DataType::Boolean
67 | DataType::Int8
68 | DataType::Int16
69 | DataType::Int32
70 | DataType::Int64
71 | DataType::UInt8
72 | DataType::UInt16
73 | DataType::UInt32
74 | DataType::UInt64
75 | DataType::Float16
76 | DataType::Float32
77 | DataType::Float64
78 | DataType::Utf8
79 | DataType::Utf8View
80 | DataType::LargeUtf8
81 | DataType::Timestamp(_, _)
82 | DataType::Date32
83 | DataType::Date64
84 | DataType::Time32(_)
85 | DataType::Time64(_)
86 | DataType::Duration(_)
87 | DataType::Interval(_)
88 | DataType::Binary
89 | DataType::LargeBinary
90 | DataType::BinaryView
91 | DataType::Decimal32(_, _)
92 | DataType::Decimal64(_, _)
93 | DataType::Decimal128(_, _)
94 | DataType::Decimal256(_, _)
95 | DataType::FixedSizeBinary(_)
96 )
97}
98
99#[derive(Debug, Default, Clone, Copy)]
101struct LevelContext {
102 rep_level: i16,
104 def_level: i16,
106}
107
108#[derive(Debug)]
110enum LevelInfoBuilder {
111 Primitive(ArrayLevels),
113 List(
115 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i32>, Option<NullBuffer>, ),
120 LargeList(
122 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i64>, Option<NullBuffer>, ),
127 FixedSizeList(
129 Box<LevelInfoBuilder>, LevelContext, usize, Option<NullBuffer>, ),
134 Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
136}
137
138impl LevelInfoBuilder {
139 fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
141 if !Self::types_compatible(field.data_type(), array.data_type()) {
142 return Err(arrow_err!(format!(
143 "Incompatible type. Field '{}' has type {}, array has type {}",
144 field.name(),
145 field.data_type(),
146 array.data_type(),
147 )));
148 }
149
150 let is_nullable = field.is_nullable();
151
152 match array.data_type() {
153 d if is_leaf(d) => {
154 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
155 Ok(Self::Primitive(levels))
156 }
157 DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
158 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
159 Ok(Self::Primitive(levels))
160 }
161 DataType::Struct(children) => {
162 let array = array.as_struct();
163 let def_level = match is_nullable {
164 true => parent_ctx.def_level + 1,
165 false => parent_ctx.def_level,
166 };
167
168 let ctx = LevelContext {
169 rep_level: parent_ctx.rep_level,
170 def_level,
171 };
172
173 let children = children
174 .iter()
175 .zip(array.columns())
176 .map(|(f, a)| Self::try_new(f, ctx, a))
177 .collect::<Result<_>>()?;
178
179 Ok(Self::Struct(children, ctx, array.nulls().cloned()))
180 }
181 DataType::List(child)
182 | DataType::LargeList(child)
183 | DataType::Map(child, _)
184 | DataType::FixedSizeList(child, _) => {
185 let def_level = match is_nullable {
186 true => parent_ctx.def_level + 2,
187 false => parent_ctx.def_level + 1,
188 };
189
190 let ctx = LevelContext {
191 rep_level: parent_ctx.rep_level + 1,
192 def_level,
193 };
194
195 Ok(match field.data_type() {
196 DataType::List(_) => {
197 let list = array.as_list();
198 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
199 let offsets = list.offsets().clone();
200 Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
201 }
202 DataType::LargeList(_) => {
203 let list = array.as_list();
204 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
205 let offsets = list.offsets().clone();
206 let nulls = list.nulls().cloned();
207 Self::LargeList(Box::new(child), ctx, offsets, nulls)
208 }
209 DataType::Map(_, _) => {
210 let map = array.as_map();
211 let entries = Arc::new(map.entries().clone()) as ArrayRef;
212 let child = Self::try_new(child.as_ref(), ctx, &entries)?;
213 let offsets = map.offsets().clone();
214 Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
215 }
216 DataType::FixedSizeList(_, size) => {
217 let list = array.as_fixed_size_list();
218 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
219 let nulls = list.nulls().cloned();
220 Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
221 }
222 _ => unreachable!(),
223 })
224 }
225 d => Err(nyi_err!("Datatype {} is not yet supported", d)),
226 }
227 }
228
229 fn finish(self) -> Vec<ArrayLevels> {
232 match self {
233 LevelInfoBuilder::Primitive(v) => vec![v],
234 LevelInfoBuilder::List(v, _, _, _)
235 | LevelInfoBuilder::LargeList(v, _, _, _)
236 | LevelInfoBuilder::FixedSizeList(v, _, _, _) => v.finish(),
237 LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
238 }
239 }
240
241 fn write(&mut self, range: Range<usize>) {
243 match self {
244 LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
245 LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
246 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
247 }
248 LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
249 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
250 }
251 LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
252 Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
253 }
254 LevelInfoBuilder::Struct(children, ctx, nulls) => {
255 Self::write_struct(children, ctx, nulls.as_ref(), range)
256 }
257 }
258 }
259
260 fn write_list<O: OffsetSizeTrait>(
264 child: &mut LevelInfoBuilder,
265 ctx: &LevelContext,
266 offsets: &[O],
267 nulls: Option<&NullBuffer>,
268 range: Range<usize>,
269 ) {
270 let offsets = &offsets[range.start..range.end + 1];
271
272 let write_non_null_slice =
273 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
274 child.write(start_idx..end_idx);
275 child.visit_leaves(|leaf| {
276 let rep_levels = leaf.rep_levels.as_mut().unwrap();
277 let mut rev = rep_levels.iter_mut().rev();
278 let mut remaining = end_idx - start_idx;
279
280 loop {
281 let next = rev.next().unwrap();
282 if *next > ctx.rep_level {
283 continue;
285 }
286
287 remaining -= 1;
288 if remaining == 0 {
289 *next = ctx.rep_level - 1;
290 break;
291 }
292 }
293 })
294 };
295
296 let write_empty_slice = |child: &mut LevelInfoBuilder| {
297 child.visit_leaves(|leaf| {
298 let rep_levels = leaf.rep_levels.as_mut().unwrap();
299 rep_levels.push(ctx.rep_level - 1);
300 let def_levels = leaf.def_levels.as_mut().unwrap();
301 def_levels.push(ctx.def_level - 1);
302 })
303 };
304
305 let write_null_slice = |child: &mut LevelInfoBuilder| {
306 child.visit_leaves(|leaf| {
307 let rep_levels = leaf.rep_levels.as_mut().unwrap();
308 rep_levels.push(ctx.rep_level - 1);
309 let def_levels = leaf.def_levels.as_mut().unwrap();
310 def_levels.push(ctx.def_level - 2);
311 })
312 };
313
314 match nulls {
315 Some(nulls) => {
316 let null_offset = range.start;
317 for (idx, w) in offsets.windows(2).enumerate() {
319 let is_valid = nulls.is_valid(idx + null_offset);
320 let start_idx = w[0].as_usize();
321 let end_idx = w[1].as_usize();
322 if !is_valid {
323 write_null_slice(child)
324 } else if start_idx == end_idx {
325 write_empty_slice(child)
326 } else {
327 write_non_null_slice(child, start_idx, end_idx)
328 }
329 }
330 }
331 None => {
332 for w in offsets.windows(2) {
333 let start_idx = w[0].as_usize();
334 let end_idx = w[1].as_usize();
335 if start_idx == end_idx {
336 write_empty_slice(child)
337 } else {
338 write_non_null_slice(child, start_idx, end_idx)
339 }
340 }
341 }
342 }
343 }
344
345 fn write_struct(
347 children: &mut [LevelInfoBuilder],
348 ctx: &LevelContext,
349 nulls: Option<&NullBuffer>,
350 range: Range<usize>,
351 ) {
352 let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
353 for child in children {
354 child.visit_leaves(|info| {
355 let len = range.end - range.start;
356
357 let def_levels = info.def_levels.as_mut().unwrap();
358 def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
359
360 if let Some(rep_levels) = info.rep_levels.as_mut() {
361 rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
362 }
363 })
364 }
365 };
366
367 let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
368 for child in children {
369 child.write(range.clone())
370 }
371 };
372
373 match nulls {
374 Some(validity) => {
375 let mut last_non_null_idx = None;
376 let mut last_null_idx = None;
377
378 for i in range.clone() {
380 match validity.is_valid(i) {
381 true => {
382 if let Some(last_idx) = last_null_idx.take() {
383 write_null(children, last_idx..i)
384 }
385 last_non_null_idx.get_or_insert(i);
386 }
387 false => {
388 if let Some(last_idx) = last_non_null_idx.take() {
389 write_non_null(children, last_idx..i)
390 }
391 last_null_idx.get_or_insert(i);
392 }
393 }
394 }
395
396 if let Some(last_idx) = last_null_idx.take() {
397 write_null(children, last_idx..range.end)
398 }
399
400 if let Some(last_idx) = last_non_null_idx.take() {
401 write_non_null(children, last_idx..range.end)
402 }
403 }
404 None => write_non_null(children, range),
405 }
406 }
407
408 fn write_fixed_size_list(
410 child: &mut LevelInfoBuilder,
411 ctx: &LevelContext,
412 fixed_size: usize,
413 nulls: Option<&NullBuffer>,
414 range: Range<usize>,
415 ) {
416 let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
417 let values_start = start_idx * fixed_size;
418 let values_end = end_idx * fixed_size;
419 child.write(values_start..values_end);
420
421 child.visit_leaves(|leaf| {
422 let rep_levels = leaf.rep_levels.as_mut().unwrap();
423
424 let row_indices = (0..fixed_size)
425 .rev()
426 .cycle()
427 .take(values_end - values_start);
428
429 rep_levels
431 .iter_mut()
432 .rev()
433 .filter(|&&mut r| r == ctx.rep_level)
435 .zip(row_indices)
436 .for_each(|(r, idx)| {
437 if idx == 0 {
438 *r = ctx.rep_level - 1;
439 }
440 });
441 })
442 };
443
444 let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
446 let len = end_idx - start_idx;
447 child.visit_leaves(|leaf| {
448 let rep_levels = leaf.rep_levels.as_mut().unwrap();
449 rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
450 let def_levels = leaf.def_levels.as_mut().unwrap();
451 def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
452 })
453 };
454
455 let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
456 if fixed_size > 0 {
457 write_non_null(child, start_idx, end_idx)
458 } else {
459 write_empty(child, start_idx, end_idx)
460 }
461 };
462
463 match nulls {
464 Some(nulls) => {
465 let mut start_idx = None;
466 for idx in range.clone() {
467 if nulls.is_valid(idx) {
468 start_idx.get_or_insert(idx);
470 } else {
471 if let Some(start) = start_idx.take() {
473 write_rows(child, start, idx);
474 }
475 child.visit_leaves(|leaf| {
477 let rep_levels = leaf.rep_levels.as_mut().unwrap();
478 rep_levels.push(ctx.rep_level - 1);
479 let def_levels = leaf.def_levels.as_mut().unwrap();
480 def_levels.push(ctx.def_level - 2);
481 })
482 }
483 }
484 if let Some(start) = start_idx.take() {
486 write_rows(child, start, range.end);
487 }
488 }
489 None => write_rows(child, range.start, range.end),
491 }
492 }
493
494 fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
496 let len = range.end - range.start;
497
498 match &mut info.def_levels {
499 Some(def_levels) => {
500 def_levels.reserve(len);
501 info.non_null_indices.reserve(len);
502
503 match &info.logical_nulls {
504 Some(nulls) => {
505 assert!(range.end <= nulls.len());
506 let nulls = nulls.inner();
507 def_levels.extend(range.clone().map(|i| {
508 let valid = unsafe { nulls.value_unchecked(i) };
510 info.max_def_level - (!valid as i16)
511 }));
512 info.non_null_indices.extend(
513 BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
514 .map(|i| i + range.start),
515 );
516 }
517 None => {
518 let iter = std::iter::repeat_n(info.max_def_level, len);
519 def_levels.extend(iter);
520 info.non_null_indices.extend(range);
521 }
522 }
523 }
524 None => info.non_null_indices.extend(range),
525 }
526
527 if let Some(rep_levels) = &mut info.rep_levels {
528 rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
529 }
530 }
531
532 fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
534 match self {
535 LevelInfoBuilder::Primitive(info) => visit(info),
536 LevelInfoBuilder::List(c, _, _, _)
537 | LevelInfoBuilder::LargeList(c, _, _, _)
538 | LevelInfoBuilder::FixedSizeList(c, _, _, _) => c.visit_leaves(visit),
539 LevelInfoBuilder::Struct(children, _, _) => {
540 for c in children {
541 c.visit_leaves(visit)
542 }
543 }
544 }
545 }
546
547 fn types_compatible(a: &DataType, b: &DataType) -> bool {
553 if a == b {
555 return true;
556 }
557
558 let (a, b) = match (a, b) {
560 (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
561 (va.as_ref(), vb.as_ref())
562 }
563 (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
564 (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
565 _ => (a, b),
566 };
567
568 if a == b {
571 return true;
572 }
573
574 match a {
577 DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
579 DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
580 DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
581
582 DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
584 DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
585 DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
586
587 _ => false,
589 }
590 }
591}
592
593#[derive(Debug, Clone)]
596pub(crate) struct ArrayLevels {
597 def_levels: Option<Vec<i16>>,
601
602 rep_levels: Option<Vec<i16>>,
606
607 non_null_indices: Vec<usize>,
610
611 max_def_level: i16,
613
614 max_rep_level: i16,
616
617 array: ArrayRef,
619
620 logical_nulls: Option<NullBuffer>,
622}
623
624impl PartialEq for ArrayLevels {
625 fn eq(&self, other: &Self) -> bool {
626 self.def_levels == other.def_levels
627 && self.rep_levels == other.rep_levels
628 && self.non_null_indices == other.non_null_indices
629 && self.max_def_level == other.max_def_level
630 && self.max_rep_level == other.max_rep_level
631 && self.array.as_ref() == other.array.as_ref()
632 && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
633 }
634}
635impl Eq for ArrayLevels {}
636
637impl ArrayLevels {
638 fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
639 let max_rep_level = ctx.rep_level;
640 let max_def_level = match is_nullable {
641 true => ctx.def_level + 1,
642 false => ctx.def_level,
643 };
644
645 let logical_nulls = array.logical_nulls();
646
647 Self {
648 def_levels: (max_def_level != 0).then(Vec::new),
649 rep_levels: (max_rep_level != 0).then(Vec::new),
650 non_null_indices: vec![],
651 max_def_level,
652 max_rep_level,
653 array,
654 logical_nulls,
655 }
656 }
657
658 pub fn array(&self) -> &ArrayRef {
659 &self.array
660 }
661
662 pub fn def_levels(&self) -> Option<&[i16]> {
663 self.def_levels.as_deref()
664 }
665
666 pub fn rep_levels(&self) -> Option<&[i16]> {
667 self.rep_levels.as_deref()
668 }
669
670 pub fn non_null_indices(&self) -> &[usize] {
671 &self.non_null_indices
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678
679 use arrow_array::builder::*;
680 use arrow_array::types::Int32Type;
681 use arrow_array::*;
682 use arrow_buffer::{Buffer, ToByteSlice};
683 use arrow_cast::display::array_value_to_string;
684 use arrow_data::{ArrayData, ArrayDataBuilder};
685 use arrow_schema::{Fields, Schema};
686
687 #[test]
688 fn test_calculate_array_levels_twitter_example() {
689 let leaf_type = Field::new_list_field(DataType::Int32, false);
693 let inner_type = DataType::List(Arc::new(leaf_type));
694 let inner_field = Field::new("l2", inner_type.clone(), false);
695 let outer_type = DataType::List(Arc::new(inner_field));
696 let outer_field = Field::new("l1", outer_type.clone(), false);
697
698 let primitives = Int32Array::from_iter(0..10);
699
700 let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
702 let inner_list = ArrayDataBuilder::new(inner_type)
703 .len(4)
704 .add_buffer(offsets)
705 .add_child_data(primitives.to_data())
706 .build()
707 .unwrap();
708
709 let offsets = Buffer::from_iter([0_i32, 2, 4]);
710 let outer_list = ArrayDataBuilder::new(outer_type)
711 .len(2)
712 .add_buffer(offsets)
713 .add_child_data(inner_list)
714 .build()
715 .unwrap();
716 let outer_list = make_array(outer_list);
717
718 let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
719 assert_eq!(levels.len(), 1);
720
721 let expected = ArrayLevels {
722 def_levels: Some(vec![2; 10]),
723 rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
724 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
725 max_def_level: 2,
726 max_rep_level: 2,
727 array: Arc::new(primitives),
728 logical_nulls: None,
729 };
730 assert_eq!(&levels[0], &expected);
731 }
732
733 #[test]
734 fn test_calculate_one_level_1() {
735 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
737 let field = Field::new_list_field(DataType::Int32, false);
738
739 let levels = calculate_array_levels(&array, &field).unwrap();
740 assert_eq!(levels.len(), 1);
741
742 let expected_levels = ArrayLevels {
743 def_levels: None,
744 rep_levels: None,
745 non_null_indices: (0..10).collect(),
746 max_def_level: 0,
747 max_rep_level: 0,
748 array,
749 logical_nulls: None,
750 };
751 assert_eq!(&levels[0], &expected_levels);
752 }
753
754 #[test]
755 fn test_calculate_one_level_2() {
756 let array = Arc::new(Int32Array::from_iter([
758 Some(0),
759 None,
760 Some(0),
761 Some(0),
762 None,
763 ])) as ArrayRef;
764 let field = Field::new_list_field(DataType::Int32, true);
765
766 let levels = calculate_array_levels(&array, &field).unwrap();
767 assert_eq!(levels.len(), 1);
768
769 let logical_nulls = array.logical_nulls();
770 let expected_levels = ArrayLevels {
771 def_levels: Some(vec![1, 0, 1, 1, 0]),
772 rep_levels: None,
773 non_null_indices: vec![0, 2, 3],
774 max_def_level: 1,
775 max_rep_level: 0,
776 array,
777 logical_nulls,
778 };
779 assert_eq!(&levels[0], &expected_levels);
780 }
781
782 #[test]
783 fn test_calculate_array_levels_1() {
784 let leaf_field = Field::new_list_field(DataType::Int32, false);
785 let list_type = DataType::List(Arc::new(leaf_field));
786
787 let leaf_array = Int32Array::from_iter(0..5);
791 let offsets = Buffer::from_iter(0_i32..6);
793 let list = ArrayDataBuilder::new(list_type.clone())
794 .len(5)
795 .add_buffer(offsets)
796 .add_child_data(leaf_array.to_data())
797 .build()
798 .unwrap();
799 let list = make_array(list);
800
801 let list_field = Field::new("list", list_type.clone(), false);
802 let levels = calculate_array_levels(&list, &list_field).unwrap();
803 assert_eq!(levels.len(), 1);
804
805 let expected_levels = ArrayLevels {
806 def_levels: Some(vec![1; 5]),
807 rep_levels: Some(vec![0; 5]),
808 non_null_indices: (0..5).collect(),
809 max_def_level: 1,
810 max_rep_level: 1,
811 array: Arc::new(leaf_array),
812 logical_nulls: None,
813 };
814 assert_eq!(&levels[0], &expected_levels);
815
816 let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
825 let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
826 let list = ArrayDataBuilder::new(list_type.clone())
827 .len(5)
828 .add_buffer(offsets)
829 .add_child_data(leaf_array.to_data())
830 .null_bit_buffer(Some(Buffer::from([0b00011101])))
831 .build()
832 .unwrap();
833 let list = make_array(list);
834
835 let list_field = Field::new("list", list_type, true);
836 let levels = calculate_array_levels(&list, &list_field).unwrap();
837 assert_eq!(levels.len(), 1);
838
839 let expected_levels = ArrayLevels {
840 def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
841 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
842 non_null_indices: (0..11).collect(),
843 max_def_level: 2,
844 max_rep_level: 1,
845 array: Arc::new(leaf_array),
846 logical_nulls: None,
847 };
848 assert_eq!(&levels[0], &expected_levels);
849 }
850
851 #[test]
852 fn test_calculate_array_levels_2() {
853 let leaf = Int32Array::from_iter(0..11);
867 let leaf_field = Field::new("leaf", DataType::Int32, false);
868
869 let list_type = DataType::List(Arc::new(leaf_field));
870 let list = ArrayData::builder(list_type.clone())
871 .len(5)
872 .add_child_data(leaf.to_data())
873 .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
874 .build()
875 .unwrap();
876
877 let list = make_array(list);
878 let list_field = Arc::new(Field::new("list", list_type, true));
879
880 let struct_array =
881 StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
882 let array = Arc::new(struct_array) as ArrayRef;
883
884 let struct_field = Field::new("struct", array.data_type().clone(), true);
885
886 let levels = calculate_array_levels(&array, &struct_field).unwrap();
887 assert_eq!(levels.len(), 1);
888
889 let expected_levels = ArrayLevels {
890 def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
891 rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
892 non_null_indices: (4..11).collect(),
893 max_def_level: 3,
894 max_rep_level: 1,
895 array: Arc::new(leaf),
896 logical_nulls: None,
897 };
898
899 assert_eq!(&levels[0], &expected_levels);
900
901 let leaf = Int32Array::from_iter(100..122);
910 let leaf_field = Field::new("leaf", DataType::Int32, true);
911
912 let l1_type = DataType::List(Arc::new(leaf_field));
913 let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
914 let l1 = ArrayData::builder(l1_type.clone())
915 .len(11)
916 .add_child_data(leaf.to_data())
917 .add_buffer(offsets)
918 .build()
919 .unwrap();
920
921 let l1_field = Field::new("l1", l1_type, true);
922 let l2_type = DataType::List(Arc::new(l1_field));
923 let l2 = ArrayData::builder(l2_type)
924 .len(5)
925 .add_child_data(l1)
926 .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
927 .build()
928 .unwrap();
929
930 let l2 = make_array(l2);
931 let l2_field = Field::new("l2", l2.data_type().clone(), true);
932
933 let levels = calculate_array_levels(&l2, &l2_field).unwrap();
934 assert_eq!(levels.len(), 1);
935
936 let expected_levels = ArrayLevels {
937 def_levels: Some(vec![
938 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
939 ]),
940 rep_levels: Some(vec![
941 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
942 ]),
943 non_null_indices: (0..22).collect(),
944 max_def_level: 5,
945 max_rep_level: 2,
946 array: Arc::new(leaf),
947 logical_nulls: None,
948 };
949
950 assert_eq!(&levels[0], &expected_levels);
951 }
952
953 #[test]
954 fn test_calculate_array_levels_nested_list() {
955 let leaf_field = Field::new("leaf", DataType::Int32, false);
956 let list_type = DataType::List(Arc::new(leaf_field));
957
958 let leaf = Int32Array::from_iter([0; 4]);
966 let list = ArrayData::builder(list_type.clone())
967 .len(4)
968 .add_buffer(Buffer::from_iter(0_i32..5))
969 .add_child_data(leaf.to_data())
970 .build()
971 .unwrap();
972 let list = make_array(list);
973
974 let list_field = Field::new("list", list_type.clone(), false);
975 let levels = calculate_array_levels(&list, &list_field).unwrap();
976 assert_eq!(levels.len(), 1);
977
978 let expected_levels = ArrayLevels {
979 def_levels: Some(vec![1; 4]),
980 rep_levels: Some(vec![0; 4]),
981 non_null_indices: (0..4).collect(),
982 max_def_level: 1,
983 max_rep_level: 1,
984 array: Arc::new(leaf),
985 logical_nulls: None,
986 };
987 assert_eq!(&levels[0], &expected_levels);
988
989 let leaf = Int32Array::from_iter(0..8);
994 let list = ArrayData::builder(list_type.clone())
995 .len(4)
996 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
997 .null_bit_buffer(Some(Buffer::from([0b00001110])))
998 .add_child_data(leaf.to_data())
999 .build()
1000 .unwrap();
1001 let list = make_array(list);
1002 let list_field = Arc::new(Field::new("list", list_type, true));
1003
1004 let struct_array = StructArray::from(vec![(list_field, list)]);
1005 let array = Arc::new(struct_array) as ArrayRef;
1006
1007 let struct_field = Field::new("struct", array.data_type().clone(), true);
1008 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1009 assert_eq!(levels.len(), 1);
1010
1011 let expected_levels = ArrayLevels {
1012 def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1013 rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1014 non_null_indices: (0..7).collect(),
1015 max_def_level: 3,
1016 max_rep_level: 1,
1017 array: Arc::new(leaf),
1018 logical_nulls: None,
1019 };
1020 assert_eq!(&levels[0], &expected_levels);
1021
1022 let leaf = Int32Array::from_iter(201..216);
1030 let leaf_field = Field::new("leaf", DataType::Int32, false);
1031 let list_1_type = DataType::List(Arc::new(leaf_field));
1032 let list_1 = ArrayData::builder(list_1_type.clone())
1033 .len(7)
1034 .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1035 .add_child_data(leaf.to_data())
1036 .build()
1037 .unwrap();
1038
1039 let list_1_field = Field::new("l1", list_1_type, true);
1040 let list_2_type = DataType::List(Arc::new(list_1_field));
1041 let list_2 = ArrayData::builder(list_2_type.clone())
1042 .len(4)
1043 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1044 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1045 .add_child_data(list_1)
1046 .build()
1047 .unwrap();
1048
1049 let list_2 = make_array(list_2);
1050 let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1051
1052 let struct_array =
1053 StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1054 let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1055
1056 let array = Arc::new(struct_array) as ArrayRef;
1057 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1058 assert_eq!(levels.len(), 1);
1059
1060 let expected_levels = ArrayLevels {
1061 def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1062 rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1063 non_null_indices: (0..15).collect(),
1064 max_def_level: 5,
1065 max_rep_level: 2,
1066 array: Arc::new(leaf),
1067 logical_nulls: None,
1068 };
1069 assert_eq!(&levels[0], &expected_levels);
1070 }
1071
1072 #[test]
1073 fn test_calculate_nested_struct_levels() {
1074 let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1084 let leaf = Arc::new(c) as ArrayRef;
1085 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1086 let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1087
1088 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1089 let a = StructArray::from((
1090 (vec![(b_field, Arc::new(b) as ArrayRef)]),
1091 Buffer::from([0b00101111]),
1092 ));
1093
1094 let a_field = Field::new("a", a.data_type().clone(), true);
1095 let a_array = Arc::new(a) as ArrayRef;
1096
1097 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1098 assert_eq!(levels.len(), 1);
1099
1100 let logical_nulls = leaf.logical_nulls();
1101 let expected_levels = ArrayLevels {
1102 def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1103 rep_levels: None,
1104 non_null_indices: vec![0, 2, 5],
1105 max_def_level: 3,
1106 max_rep_level: 0,
1107 array: leaf,
1108 logical_nulls,
1109 };
1110 assert_eq!(&levels[0], &expected_levels);
1111 }
1112
1113 #[test]
1114 fn list_single_column() {
1115 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1118 let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1119 let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1120 let a_list_data = ArrayData::builder(a_list_type.clone())
1121 .len(5)
1122 .add_buffer(a_value_offsets)
1123 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1124 .add_child_data(a_values.to_data())
1125 .build()
1126 .unwrap();
1127
1128 assert_eq!(a_list_data.null_count(), 1);
1129
1130 let a = ListArray::from(a_list_data);
1131
1132 let item_field = Field::new_list_field(a_list_type, true);
1133 let mut builder = levels(&item_field, a);
1134 builder.write(2..4);
1135 let levels = builder.finish();
1136
1137 assert_eq!(levels.len(), 1);
1138
1139 let list_level = &levels[0];
1140
1141 let expected_level = ArrayLevels {
1142 def_levels: Some(vec![0, 3, 3, 3]),
1143 rep_levels: Some(vec![0, 0, 1, 1]),
1144 non_null_indices: vec![3, 4, 5],
1145 max_def_level: 3,
1146 max_rep_level: 1,
1147 array: Arc::new(a_values),
1148 logical_nulls: None,
1149 };
1150 assert_eq!(list_level, &expected_level);
1151 }
1152
1153 #[test]
1154 fn mixed_struct_list() {
1155 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1159 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1160 let struct_field_g = Arc::new(Field::new(
1161 "g",
1162 DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1163 false,
1164 ));
1165 let struct_field_e = Arc::new(Field::new(
1166 "e",
1167 DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1168 true,
1169 ));
1170 let schema = Schema::new(vec![
1171 Field::new("a", DataType::Int32, false),
1172 Field::new("b", DataType::Int32, true),
1173 Field::new(
1174 "c",
1175 DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1176 true, ),
1178 ]);
1179
1180 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1182 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1183 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1184 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1185
1186 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1187
1188 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1191
1192 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1194 .len(5)
1195 .add_buffer(g_value_offsets)
1196 .add_child_data(g_value.into_data())
1197 .build()
1198 .unwrap();
1199 let g = ListArray::from(g_list_data);
1200
1201 let e = StructArray::from(vec![
1202 (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1203 (struct_field_g, Arc::new(g) as ArrayRef),
1204 ]);
1205
1206 let c = StructArray::from(vec![
1207 (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1208 (struct_field_e, Arc::new(e) as ArrayRef),
1209 ]);
1210
1211 let batch = RecordBatch::try_new(
1213 Arc::new(schema),
1214 vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1215 )
1216 .unwrap();
1217
1218 let mut levels = vec![];
1221 batch
1222 .columns()
1223 .iter()
1224 .zip(batch.schema().fields())
1225 .for_each(|(array, field)| {
1226 let mut array_levels = calculate_array_levels(array, field).unwrap();
1227 levels.append(&mut array_levels);
1228 });
1229 assert_eq!(levels.len(), 5);
1230
1231 let list_level = &levels[0];
1233
1234 let expected_level = ArrayLevels {
1235 def_levels: None,
1236 rep_levels: None,
1237 non_null_indices: vec![0, 1, 2, 3, 4],
1238 max_def_level: 0,
1239 max_rep_level: 0,
1240 array: Arc::new(a),
1241 logical_nulls: None,
1242 };
1243 assert_eq!(list_level, &expected_level);
1244
1245 let list_level = levels.get(1).unwrap();
1247
1248 let b_logical_nulls = b.logical_nulls();
1249 let expected_level = ArrayLevels {
1250 def_levels: Some(vec![1, 0, 0, 1, 1]),
1251 rep_levels: None,
1252 non_null_indices: vec![0, 3, 4],
1253 max_def_level: 1,
1254 max_rep_level: 0,
1255 array: Arc::new(b),
1256 logical_nulls: b_logical_nulls,
1257 };
1258 assert_eq!(list_level, &expected_level);
1259
1260 let list_level = levels.get(2).unwrap();
1262
1263 let d_logical_nulls = d.logical_nulls();
1264 let expected_level = ArrayLevels {
1265 def_levels: Some(vec![1, 1, 1, 2, 1]),
1266 rep_levels: None,
1267 non_null_indices: vec![3],
1268 max_def_level: 2,
1269 max_rep_level: 0,
1270 array: Arc::new(d),
1271 logical_nulls: d_logical_nulls,
1272 };
1273 assert_eq!(list_level, &expected_level);
1274
1275 let list_level = levels.get(3).unwrap();
1277
1278 let f_logical_nulls = f.logical_nulls();
1279 let expected_level = ArrayLevels {
1280 def_levels: Some(vec![3, 2, 3, 2, 3]),
1281 rep_levels: None,
1282 non_null_indices: vec![0, 2, 4],
1283 max_def_level: 3,
1284 max_rep_level: 0,
1285 array: Arc::new(f),
1286 logical_nulls: f_logical_nulls,
1287 };
1288 assert_eq!(list_level, &expected_level);
1289 }
1290
1291 #[test]
1292 fn test_null_vs_nonnull_struct() {
1293 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1295 let schema = Schema::new(vec![Field::new(
1296 "some_nested_object",
1297 DataType::Struct(vec![offset_field.clone()].into()),
1298 false,
1299 )]);
1300
1301 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1303
1304 let some_nested_object =
1305 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1306
1307 let batch =
1309 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1310
1311 let struct_null_level =
1312 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1313
1314 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1317 let schema = Schema::new(vec![Field::new(
1318 "some_nested_object",
1319 DataType::Struct(vec![offset_field.clone()].into()),
1320 true,
1321 )]);
1322
1323 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1325
1326 let some_nested_object =
1327 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1328
1329 let batch =
1331 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1332
1333 let struct_non_null_level =
1334 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1335
1336 if struct_non_null_level == struct_null_level {
1338 panic!("Levels should not be equal, to reflect the difference in struct nullness");
1339 }
1340 }
1341
1342 #[test]
1343 fn test_map_array() {
1344 let json_content = r#"
1346 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1347 {"stocks":{"long": "$CCC", "short": null}}
1348 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1349 "#;
1350 let entries_struct_type = DataType::Struct(Fields::from(vec![
1351 Field::new("key", DataType::Utf8, false),
1352 Field::new("value", DataType::Utf8, true),
1353 ]));
1354 let stocks_field = Field::new(
1355 "stocks",
1356 DataType::Map(
1357 Arc::new(Field::new("entries", entries_struct_type, false)),
1358 false,
1359 ),
1360 false,
1362 );
1363 let schema = Arc::new(Schema::new(vec![stocks_field]));
1364 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1365 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1366
1367 let batch = reader.next().unwrap().unwrap();
1368
1369 let mut levels = vec![];
1371 batch
1372 .columns()
1373 .iter()
1374 .zip(batch.schema().fields())
1375 .for_each(|(array, field)| {
1376 let mut array_levels = calculate_array_levels(array, field).unwrap();
1377 levels.append(&mut array_levels);
1378 });
1379 assert_eq!(levels.len(), 2);
1380
1381 let map = batch.column(0).as_map();
1382 let map_keys_logical_nulls = map.keys().logical_nulls();
1383
1384 let list_level = &levels[0];
1386
1387 let expected_level = ArrayLevels {
1388 def_levels: Some(vec![1; 7]),
1389 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1390 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1391 max_def_level: 1,
1392 max_rep_level: 1,
1393 array: map.keys().clone(),
1394 logical_nulls: map_keys_logical_nulls,
1395 };
1396 assert_eq!(list_level, &expected_level);
1397
1398 let list_level = levels.get(1).unwrap();
1400 let map_values_logical_nulls = map.values().logical_nulls();
1401
1402 let expected_level = ArrayLevels {
1403 def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1404 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1405 non_null_indices: vec![0, 1, 2, 4, 6],
1406 max_def_level: 2,
1407 max_rep_level: 1,
1408 array: map.values().clone(),
1409 logical_nulls: map_values_logical_nulls,
1410 };
1411 assert_eq!(list_level, &expected_level);
1412 }
1413
1414 #[test]
1415 fn test_list_of_struct() {
1416 let int_field = Field::new("a", DataType::Int32, true);
1418 let fields = Fields::from([Arc::new(int_field)]);
1419 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1420 let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1421
1422 let int_builder = Int32Builder::with_capacity(10);
1423 let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1424 let mut list_builder = ListBuilder::new(struct_builder);
1425
1426 let values = list_builder.values();
1430 values
1431 .field_builder::<Int32Builder>(0)
1432 .unwrap()
1433 .append_value(1);
1434 values.append(true);
1435 list_builder.append(true);
1436
1437 list_builder.append(true);
1439
1440 list_builder.append(false);
1442
1443 let values = list_builder.values();
1445 values
1446 .field_builder::<Int32Builder>(0)
1447 .unwrap()
1448 .append_null();
1449 values.append(false);
1450 values
1451 .field_builder::<Int32Builder>(0)
1452 .unwrap()
1453 .append_null();
1454 values.append(false);
1455 list_builder.append(true);
1456
1457 let values = list_builder.values();
1459 values
1460 .field_builder::<Int32Builder>(0)
1461 .unwrap()
1462 .append_null();
1463 values.append(true);
1464 list_builder.append(true);
1465
1466 let values = list_builder.values();
1468 values
1469 .field_builder::<Int32Builder>(0)
1470 .unwrap()
1471 .append_value(2);
1472 values.append(true);
1473 list_builder.append(true);
1474
1475 let array = Arc::new(list_builder.finish());
1476
1477 let values = array.values().as_struct().column(0).clone();
1478 let values_len = values.len();
1479 assert_eq!(values_len, 5);
1480
1481 let schema = Arc::new(Schema::new(vec![list_field]));
1482
1483 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1484
1485 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1486 let list_level = &levels[0];
1487
1488 let logical_nulls = values.logical_nulls();
1489 let expected_level = ArrayLevels {
1490 def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1491 rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1492 non_null_indices: vec![0, 4],
1493 max_def_level: 4,
1494 max_rep_level: 1,
1495 array: values,
1496 logical_nulls,
1497 };
1498
1499 assert_eq!(list_level, &expected_level);
1500 }
1501
1502 #[test]
1503 fn test_struct_mask_list() {
1504 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1506 Some(vec![Some(1), Some(2)]),
1507 Some(vec![None]),
1508 Some(vec![]),
1509 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1511 None, None,
1513 ]);
1514 let values = inner.values().clone();
1515
1516 assert_eq!(inner.values().len(), 7);
1518
1519 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1520 let array = Arc::new(inner) as ArrayRef;
1521 let nulls = Buffer::from([0b01010111]);
1522 let struct_a = StructArray::from((vec![(field, array)], nulls));
1523
1524 let field = Field::new("struct", struct_a.data_type().clone(), true);
1525 let array = Arc::new(struct_a) as ArrayRef;
1526 let levels = calculate_array_levels(&array, &field).unwrap();
1527
1528 assert_eq!(levels.len(), 1);
1529
1530 let logical_nulls = values.logical_nulls();
1531 let expected_level = ArrayLevels {
1532 def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1533 rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1534 non_null_indices: vec![0, 1, 5, 6],
1535 max_def_level: 4,
1536 max_rep_level: 1,
1537 array: values,
1538 logical_nulls,
1539 };
1540
1541 assert_eq!(&levels[0], &expected_level);
1542 }
1543
1544 #[test]
1545 fn test_list_mask_struct() {
1546 let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1550 Some(vec![None]), Some(vec![]), Some(vec![Some(3), None]),
1553 Some(vec![Some(4), Some(5), None, Some(6)]), None,
1555 None,
1556 ]);
1557 let a1_values = a1.values().clone();
1558 let a1 = Arc::new(a1) as ArrayRef;
1559
1560 let a2 = Arc::new(Int32Array::from_iter(vec![
1561 Some(1), Some(2), None,
1564 Some(4), Some(5),
1566 None,
1567 ])) as ArrayRef;
1568 let a2_values = a2.clone();
1569
1570 let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1571 let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1572
1573 let nulls = Buffer::from([0b00110111]);
1574 let struct_a = Arc::new(StructArray::from((
1575 vec![(field_a1, a1), (field_a2, a2)],
1576 nulls,
1577 ))) as ArrayRef;
1578
1579 let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1580 let nulls = Buffer::from([0b00111100]);
1581
1582 let list_type = DataType::List(Arc::new(Field::new(
1583 "struct",
1584 struct_a.data_type().clone(),
1585 true,
1586 )));
1587
1588 let data = ArrayDataBuilder::new(list_type.clone())
1589 .len(6)
1590 .null_bit_buffer(Some(nulls))
1591 .add_buffer(offsets)
1592 .add_child_data(struct_a.into_data())
1593 .build()
1594 .unwrap();
1595
1596 let list = make_array(data);
1597 let list_field = Field::new("col", list_type, true);
1598
1599 let expected = vec![
1600 r#""#.to_string(),
1601 r#""#.to_string(),
1602 r#"[]"#.to_string(),
1603 r#"[{list: [3, ], integers: }]"#.to_string(),
1604 r#"[, {list: , integers: 5}]"#.to_string(),
1605 r#"[]"#.to_string(),
1606 ];
1607
1608 let actual: Vec<_> = (0..6)
1609 .map(|x| array_value_to_string(&list, x).unwrap())
1610 .collect();
1611 assert_eq!(actual, expected);
1612
1613 let levels = calculate_array_levels(&list, &list_field).unwrap();
1614
1615 assert_eq!(levels.len(), 2);
1616
1617 let a1_logical_nulls = a1_values.logical_nulls();
1618 let expected_level = ArrayLevels {
1619 def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1620 rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1621 non_null_indices: vec![1],
1622 max_def_level: 6,
1623 max_rep_level: 2,
1624 array: a1_values,
1625 logical_nulls: a1_logical_nulls,
1626 };
1627
1628 assert_eq!(&levels[0], &expected_level);
1629
1630 let a2_logical_nulls = a2_values.logical_nulls();
1631 let expected_level = ArrayLevels {
1632 def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1633 rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1634 non_null_indices: vec![4],
1635 max_def_level: 4,
1636 max_rep_level: 1,
1637 array: a2_values,
1638 logical_nulls: a2_logical_nulls,
1639 };
1640
1641 assert_eq!(&levels[1], &expected_level);
1642 }
1643
1644 #[test]
1645 fn test_fixed_size_list() {
1646 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1648 builder.values().append_slice(&[1, 2]);
1649 builder.append(true);
1650 builder.values().append_slice(&[3, 4]);
1651 builder.append(false);
1652 builder.values().append_slice(&[5, 6]);
1653 builder.append(false);
1654 builder.values().append_slice(&[7, 8]);
1655 builder.append(true);
1656 builder.values().append_slice(&[9, 10]);
1657 builder.append(false);
1658 let a = builder.finish();
1659 let values = a.values().clone();
1660
1661 let item_field = Field::new_list_field(a.data_type().clone(), true);
1662 let mut builder = levels(&item_field, a);
1663 builder.write(1..4);
1664 let levels = builder.finish();
1665
1666 assert_eq!(levels.len(), 1);
1667
1668 let list_level = &levels[0];
1669
1670 let logical_nulls = values.logical_nulls();
1671 let expected_level = ArrayLevels {
1672 def_levels: Some(vec![0, 0, 3, 3]),
1673 rep_levels: Some(vec![0, 0, 0, 1]),
1674 non_null_indices: vec![6, 7],
1675 max_def_level: 3,
1676 max_rep_level: 1,
1677 array: values,
1678 logical_nulls,
1679 };
1680 assert_eq!(list_level, &expected_level);
1681 }
1682
1683 #[test]
1684 fn test_fixed_size_list_of_struct() {
1685 let field_a = Field::new("a", DataType::Int32, true);
1687 let field_b = Field::new("b", DataType::Int64, false);
1688 let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1689 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1690 let list_field = Field::new(
1691 "list",
1692 DataType::FixedSizeList(Arc::new(item_field), 2),
1693 true,
1694 );
1695
1696 let builder_a = Int32Builder::with_capacity(10);
1697 let builder_b = Int64Builder::with_capacity(10);
1698 let struct_builder =
1699 StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1700 let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1701
1702 let values = list_builder.values();
1711 values
1713 .field_builder::<Int32Builder>(0)
1714 .unwrap()
1715 .append_value(1);
1716 values
1717 .field_builder::<Int64Builder>(1)
1718 .unwrap()
1719 .append_value(2);
1720 values.append(true);
1721 values
1723 .field_builder::<Int32Builder>(0)
1724 .unwrap()
1725 .append_null();
1726 values
1727 .field_builder::<Int64Builder>(1)
1728 .unwrap()
1729 .append_value(0);
1730 values.append(false);
1731 list_builder.append(true);
1732
1733 let values = list_builder.values();
1735 values
1737 .field_builder::<Int32Builder>(0)
1738 .unwrap()
1739 .append_null();
1740 values
1741 .field_builder::<Int64Builder>(1)
1742 .unwrap()
1743 .append_value(0);
1744 values.append(false);
1745 values
1747 .field_builder::<Int32Builder>(0)
1748 .unwrap()
1749 .append_null();
1750 values
1751 .field_builder::<Int64Builder>(1)
1752 .unwrap()
1753 .append_value(0);
1754 values.append(false);
1755 list_builder.append(false);
1756
1757 let values = list_builder.values();
1759 values
1761 .field_builder::<Int32Builder>(0)
1762 .unwrap()
1763 .append_null();
1764 values
1765 .field_builder::<Int64Builder>(1)
1766 .unwrap()
1767 .append_value(0);
1768 values.append(false);
1769 values
1771 .field_builder::<Int32Builder>(0)
1772 .unwrap()
1773 .append_null();
1774 values
1775 .field_builder::<Int64Builder>(1)
1776 .unwrap()
1777 .append_value(0);
1778 values.append(false);
1779 list_builder.append(true);
1780
1781 let values = list_builder.values();
1783 values
1785 .field_builder::<Int32Builder>(0)
1786 .unwrap()
1787 .append_null();
1788 values
1789 .field_builder::<Int64Builder>(1)
1790 .unwrap()
1791 .append_value(3);
1792 values.append(true);
1793 values
1795 .field_builder::<Int32Builder>(0)
1796 .unwrap()
1797 .append_value(2);
1798 values
1799 .field_builder::<Int64Builder>(1)
1800 .unwrap()
1801 .append_value(4);
1802 values.append(true);
1803 list_builder.append(true);
1804
1805 let array = Arc::new(list_builder.finish());
1806
1807 assert_eq!(array.values().len(), 8);
1808 assert_eq!(array.len(), 4);
1809
1810 let struct_values = array.values().as_struct();
1811 let values_a = struct_values.column(0).clone();
1812 let values_b = struct_values.column(1).clone();
1813
1814 let schema = Arc::new(Schema::new(vec![list_field]));
1815 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1816
1817 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1818 let a_levels = &levels[0];
1819 let b_levels = &levels[1];
1820
1821 let values_a_logical_nulls = values_a.logical_nulls();
1823 let expected_a = ArrayLevels {
1824 def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
1825 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1826 non_null_indices: vec![0, 7],
1827 max_def_level: 4,
1828 max_rep_level: 1,
1829 array: values_a,
1830 logical_nulls: values_a_logical_nulls,
1831 };
1832 let values_b_logical_nulls = values_b.logical_nulls();
1834 let expected_b = ArrayLevels {
1835 def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
1836 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1837 non_null_indices: vec![0, 6, 7],
1838 max_def_level: 3,
1839 max_rep_level: 1,
1840 array: values_b,
1841 logical_nulls: values_b_logical_nulls,
1842 };
1843
1844 assert_eq!(a_levels, &expected_a);
1845 assert_eq!(b_levels, &expected_b);
1846 }
1847
1848 #[test]
1849 fn test_fixed_size_list_empty() {
1850 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
1851 builder.append(true);
1852 builder.append(false);
1853 builder.append(true);
1854 let array = builder.finish();
1855 let values = array.values().clone();
1856
1857 let item_field = Field::new_list_field(array.data_type().clone(), true);
1858 let mut builder = levels(&item_field, array);
1859 builder.write(0..3);
1860 let levels = builder.finish();
1861
1862 assert_eq!(levels.len(), 1);
1863
1864 let list_level = &levels[0];
1865
1866 let logical_nulls = values.logical_nulls();
1867 let expected_level = ArrayLevels {
1868 def_levels: Some(vec![1, 0, 1]),
1869 rep_levels: Some(vec![0, 0, 0]),
1870 non_null_indices: vec![],
1871 max_def_level: 3,
1872 max_rep_level: 1,
1873 array: values,
1874 logical_nulls,
1875 };
1876 assert_eq!(list_level, &expected_level);
1877 }
1878
1879 #[test]
1880 fn test_fixed_size_list_of_var_lists() {
1881 let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
1883 builder.values().append_value([Some(1), None, Some(3)]);
1884 builder.values().append_null();
1885 builder.append(true);
1886 builder.values().append_value([Some(4)]);
1887 builder.values().append_value([]);
1888 builder.append(true);
1889 builder.values().append_value([Some(5), Some(6)]);
1890 builder.values().append_value([None, None]);
1891 builder.append(true);
1892 builder.values().append_null();
1893 builder.values().append_null();
1894 builder.append(false);
1895 let a = builder.finish();
1896 let values = a.values().as_list::<i32>().values().clone();
1897
1898 let item_field = Field::new_list_field(a.data_type().clone(), true);
1899 let mut builder = levels(&item_field, a);
1900 builder.write(0..4);
1901 let levels = builder.finish();
1902
1903 let logical_nulls = values.logical_nulls();
1904 let expected_level = ArrayLevels {
1905 def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
1906 rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
1907 non_null_indices: vec![0, 2, 3, 4, 5],
1908 max_def_level: 5,
1909 max_rep_level: 2,
1910 array: values,
1911 logical_nulls,
1912 };
1913
1914 assert_eq!(levels[0], expected_level);
1915 }
1916
1917 #[test]
1918 fn test_null_dictionary_values() {
1919 let values = Int32Array::new(
1920 vec![1, 2, 3, 4].into(),
1921 Some(NullBuffer::from(vec![true, false, true, true])),
1922 );
1923 let keys = Int32Array::new(
1924 vec![1, 54, 2, 0].into(),
1925 Some(NullBuffer::from(vec![true, false, true, true])),
1926 );
1927 let dict = DictionaryArray::new(keys, Arc::new(values));
1929
1930 let item_field = Field::new_list_field(dict.data_type().clone(), true);
1931
1932 let mut builder = levels(&item_field, dict.clone());
1933 builder.write(0..4);
1934 let levels = builder.finish();
1935
1936 let logical_nulls = dict.logical_nulls();
1937 let expected_level = ArrayLevels {
1938 def_levels: Some(vec![0, 0, 1, 1]),
1939 rep_levels: None,
1940 non_null_indices: vec![2, 3],
1941 max_def_level: 1,
1942 max_rep_level: 0,
1943 array: Arc::new(dict),
1944 logical_nulls,
1945 };
1946 assert_eq!(levels[0], expected_level);
1947 }
1948
1949 #[test]
1950 fn mismatched_types() {
1951 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
1952 let field = Field::new_list_field(DataType::Float64, false);
1953
1954 let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
1955 .unwrap_err()
1956 .to_string();
1957
1958 assert_eq!(
1959 err,
1960 "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
1961 );
1962 }
1963
1964 fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
1965 let v = Arc::new(array) as ArrayRef;
1966 LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
1967 }
1968}