1use crate::errors::{ParquetError, Result};
44use arrow_array::cast::AsArray;
45use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
46use arrow_buffer::bit_iterator::BitIndexIterator;
47use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
48use arrow_schema::{DataType, Field};
49use std::ops::Range;
50use std::sync::Arc;
51
52pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
55 let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
56 builder.write(0..array.len());
57 Ok(builder.finish())
58}
59
60fn is_leaf(data_type: &DataType) -> bool {
63 matches!(
64 data_type,
65 DataType::Null
66 | DataType::Boolean
67 | DataType::Int8
68 | DataType::Int16
69 | DataType::Int32
70 | DataType::Int64
71 | DataType::UInt8
72 | DataType::UInt16
73 | DataType::UInt32
74 | DataType::UInt64
75 | DataType::Float16
76 | DataType::Float32
77 | DataType::Float64
78 | DataType::Utf8
79 | DataType::Utf8View
80 | DataType::LargeUtf8
81 | DataType::Timestamp(_, _)
82 | DataType::Date32
83 | DataType::Date64
84 | DataType::Time32(_)
85 | DataType::Time64(_)
86 | DataType::Duration(_)
87 | DataType::Interval(_)
88 | DataType::Binary
89 | DataType::LargeBinary
90 | DataType::BinaryView
91 | DataType::Decimal32(_, _)
92 | DataType::Decimal64(_, _)
93 | DataType::Decimal128(_, _)
94 | DataType::Decimal256(_, _)
95 | DataType::FixedSizeBinary(_)
96 )
97}
98
99#[derive(Debug, Default, Clone, Copy)]
101struct LevelContext {
102 rep_level: i16,
104 def_level: i16,
106}
107
108#[derive(Debug)]
110enum LevelInfoBuilder {
111 Primitive(ArrayLevels),
113 List(
115 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i32>, Option<NullBuffer>, ),
120 LargeList(
122 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i64>, Option<NullBuffer>, ),
127 FixedSizeList(
129 Box<LevelInfoBuilder>, LevelContext, usize, Option<NullBuffer>, ),
134 ListView(
136 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i32>, ScalarBuffer<i32>, Option<NullBuffer>, ),
142 LargeListView(
144 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i64>, ScalarBuffer<i64>, Option<NullBuffer>, ),
150 Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
152}
153
154impl LevelInfoBuilder {
155 fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
157 if !Self::types_compatible(field.data_type(), array.data_type()) {
158 return Err(arrow_err!(format!(
159 "Incompatible type. Field '{}' has type {}, array has type {}",
160 field.name(),
161 field.data_type(),
162 array.data_type(),
163 )));
164 }
165
166 let is_nullable = field.is_nullable();
167
168 match array.data_type() {
169 d if is_leaf(d) => {
170 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
171 Ok(Self::Primitive(levels))
172 }
173 DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
174 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
175 Ok(Self::Primitive(levels))
176 }
177 DataType::Struct(children) => {
178 let array = array.as_struct();
179 let def_level = match is_nullable {
180 true => parent_ctx.def_level + 1,
181 false => parent_ctx.def_level,
182 };
183
184 let ctx = LevelContext {
185 rep_level: parent_ctx.rep_level,
186 def_level,
187 };
188
189 let children = children
190 .iter()
191 .zip(array.columns())
192 .map(|(f, a)| Self::try_new(f, ctx, a))
193 .collect::<Result<_>>()?;
194
195 Ok(Self::Struct(children, ctx, array.nulls().cloned()))
196 }
197 DataType::List(child)
198 | DataType::LargeList(child)
199 | DataType::Map(child, _)
200 | DataType::FixedSizeList(child, _)
201 | DataType::ListView(child)
202 | DataType::LargeListView(child) => {
203 let def_level = match is_nullable {
204 true => parent_ctx.def_level + 2,
205 false => parent_ctx.def_level + 1,
206 };
207
208 let ctx = LevelContext {
209 rep_level: parent_ctx.rep_level + 1,
210 def_level,
211 };
212
213 Ok(match field.data_type() {
214 DataType::List(_) => {
215 let list = array.as_list();
216 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
217 let offsets = list.offsets().clone();
218 Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
219 }
220 DataType::LargeList(_) => {
221 let list = array.as_list();
222 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
223 let offsets = list.offsets().clone();
224 let nulls = list.nulls().cloned();
225 Self::LargeList(Box::new(child), ctx, offsets, nulls)
226 }
227 DataType::Map(_, _) => {
228 let map = array.as_map();
229 let entries = Arc::new(map.entries().clone()) as ArrayRef;
230 let child = Self::try_new(child.as_ref(), ctx, &entries)?;
231 let offsets = map.offsets().clone();
232 Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
233 }
234 DataType::FixedSizeList(_, size) => {
235 let list = array.as_fixed_size_list();
236 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
237 let nulls = list.nulls().cloned();
238 Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
239 }
240 DataType::ListView(_) => {
241 let list = array.as_list_view();
242 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
243 let offsets = list.offsets().clone();
244 let sizes = list.sizes().clone();
245 let nulls = list.nulls().cloned();
246 Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
247 }
248 DataType::LargeListView(_) => {
249 let list = array.as_list_view();
250 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
251 let offsets = list.offsets().clone();
252 let sizes = list.sizes().clone();
253 let nulls = list.nulls().cloned();
254 Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
255 }
256 _ => unreachable!(),
257 })
258 }
259 d => Err(nyi_err!("Datatype {} is not yet supported", d)),
260 }
261 }
262
263 fn finish(self) -> Vec<ArrayLevels> {
266 match self {
267 LevelInfoBuilder::Primitive(v) => vec![v],
268 LevelInfoBuilder::List(v, _, _, _)
269 | LevelInfoBuilder::LargeList(v, _, _, _)
270 | LevelInfoBuilder::FixedSizeList(v, _, _, _)
271 | LevelInfoBuilder::ListView(v, _, _, _, _)
272 | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
273 LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
274 }
275 }
276
277 fn write(&mut self, range: Range<usize>) {
279 match self {
280 LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
281 LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
282 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
283 }
284 LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
285 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
286 }
287 LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
288 Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
289 }
290 LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
291 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
292 }
293 LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
294 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
295 }
296 LevelInfoBuilder::Struct(children, ctx, nulls) => {
297 Self::write_struct(children, ctx, nulls.as_ref(), range)
298 }
299 }
300 }
301
302 fn write_list<O: OffsetSizeTrait>(
306 child: &mut LevelInfoBuilder,
307 ctx: &LevelContext,
308 offsets: &[O],
309 nulls: Option<&NullBuffer>,
310 range: Range<usize>,
311 ) {
312 let offsets = &offsets[range.start..range.end + 1];
313
314 let write_non_null_slice =
315 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
316 child.write(start_idx..end_idx);
317 child.visit_leaves(|leaf| {
318 let rep_levels = leaf.rep_levels.as_mut().unwrap();
319 let mut rev = rep_levels.iter_mut().rev();
320 let mut remaining = end_idx - start_idx;
321
322 loop {
323 let next = rev.next().unwrap();
324 if *next > ctx.rep_level {
325 continue;
327 }
328
329 remaining -= 1;
330 if remaining == 0 {
331 *next = ctx.rep_level - 1;
332 break;
333 }
334 }
335 })
336 };
337
338 let write_empty_slice = |child: &mut LevelInfoBuilder| {
339 child.visit_leaves(|leaf| {
340 let rep_levels = leaf.rep_levels.as_mut().unwrap();
341 rep_levels.push(ctx.rep_level - 1);
342 let def_levels = leaf.def_levels.as_mut().unwrap();
343 def_levels.push(ctx.def_level - 1);
344 })
345 };
346
347 let write_null_slice = |child: &mut LevelInfoBuilder| {
348 child.visit_leaves(|leaf| {
349 let rep_levels = leaf.rep_levels.as_mut().unwrap();
350 rep_levels.push(ctx.rep_level - 1);
351 let def_levels = leaf.def_levels.as_mut().unwrap();
352 def_levels.push(ctx.def_level - 2);
353 })
354 };
355
356 match nulls {
357 Some(nulls) => {
358 let null_offset = range.start;
359 for (idx, w) in offsets.windows(2).enumerate() {
361 let is_valid = nulls.is_valid(idx + null_offset);
362 let start_idx = w[0].as_usize();
363 let end_idx = w[1].as_usize();
364 if !is_valid {
365 write_null_slice(child)
366 } else if start_idx == end_idx {
367 write_empty_slice(child)
368 } else {
369 write_non_null_slice(child, start_idx, end_idx)
370 }
371 }
372 }
373 None => {
374 for w in offsets.windows(2) {
375 let start_idx = w[0].as_usize();
376 let end_idx = w[1].as_usize();
377 if start_idx == end_idx {
378 write_empty_slice(child)
379 } else {
380 write_non_null_slice(child, start_idx, end_idx)
381 }
382 }
383 }
384 }
385 }
386
387 fn write_list_view<O: OffsetSizeTrait>(
389 child: &mut LevelInfoBuilder,
390 ctx: &LevelContext,
391 offsets: &[O],
392 sizes: &[O],
393 nulls: Option<&NullBuffer>,
394 range: Range<usize>,
395 ) {
396 let offsets = &offsets[range.start..range.end];
397 let sizes = &sizes[range.start..range.end];
398
399 let write_non_null_slice =
400 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
401 child.write(start_idx..end_idx);
402 child.visit_leaves(|leaf| {
403 let rep_levels = leaf.rep_levels.as_mut().unwrap();
404 let mut rev = rep_levels.iter_mut().rev();
405 let mut remaining = end_idx - start_idx;
406
407 loop {
408 let next = rev.next().unwrap();
409 if *next > ctx.rep_level {
410 continue;
412 }
413
414 remaining -= 1;
415 if remaining == 0 {
416 *next = ctx.rep_level - 1;
417 break;
418 }
419 }
420 })
421 };
422
423 let write_empty_slice = |child: &mut LevelInfoBuilder| {
424 child.visit_leaves(|leaf| {
425 let rep_levels = leaf.rep_levels.as_mut().unwrap();
426 rep_levels.push(ctx.rep_level - 1);
427 let def_levels = leaf.def_levels.as_mut().unwrap();
428 def_levels.push(ctx.def_level - 1);
429 })
430 };
431
432 let write_null_slice = |child: &mut LevelInfoBuilder| {
433 child.visit_leaves(|leaf| {
434 let rep_levels = leaf.rep_levels.as_mut().unwrap();
435 rep_levels.push(ctx.rep_level - 1);
436 let def_levels = leaf.def_levels.as_mut().unwrap();
437 def_levels.push(ctx.def_level - 2);
438 })
439 };
440
441 match nulls {
442 Some(nulls) => {
443 let null_offset = range.start;
444 for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
446 let is_valid = nulls.is_valid(idx + null_offset);
447 let start_idx = offset.as_usize();
448 let size = size.as_usize();
449 let end_idx = start_idx + size;
450 if !is_valid {
451 write_null_slice(child)
452 } else if size == 0 {
453 write_empty_slice(child)
454 } else {
455 write_non_null_slice(child, start_idx, end_idx)
456 }
457 }
458 }
459 None => {
460 for (offset, size) in offsets.iter().zip(sizes.iter()) {
461 let start_idx = offset.as_usize();
462 let size = size.as_usize();
463 let end_idx = start_idx + size;
464 if size == 0 {
465 write_empty_slice(child)
466 } else {
467 write_non_null_slice(child, start_idx, end_idx)
468 }
469 }
470 }
471 }
472 }
473
474 fn write_struct(
476 children: &mut [LevelInfoBuilder],
477 ctx: &LevelContext,
478 nulls: Option<&NullBuffer>,
479 range: Range<usize>,
480 ) {
481 let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
482 for child in children {
483 child.visit_leaves(|info| {
484 let len = range.end - range.start;
485
486 let def_levels = info.def_levels.as_mut().unwrap();
487 def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
488
489 if let Some(rep_levels) = info.rep_levels.as_mut() {
490 rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
491 }
492 })
493 }
494 };
495
496 let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
497 for child in children {
498 child.write(range.clone())
499 }
500 };
501
502 match nulls {
503 Some(validity) => {
504 let mut last_non_null_idx = None;
505 let mut last_null_idx = None;
506
507 for i in range.clone() {
509 match validity.is_valid(i) {
510 true => {
511 if let Some(last_idx) = last_null_idx.take() {
512 write_null(children, last_idx..i)
513 }
514 last_non_null_idx.get_or_insert(i);
515 }
516 false => {
517 if let Some(last_idx) = last_non_null_idx.take() {
518 write_non_null(children, last_idx..i)
519 }
520 last_null_idx.get_or_insert(i);
521 }
522 }
523 }
524
525 if let Some(last_idx) = last_null_idx.take() {
526 write_null(children, last_idx..range.end)
527 }
528
529 if let Some(last_idx) = last_non_null_idx.take() {
530 write_non_null(children, last_idx..range.end)
531 }
532 }
533 None => write_non_null(children, range),
534 }
535 }
536
537 fn write_fixed_size_list(
539 child: &mut LevelInfoBuilder,
540 ctx: &LevelContext,
541 fixed_size: usize,
542 nulls: Option<&NullBuffer>,
543 range: Range<usize>,
544 ) {
545 let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
546 let values_start = start_idx * fixed_size;
547 let values_end = end_idx * fixed_size;
548 child.write(values_start..values_end);
549
550 child.visit_leaves(|leaf| {
551 let rep_levels = leaf.rep_levels.as_mut().unwrap();
552
553 let row_indices = (0..fixed_size)
554 .rev()
555 .cycle()
556 .take(values_end - values_start);
557
558 rep_levels
560 .iter_mut()
561 .rev()
562 .filter(|&&mut r| r == ctx.rep_level)
564 .zip(row_indices)
565 .for_each(|(r, idx)| {
566 if idx == 0 {
567 *r = ctx.rep_level - 1;
568 }
569 });
570 })
571 };
572
573 let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
575 let len = end_idx - start_idx;
576 child.visit_leaves(|leaf| {
577 let rep_levels = leaf.rep_levels.as_mut().unwrap();
578 rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
579 let def_levels = leaf.def_levels.as_mut().unwrap();
580 def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
581 })
582 };
583
584 let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
585 if fixed_size > 0 {
586 write_non_null(child, start_idx, end_idx)
587 } else {
588 write_empty(child, start_idx, end_idx)
589 }
590 };
591
592 match nulls {
593 Some(nulls) => {
594 let mut start_idx = None;
595 for idx in range.clone() {
596 if nulls.is_valid(idx) {
597 start_idx.get_or_insert(idx);
599 } else {
600 if let Some(start) = start_idx.take() {
602 write_rows(child, start, idx);
603 }
604 child.visit_leaves(|leaf| {
606 let rep_levels = leaf.rep_levels.as_mut().unwrap();
607 rep_levels.push(ctx.rep_level - 1);
608 let def_levels = leaf.def_levels.as_mut().unwrap();
609 def_levels.push(ctx.def_level - 2);
610 })
611 }
612 }
613 if let Some(start) = start_idx.take() {
615 write_rows(child, start, range.end);
616 }
617 }
618 None => write_rows(child, range.start, range.end),
620 }
621 }
622
623 fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
625 let len = range.end - range.start;
626
627 match &mut info.def_levels {
628 Some(def_levels) => {
629 def_levels.reserve(len);
630 info.non_null_indices.reserve(len);
631
632 match &info.logical_nulls {
633 Some(nulls) => {
634 assert!(range.end <= nulls.len());
635 let nulls = nulls.inner();
636 def_levels.extend(range.clone().map(|i| {
637 let valid = unsafe { nulls.value_unchecked(i) };
639 info.max_def_level - (!valid as i16)
640 }));
641 info.non_null_indices.extend(
642 BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
643 .map(|i| i + range.start),
644 );
645 }
646 None => {
647 let iter = std::iter::repeat_n(info.max_def_level, len);
648 def_levels.extend(iter);
649 info.non_null_indices.extend(range);
650 }
651 }
652 }
653 None => info.non_null_indices.extend(range),
654 }
655
656 if let Some(rep_levels) = &mut info.rep_levels {
657 rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
658 }
659 }
660
661 fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
663 match self {
664 LevelInfoBuilder::Primitive(info) => visit(info),
665 LevelInfoBuilder::List(c, _, _, _)
666 | LevelInfoBuilder::LargeList(c, _, _, _)
667 | LevelInfoBuilder::FixedSizeList(c, _, _, _)
668 | LevelInfoBuilder::ListView(c, _, _, _, _)
669 | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
670 LevelInfoBuilder::Struct(children, _, _) => {
671 for c in children {
672 c.visit_leaves(visit)
673 }
674 }
675 }
676 }
677
678 fn types_compatible(a: &DataType, b: &DataType) -> bool {
684 if a.equals_datatype(b) {
686 return true;
687 }
688
689 let (a, b) = match (a, b) {
691 (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
692 (va.as_ref(), vb.as_ref())
693 }
694 (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
695 (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
696 _ => (a, b),
697 };
698
699 if a == b {
702 return true;
703 }
704
705 match a {
708 DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
710 DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
711 DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
712
713 DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
715 DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
716 DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
717
718 _ => false,
720 }
721 }
722}
723
724#[derive(Debug, Clone)]
727pub(crate) struct ArrayLevels {
728 def_levels: Option<Vec<i16>>,
732
733 rep_levels: Option<Vec<i16>>,
737
738 non_null_indices: Vec<usize>,
741
742 max_def_level: i16,
744
745 max_rep_level: i16,
747
748 array: ArrayRef,
750
751 logical_nulls: Option<NullBuffer>,
753}
754
755impl PartialEq for ArrayLevels {
756 fn eq(&self, other: &Self) -> bool {
757 self.def_levels == other.def_levels
758 && self.rep_levels == other.rep_levels
759 && self.non_null_indices == other.non_null_indices
760 && self.max_def_level == other.max_def_level
761 && self.max_rep_level == other.max_rep_level
762 && self.array.as_ref() == other.array.as_ref()
763 && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
764 }
765}
766impl Eq for ArrayLevels {}
767
768impl ArrayLevels {
769 fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
770 let max_rep_level = ctx.rep_level;
771 let max_def_level = match is_nullable {
772 true => ctx.def_level + 1,
773 false => ctx.def_level,
774 };
775
776 let logical_nulls = array.logical_nulls();
777
778 Self {
779 def_levels: (max_def_level != 0).then(Vec::new),
780 rep_levels: (max_rep_level != 0).then(Vec::new),
781 non_null_indices: vec![],
782 max_def_level,
783 max_rep_level,
784 array,
785 logical_nulls,
786 }
787 }
788
789 pub fn array(&self) -> &ArrayRef {
790 &self.array
791 }
792
793 pub fn def_levels(&self) -> Option<&[i16]> {
794 self.def_levels.as_deref()
795 }
796
797 pub fn rep_levels(&self) -> Option<&[i16]> {
798 self.rep_levels.as_deref()
799 }
800
801 pub fn non_null_indices(&self) -> &[usize] {
802 &self.non_null_indices
803 }
804}
805
806#[cfg(test)]
807mod tests {
808 use super::*;
809
810 use arrow_array::builder::*;
811 use arrow_array::types::Int32Type;
812 use arrow_array::*;
813 use arrow_buffer::{Buffer, ToByteSlice};
814 use arrow_cast::display::array_value_to_string;
815 use arrow_data::{ArrayData, ArrayDataBuilder};
816 use arrow_schema::{Fields, Schema};
817
818 #[test]
819 fn test_calculate_array_levels_twitter_example() {
820 let leaf_type = Field::new_list_field(DataType::Int32, false);
824 let inner_type = DataType::List(Arc::new(leaf_type));
825 let inner_field = Field::new("l2", inner_type.clone(), false);
826 let outer_type = DataType::List(Arc::new(inner_field));
827 let outer_field = Field::new("l1", outer_type.clone(), false);
828
829 let primitives = Int32Array::from_iter(0..10);
830
831 let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
833 let inner_list = ArrayDataBuilder::new(inner_type)
834 .len(4)
835 .add_buffer(offsets)
836 .add_child_data(primitives.to_data())
837 .build()
838 .unwrap();
839
840 let offsets = Buffer::from_iter([0_i32, 2, 4]);
841 let outer_list = ArrayDataBuilder::new(outer_type)
842 .len(2)
843 .add_buffer(offsets)
844 .add_child_data(inner_list)
845 .build()
846 .unwrap();
847 let outer_list = make_array(outer_list);
848
849 let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
850 assert_eq!(levels.len(), 1);
851
852 let expected = ArrayLevels {
853 def_levels: Some(vec![2; 10]),
854 rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
855 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
856 max_def_level: 2,
857 max_rep_level: 2,
858 array: Arc::new(primitives),
859 logical_nulls: None,
860 };
861 assert_eq!(&levels[0], &expected);
862 }
863
864 #[test]
865 fn test_calculate_one_level_1() {
866 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
868 let field = Field::new_list_field(DataType::Int32, false);
869
870 let levels = calculate_array_levels(&array, &field).unwrap();
871 assert_eq!(levels.len(), 1);
872
873 let expected_levels = ArrayLevels {
874 def_levels: None,
875 rep_levels: None,
876 non_null_indices: (0..10).collect(),
877 max_def_level: 0,
878 max_rep_level: 0,
879 array,
880 logical_nulls: None,
881 };
882 assert_eq!(&levels[0], &expected_levels);
883 }
884
885 #[test]
886 fn test_calculate_one_level_2() {
887 let array = Arc::new(Int32Array::from_iter([
889 Some(0),
890 None,
891 Some(0),
892 Some(0),
893 None,
894 ])) as ArrayRef;
895 let field = Field::new_list_field(DataType::Int32, true);
896
897 let levels = calculate_array_levels(&array, &field).unwrap();
898 assert_eq!(levels.len(), 1);
899
900 let logical_nulls = array.logical_nulls();
901 let expected_levels = ArrayLevels {
902 def_levels: Some(vec![1, 0, 1, 1, 0]),
903 rep_levels: None,
904 non_null_indices: vec![0, 2, 3],
905 max_def_level: 1,
906 max_rep_level: 0,
907 array,
908 logical_nulls,
909 };
910 assert_eq!(&levels[0], &expected_levels);
911 }
912
913 #[test]
914 fn test_calculate_array_levels_1() {
915 let leaf_field = Field::new_list_field(DataType::Int32, false);
916 let list_type = DataType::List(Arc::new(leaf_field));
917
918 let leaf_array = Int32Array::from_iter(0..5);
922 let offsets = Buffer::from_iter(0_i32..6);
924 let list = ArrayDataBuilder::new(list_type.clone())
925 .len(5)
926 .add_buffer(offsets)
927 .add_child_data(leaf_array.to_data())
928 .build()
929 .unwrap();
930 let list = make_array(list);
931
932 let list_field = Field::new("list", list_type.clone(), false);
933 let levels = calculate_array_levels(&list, &list_field).unwrap();
934 assert_eq!(levels.len(), 1);
935
936 let expected_levels = ArrayLevels {
937 def_levels: Some(vec![1; 5]),
938 rep_levels: Some(vec![0; 5]),
939 non_null_indices: (0..5).collect(),
940 max_def_level: 1,
941 max_rep_level: 1,
942 array: Arc::new(leaf_array),
943 logical_nulls: None,
944 };
945 assert_eq!(&levels[0], &expected_levels);
946
947 let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
956 let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
957 let list = ArrayDataBuilder::new(list_type.clone())
958 .len(5)
959 .add_buffer(offsets)
960 .add_child_data(leaf_array.to_data())
961 .null_bit_buffer(Some(Buffer::from([0b00011101])))
962 .build()
963 .unwrap();
964 let list = make_array(list);
965
966 let list_field = Field::new("list", list_type, true);
967 let levels = calculate_array_levels(&list, &list_field).unwrap();
968 assert_eq!(levels.len(), 1);
969
970 let expected_levels = ArrayLevels {
971 def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
972 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
973 non_null_indices: (0..11).collect(),
974 max_def_level: 2,
975 max_rep_level: 1,
976 array: Arc::new(leaf_array),
977 logical_nulls: None,
978 };
979 assert_eq!(&levels[0], &expected_levels);
980 }
981
982 #[test]
983 fn test_calculate_array_levels_2() {
984 let leaf = Int32Array::from_iter(0..11);
998 let leaf_field = Field::new("leaf", DataType::Int32, false);
999
1000 let list_type = DataType::List(Arc::new(leaf_field));
1001 let list = ArrayData::builder(list_type.clone())
1002 .len(5)
1003 .add_child_data(leaf.to_data())
1004 .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1005 .build()
1006 .unwrap();
1007
1008 let list = make_array(list);
1009 let list_field = Arc::new(Field::new("list", list_type, true));
1010
1011 let struct_array =
1012 StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1013 let array = Arc::new(struct_array) as ArrayRef;
1014
1015 let struct_field = Field::new("struct", array.data_type().clone(), true);
1016
1017 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1018 assert_eq!(levels.len(), 1);
1019
1020 let expected_levels = ArrayLevels {
1021 def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1022 rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1023 non_null_indices: (4..11).collect(),
1024 max_def_level: 3,
1025 max_rep_level: 1,
1026 array: Arc::new(leaf),
1027 logical_nulls: None,
1028 };
1029
1030 assert_eq!(&levels[0], &expected_levels);
1031
1032 let leaf = Int32Array::from_iter(100..122);
1041 let leaf_field = Field::new("leaf", DataType::Int32, true);
1042
1043 let l1_type = DataType::List(Arc::new(leaf_field));
1044 let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1045 let l1 = ArrayData::builder(l1_type.clone())
1046 .len(11)
1047 .add_child_data(leaf.to_data())
1048 .add_buffer(offsets)
1049 .build()
1050 .unwrap();
1051
1052 let l1_field = Field::new("l1", l1_type, true);
1053 let l2_type = DataType::List(Arc::new(l1_field));
1054 let l2 = ArrayData::builder(l2_type)
1055 .len(5)
1056 .add_child_data(l1)
1057 .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1058 .build()
1059 .unwrap();
1060
1061 let l2 = make_array(l2);
1062 let l2_field = Field::new("l2", l2.data_type().clone(), true);
1063
1064 let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1065 assert_eq!(levels.len(), 1);
1066
1067 let expected_levels = ArrayLevels {
1068 def_levels: Some(vec![
1069 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1070 ]),
1071 rep_levels: Some(vec![
1072 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1073 ]),
1074 non_null_indices: (0..22).collect(),
1075 max_def_level: 5,
1076 max_rep_level: 2,
1077 array: Arc::new(leaf),
1078 logical_nulls: None,
1079 };
1080
1081 assert_eq!(&levels[0], &expected_levels);
1082 }
1083
1084 #[test]
1085 fn test_calculate_array_levels_nested_list() {
1086 let leaf_field = Field::new("leaf", DataType::Int32, false);
1087 let list_type = DataType::List(Arc::new(leaf_field));
1088
1089 let leaf = Int32Array::from_iter([0; 4]);
1097 let list = ArrayData::builder(list_type.clone())
1098 .len(4)
1099 .add_buffer(Buffer::from_iter(0_i32..5))
1100 .add_child_data(leaf.to_data())
1101 .build()
1102 .unwrap();
1103 let list = make_array(list);
1104
1105 let list_field = Field::new("list", list_type.clone(), false);
1106 let levels = calculate_array_levels(&list, &list_field).unwrap();
1107 assert_eq!(levels.len(), 1);
1108
1109 let expected_levels = ArrayLevels {
1110 def_levels: Some(vec![1; 4]),
1111 rep_levels: Some(vec![0; 4]),
1112 non_null_indices: (0..4).collect(),
1113 max_def_level: 1,
1114 max_rep_level: 1,
1115 array: Arc::new(leaf),
1116 logical_nulls: None,
1117 };
1118 assert_eq!(&levels[0], &expected_levels);
1119
1120 let leaf = Int32Array::from_iter(0..8);
1125 let list = ArrayData::builder(list_type.clone())
1126 .len(4)
1127 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1128 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1129 .add_child_data(leaf.to_data())
1130 .build()
1131 .unwrap();
1132 let list = make_array(list);
1133 let list_field = Arc::new(Field::new("list", list_type, true));
1134
1135 let struct_array = StructArray::from(vec![(list_field, list)]);
1136 let array = Arc::new(struct_array) as ArrayRef;
1137
1138 let struct_field = Field::new("struct", array.data_type().clone(), true);
1139 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1140 assert_eq!(levels.len(), 1);
1141
1142 let expected_levels = ArrayLevels {
1143 def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1144 rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1145 non_null_indices: (0..7).collect(),
1146 max_def_level: 3,
1147 max_rep_level: 1,
1148 array: Arc::new(leaf),
1149 logical_nulls: None,
1150 };
1151 assert_eq!(&levels[0], &expected_levels);
1152
1153 let leaf = Int32Array::from_iter(201..216);
1161 let leaf_field = Field::new("leaf", DataType::Int32, false);
1162 let list_1_type = DataType::List(Arc::new(leaf_field));
1163 let list_1 = ArrayData::builder(list_1_type.clone())
1164 .len(7)
1165 .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1166 .add_child_data(leaf.to_data())
1167 .build()
1168 .unwrap();
1169
1170 let list_1_field = Field::new("l1", list_1_type, true);
1171 let list_2_type = DataType::List(Arc::new(list_1_field));
1172 let list_2 = ArrayData::builder(list_2_type.clone())
1173 .len(4)
1174 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1175 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1176 .add_child_data(list_1)
1177 .build()
1178 .unwrap();
1179
1180 let list_2 = make_array(list_2);
1181 let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1182
1183 let struct_array =
1184 StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1185 let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1186
1187 let array = Arc::new(struct_array) as ArrayRef;
1188 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1189 assert_eq!(levels.len(), 1);
1190
1191 let expected_levels = ArrayLevels {
1192 def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1193 rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1194 non_null_indices: (0..15).collect(),
1195 max_def_level: 5,
1196 max_rep_level: 2,
1197 array: Arc::new(leaf),
1198 logical_nulls: None,
1199 };
1200 assert_eq!(&levels[0], &expected_levels);
1201 }
1202
1203 #[test]
1204 fn test_calculate_nested_struct_levels() {
1205 let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1215 let leaf = Arc::new(c) as ArrayRef;
1216 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1217 let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1218
1219 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1220 let a = StructArray::from((
1221 (vec![(b_field, Arc::new(b) as ArrayRef)]),
1222 Buffer::from([0b00101111]),
1223 ));
1224
1225 let a_field = Field::new("a", a.data_type().clone(), true);
1226 let a_array = Arc::new(a) as ArrayRef;
1227
1228 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1229 assert_eq!(levels.len(), 1);
1230
1231 let logical_nulls = leaf.logical_nulls();
1232 let expected_levels = ArrayLevels {
1233 def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1234 rep_levels: None,
1235 non_null_indices: vec![0, 2, 5],
1236 max_def_level: 3,
1237 max_rep_level: 0,
1238 array: leaf,
1239 logical_nulls,
1240 };
1241 assert_eq!(&levels[0], &expected_levels);
1242 }
1243
1244 #[test]
1245 fn list_single_column() {
1246 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1249 let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1250 let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1251 let a_list_data = ArrayData::builder(a_list_type.clone())
1252 .len(5)
1253 .add_buffer(a_value_offsets)
1254 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1255 .add_child_data(a_values.to_data())
1256 .build()
1257 .unwrap();
1258
1259 assert_eq!(a_list_data.null_count(), 1);
1260
1261 let a = ListArray::from(a_list_data);
1262
1263 let item_field = Field::new_list_field(a_list_type, true);
1264 let mut builder = levels(&item_field, a);
1265 builder.write(2..4);
1266 let levels = builder.finish();
1267
1268 assert_eq!(levels.len(), 1);
1269
1270 let list_level = &levels[0];
1271
1272 let expected_level = ArrayLevels {
1273 def_levels: Some(vec![0, 3, 3, 3]),
1274 rep_levels: Some(vec![0, 0, 1, 1]),
1275 non_null_indices: vec![3, 4, 5],
1276 max_def_level: 3,
1277 max_rep_level: 1,
1278 array: Arc::new(a_values),
1279 logical_nulls: None,
1280 };
1281 assert_eq!(list_level, &expected_level);
1282 }
1283
1284 #[test]
1285 fn mixed_struct_list() {
1286 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1290 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1291 let struct_field_g = Arc::new(Field::new(
1292 "g",
1293 DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1294 false,
1295 ));
1296 let struct_field_e = Arc::new(Field::new(
1297 "e",
1298 DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1299 true,
1300 ));
1301 let schema = Schema::new(vec![
1302 Field::new("a", DataType::Int32, false),
1303 Field::new("b", DataType::Int32, true),
1304 Field::new(
1305 "c",
1306 DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1307 true, ),
1309 ]);
1310
1311 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1313 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1314 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1315 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1316
1317 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1318
1319 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1322
1323 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1325 .len(5)
1326 .add_buffer(g_value_offsets)
1327 .add_child_data(g_value.into_data())
1328 .build()
1329 .unwrap();
1330 let g = ListArray::from(g_list_data);
1331
1332 let e = StructArray::from(vec![
1333 (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1334 (struct_field_g, Arc::new(g) as ArrayRef),
1335 ]);
1336
1337 let c = StructArray::from(vec![
1338 (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1339 (struct_field_e, Arc::new(e) as ArrayRef),
1340 ]);
1341
1342 let batch = RecordBatch::try_new(
1344 Arc::new(schema),
1345 vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1346 )
1347 .unwrap();
1348
1349 let mut levels = vec![];
1352 batch
1353 .columns()
1354 .iter()
1355 .zip(batch.schema().fields())
1356 .for_each(|(array, field)| {
1357 let mut array_levels = calculate_array_levels(array, field).unwrap();
1358 levels.append(&mut array_levels);
1359 });
1360 assert_eq!(levels.len(), 5);
1361
1362 let list_level = &levels[0];
1364
1365 let expected_level = ArrayLevels {
1366 def_levels: None,
1367 rep_levels: None,
1368 non_null_indices: vec![0, 1, 2, 3, 4],
1369 max_def_level: 0,
1370 max_rep_level: 0,
1371 array: Arc::new(a),
1372 logical_nulls: None,
1373 };
1374 assert_eq!(list_level, &expected_level);
1375
1376 let list_level = levels.get(1).unwrap();
1378
1379 let b_logical_nulls = b.logical_nulls();
1380 let expected_level = ArrayLevels {
1381 def_levels: Some(vec![1, 0, 0, 1, 1]),
1382 rep_levels: None,
1383 non_null_indices: vec![0, 3, 4],
1384 max_def_level: 1,
1385 max_rep_level: 0,
1386 array: Arc::new(b),
1387 logical_nulls: b_logical_nulls,
1388 };
1389 assert_eq!(list_level, &expected_level);
1390
1391 let list_level = levels.get(2).unwrap();
1393
1394 let d_logical_nulls = d.logical_nulls();
1395 let expected_level = ArrayLevels {
1396 def_levels: Some(vec![1, 1, 1, 2, 1]),
1397 rep_levels: None,
1398 non_null_indices: vec![3],
1399 max_def_level: 2,
1400 max_rep_level: 0,
1401 array: Arc::new(d),
1402 logical_nulls: d_logical_nulls,
1403 };
1404 assert_eq!(list_level, &expected_level);
1405
1406 let list_level = levels.get(3).unwrap();
1408
1409 let f_logical_nulls = f.logical_nulls();
1410 let expected_level = ArrayLevels {
1411 def_levels: Some(vec![3, 2, 3, 2, 3]),
1412 rep_levels: None,
1413 non_null_indices: vec![0, 2, 4],
1414 max_def_level: 3,
1415 max_rep_level: 0,
1416 array: Arc::new(f),
1417 logical_nulls: f_logical_nulls,
1418 };
1419 assert_eq!(list_level, &expected_level);
1420 }
1421
1422 #[test]
1423 fn test_null_vs_nonnull_struct() {
1424 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1426 let schema = Schema::new(vec![Field::new(
1427 "some_nested_object",
1428 DataType::Struct(vec![offset_field.clone()].into()),
1429 false,
1430 )]);
1431
1432 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1434
1435 let some_nested_object =
1436 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1437
1438 let batch =
1440 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1441
1442 let struct_null_level =
1443 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1444
1445 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1448 let schema = Schema::new(vec![Field::new(
1449 "some_nested_object",
1450 DataType::Struct(vec![offset_field.clone()].into()),
1451 true,
1452 )]);
1453
1454 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1456
1457 let some_nested_object =
1458 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1459
1460 let batch =
1462 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1463
1464 let struct_non_null_level =
1465 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1466
1467 if struct_non_null_level == struct_null_level {
1469 panic!("Levels should not be equal, to reflect the difference in struct nullness");
1470 }
1471 }
1472
1473 #[test]
1474 fn test_map_array() {
1475 let json_content = r#"
1477 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1478 {"stocks":{"long": "$CCC", "short": null}}
1479 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1480 "#;
1481 let entries_struct_type = DataType::Struct(Fields::from(vec![
1482 Field::new("key", DataType::Utf8, false),
1483 Field::new("value", DataType::Utf8, true),
1484 ]));
1485 let stocks_field = Field::new(
1486 "stocks",
1487 DataType::Map(
1488 Arc::new(Field::new("entries", entries_struct_type, false)),
1489 false,
1490 ),
1491 false,
1493 );
1494 let schema = Arc::new(Schema::new(vec![stocks_field]));
1495 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1496 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1497
1498 let batch = reader.next().unwrap().unwrap();
1499
1500 let mut levels = vec![];
1502 batch
1503 .columns()
1504 .iter()
1505 .zip(batch.schema().fields())
1506 .for_each(|(array, field)| {
1507 let mut array_levels = calculate_array_levels(array, field).unwrap();
1508 levels.append(&mut array_levels);
1509 });
1510 assert_eq!(levels.len(), 2);
1511
1512 let map = batch.column(0).as_map();
1513 let map_keys_logical_nulls = map.keys().logical_nulls();
1514
1515 let list_level = &levels[0];
1517
1518 let expected_level = ArrayLevels {
1519 def_levels: Some(vec![1; 7]),
1520 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1521 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1522 max_def_level: 1,
1523 max_rep_level: 1,
1524 array: map.keys().clone(),
1525 logical_nulls: map_keys_logical_nulls,
1526 };
1527 assert_eq!(list_level, &expected_level);
1528
1529 let list_level = levels.get(1).unwrap();
1531 let map_values_logical_nulls = map.values().logical_nulls();
1532
1533 let expected_level = ArrayLevels {
1534 def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1535 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1536 non_null_indices: vec![0, 1, 2, 4, 6],
1537 max_def_level: 2,
1538 max_rep_level: 1,
1539 array: map.values().clone(),
1540 logical_nulls: map_values_logical_nulls,
1541 };
1542 assert_eq!(list_level, &expected_level);
1543 }
1544
1545 #[test]
1546 fn test_list_of_struct() {
1547 let int_field = Field::new("a", DataType::Int32, true);
1549 let fields = Fields::from([Arc::new(int_field)]);
1550 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1551 let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1552
1553 let int_builder = Int32Builder::with_capacity(10);
1554 let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1555 let mut list_builder = ListBuilder::new(struct_builder);
1556
1557 let values = list_builder.values();
1561 values
1562 .field_builder::<Int32Builder>(0)
1563 .unwrap()
1564 .append_value(1);
1565 values.append(true);
1566 list_builder.append(true);
1567
1568 list_builder.append(true);
1570
1571 list_builder.append(false);
1573
1574 let values = list_builder.values();
1576 values
1577 .field_builder::<Int32Builder>(0)
1578 .unwrap()
1579 .append_null();
1580 values.append(false);
1581 values
1582 .field_builder::<Int32Builder>(0)
1583 .unwrap()
1584 .append_null();
1585 values.append(false);
1586 list_builder.append(true);
1587
1588 let values = list_builder.values();
1590 values
1591 .field_builder::<Int32Builder>(0)
1592 .unwrap()
1593 .append_null();
1594 values.append(true);
1595 list_builder.append(true);
1596
1597 let values = list_builder.values();
1599 values
1600 .field_builder::<Int32Builder>(0)
1601 .unwrap()
1602 .append_value(2);
1603 values.append(true);
1604 list_builder.append(true);
1605
1606 let array = Arc::new(list_builder.finish());
1607
1608 let values = array.values().as_struct().column(0).clone();
1609 let values_len = values.len();
1610 assert_eq!(values_len, 5);
1611
1612 let schema = Arc::new(Schema::new(vec![list_field]));
1613
1614 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1615
1616 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1617 let list_level = &levels[0];
1618
1619 let logical_nulls = values.logical_nulls();
1620 let expected_level = ArrayLevels {
1621 def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1622 rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1623 non_null_indices: vec![0, 4],
1624 max_def_level: 4,
1625 max_rep_level: 1,
1626 array: values,
1627 logical_nulls,
1628 };
1629
1630 assert_eq!(list_level, &expected_level);
1631 }
1632
1633 #[test]
1634 fn test_struct_mask_list() {
1635 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1637 Some(vec![Some(1), Some(2)]),
1638 Some(vec![None]),
1639 Some(vec![]),
1640 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1642 None, None,
1644 ]);
1645 let values = inner.values().clone();
1646
1647 assert_eq!(inner.values().len(), 7);
1649
1650 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1651 let array = Arc::new(inner) as ArrayRef;
1652 let nulls = Buffer::from([0b01010111]);
1653 let struct_a = StructArray::from((vec![(field, array)], nulls));
1654
1655 let field = Field::new("struct", struct_a.data_type().clone(), true);
1656 let array = Arc::new(struct_a) as ArrayRef;
1657 let levels = calculate_array_levels(&array, &field).unwrap();
1658
1659 assert_eq!(levels.len(), 1);
1660
1661 let logical_nulls = values.logical_nulls();
1662 let expected_level = ArrayLevels {
1663 def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1664 rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1665 non_null_indices: vec![0, 1, 5, 6],
1666 max_def_level: 4,
1667 max_rep_level: 1,
1668 array: values,
1669 logical_nulls,
1670 };
1671
1672 assert_eq!(&levels[0], &expected_level);
1673 }
1674
1675 #[test]
1676 fn test_list_mask_struct() {
1677 let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1681 Some(vec![None]), Some(vec![]), Some(vec![Some(3), None]),
1684 Some(vec![Some(4), Some(5), None, Some(6)]), None,
1686 None,
1687 ]);
1688 let a1_values = a1.values().clone();
1689 let a1 = Arc::new(a1) as ArrayRef;
1690
1691 let a2 = Arc::new(Int32Array::from_iter(vec![
1692 Some(1), Some(2), None,
1695 Some(4), Some(5),
1697 None,
1698 ])) as ArrayRef;
1699 let a2_values = a2.clone();
1700
1701 let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1702 let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1703
1704 let nulls = Buffer::from([0b00110111]);
1705 let struct_a = Arc::new(StructArray::from((
1706 vec![(field_a1, a1), (field_a2, a2)],
1707 nulls,
1708 ))) as ArrayRef;
1709
1710 let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1711 let nulls = Buffer::from([0b00111100]);
1712
1713 let list_type = DataType::List(Arc::new(Field::new(
1714 "struct",
1715 struct_a.data_type().clone(),
1716 true,
1717 )));
1718
1719 let data = ArrayDataBuilder::new(list_type.clone())
1720 .len(6)
1721 .null_bit_buffer(Some(nulls))
1722 .add_buffer(offsets)
1723 .add_child_data(struct_a.into_data())
1724 .build()
1725 .unwrap();
1726
1727 let list = make_array(data);
1728 let list_field = Field::new("col", list_type, true);
1729
1730 let expected = vec![
1731 r#""#.to_string(),
1732 r#""#.to_string(),
1733 r#"[]"#.to_string(),
1734 r#"[{list: [3, ], integers: }]"#.to_string(),
1735 r#"[, {list: , integers: 5}]"#.to_string(),
1736 r#"[]"#.to_string(),
1737 ];
1738
1739 let actual: Vec<_> = (0..6)
1740 .map(|x| array_value_to_string(&list, x).unwrap())
1741 .collect();
1742 assert_eq!(actual, expected);
1743
1744 let levels = calculate_array_levels(&list, &list_field).unwrap();
1745
1746 assert_eq!(levels.len(), 2);
1747
1748 let a1_logical_nulls = a1_values.logical_nulls();
1749 let expected_level = ArrayLevels {
1750 def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1751 rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1752 non_null_indices: vec![1],
1753 max_def_level: 6,
1754 max_rep_level: 2,
1755 array: a1_values,
1756 logical_nulls: a1_logical_nulls,
1757 };
1758
1759 assert_eq!(&levels[0], &expected_level);
1760
1761 let a2_logical_nulls = a2_values.logical_nulls();
1762 let expected_level = ArrayLevels {
1763 def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1764 rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1765 non_null_indices: vec![4],
1766 max_def_level: 4,
1767 max_rep_level: 1,
1768 array: a2_values,
1769 logical_nulls: a2_logical_nulls,
1770 };
1771
1772 assert_eq!(&levels[1], &expected_level);
1773 }
1774
1775 #[test]
1776 fn test_fixed_size_list() {
1777 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1779 builder.values().append_slice(&[1, 2]);
1780 builder.append(true);
1781 builder.values().append_slice(&[3, 4]);
1782 builder.append(false);
1783 builder.values().append_slice(&[5, 6]);
1784 builder.append(false);
1785 builder.values().append_slice(&[7, 8]);
1786 builder.append(true);
1787 builder.values().append_slice(&[9, 10]);
1788 builder.append(false);
1789 let a = builder.finish();
1790 let values = a.values().clone();
1791
1792 let item_field = Field::new_list_field(a.data_type().clone(), true);
1793 let mut builder = levels(&item_field, a);
1794 builder.write(1..4);
1795 let levels = builder.finish();
1796
1797 assert_eq!(levels.len(), 1);
1798
1799 let list_level = &levels[0];
1800
1801 let logical_nulls = values.logical_nulls();
1802 let expected_level = ArrayLevels {
1803 def_levels: Some(vec![0, 0, 3, 3]),
1804 rep_levels: Some(vec![0, 0, 0, 1]),
1805 non_null_indices: vec![6, 7],
1806 max_def_level: 3,
1807 max_rep_level: 1,
1808 array: values,
1809 logical_nulls,
1810 };
1811 assert_eq!(list_level, &expected_level);
1812 }
1813
1814 #[test]
1815 fn test_fixed_size_list_of_struct() {
1816 let field_a = Field::new("a", DataType::Int32, true);
1818 let field_b = Field::new("b", DataType::Int64, false);
1819 let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1820 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1821 let list_field = Field::new(
1822 "list",
1823 DataType::FixedSizeList(Arc::new(item_field), 2),
1824 true,
1825 );
1826
1827 let builder_a = Int32Builder::with_capacity(10);
1828 let builder_b = Int64Builder::with_capacity(10);
1829 let struct_builder =
1830 StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1831 let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1832
1833 let values = list_builder.values();
1842 values
1844 .field_builder::<Int32Builder>(0)
1845 .unwrap()
1846 .append_value(1);
1847 values
1848 .field_builder::<Int64Builder>(1)
1849 .unwrap()
1850 .append_value(2);
1851 values.append(true);
1852 values
1854 .field_builder::<Int32Builder>(0)
1855 .unwrap()
1856 .append_null();
1857 values
1858 .field_builder::<Int64Builder>(1)
1859 .unwrap()
1860 .append_value(0);
1861 values.append(false);
1862 list_builder.append(true);
1863
1864 let values = list_builder.values();
1866 values
1868 .field_builder::<Int32Builder>(0)
1869 .unwrap()
1870 .append_null();
1871 values
1872 .field_builder::<Int64Builder>(1)
1873 .unwrap()
1874 .append_value(0);
1875 values.append(false);
1876 values
1878 .field_builder::<Int32Builder>(0)
1879 .unwrap()
1880 .append_null();
1881 values
1882 .field_builder::<Int64Builder>(1)
1883 .unwrap()
1884 .append_value(0);
1885 values.append(false);
1886 list_builder.append(false);
1887
1888 let values = list_builder.values();
1890 values
1892 .field_builder::<Int32Builder>(0)
1893 .unwrap()
1894 .append_null();
1895 values
1896 .field_builder::<Int64Builder>(1)
1897 .unwrap()
1898 .append_value(0);
1899 values.append(false);
1900 values
1902 .field_builder::<Int32Builder>(0)
1903 .unwrap()
1904 .append_null();
1905 values
1906 .field_builder::<Int64Builder>(1)
1907 .unwrap()
1908 .append_value(0);
1909 values.append(false);
1910 list_builder.append(true);
1911
1912 let values = list_builder.values();
1914 values
1916 .field_builder::<Int32Builder>(0)
1917 .unwrap()
1918 .append_null();
1919 values
1920 .field_builder::<Int64Builder>(1)
1921 .unwrap()
1922 .append_value(3);
1923 values.append(true);
1924 values
1926 .field_builder::<Int32Builder>(0)
1927 .unwrap()
1928 .append_value(2);
1929 values
1930 .field_builder::<Int64Builder>(1)
1931 .unwrap()
1932 .append_value(4);
1933 values.append(true);
1934 list_builder.append(true);
1935
1936 let array = Arc::new(list_builder.finish());
1937
1938 assert_eq!(array.values().len(), 8);
1939 assert_eq!(array.len(), 4);
1940
1941 let struct_values = array.values().as_struct();
1942 let values_a = struct_values.column(0).clone();
1943 let values_b = struct_values.column(1).clone();
1944
1945 let schema = Arc::new(Schema::new(vec![list_field]));
1946 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1947
1948 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1949 let a_levels = &levels[0];
1950 let b_levels = &levels[1];
1951
1952 let values_a_logical_nulls = values_a.logical_nulls();
1954 let expected_a = ArrayLevels {
1955 def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
1956 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1957 non_null_indices: vec![0, 7],
1958 max_def_level: 4,
1959 max_rep_level: 1,
1960 array: values_a,
1961 logical_nulls: values_a_logical_nulls,
1962 };
1963 let values_b_logical_nulls = values_b.logical_nulls();
1965 let expected_b = ArrayLevels {
1966 def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
1967 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
1968 non_null_indices: vec![0, 6, 7],
1969 max_def_level: 3,
1970 max_rep_level: 1,
1971 array: values_b,
1972 logical_nulls: values_b_logical_nulls,
1973 };
1974
1975 assert_eq!(a_levels, &expected_a);
1976 assert_eq!(b_levels, &expected_b);
1977 }
1978
1979 #[test]
1980 fn test_fixed_size_list_empty() {
1981 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
1982 builder.append(true);
1983 builder.append(false);
1984 builder.append(true);
1985 let array = builder.finish();
1986 let values = array.values().clone();
1987
1988 let item_field = Field::new_list_field(array.data_type().clone(), true);
1989 let mut builder = levels(&item_field, array);
1990 builder.write(0..3);
1991 let levels = builder.finish();
1992
1993 assert_eq!(levels.len(), 1);
1994
1995 let list_level = &levels[0];
1996
1997 let logical_nulls = values.logical_nulls();
1998 let expected_level = ArrayLevels {
1999 def_levels: Some(vec![1, 0, 1]),
2000 rep_levels: Some(vec![0, 0, 0]),
2001 non_null_indices: vec![],
2002 max_def_level: 3,
2003 max_rep_level: 1,
2004 array: values,
2005 logical_nulls,
2006 };
2007 assert_eq!(list_level, &expected_level);
2008 }
2009
2010 #[test]
2011 fn test_fixed_size_list_of_var_lists() {
2012 let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2014 builder.values().append_value([Some(1), None, Some(3)]);
2015 builder.values().append_null();
2016 builder.append(true);
2017 builder.values().append_value([Some(4)]);
2018 builder.values().append_value([]);
2019 builder.append(true);
2020 builder.values().append_value([Some(5), Some(6)]);
2021 builder.values().append_value([None, None]);
2022 builder.append(true);
2023 builder.values().append_null();
2024 builder.values().append_null();
2025 builder.append(false);
2026 let a = builder.finish();
2027 let values = a.values().as_list::<i32>().values().clone();
2028
2029 let item_field = Field::new_list_field(a.data_type().clone(), true);
2030 let mut builder = levels(&item_field, a);
2031 builder.write(0..4);
2032 let levels = builder.finish();
2033
2034 let logical_nulls = values.logical_nulls();
2035 let expected_level = ArrayLevels {
2036 def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2037 rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2038 non_null_indices: vec![0, 2, 3, 4, 5],
2039 max_def_level: 5,
2040 max_rep_level: 2,
2041 array: values,
2042 logical_nulls,
2043 };
2044
2045 assert_eq!(levels[0], expected_level);
2046 }
2047
2048 #[test]
2049 fn test_null_dictionary_values() {
2050 let values = Int32Array::new(
2051 vec![1, 2, 3, 4].into(),
2052 Some(NullBuffer::from(vec![true, false, true, true])),
2053 );
2054 let keys = Int32Array::new(
2055 vec![1, 54, 2, 0].into(),
2056 Some(NullBuffer::from(vec![true, false, true, true])),
2057 );
2058 let dict = DictionaryArray::new(keys, Arc::new(values));
2060
2061 let item_field = Field::new_list_field(dict.data_type().clone(), true);
2062
2063 let mut builder = levels(&item_field, dict.clone());
2064 builder.write(0..4);
2065 let levels = builder.finish();
2066
2067 let logical_nulls = dict.logical_nulls();
2068 let expected_level = ArrayLevels {
2069 def_levels: Some(vec![0, 0, 1, 1]),
2070 rep_levels: None,
2071 non_null_indices: vec![2, 3],
2072 max_def_level: 1,
2073 max_rep_level: 0,
2074 array: Arc::new(dict),
2075 logical_nulls,
2076 };
2077 assert_eq!(levels[0], expected_level);
2078 }
2079
2080 #[test]
2081 fn mismatched_types() {
2082 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2083 let field = Field::new_list_field(DataType::Float64, false);
2084
2085 let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2086 .unwrap_err()
2087 .to_string();
2088
2089 assert_eq!(
2090 err,
2091 "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2092 );
2093 }
2094
2095 fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2096 let v = Arc::new(array) as ArrayRef;
2097 LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2098 }
2099}