1use crate::column::chunker::CdcChunk;
44use crate::column::writer::LevelDataRef;
45use crate::errors::{ParquetError, Result};
46use arrow_array::cast::AsArray;
47use arrow_array::types::RunEndIndexType;
48use arrow_array::{Array, ArrayRef, Int32Array, OffsetSizeTrait, RunArray, downcast_run_array};
49use arrow_buffer::bit_iterator::BitIndexIterator;
50use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
51use arrow_schema::{DataType, Field};
52use std::ops::Range;
53use std::sync::Arc;
54
55fn expand_ree_array(array: &ArrayRef) -> Result<ArrayRef> {
60 downcast_run_array!(
61 array => expand_typed_ree(array),
62 _ => unreachable!("expand_ree_array called on non-REE array"),
63 )
64}
65
66fn expand_typed_ree<R: RunEndIndexType>(run_array: &RunArray<R>) -> Result<ArrayRef> {
67 let run_ends = run_array.run_ends();
68 let values = run_array.values();
69 let len = run_array.len();
70 let indices: Int32Array = (0..len)
71 .map(|i| run_ends.get_physical_index(i) as i32)
72 .collect();
73 arrow_select::take::take(values.as_ref(), &indices, None)
74 .map_err(|e| arrow_err!("Failed to expand REE array: {}", e))
75}
76
77pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
80 let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
81 builder.write(0..array.len());
82 Ok(builder.finish())
83}
84
85fn is_leaf(data_type: &DataType) -> bool {
88 matches!(
89 data_type,
90 DataType::Null
91 | DataType::Boolean
92 | DataType::Int8
93 | DataType::Int16
94 | DataType::Int32
95 | DataType::Int64
96 | DataType::UInt8
97 | DataType::UInt16
98 | DataType::UInt32
99 | DataType::UInt64
100 | DataType::Float16
101 | DataType::Float32
102 | DataType::Float64
103 | DataType::Utf8
104 | DataType::Utf8View
105 | DataType::LargeUtf8
106 | DataType::Timestamp(_, _)
107 | DataType::Date32
108 | DataType::Date64
109 | DataType::Time32(_)
110 | DataType::Time64(_)
111 | DataType::Duration(_)
112 | DataType::Interval(_)
113 | DataType::Binary
114 | DataType::LargeBinary
115 | DataType::BinaryView
116 | DataType::Decimal32(_, _)
117 | DataType::Decimal64(_, _)
118 | DataType::Decimal128(_, _)
119 | DataType::Decimal256(_, _)
120 | DataType::FixedSizeBinary(_)
121 )
122}
123
124#[derive(Debug, Default, Clone, Copy)]
126struct LevelContext {
127 rep_level: i16,
129 def_level: i16,
131}
132
133#[derive(Debug)]
135enum LevelInfoBuilder {
136 Primitive(ArrayLevels),
138 List(
140 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i32>, Option<NullBuffer>, bool, ),
146 LargeList(
148 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i64>, Option<NullBuffer>, bool, ),
154 FixedSizeList(
156 Box<LevelInfoBuilder>, LevelContext, usize, Option<NullBuffer>, ),
161 ListView(
163 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i32>, ScalarBuffer<i32>, Option<NullBuffer>, ),
169 LargeListView(
171 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i64>, ScalarBuffer<i64>, Option<NullBuffer>, ),
177 Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
179}
180
181const BULK_FILL_MIN_LEN: usize = 64;
187
188impl LevelInfoBuilder {
189 fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
191 if !Self::types_compatible(field.data_type(), array.data_type()) {
192 return Err(arrow_err!(format!(
193 "Incompatible type. Field '{}' has type {}, array has type {}",
194 field.name(),
195 field.data_type(),
196 array.data_type(),
197 )));
198 }
199
200 let is_nullable = field.is_nullable();
201
202 match array.data_type() {
203 d if is_leaf(d) => {
204 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
205 Ok(Self::Primitive(levels))
206 }
207 DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
208 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
209 Ok(Self::Primitive(levels))
210 }
211 DataType::RunEndEncoded(_, value_field) => {
212 let flat = expand_ree_array(array)?;
213 let flat_field = Field::new(
214 field.name(),
215 value_field.data_type().clone(),
216 field.is_nullable(),
217 );
218 Self::try_new(&flat_field, parent_ctx, &flat)
219 }
220 DataType::Struct(children) => {
221 let array = array.as_struct();
222 let def_level = match is_nullable {
223 true => parent_ctx.def_level + 1,
224 false => parent_ctx.def_level,
225 };
226
227 let ctx = LevelContext {
228 rep_level: parent_ctx.rep_level,
229 def_level,
230 };
231
232 let children = children
233 .iter()
234 .zip(array.columns())
235 .map(|(f, a)| Self::try_new(f, ctx, a))
236 .collect::<Result<_>>()?;
237
238 Ok(Self::Struct(children, ctx, array.nulls().cloned()))
239 }
240 DataType::List(child)
241 | DataType::LargeList(child)
242 | DataType::Map(child, _)
243 | DataType::FixedSizeList(child, _)
244 | DataType::ListView(child)
245 | DataType::LargeListView(child) => {
246 let def_level = match is_nullable {
247 true => parent_ctx.def_level + 2,
248 false => parent_ctx.def_level + 1,
249 };
250
251 let ctx = LevelContext {
252 rep_level: parent_ctx.rep_level + 1,
253 def_level,
254 };
255
256 Ok(match field.data_type() {
257 DataType::List(_) => {
258 let list = array.as_list();
259 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
260 let is_last = child.child_has_no_nested_rep();
261 let offsets = list.offsets().clone();
262 Self::List(
263 Box::new(child),
264 ctx,
265 offsets,
266 list.nulls().cloned(),
267 is_last,
268 )
269 }
270 DataType::LargeList(_) => {
271 let list = array.as_list();
272 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
273 let is_last = child.child_has_no_nested_rep();
274 let offsets = list.offsets().clone();
275 let nulls = list.nulls().cloned();
276 Self::LargeList(Box::new(child), ctx, offsets, nulls, is_last)
277 }
278 DataType::Map(_, _) => {
279 let map = array.as_map();
280 let entries = Arc::new(map.entries().clone()) as ArrayRef;
281 let child = Self::try_new(child.as_ref(), ctx, &entries)?;
282 let is_last = child.child_has_no_nested_rep();
283 let offsets = map.offsets().clone();
284 Self::List(Box::new(child), ctx, offsets, map.nulls().cloned(), is_last)
285 }
286 DataType::FixedSizeList(_, size) => {
287 let list = array.as_fixed_size_list();
288 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
289 let nulls = list.nulls().cloned();
290 Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
291 }
292 DataType::ListView(_) => {
293 let list = array.as_list_view();
294 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
295 let offsets = list.offsets().clone();
296 let sizes = list.sizes().clone();
297 let nulls = list.nulls().cloned();
298 Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
299 }
300 DataType::LargeListView(_) => {
301 let list = array.as_list_view();
302 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
303 let offsets = list.offsets().clone();
304 let sizes = list.sizes().clone();
305 let nulls = list.nulls().cloned();
306 Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
307 }
308 _ => unreachable!(),
309 })
310 }
311 d => Err(nyi_err!("Datatype {} is not yet supported", d)),
312 }
313 }
314
315 fn finish(self) -> Vec<ArrayLevels> {
318 match self {
319 LevelInfoBuilder::Primitive(v) => vec![v],
320 LevelInfoBuilder::List(v, _, _, _, _)
321 | LevelInfoBuilder::LargeList(v, _, _, _, _)
322 | LevelInfoBuilder::FixedSizeList(v, _, _, _)
323 | LevelInfoBuilder::ListView(v, _, _, _, _)
324 | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
325 LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
326 }
327 }
328
329 fn write(&mut self, range: Range<usize>) {
331 match self {
332 LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
333 LevelInfoBuilder::List(child, ctx, offsets, nulls, is_last) => {
334 Self::write_list(child, ctx, offsets, nulls.as_ref(), range, *is_last)
335 }
336 LevelInfoBuilder::LargeList(child, ctx, offsets, nulls, is_last) => {
337 Self::write_list(child, ctx, offsets, nulls.as_ref(), range, *is_last)
338 }
339 LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
340 Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
341 }
342 LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
343 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
344 }
345 LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
346 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
347 }
348 LevelInfoBuilder::Struct(children, ctx, nulls) => {
349 Self::write_struct(children, ctx, nulls.as_ref(), range)
350 }
351 }
352 }
353
354 fn child_has_no_nested_rep(&self) -> bool {
358 match self {
359 LevelInfoBuilder::Primitive(_) => true,
360 LevelInfoBuilder::Struct(children, _, _) => {
361 children.iter().all(|c| c.child_has_no_nested_rep())
362 }
363 _ => false,
364 }
365 }
366
367 fn write_list<O: OffsetSizeTrait>(
371 child: &mut LevelInfoBuilder,
372 ctx: &LevelContext,
373 offsets: &[O],
374 nulls: Option<&NullBuffer>,
375 range: Range<usize>,
376 is_last_level: bool,
377 ) {
378 if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
380 let count = range.end - range.start;
381 child.visit_leaves(|leaf| {
382 leaf.extend_uniform_levels(ctx.def_level - 2, ctx.rep_level - 1, count);
383 });
384 return;
385 }
386
387 if is_last_level {
395 Self::write_list_last_level(child, ctx, offsets, nulls, range);
396 return;
397 }
398
399 let offsets = &offsets[range.start..range.end + 1];
400
401 let write_non_null_slice =
402 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
403 child.write(start_idx..end_idx);
404 child.visit_leaves(|leaf| {
405 let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
406 let mut rev = rep_levels.iter_mut().rev();
407 let mut remaining = end_idx - start_idx;
408
409 loop {
410 let next = rev.next().unwrap();
411 if *next > ctx.rep_level {
412 continue;
414 }
415
416 remaining -= 1;
417 if remaining == 0 {
418 *next = ctx.rep_level - 1;
419 break;
420 }
421 }
422 })
423 };
424
425 let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
433 if count > 0 {
434 child.visit_leaves(|leaf| {
435 leaf.append_rep_level_run(ctx.rep_level - 1, count);
436 leaf.append_def_level_run(ctx.def_level - 2, count);
437 });
438 }
439 };
440
441 let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
442 if count > 0 {
443 child.visit_leaves(|leaf| {
444 leaf.append_rep_level_run(ctx.rep_level - 1, count);
445 leaf.append_def_level_run(ctx.def_level - 1, count);
446 });
447 }
448 };
449
450 match nulls {
451 Some(nulls) => {
452 let null_offset = range.start;
453 let mut pending_nulls: usize = 0;
454 let mut pending_empties: usize = 0;
455
456 for (idx, w) in offsets.windows(2).enumerate() {
458 let is_valid = nulls.is_valid(idx + null_offset);
459 let start_idx = w[0].as_usize();
460 let end_idx = w[1].as_usize();
461
462 if !is_valid {
463 write_empty_run(child, pending_empties);
464 pending_empties = 0;
465 pending_nulls += 1;
466 } else if start_idx == end_idx {
467 write_null_run(child, pending_nulls);
468 pending_nulls = 0;
469 pending_empties += 1;
470 } else {
471 write_null_run(child, pending_nulls);
472 pending_nulls = 0;
473 write_empty_run(child, pending_empties);
474 pending_empties = 0;
475 write_non_null_slice(child, start_idx, end_idx);
476 }
477 }
478 write_null_run(child, pending_nulls);
479 write_empty_run(child, pending_empties);
480 }
481 None => {
482 let mut pending_empties: usize = 0;
483 for w in offsets.windows(2) {
484 let start_idx = w[0].as_usize();
485 let end_idx = w[1].as_usize();
486 if start_idx == end_idx {
487 pending_empties += 1;
488 } else {
489 write_empty_run(child, pending_empties);
490 pending_empties = 0;
491 write_non_null_slice(child, start_idx, end_idx);
492 }
493 }
494 write_empty_run(child, pending_empties);
495 }
496 }
497 }
498
499 fn write_list_last_level<O: OffsetSizeTrait>(
507 child: &mut LevelInfoBuilder,
508 ctx: &LevelContext,
509 offsets: &[O],
510 nulls: Option<&NullBuffer>,
511 range: Range<usize>,
512 ) {
513 let null_offset = range.start;
514 let offsets = &offsets[range.start..range.end + 1];
515 let list_start_rep = ctx.rep_level - 1;
516
517 let emit_nulls = |child: &mut LevelInfoBuilder, count: usize| {
518 child.visit_leaves(|leaf| {
519 leaf.append_rep_level_run(list_start_rep, count);
520 leaf.append_def_level_run(ctx.def_level - 2, count);
521 });
522 };
523
524 let emit_empties = |child: &mut LevelInfoBuilder, count: usize| {
525 child.visit_leaves(|leaf| {
526 leaf.append_rep_level_run(list_start_rep, count);
527 leaf.append_def_level_run(ctx.def_level - 1, count);
528 });
529 };
530
531 let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| {
532 debug_assert!(run_offsets.len() >= 2);
533 let values_start = run_offsets[0].as_usize();
534 let values_end = run_offsets[run_offsets.len() - 1].as_usize();
535 debug_assert!(values_end > values_start);
536
537 child.write(values_start..values_end);
541
542 child.visit_leaves(|leaf| {
547 let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
548 let batch_len = values_end - values_start;
549 let batch_base = rep_levels.len() - batch_len;
550
551 for slot_offset in run_offsets.iter().take(run_offsets.len() - 1) {
552 let list_start_pos = batch_base + (slot_offset.as_usize() - values_start);
553 rep_levels[list_start_pos] = list_start_rep;
554 }
555 });
556 };
557
558 #[derive(Clone, Copy, PartialEq)]
560 enum SlotKind {
561 Null,
562 Empty,
563 NonEmpty,
564 }
565
566 let num_slots = offsets.len() - 1;
567 if num_slots == 0 {
568 return;
569 }
570
571 macro_rules! classify {
572 ($i:expr, $nulls:expr) => {
573 if !$nulls.is_valid($i + null_offset) {
574 SlotKind::Null
575 } else if offsets[$i] == offsets[$i + 1] {
576 SlotKind::Empty
577 } else {
578 SlotKind::NonEmpty
579 }
580 };
581 }
582
583 macro_rules! flush_run {
584 ($kind:expr, $start:expr, $end:expr) => {
585 match $kind {
586 SlotKind::Null => emit_nulls(child, $end - $start),
587 SlotKind::Empty => emit_empties(child, $end - $start),
588 SlotKind::NonEmpty => emit_non_empty_run(child, &offsets[$start..$end + 1]),
589 }
590 };
591 }
592
593 match nulls {
594 Some(nulls) => {
595 let mut run_kind = classify!(0, nulls);
596 let mut run_start: usize = 0;
597 for i in 1..num_slots {
598 let kind = classify!(i, nulls);
599 if kind != run_kind {
600 flush_run!(run_kind, run_start, i);
601 run_kind = kind;
602 run_start = i;
603 }
604 }
605 flush_run!(run_kind, run_start, num_slots);
606 }
607 None => {
608 let mut run_kind = if offsets[0] == offsets[1] {
609 SlotKind::Empty
610 } else {
611 SlotKind::NonEmpty
612 };
613 let mut run_start: usize = 0;
614 for i in 1..num_slots {
615 let kind = if offsets[i] == offsets[i + 1] {
616 SlotKind::Empty
617 } else {
618 SlotKind::NonEmpty
619 };
620 if kind != run_kind {
621 flush_run!(run_kind, run_start, i);
622 run_kind = kind;
623 run_start = i;
624 }
625 }
626 flush_run!(run_kind, run_start, num_slots);
627 }
628 }
629 }
630
631 fn write_list_view<O: OffsetSizeTrait>(
633 child: &mut LevelInfoBuilder,
634 ctx: &LevelContext,
635 offsets: &[O],
636 sizes: &[O],
637 nulls: Option<&NullBuffer>,
638 range: Range<usize>,
639 ) {
640 let offsets = &offsets[range.start..range.end];
641 let sizes = &sizes[range.start..range.end];
642
643 let write_non_null_slice =
644 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
645 child.write(start_idx..end_idx);
646 child.visit_leaves(|leaf| {
647 let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
648 let mut rev = rep_levels.iter_mut().rev();
649 let mut remaining = end_idx - start_idx;
650
651 loop {
652 let next = rev.next().unwrap();
653 if *next > ctx.rep_level {
654 continue;
656 }
657
658 remaining -= 1;
659 if remaining == 0 {
660 *next = ctx.rep_level - 1;
661 break;
662 }
663 }
664 })
665 };
666
667 let write_empty_slice = |child: &mut LevelInfoBuilder| {
668 child.visit_leaves(|leaf| {
669 leaf.append_rep_level_run(ctx.rep_level - 1, 1);
670 leaf.append_def_level_run(ctx.def_level - 1, 1);
671 })
672 };
673
674 let write_null_slice = |child: &mut LevelInfoBuilder| {
675 child.visit_leaves(|leaf| {
676 leaf.append_rep_level_run(ctx.rep_level - 1, 1);
677 leaf.append_def_level_run(ctx.def_level - 2, 1);
678 })
679 };
680
681 match nulls {
682 Some(nulls) => {
683 let null_offset = range.start;
684 for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
686 let is_valid = nulls.is_valid(idx + null_offset);
687 let start_idx = offset.as_usize();
688 let size = size.as_usize();
689 let end_idx = start_idx + size;
690 if !is_valid {
691 write_null_slice(child)
692 } else if size == 0 {
693 write_empty_slice(child)
694 } else {
695 write_non_null_slice(child, start_idx, end_idx)
696 }
697 }
698 }
699 None => {
700 for (offset, size) in offsets.iter().zip(sizes.iter()) {
701 let start_idx = offset.as_usize();
702 let size = size.as_usize();
703 let end_idx = start_idx + size;
704 if size == 0 {
705 write_empty_slice(child)
706 } else {
707 write_non_null_slice(child, start_idx, end_idx)
708 }
709 }
710 }
711 }
712 }
713
714 fn write_struct(
716 children: &mut [LevelInfoBuilder],
717 ctx: &LevelContext,
718 nulls: Option<&NullBuffer>,
719 range: Range<usize>,
720 ) {
721 let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
722 let len = range.end - range.start;
723 for child in children {
724 child.visit_leaves(|info| {
725 info.extend_uniform_levels(ctx.def_level - 1, ctx.rep_level, len);
726 })
727 }
728 };
729
730 if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
732 write_null(children, range);
733 return;
734 }
735
736 let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
737 for child in children {
738 child.write(range.clone())
739 }
740 };
741
742 match nulls {
743 Some(validity) => {
744 let mut last_non_null_idx = None;
745 let mut last_null_idx = None;
746
747 for i in range.clone() {
749 match validity.is_valid(i) {
750 true => {
751 if let Some(last_idx) = last_null_idx.take() {
752 write_null(children, last_idx..i)
753 }
754 last_non_null_idx.get_or_insert(i);
755 }
756 false => {
757 if let Some(last_idx) = last_non_null_idx.take() {
758 write_non_null(children, last_idx..i)
759 }
760 last_null_idx.get_or_insert(i);
761 }
762 }
763 }
764
765 if let Some(last_idx) = last_null_idx.take() {
766 write_null(children, last_idx..range.end)
767 }
768
769 if let Some(last_idx) = last_non_null_idx.take() {
770 write_non_null(children, last_idx..range.end)
771 }
772 }
773 None => write_non_null(children, range),
774 }
775 }
776
777 fn write_fixed_size_list(
779 child: &mut LevelInfoBuilder,
780 ctx: &LevelContext,
781 fixed_size: usize,
782 nulls: Option<&NullBuffer>,
783 range: Range<usize>,
784 ) {
785 if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
787 let count = range.end - range.start;
788 child.visit_leaves(|leaf| {
789 leaf.extend_uniform_levels(ctx.def_level - 2, ctx.rep_level - 1, count);
790 });
791 return;
792 }
793
794 let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
795 let values_start = start_idx * fixed_size;
796 let values_end = end_idx * fixed_size;
797 child.write(values_start..values_end);
798
799 child.visit_leaves(|leaf| {
800 let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
801
802 let row_indices = (0..fixed_size)
803 .rev()
804 .cycle()
805 .take(values_end - values_start);
806
807 rep_levels
809 .iter_mut()
810 .rev()
811 .filter(|&&mut r| r == ctx.rep_level)
813 .zip(row_indices)
814 .for_each(|(r, idx)| {
815 if idx == 0 {
816 *r = ctx.rep_level - 1;
817 }
818 });
819 })
820 };
821
822 let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
824 let len = end_idx - start_idx;
825 child.visit_leaves(|leaf| {
826 leaf.append_rep_level_run(ctx.rep_level - 1, len);
827 leaf.append_def_level_run(ctx.def_level - 1, len);
828 })
829 };
830
831 let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
832 if fixed_size > 0 {
833 write_non_null(child, start_idx, end_idx)
834 } else {
835 write_empty(child, start_idx, end_idx)
836 }
837 };
838
839 match nulls {
840 Some(nulls) => {
841 let mut start_idx = None;
842 for idx in range.clone() {
843 if nulls.is_valid(idx) {
844 start_idx.get_or_insert(idx);
846 } else {
847 if let Some(start) = start_idx.take() {
849 write_rows(child, start, idx);
850 }
851 child.visit_leaves(|leaf| {
853 leaf.append_rep_level_run(ctx.rep_level - 1, 1);
854 leaf.append_def_level_run(ctx.def_level - 2, 1);
855 })
856 }
857 }
858 if let Some(start) = start_idx.take() {
860 write_rows(child, start, range.end);
861 }
862 }
863 None => write_rows(child, range.start, range.end),
865 }
866 }
867
868 fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
870 let len = range.end - range.start;
871
872 if let Some(nulls) = &info.logical_nulls {
874 if !matches!(info.def_levels, LevelData::Absent) && nulls.null_count() == nulls.len() {
875 info.extend_uniform_levels(info.max_def_level - 1, info.max_rep_level, len);
876 return;
877 }
878 }
879
880 if matches!(info.def_levels, LevelData::Absent) {
881 info.non_null_indices.extend(range.clone());
882 } else {
883 let max_def_level = info.max_def_level;
884 match &info.logical_nulls {
885 Some(nulls) => {
886 assert!(range.end <= nulls.len());
887 if len >= BULK_FILL_MIN_LEN && nulls.null_count() * 2 >= nulls.len() {
892 let range_nulls = nulls.slice(range.start, len);
893 let valid_in_range = len - range_nulls.null_count();
894 let null_def_level = max_def_level - 1;
895 let buf = info
896 .def_levels
897 .materialize_mut()
898 .expect("definition levels present");
899 let base = buf.len();
900 buf.resize(base + len, null_def_level);
901 for i in range_nulls.valid_indices() {
902 buf[base + i] = max_def_level;
903 }
904 info.non_null_indices.reserve(valid_in_range);
905 info.non_null_indices
906 .extend(range_nulls.valid_indices().map(|i| i + range.start));
907 } else {
908 let bits = nulls.inner();
909 info.def_levels.extend_from_iter(range.clone().map(|i| {
910 let valid = unsafe { bits.value_unchecked(i) };
912 max_def_level - (!valid as i16)
913 }));
914 info.non_null_indices.reserve(len);
915 info.non_null_indices.extend(
916 BitIndexIterator::new(bits.inner(), bits.offset() + range.start, len)
917 .map(|i| i + range.start),
918 );
919 }
920 }
921 None => {
922 info.append_def_level_run(max_def_level, len);
923 info.non_null_indices.reserve(len);
924 info.non_null_indices.extend(range.clone());
925 }
926 }
927 }
928
929 if !matches!(info.rep_levels, LevelData::Absent) {
930 info.append_rep_level_run(info.max_rep_level, len);
931 }
932 }
933
934 fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
936 match self {
937 LevelInfoBuilder::Primitive(info) => visit(info),
938 LevelInfoBuilder::List(c, _, _, _, _)
939 | LevelInfoBuilder::LargeList(c, _, _, _, _)
940 | LevelInfoBuilder::FixedSizeList(c, _, _, _)
941 | LevelInfoBuilder::ListView(c, _, _, _, _)
942 | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
943 LevelInfoBuilder::Struct(children, _, _) => {
944 for c in children {
945 c.visit_leaves(visit)
946 }
947 }
948 }
949 }
950
951 fn types_compatible(a: &DataType, b: &DataType) -> bool {
957 if a.equals_datatype(b) {
959 return true;
960 }
961
962 let (a, b) = match (a, b) {
964 (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
965 (va.as_ref(), vb.as_ref())
966 }
967 (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
968 (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
969 _ => (a, b),
970 };
971
972 if a == b {
975 return true;
976 }
977
978 match a {
981 DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
983 DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
984 DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
985
986 DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
988 DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
989 DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
990
991 _ => false,
993 }
994 }
995}
996
997#[derive(Debug, Clone)]
1000pub(crate) enum LevelData {
1001 Absent,
1002 Materialized(Vec<i16>),
1003 Uniform { value: i16, count: usize },
1004}
1005
1006impl PartialEq for LevelData {
1009 fn eq(&self, other: &Self) -> bool {
1010 match (self, other) {
1011 (Self::Absent, Self::Absent) => true,
1012 (Self::Materialized(a), Self::Materialized(b)) => a == b,
1013 (Self::Uniform { value: v, count: n }, Self::Materialized(b))
1014 | (Self::Materialized(b), Self::Uniform { value: v, count: n }) => {
1015 b.len() == *n && b.iter().all(|x| x == v)
1016 }
1017 (
1018 Self::Uniform {
1019 value: v1,
1020 count: n1,
1021 },
1022 Self::Uniform {
1023 value: v2,
1024 count: n2,
1025 },
1026 ) => v1 == v2 && n1 == n2,
1027 _ => false,
1028 }
1029 }
1030}
1031
1032impl Eq for LevelData {}
1033
1034impl LevelData {
1035 fn new(present: bool) -> Self {
1036 match present {
1037 true => Self::Materialized(Vec::new()),
1038 false => Self::Absent,
1039 }
1040 }
1041
1042 pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
1043 match self {
1044 Self::Absent => LevelDataRef::Absent,
1045 Self::Materialized(values) => LevelDataRef::Materialized(values),
1046 Self::Uniform { value, count } => LevelDataRef::Uniform {
1047 value: *value,
1048 count: *count,
1049 },
1050 }
1051 }
1052
1053 pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
1054 match self {
1055 Self::Absent => Self::Absent,
1056 Self::Materialized(values) => Self::Materialized(values[offset..offset + len].to_vec()),
1057 Self::Uniform { value, .. } => Self::Uniform {
1058 value: *value,
1059 count: len,
1060 },
1061 }
1062 }
1063
1064 fn append_run(&mut self, value: i16, count: usize) {
1065 if count == 0 {
1066 return;
1067 }
1068
1069 match self {
1070 Self::Absent => {}
1073 Self::Materialized(values) if values.is_empty() => {
1076 *self = Self::Uniform { value, count };
1077 }
1078 Self::Materialized(values) => values.extend(std::iter::repeat_n(value, count)),
1080 Self::Uniform {
1083 value: uniform_value,
1084 count: uniform_count,
1085 } if *uniform_value == value => {
1086 *uniform_count += count;
1087 }
1088 Self::Uniform { .. } => {
1091 let values = self.materialize_mut().unwrap();
1092 values.extend(std::iter::repeat_n(value, count));
1093 }
1094 }
1095 }
1096
1097 fn extend_from_iter<I>(&mut self, iter: I)
1098 where
1099 I: IntoIterator<Item = i16>,
1100 {
1101 if let Some(values) = self.materialize_mut() {
1102 values.extend(iter);
1103 }
1104 }
1105
1106 fn materialize_mut(&mut self) -> Option<&mut Vec<i16>> {
1109 match self {
1110 Self::Absent => None,
1111 Self::Materialized(values) => Some(values),
1112 Self::Uniform { value, count } => {
1113 let values = vec![*value; *count];
1114 *self = Self::Materialized(values);
1115 match self {
1116 Self::Materialized(values) => Some(values),
1117 _ => unreachable!(),
1118 }
1119 }
1120 }
1121 }
1122}
1123
1124#[derive(Debug, Clone)]
1125pub(crate) struct ArrayLevels {
1126 def_levels: LevelData,
1130
1131 rep_levels: LevelData,
1135
1136 non_null_indices: Vec<usize>,
1139
1140 max_def_level: i16,
1142
1143 max_rep_level: i16,
1145
1146 array: ArrayRef,
1148
1149 logical_nulls: Option<NullBuffer>,
1151}
1152
1153impl PartialEq for ArrayLevels {
1154 fn eq(&self, other: &Self) -> bool {
1155 self.def_levels == other.def_levels
1156 && self.rep_levels == other.rep_levels
1157 && self.non_null_indices == other.non_null_indices
1158 && self.max_def_level == other.max_def_level
1159 && self.max_rep_level == other.max_rep_level
1160 && self.array.as_ref() == other.array.as_ref()
1161 && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
1162 }
1163}
1164impl Eq for ArrayLevels {}
1165
1166impl ArrayLevels {
1167 fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
1168 let max_rep_level = ctx.rep_level;
1169 let max_def_level = match is_nullable {
1170 true => ctx.def_level + 1,
1171 false => ctx.def_level,
1172 };
1173
1174 let logical_nulls = array.logical_nulls();
1175
1176 Self {
1177 def_levels: LevelData::new(max_def_level != 0),
1178 rep_levels: LevelData::new(max_rep_level != 0),
1179 non_null_indices: vec![],
1180 max_def_level,
1181 max_rep_level,
1182 array,
1183 logical_nulls,
1184 }
1185 }
1186
1187 pub fn array(&self) -> &ArrayRef {
1188 &self.array
1189 }
1190
1191 pub(crate) fn def_level_data(&self) -> &LevelData {
1192 &self.def_levels
1193 }
1194
1195 pub(crate) fn rep_level_data(&self) -> &LevelData {
1196 &self.rep_levels
1197 }
1198
1199 pub fn non_null_indices(&self) -> &[usize] {
1200 &self.non_null_indices
1201 }
1202
1203 pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
1209 let def_levels = self.def_levels.slice(chunk.level_offset, chunk.num_levels);
1210 let rep_levels = self.rep_levels.slice(chunk.level_offset, chunk.num_levels);
1211
1212 let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
1214 let start = nni.first().copied().unwrap_or(0);
1219 let end = nni.last().map_or(0, |&i| i + 1);
1220 let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
1222 let array = self.array.slice(start, end - start);
1224 let logical_nulls = array.logical_nulls();
1225
1226 Self {
1227 def_levels,
1228 rep_levels,
1229 non_null_indices,
1230 max_def_level: self.max_def_level,
1231 max_rep_level: self.max_rep_level,
1232 array,
1233 logical_nulls,
1234 }
1235 }
1236
1237 fn extend_uniform_levels(&mut self, def_val: i16, rep_val: i16, count: usize) {
1239 self.def_levels.append_run(def_val, count);
1240 self.rep_levels.append_run(rep_val, count);
1241 }
1242
1243 fn append_def_level_run(&mut self, value: i16, count: usize) {
1244 self.def_levels.append_run(value, count);
1245 }
1246
1247 fn append_rep_level_run(&mut self, value: i16, count: usize) {
1248 self.rep_levels.append_run(value, count);
1249 }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254 use super::*;
1255 use crate::column::chunker::CdcChunk;
1256
1257 use arrow_array::builder::*;
1258 use arrow_array::types::Int32Type;
1259 use arrow_array::*;
1260 use arrow_buffer::{Buffer, ToByteSlice};
1261 use arrow_cast::display::array_value_to_string;
1262 use arrow_data::{ArrayData, ArrayDataBuilder};
1263 use arrow_schema::{Fields, Schema};
1264
1265 #[test]
1266 fn test_calculate_array_levels_twitter_example() {
1267 let leaf_type = Field::new_list_field(DataType::Int32, false);
1271 let inner_type = DataType::List(Arc::new(leaf_type));
1272 let inner_field = Field::new("l2", inner_type.clone(), false);
1273 let outer_type = DataType::List(Arc::new(inner_field));
1274 let outer_field = Field::new("l1", outer_type.clone(), false);
1275
1276 let primitives = Int32Array::from_iter(0..10);
1277
1278 let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
1280 let inner_list = ArrayDataBuilder::new(inner_type)
1281 .len(4)
1282 .add_buffer(offsets)
1283 .add_child_data(primitives.to_data())
1284 .build()
1285 .unwrap();
1286
1287 let offsets = Buffer::from_iter([0_i32, 2, 4]);
1288 let outer_list = ArrayDataBuilder::new(outer_type)
1289 .len(2)
1290 .add_buffer(offsets)
1291 .add_child_data(inner_list)
1292 .build()
1293 .unwrap();
1294 let outer_list = make_array(outer_list);
1295
1296 let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
1297 assert_eq!(levels.len(), 1);
1298
1299 let expected = ArrayLevels {
1300 def_levels: LevelData::Materialized(vec![2; 10]),
1301 rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
1302 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
1303 max_def_level: 2,
1304 max_rep_level: 2,
1305 array: Arc::new(primitives),
1306 logical_nulls: None,
1307 };
1308 assert_eq!(&levels[0], &expected);
1309 }
1310
1311 #[test]
1312 fn test_calculate_one_level_1() {
1313 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
1315 let field = Field::new_list_field(DataType::Int32, false);
1316
1317 let levels = calculate_array_levels(&array, &field).unwrap();
1318 assert_eq!(levels.len(), 1);
1319
1320 let expected_levels = ArrayLevels {
1321 def_levels: LevelData::Absent,
1322 rep_levels: LevelData::Absent,
1323 non_null_indices: (0..10).collect(),
1324 max_def_level: 0,
1325 max_rep_level: 0,
1326 array,
1327 logical_nulls: None,
1328 };
1329 assert_eq!(&levels[0], &expected_levels);
1330 }
1331
1332 #[test]
1333 fn test_calculate_one_level_2() {
1334 let array = Arc::new(Int32Array::from_iter([
1336 Some(0),
1337 None,
1338 Some(0),
1339 Some(0),
1340 None,
1341 ])) as ArrayRef;
1342 let field = Field::new_list_field(DataType::Int32, true);
1343
1344 let levels = calculate_array_levels(&array, &field).unwrap();
1345 assert_eq!(levels.len(), 1);
1346
1347 let logical_nulls = array.logical_nulls();
1348 let expected_levels = ArrayLevels {
1349 def_levels: LevelData::Materialized(vec![1, 0, 1, 1, 0]),
1350 rep_levels: LevelData::Absent,
1351 non_null_indices: vec![0, 2, 3],
1352 max_def_level: 1,
1353 max_rep_level: 0,
1354 array,
1355 logical_nulls,
1356 };
1357 assert_eq!(&levels[0], &expected_levels);
1358 }
1359
1360 #[test]
1361 fn test_calculate_array_levels_1() {
1362 let leaf_field = Field::new_list_field(DataType::Int32, false);
1363 let list_type = DataType::List(Arc::new(leaf_field));
1364
1365 let leaf_array = Int32Array::from_iter(0..5);
1369 let offsets = Buffer::from_iter(0_i32..6);
1371 let list = ArrayDataBuilder::new(list_type.clone())
1372 .len(5)
1373 .add_buffer(offsets)
1374 .add_child_data(leaf_array.to_data())
1375 .build()
1376 .unwrap();
1377 let list = make_array(list);
1378
1379 let list_field = Field::new("list", list_type.clone(), false);
1380 let levels = calculate_array_levels(&list, &list_field).unwrap();
1381 assert_eq!(levels.len(), 1);
1382
1383 let expected_levels = ArrayLevels {
1384 def_levels: LevelData::Materialized(vec![1; 5]),
1385 rep_levels: LevelData::Materialized(vec![0; 5]),
1386 non_null_indices: (0..5).collect(),
1387 max_def_level: 1,
1388 max_rep_level: 1,
1389 array: Arc::new(leaf_array),
1390 logical_nulls: None,
1391 };
1392 assert_eq!(&levels[0], &expected_levels);
1393
1394 let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
1403 let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
1404 let list = ArrayDataBuilder::new(list_type.clone())
1405 .len(5)
1406 .add_buffer(offsets)
1407 .add_child_data(leaf_array.to_data())
1408 .null_bit_buffer(Some(Buffer::from([0b00011101])))
1409 .build()
1410 .unwrap();
1411 let list = make_array(list);
1412
1413 let list_field = Field::new("list", list_type, true);
1414 let levels = calculate_array_levels(&list, &list_field).unwrap();
1415 assert_eq!(levels.len(), 1);
1416
1417 let expected_levels = ArrayLevels {
1418 def_levels: LevelData::Materialized(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1419 rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1420 non_null_indices: (0..11).collect(),
1421 max_def_level: 2,
1422 max_rep_level: 1,
1423 array: Arc::new(leaf_array),
1424 logical_nulls: None,
1425 };
1426 assert_eq!(&levels[0], &expected_levels);
1427 }
1428
1429 #[test]
1430 fn test_calculate_array_levels_2() {
1431 let leaf = Int32Array::from_iter(0..11);
1445 let leaf_field = Field::new("leaf", DataType::Int32, false);
1446
1447 let list_type = DataType::List(Arc::new(leaf_field));
1448 let list = ArrayData::builder(list_type.clone())
1449 .len(5)
1450 .add_child_data(leaf.to_data())
1451 .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1452 .build()
1453 .unwrap();
1454
1455 let list = make_array(list);
1456 let list_field = Arc::new(Field::new("list", list_type, true));
1457
1458 let struct_array =
1459 StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1460 let array = Arc::new(struct_array) as ArrayRef;
1461
1462 let struct_field = Field::new("struct", array.data_type().clone(), true);
1463
1464 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1465 assert_eq!(levels.len(), 1);
1466
1467 let expected_levels = ArrayLevels {
1468 def_levels: LevelData::Materialized(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1469 rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1470 non_null_indices: (4..11).collect(),
1471 max_def_level: 3,
1472 max_rep_level: 1,
1473 array: Arc::new(leaf),
1474 logical_nulls: None,
1475 };
1476
1477 assert_eq!(&levels[0], &expected_levels);
1478
1479 let leaf = Int32Array::from_iter(100..122);
1488 let leaf_field = Field::new("leaf", DataType::Int32, true);
1489
1490 let l1_type = DataType::List(Arc::new(leaf_field));
1491 let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1492 let l1 = ArrayData::builder(l1_type.clone())
1493 .len(11)
1494 .add_child_data(leaf.to_data())
1495 .add_buffer(offsets)
1496 .build()
1497 .unwrap();
1498
1499 let l1_field = Field::new("l1", l1_type, true);
1500 let l2_type = DataType::List(Arc::new(l1_field));
1501 let l2 = ArrayData::builder(l2_type)
1502 .len(5)
1503 .add_child_data(l1)
1504 .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1505 .build()
1506 .unwrap();
1507
1508 let l2 = make_array(l2);
1509 let l2_field = Field::new("l2", l2.data_type().clone(), true);
1510
1511 let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1512 assert_eq!(levels.len(), 1);
1513
1514 let expected_levels = ArrayLevels {
1515 def_levels: LevelData::Materialized(vec![
1516 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1517 ]),
1518 rep_levels: LevelData::Materialized(vec![
1519 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1520 ]),
1521 non_null_indices: (0..22).collect(),
1522 max_def_level: 5,
1523 max_rep_level: 2,
1524 array: Arc::new(leaf),
1525 logical_nulls: None,
1526 };
1527
1528 assert_eq!(&levels[0], &expected_levels);
1529 }
1530
1531 #[test]
1532 fn test_calculate_array_levels_nested_list() {
1533 let leaf_field = Field::new("leaf", DataType::Int32, false);
1534 let list_type = DataType::List(Arc::new(leaf_field));
1535
1536 let leaf = Int32Array::from_iter([0; 4]);
1544 let list = ArrayData::builder(list_type.clone())
1545 .len(4)
1546 .add_buffer(Buffer::from_iter(0_i32..5))
1547 .add_child_data(leaf.to_data())
1548 .build()
1549 .unwrap();
1550 let list = make_array(list);
1551
1552 let list_field = Field::new("list", list_type.clone(), false);
1553 let levels = calculate_array_levels(&list, &list_field).unwrap();
1554 assert_eq!(levels.len(), 1);
1555
1556 let expected_levels = ArrayLevels {
1557 def_levels: LevelData::Materialized(vec![1; 4]),
1558 rep_levels: LevelData::Materialized(vec![0; 4]),
1559 non_null_indices: (0..4).collect(),
1560 max_def_level: 1,
1561 max_rep_level: 1,
1562 array: Arc::new(leaf),
1563 logical_nulls: None,
1564 };
1565 assert_eq!(&levels[0], &expected_levels);
1566
1567 let leaf = Int32Array::from_iter(0..8);
1572 let list = ArrayData::builder(list_type.clone())
1573 .len(4)
1574 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1575 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1576 .add_child_data(leaf.to_data())
1577 .build()
1578 .unwrap();
1579 let list = make_array(list);
1580 let list_field = Arc::new(Field::new("list", list_type, true));
1581
1582 let struct_array = StructArray::from(vec![(list_field, list)]);
1583 let array = Arc::new(struct_array) as ArrayRef;
1584
1585 let struct_field = Field::new("struct", array.data_type().clone(), true);
1586 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1587 assert_eq!(levels.len(), 1);
1588
1589 let expected_levels = ArrayLevels {
1590 def_levels: LevelData::Materialized(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1591 rep_levels: LevelData::Materialized(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1592 non_null_indices: (0..7).collect(),
1593 max_def_level: 3,
1594 max_rep_level: 1,
1595 array: Arc::new(leaf),
1596 logical_nulls: None,
1597 };
1598 assert_eq!(&levels[0], &expected_levels);
1599
1600 let leaf = Int32Array::from_iter(201..216);
1608 let leaf_field = Field::new("leaf", DataType::Int32, false);
1609 let list_1_type = DataType::List(Arc::new(leaf_field));
1610 let list_1 = ArrayData::builder(list_1_type.clone())
1611 .len(7)
1612 .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1613 .add_child_data(leaf.to_data())
1614 .build()
1615 .unwrap();
1616
1617 let list_1_field = Field::new("l1", list_1_type, true);
1618 let list_2_type = DataType::List(Arc::new(list_1_field));
1619 let list_2 = ArrayData::builder(list_2_type.clone())
1620 .len(4)
1621 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1622 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1623 .add_child_data(list_1)
1624 .build()
1625 .unwrap();
1626
1627 let list_2 = make_array(list_2);
1628 let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1629
1630 let struct_array =
1631 StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1632 let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1633
1634 let array = Arc::new(struct_array) as ArrayRef;
1635 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1636 assert_eq!(levels.len(), 1);
1637
1638 let expected_levels = ArrayLevels {
1639 def_levels: LevelData::Materialized(vec![
1640 1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5,
1641 ]),
1642 rep_levels: LevelData::Materialized(vec![
1643 0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2,
1644 ]),
1645 non_null_indices: (0..15).collect(),
1646 max_def_level: 5,
1647 max_rep_level: 2,
1648 array: Arc::new(leaf),
1649 logical_nulls: None,
1650 };
1651 assert_eq!(&levels[0], &expected_levels);
1652 }
1653
1654 #[test]
1655 fn test_calculate_nested_struct_levels() {
1656 let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1666 let leaf = Arc::new(c) as ArrayRef;
1667 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1668 let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1669
1670 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1671 let a = StructArray::from((
1672 (vec![(b_field, Arc::new(b) as ArrayRef)]),
1673 Buffer::from([0b00101111]),
1674 ));
1675
1676 let a_field = Field::new("a", a.data_type().clone(), true);
1677 let a_array = Arc::new(a) as ArrayRef;
1678
1679 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1680 assert_eq!(levels.len(), 1);
1681
1682 let logical_nulls = leaf.logical_nulls();
1683 let expected_levels = ArrayLevels {
1684 def_levels: LevelData::Materialized(vec![3, 2, 3, 1, 0, 3]),
1685 rep_levels: LevelData::Absent,
1686 non_null_indices: vec![0, 2, 5],
1687 max_def_level: 3,
1688 max_rep_level: 0,
1689 array: leaf,
1690 logical_nulls,
1691 };
1692 assert_eq!(&levels[0], &expected_levels);
1693 }
1694
1695 #[test]
1696 fn list_single_column() {
1697 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1700 let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1701 let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1702 let a_list_data = ArrayData::builder(a_list_type.clone())
1703 .len(5)
1704 .add_buffer(a_value_offsets)
1705 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1706 .add_child_data(a_values.to_data())
1707 .build()
1708 .unwrap();
1709
1710 assert_eq!(a_list_data.null_count(), 1);
1711
1712 let a = ListArray::from(a_list_data);
1713
1714 let item_field = Field::new_list_field(a_list_type, true);
1715 let mut builder = levels(&item_field, a);
1716 builder.write(2..4);
1717 let levels = builder.finish();
1718
1719 assert_eq!(levels.len(), 1);
1720
1721 let list_level = &levels[0];
1722
1723 let expected_level = ArrayLevels {
1724 def_levels: LevelData::Materialized(vec![0, 3, 3, 3]),
1725 rep_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
1726 non_null_indices: vec![3, 4, 5],
1727 max_def_level: 3,
1728 max_rep_level: 1,
1729 array: Arc::new(a_values),
1730 logical_nulls: None,
1731 };
1732 assert_eq!(list_level, &expected_level);
1733 }
1734
1735 #[test]
1736 fn mixed_struct_list() {
1737 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1741 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1742 let struct_field_g = Arc::new(Field::new(
1743 "g",
1744 DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1745 false,
1746 ));
1747 let struct_field_e = Arc::new(Field::new(
1748 "e",
1749 DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1750 true,
1751 ));
1752 let schema = Schema::new(vec![
1753 Field::new("a", DataType::Int32, false),
1754 Field::new("b", DataType::Int32, true),
1755 Field::new(
1756 "c",
1757 DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1758 true, ),
1760 ]);
1761
1762 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1764 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1765 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1766 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1767
1768 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1769
1770 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1773
1774 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1776 .len(5)
1777 .add_buffer(g_value_offsets)
1778 .add_child_data(g_value.into_data())
1779 .build()
1780 .unwrap();
1781 let g = ListArray::from(g_list_data);
1782
1783 let e = StructArray::from(vec![
1784 (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1785 (struct_field_g, Arc::new(g) as ArrayRef),
1786 ]);
1787
1788 let c = StructArray::from(vec![
1789 (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1790 (struct_field_e, Arc::new(e) as ArrayRef),
1791 ]);
1792
1793 let batch = RecordBatch::try_new(
1795 Arc::new(schema),
1796 vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1797 )
1798 .unwrap();
1799
1800 let mut levels = vec![];
1803 batch
1804 .columns()
1805 .iter()
1806 .zip(batch.schema().fields())
1807 .for_each(|(array, field)| {
1808 let mut array_levels = calculate_array_levels(array, field).unwrap();
1809 levels.append(&mut array_levels);
1810 });
1811 assert_eq!(levels.len(), 5);
1812
1813 let list_level = &levels[0];
1815
1816 let expected_level = ArrayLevels {
1817 def_levels: LevelData::Absent,
1818 rep_levels: LevelData::Absent,
1819 non_null_indices: vec![0, 1, 2, 3, 4],
1820 max_def_level: 0,
1821 max_rep_level: 0,
1822 array: Arc::new(a),
1823 logical_nulls: None,
1824 };
1825 assert_eq!(list_level, &expected_level);
1826
1827 let list_level = levels.get(1).unwrap();
1829
1830 let b_logical_nulls = b.logical_nulls();
1831 let expected_level = ArrayLevels {
1832 def_levels: LevelData::Materialized(vec![1, 0, 0, 1, 1]),
1833 rep_levels: LevelData::Absent,
1834 non_null_indices: vec![0, 3, 4],
1835 max_def_level: 1,
1836 max_rep_level: 0,
1837 array: Arc::new(b),
1838 logical_nulls: b_logical_nulls,
1839 };
1840 assert_eq!(list_level, &expected_level);
1841
1842 let list_level = levels.get(2).unwrap();
1844
1845 let d_logical_nulls = d.logical_nulls();
1846 let expected_level = ArrayLevels {
1847 def_levels: LevelData::Materialized(vec![1, 1, 1, 2, 1]),
1848 rep_levels: LevelData::Absent,
1849 non_null_indices: vec![3],
1850 max_def_level: 2,
1851 max_rep_level: 0,
1852 array: Arc::new(d),
1853 logical_nulls: d_logical_nulls,
1854 };
1855 assert_eq!(list_level, &expected_level);
1856
1857 let list_level = levels.get(3).unwrap();
1859
1860 let f_logical_nulls = f.logical_nulls();
1861 let expected_level = ArrayLevels {
1862 def_levels: LevelData::Materialized(vec![3, 2, 3, 2, 3]),
1863 rep_levels: LevelData::Absent,
1864 non_null_indices: vec![0, 2, 4],
1865 max_def_level: 3,
1866 max_rep_level: 0,
1867 array: Arc::new(f),
1868 logical_nulls: f_logical_nulls,
1869 };
1870 assert_eq!(list_level, &expected_level);
1871 }
1872
1873 #[test]
1874 fn test_null_vs_nonnull_struct() {
1875 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1877 let schema = Schema::new(vec![Field::new(
1878 "some_nested_object",
1879 DataType::Struct(vec![offset_field.clone()].into()),
1880 false,
1881 )]);
1882
1883 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1885
1886 let some_nested_object =
1887 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1888
1889 let batch =
1891 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1892
1893 let struct_null_level =
1894 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1895
1896 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1899 let schema = Schema::new(vec![Field::new(
1900 "some_nested_object",
1901 DataType::Struct(vec![offset_field.clone()].into()),
1902 true,
1903 )]);
1904
1905 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1907
1908 let some_nested_object =
1909 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1910
1911 let batch =
1913 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1914
1915 let struct_non_null_level =
1916 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1917
1918 if struct_non_null_level == struct_null_level {
1920 panic!("Levels should not be equal, to reflect the difference in struct nullness");
1921 }
1922 }
1923
1924 #[test]
1925 fn test_map_array() {
1926 let json_content = r#"
1928 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1929 {"stocks":{"long": "$CCC", "short": null}}
1930 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1931 "#;
1932 let entries_struct_type = DataType::Struct(Fields::from(vec![
1933 Field::new("key", DataType::Utf8, false),
1934 Field::new("value", DataType::Utf8, true),
1935 ]));
1936 let stocks_field = Field::new(
1937 "stocks",
1938 DataType::Map(
1939 Arc::new(Field::new("entries", entries_struct_type, false)),
1940 false,
1941 ),
1942 false,
1944 );
1945 let schema = Arc::new(Schema::new(vec![stocks_field]));
1946 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1947 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1948
1949 let batch = reader.next().unwrap().unwrap();
1950
1951 let mut levels = vec![];
1953 batch
1954 .columns()
1955 .iter()
1956 .zip(batch.schema().fields())
1957 .for_each(|(array, field)| {
1958 let mut array_levels = calculate_array_levels(array, field).unwrap();
1959 levels.append(&mut array_levels);
1960 });
1961 assert_eq!(levels.len(), 2);
1962
1963 let map = batch.column(0).as_map();
1964 let map_keys_logical_nulls = map.keys().logical_nulls();
1965
1966 let list_level = &levels[0];
1968
1969 let expected_level = ArrayLevels {
1970 def_levels: LevelData::Materialized(vec![1; 7]),
1971 rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
1972 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1973 max_def_level: 1,
1974 max_rep_level: 1,
1975 array: map.keys().clone(),
1976 logical_nulls: map_keys_logical_nulls,
1977 };
1978 assert_eq!(list_level, &expected_level);
1979
1980 let list_level = levels.get(1).unwrap();
1982 let map_values_logical_nulls = map.values().logical_nulls();
1983
1984 let expected_level = ArrayLevels {
1985 def_levels: LevelData::Materialized(vec![2, 2, 2, 1, 2, 1, 2]),
1986 rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
1987 non_null_indices: vec![0, 1, 2, 4, 6],
1988 max_def_level: 2,
1989 max_rep_level: 1,
1990 array: map.values().clone(),
1991 logical_nulls: map_values_logical_nulls,
1992 };
1993 assert_eq!(list_level, &expected_level);
1994 }
1995
1996 #[test]
1997 fn test_list_of_struct() {
1998 let int_field = Field::new("a", DataType::Int32, true);
2000 let fields = Fields::from([Arc::new(int_field)]);
2001 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
2002 let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
2003
2004 let int_builder = Int32Builder::with_capacity(10);
2005 let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
2006 let mut list_builder = ListBuilder::new(struct_builder);
2007
2008 let values = list_builder.values();
2012 values
2013 .field_builder::<Int32Builder>(0)
2014 .unwrap()
2015 .append_value(1);
2016 values.append(true);
2017 list_builder.append(true);
2018
2019 list_builder.append(true);
2021
2022 list_builder.append(false);
2024
2025 let values = list_builder.values();
2027 values
2028 .field_builder::<Int32Builder>(0)
2029 .unwrap()
2030 .append_null();
2031 values.append(false);
2032 values
2033 .field_builder::<Int32Builder>(0)
2034 .unwrap()
2035 .append_null();
2036 values.append(false);
2037 list_builder.append(true);
2038
2039 let values = list_builder.values();
2041 values
2042 .field_builder::<Int32Builder>(0)
2043 .unwrap()
2044 .append_null();
2045 values.append(true);
2046 list_builder.append(true);
2047
2048 let values = list_builder.values();
2050 values
2051 .field_builder::<Int32Builder>(0)
2052 .unwrap()
2053 .append_value(2);
2054 values.append(true);
2055 list_builder.append(true);
2056
2057 let array = Arc::new(list_builder.finish());
2058
2059 let values = array.values().as_struct().column(0).clone();
2060 let values_len = values.len();
2061 assert_eq!(values_len, 5);
2062
2063 let schema = Arc::new(Schema::new(vec![list_field]));
2064
2065 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
2066
2067 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
2068 let list_level = &levels[0];
2069
2070 let logical_nulls = values.logical_nulls();
2071 let expected_level = ArrayLevels {
2072 def_levels: LevelData::Materialized(vec![4, 1, 0, 2, 2, 3, 4]),
2073 rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 0, 0]),
2074 non_null_indices: vec![0, 4],
2075 max_def_level: 4,
2076 max_rep_level: 1,
2077 array: values,
2078 logical_nulls,
2079 };
2080
2081 assert_eq!(list_level, &expected_level);
2082 }
2083
2084 #[test]
2085 fn test_struct_mask_list() {
2086 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2088 Some(vec![Some(1), Some(2)]),
2089 Some(vec![None]),
2090 Some(vec![]),
2091 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
2093 None, None,
2095 ]);
2096 let values = inner.values().clone();
2097
2098 assert_eq!(inner.values().len(), 7);
2100
2101 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
2102 let array = Arc::new(inner) as ArrayRef;
2103 let nulls = Buffer::from([0b01010111]);
2104 let struct_a = StructArray::from((vec![(field, array)], nulls));
2105
2106 let field = Field::new("struct", struct_a.data_type().clone(), true);
2107 let array = Arc::new(struct_a) as ArrayRef;
2108 let levels = calculate_array_levels(&array, &field).unwrap();
2109
2110 assert_eq!(levels.len(), 1);
2111
2112 let logical_nulls = values.logical_nulls();
2113 let expected_level = ArrayLevels {
2114 def_levels: LevelData::Materialized(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
2115 rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
2116 non_null_indices: vec![0, 1, 5, 6],
2117 max_def_level: 4,
2118 max_rep_level: 1,
2119 array: values,
2120 logical_nulls,
2121 };
2122
2123 assert_eq!(&levels[0], &expected_level);
2124 }
2125
2126 #[test]
2127 fn test_list_mask_struct() {
2128 let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2132 Some(vec![None]), Some(vec![]), Some(vec![Some(3), None]),
2135 Some(vec![Some(4), Some(5), None, Some(6)]), None,
2137 None,
2138 ]);
2139 let a1_values = a1.values().clone();
2140 let a1 = Arc::new(a1) as ArrayRef;
2141
2142 let a2 = Arc::new(Int32Array::from_iter(vec![
2143 Some(1), Some(2), None,
2146 Some(4), Some(5),
2148 None,
2149 ])) as ArrayRef;
2150 let a2_values = a2.clone();
2151
2152 let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
2153 let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
2154
2155 let nulls = Buffer::from([0b00110111]);
2156 let struct_a = Arc::new(StructArray::from((
2157 vec![(field_a1, a1), (field_a2, a2)],
2158 nulls,
2159 ))) as ArrayRef;
2160
2161 let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
2162 let nulls = Buffer::from([0b00111100]);
2163
2164 let list_type = DataType::List(Arc::new(Field::new(
2165 "struct",
2166 struct_a.data_type().clone(),
2167 true,
2168 )));
2169
2170 let data = ArrayDataBuilder::new(list_type.clone())
2171 .len(6)
2172 .null_bit_buffer(Some(nulls))
2173 .add_buffer(offsets)
2174 .add_child_data(struct_a.into_data())
2175 .build()
2176 .unwrap();
2177
2178 let list = make_array(data);
2179 let list_field = Field::new("col", list_type, true);
2180
2181 let expected = vec![
2182 r#""#.to_string(),
2183 r#""#.to_string(),
2184 r#"[]"#.to_string(),
2185 r#"[{list: [3, ], integers: }]"#.to_string(),
2186 r#"[, {list: , integers: 5}]"#.to_string(),
2187 r#"[]"#.to_string(),
2188 ];
2189
2190 let actual: Vec<_> = (0..6)
2191 .map(|x| array_value_to_string(&list, x).unwrap())
2192 .collect();
2193 assert_eq!(actual, expected);
2194
2195 let levels = calculate_array_levels(&list, &list_field).unwrap();
2196
2197 assert_eq!(levels.len(), 2);
2198
2199 let a1_logical_nulls = a1_values.logical_nulls();
2200 let expected_level = ArrayLevels {
2201 def_levels: LevelData::Materialized(vec![0, 0, 1, 6, 5, 2, 3, 1]),
2202 rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 2, 0, 1, 0]),
2203 non_null_indices: vec![1],
2204 max_def_level: 6,
2205 max_rep_level: 2,
2206 array: a1_values,
2207 logical_nulls: a1_logical_nulls,
2208 };
2209
2210 assert_eq!(&levels[0], &expected_level);
2211
2212 let a2_logical_nulls = a2_values.logical_nulls();
2213 let expected_level = ArrayLevels {
2214 def_levels: LevelData::Materialized(vec![0, 0, 1, 3, 2, 4, 1]),
2215 rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 0, 1, 0]),
2216 non_null_indices: vec![4],
2217 max_def_level: 4,
2218 max_rep_level: 1,
2219 array: a2_values,
2220 logical_nulls: a2_logical_nulls,
2221 };
2222
2223 assert_eq!(&levels[1], &expected_level);
2224 }
2225
2226 #[test]
2227 fn test_fixed_size_list() {
2228 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
2230 builder.values().append_slice(&[1, 2]);
2231 builder.append(true);
2232 builder.values().append_slice(&[3, 4]);
2233 builder.append(false);
2234 builder.values().append_slice(&[5, 6]);
2235 builder.append(false);
2236 builder.values().append_slice(&[7, 8]);
2237 builder.append(true);
2238 builder.values().append_slice(&[9, 10]);
2239 builder.append(false);
2240 let a = builder.finish();
2241 let values = a.values().clone();
2242
2243 let item_field = Field::new_list_field(a.data_type().clone(), true);
2244 let mut builder = levels(&item_field, a);
2245 builder.write(1..4);
2246 let levels = builder.finish();
2247
2248 assert_eq!(levels.len(), 1);
2249
2250 let list_level = &levels[0];
2251
2252 let logical_nulls = values.logical_nulls();
2253 let expected_level = ArrayLevels {
2254 def_levels: LevelData::Materialized(vec![0, 0, 3, 3]),
2255 rep_levels: LevelData::Materialized(vec![0, 0, 0, 1]),
2256 non_null_indices: vec![6, 7],
2257 max_def_level: 3,
2258 max_rep_level: 1,
2259 array: values,
2260 logical_nulls,
2261 };
2262 assert_eq!(list_level, &expected_level);
2263 }
2264
2265 #[test]
2266 fn test_fixed_size_list_of_struct() {
2267 let field_a = Field::new("a", DataType::Int32, true);
2269 let field_b = Field::new("b", DataType::Int64, false);
2270 let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
2271 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
2272 let list_field = Field::new(
2273 "list",
2274 DataType::FixedSizeList(Arc::new(item_field), 2),
2275 true,
2276 );
2277
2278 let builder_a = Int32Builder::with_capacity(10);
2279 let builder_b = Int64Builder::with_capacity(10);
2280 let struct_builder =
2281 StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
2282 let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
2283
2284 let values = list_builder.values();
2293 values
2295 .field_builder::<Int32Builder>(0)
2296 .unwrap()
2297 .append_value(1);
2298 values
2299 .field_builder::<Int64Builder>(1)
2300 .unwrap()
2301 .append_value(2);
2302 values.append(true);
2303 values
2305 .field_builder::<Int32Builder>(0)
2306 .unwrap()
2307 .append_null();
2308 values
2309 .field_builder::<Int64Builder>(1)
2310 .unwrap()
2311 .append_value(0);
2312 values.append(false);
2313 list_builder.append(true);
2314
2315 let values = list_builder.values();
2317 values
2319 .field_builder::<Int32Builder>(0)
2320 .unwrap()
2321 .append_null();
2322 values
2323 .field_builder::<Int64Builder>(1)
2324 .unwrap()
2325 .append_value(0);
2326 values.append(false);
2327 values
2329 .field_builder::<Int32Builder>(0)
2330 .unwrap()
2331 .append_null();
2332 values
2333 .field_builder::<Int64Builder>(1)
2334 .unwrap()
2335 .append_value(0);
2336 values.append(false);
2337 list_builder.append(false);
2338
2339 let values = list_builder.values();
2341 values
2343 .field_builder::<Int32Builder>(0)
2344 .unwrap()
2345 .append_null();
2346 values
2347 .field_builder::<Int64Builder>(1)
2348 .unwrap()
2349 .append_value(0);
2350 values.append(false);
2351 values
2353 .field_builder::<Int32Builder>(0)
2354 .unwrap()
2355 .append_null();
2356 values
2357 .field_builder::<Int64Builder>(1)
2358 .unwrap()
2359 .append_value(0);
2360 values.append(false);
2361 list_builder.append(true);
2362
2363 let values = list_builder.values();
2365 values
2367 .field_builder::<Int32Builder>(0)
2368 .unwrap()
2369 .append_null();
2370 values
2371 .field_builder::<Int64Builder>(1)
2372 .unwrap()
2373 .append_value(3);
2374 values.append(true);
2375 values
2377 .field_builder::<Int32Builder>(0)
2378 .unwrap()
2379 .append_value(2);
2380 values
2381 .field_builder::<Int64Builder>(1)
2382 .unwrap()
2383 .append_value(4);
2384 values.append(true);
2385 list_builder.append(true);
2386
2387 let array = Arc::new(list_builder.finish());
2388
2389 assert_eq!(array.values().len(), 8);
2390 assert_eq!(array.len(), 4);
2391
2392 let struct_values = array.values().as_struct();
2393 let values_a = struct_values.column(0).clone();
2394 let values_b = struct_values.column(1).clone();
2395
2396 let schema = Arc::new(Schema::new(vec![list_field]));
2397 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
2398
2399 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
2400 let a_levels = &levels[0];
2401 let b_levels = &levels[1];
2402
2403 let values_a_logical_nulls = values_a.logical_nulls();
2405 let expected_a = ArrayLevels {
2406 def_levels: LevelData::Materialized(vec![4, 2, 0, 2, 2, 3, 4]),
2407 rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
2408 non_null_indices: vec![0, 7],
2409 max_def_level: 4,
2410 max_rep_level: 1,
2411 array: values_a,
2412 logical_nulls: values_a_logical_nulls,
2413 };
2414 let values_b_logical_nulls = values_b.logical_nulls();
2416 let expected_b = ArrayLevels {
2417 def_levels: LevelData::Materialized(vec![3, 2, 0, 2, 2, 3, 3]),
2418 rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
2419 non_null_indices: vec![0, 6, 7],
2420 max_def_level: 3,
2421 max_rep_level: 1,
2422 array: values_b,
2423 logical_nulls: values_b_logical_nulls,
2424 };
2425
2426 assert_eq!(a_levels, &expected_a);
2427 assert_eq!(b_levels, &expected_b);
2428 }
2429
2430 #[test]
2431 fn test_fixed_size_list_empty() {
2432 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2433 builder.append(true);
2434 builder.append(false);
2435 builder.append(true);
2436 let array = builder.finish();
2437 let values = array.values().clone();
2438
2439 let item_field = Field::new_list_field(array.data_type().clone(), true);
2440 let mut builder = levels(&item_field, array);
2441 builder.write(0..3);
2442 let levels = builder.finish();
2443
2444 assert_eq!(levels.len(), 1);
2445
2446 let list_level = &levels[0];
2447
2448 let logical_nulls = values.logical_nulls();
2449 let expected_level = ArrayLevels {
2450 def_levels: LevelData::Materialized(vec![1, 0, 1]),
2451 rep_levels: LevelData::Materialized(vec![0, 0, 0]),
2452 non_null_indices: vec![],
2453 max_def_level: 3,
2454 max_rep_level: 1,
2455 array: values,
2456 logical_nulls,
2457 };
2458 assert_eq!(list_level, &expected_level);
2459 }
2460
2461 #[test]
2462 fn test_fixed_size_list_of_var_lists() {
2463 let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2465 builder.values().append_value([Some(1), None, Some(3)]);
2466 builder.values().append_null();
2467 builder.append(true);
2468 builder.values().append_value([Some(4)]);
2469 builder.values().append_value([]);
2470 builder.append(true);
2471 builder.values().append_value([Some(5), Some(6)]);
2472 builder.values().append_value([None, None]);
2473 builder.append(true);
2474 builder.values().append_null();
2475 builder.values().append_null();
2476 builder.append(false);
2477 let a = builder.finish();
2478 let values = a.values().as_list::<i32>().values().clone();
2479
2480 let item_field = Field::new_list_field(a.data_type().clone(), true);
2481 let mut builder = levels(&item_field, a);
2482 builder.write(0..4);
2483 let levels = builder.finish();
2484
2485 let logical_nulls = values.logical_nulls();
2486 let expected_level = ArrayLevels {
2487 def_levels: LevelData::Materialized(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2488 rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2489 non_null_indices: vec![0, 2, 3, 4, 5],
2490 max_def_level: 5,
2491 max_rep_level: 2,
2492 array: values,
2493 logical_nulls,
2494 };
2495
2496 assert_eq!(levels[0], expected_level);
2497 }
2498
2499 #[test]
2500 fn test_null_dictionary_values() {
2501 let values = Int32Array::new(
2502 vec![1, 2, 3, 4].into(),
2503 Some(NullBuffer::from(vec![true, false, true, true])),
2504 );
2505 let keys = Int32Array::new(
2506 vec![1, 54, 2, 0].into(),
2507 Some(NullBuffer::from(vec![true, false, true, true])),
2508 );
2509 let dict = DictionaryArray::new(keys, Arc::new(values));
2511
2512 let item_field = Field::new_list_field(dict.data_type().clone(), true);
2513
2514 let mut builder = levels(&item_field, dict.clone());
2515 builder.write(0..4);
2516 let levels = builder.finish();
2517
2518 let logical_nulls = dict.logical_nulls();
2519 let expected_level = ArrayLevels {
2520 def_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
2521 rep_levels: LevelData::Absent,
2522 non_null_indices: vec![2, 3],
2523 max_def_level: 1,
2524 max_rep_level: 0,
2525 array: Arc::new(dict),
2526 logical_nulls,
2527 };
2528 assert_eq!(levels[0], expected_level);
2529 }
2530
2531 #[test]
2532 fn mismatched_types() {
2533 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2534 let field = Field::new_list_field(DataType::Float64, false);
2535
2536 let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2537 .unwrap_err()
2538 .to_string();
2539
2540 assert_eq!(
2541 err,
2542 "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2543 );
2544 }
2545
2546 fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2547 let v = Arc::new(array) as ArrayRef;
2548 LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2549 }
2550
2551 #[test]
2552 fn test_slice_for_chunk_flat() {
2553 let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2558 let logical_nulls = array.logical_nulls();
2559 let levels = ArrayLevels {
2560 def_levels: LevelData::Absent,
2561 rep_levels: LevelData::Absent,
2562 non_null_indices: vec![0, 1, 2, 3, 4, 5],
2563 max_def_level: 0,
2564 max_rep_level: 0,
2565 array,
2566 logical_nulls,
2567 };
2568 let sliced = levels.slice_for_chunk(&CdcChunk {
2569 level_offset: 0,
2570 num_levels: 0,
2571 value_offset: 2,
2572 num_values: 3,
2573 });
2574 assert!(matches!(sliced.def_levels, LevelData::Absent));
2575 assert!(matches!(sliced.rep_levels, LevelData::Absent));
2576 assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2577 assert_eq!(sliced.array.len(), 3);
2578
2579 let array: ArrayRef = Arc::new(Int32Array::from(vec![
2585 Some(1),
2586 None,
2587 Some(3),
2588 None,
2589 Some(5),
2590 Some(6),
2591 ]));
2592 let logical_nulls = array.logical_nulls();
2593 let levels = ArrayLevels {
2594 def_levels: LevelData::Materialized(vec![1, 0, 1, 0, 1, 1]),
2595 rep_levels: LevelData::Absent,
2596 non_null_indices: vec![0, 2, 4, 5],
2597 max_def_level: 1,
2598 max_rep_level: 0,
2599 array,
2600 logical_nulls,
2601 };
2602 let sliced = levels.slice_for_chunk(&CdcChunk {
2603 level_offset: 1,
2604 num_levels: 3,
2605 value_offset: 1,
2606 num_values: 1,
2607 });
2608 assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 1, 0]));
2609 assert!(matches!(sliced.rep_levels, LevelData::Absent));
2610 assert_eq!(sliced.non_null_indices, vec![0]); assert_eq!(sliced.array.len(), 1);
2612 }
2613
2614 #[test]
2615 fn test_slice_for_chunk_nested_with_nulls() {
2616 let array: ArrayRef = Arc::new(Int32Array::from(vec![
2635 Some(1), None, None, Some(2), None, None, None, None, Some(4), Some(5), ]));
2646 let logical_nulls = array.logical_nulls();
2647 let levels = ArrayLevels {
2648 def_levels: LevelData::Materialized(vec![3, 0, 3, 2, 0, 3, 3]),
2649 rep_levels: LevelData::Materialized(vec![0, 0, 0, 1, 0, 0, 1]),
2650 non_null_indices: vec![0, 3, 8, 9],
2651 max_def_level: 3,
2652 max_rep_level: 1,
2653 array,
2654 logical_nulls,
2655 };
2656
2657 let chunk0 = levels.slice_for_chunk(&CdcChunk {
2659 level_offset: 0,
2660 num_levels: 2,
2661 value_offset: 0,
2662 num_values: 1,
2663 });
2664 assert_eq!(chunk0.non_null_indices, vec![0]);
2665 assert_eq!(chunk0.array.len(), 1);
2666
2667 let chunk1 = levels.slice_for_chunk(&CdcChunk {
2669 level_offset: 2,
2670 num_levels: 3,
2671 value_offset: 1,
2672 num_values: 1,
2673 });
2674 assert_eq!(chunk1.non_null_indices, vec![0]);
2675 assert_eq!(chunk1.array.len(), 1);
2676
2677 let chunk2 = levels.slice_for_chunk(&CdcChunk {
2679 level_offset: 5,
2680 num_levels: 2,
2681 value_offset: 2,
2682 num_values: 2,
2683 });
2684 assert_eq!(chunk2.non_null_indices, vec![0, 1]);
2685 assert_eq!(chunk2.array.len(), 2);
2686 }
2687
2688 #[test]
2689 fn test_slice_for_chunk_all_null() {
2690 let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
2692 let logical_nulls = array.logical_nulls();
2693 let levels = ArrayLevels {
2694 def_levels: LevelData::Materialized(vec![1, 0, 0, 1]),
2695 rep_levels: LevelData::Absent,
2696 non_null_indices: vec![0, 3],
2697 max_def_level: 1,
2698 max_rep_level: 0,
2699 array,
2700 logical_nulls,
2701 };
2702 let sliced = levels.slice_for_chunk(&CdcChunk {
2704 level_offset: 1,
2705 num_levels: 2,
2706 value_offset: 1,
2707 num_values: 0,
2708 });
2709 assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 0]));
2710 assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
2711 assert_eq!(sliced.array.len(), 0);
2712 }
2713
2714 #[test]
2715 fn test_all_null_list() {
2716 let item_field = Arc::new(Field::new_list_field(DataType::Int32, true));
2722 let list = ListArray::new_null(item_field, 4);
2723 let values = list.values().clone();
2724 let field = Field::new("list", list.data_type().clone(), true);
2725 let array = Arc::new(list) as ArrayRef;
2726
2727 let levels = calculate_array_levels(&array, &field).unwrap();
2728 assert_eq!(levels.len(), 1);
2729
2730 let logical_nulls = values.logical_nulls();
2731 let expected = ArrayLevels {
2732 def_levels: LevelData::Uniform { value: 0, count: 4 },
2733 rep_levels: LevelData::Uniform { value: 0, count: 4 },
2734 non_null_indices: vec![],
2735 max_def_level: 3,
2736 max_rep_level: 1,
2737 array: values,
2738 logical_nulls,
2739 };
2740 assert_eq!(&levels[0], &expected);
2741 }
2742
2743 #[test]
2744 fn test_all_null_fixed_size_list() {
2745 let item_field = Arc::new(Field::new_list_field(DataType::Int32, true));
2751 let list = FixedSizeListArray::new_null(item_field, 2, 3);
2752 let values = list.values().clone();
2753 let field = Field::new("list", list.data_type().clone(), true);
2754 let array = Arc::new(list) as ArrayRef;
2755
2756 let levels = calculate_array_levels(&array, &field).unwrap();
2757 assert_eq!(levels.len(), 1);
2758
2759 let logical_nulls = values.logical_nulls();
2760 let expected = ArrayLevels {
2761 def_levels: LevelData::Uniform { value: 0, count: 3 },
2762 rep_levels: LevelData::Uniform { value: 0, count: 3 },
2763 non_null_indices: vec![],
2764 max_def_level: 3,
2765 max_rep_level: 1,
2766 array: values,
2767 logical_nulls,
2768 };
2769 assert_eq!(&levels[0], &expected);
2770 }
2771
2772 #[test]
2773 fn test_all_null_struct() {
2774 let c = Int32Array::from(vec![None::<i32>; 4]);
2781 let leaf = Arc::new(c) as ArrayRef;
2782 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
2783 let a = StructArray::from((vec![(c_field, leaf.clone())], Buffer::from([0b00000000])));
2784 let a_field = Field::new("a", a.data_type().clone(), true);
2785 let a_array = Arc::new(a) as ArrayRef;
2786
2787 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2788 assert_eq!(levels.len(), 1);
2789
2790 let expected = ArrayLevels {
2791 def_levels: LevelData::Uniform { value: 0, count: 4 },
2792 rep_levels: LevelData::Absent,
2793 non_null_indices: vec![],
2794 max_def_level: 2,
2795 max_rep_level: 0,
2796 array: leaf,
2797 logical_nulls: Some(NullBuffer::new_null(4)),
2798 };
2799 assert_eq!(&levels[0], &expected);
2800 }
2801
2802 #[test]
2803 fn test_all_null_nested_struct() {
2804 let c = Int32Array::from(vec![None::<i32>; 3]);
2810 let leaf = Arc::new(c) as ArrayRef;
2811 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
2812 let b = StructArray::from((vec![(c_field, leaf.clone())], Buffer::from([0b00000000])));
2813 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
2814 let a = StructArray::from((
2815 vec![(b_field, Arc::new(b) as ArrayRef)],
2816 Buffer::from([0b00000000]),
2817 ));
2818 let a_field = Field::new("a", a.data_type().clone(), true);
2819 let a_array = Arc::new(a) as ArrayRef;
2820
2821 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2822 assert_eq!(levels.len(), 1);
2823
2824 let expected = ArrayLevels {
2825 def_levels: LevelData::Uniform { value: 0, count: 3 },
2826 rep_levels: LevelData::Absent,
2827 non_null_indices: vec![],
2828 max_def_level: 3,
2829 max_rep_level: 0,
2830 array: leaf,
2831 logical_nulls: Some(NullBuffer::new_null(3)),
2832 };
2833 assert_eq!(&levels[0], &expected);
2834 }
2835
2836 #[test]
2837 fn test_all_null_struct_multiple_children() {
2838 let c1 = Arc::new(Int32Array::from(vec![None::<i32>; 2])) as ArrayRef;
2844 let c2 = Arc::new(Int32Array::from(vec![None::<i32>; 2])) as ArrayRef;
2845 let c1_field = Arc::new(Field::new("c1", DataType::Int32, true));
2846 let c2_field = Arc::new(Field::new("c2", DataType::Int32, true));
2847 let a = StructArray::from((
2848 vec![(c1_field, c1.clone()), (c2_field, c2.clone())],
2849 Buffer::from([0b00000000]),
2850 ));
2851 let a_field = Field::new("a", a.data_type().clone(), true);
2852 let a_array = Arc::new(a) as ArrayRef;
2853
2854 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
2855 assert_eq!(levels.len(), 2);
2856
2857 for (i, leaf) in [c1, c2].into_iter().enumerate() {
2858 let expected = ArrayLevels {
2859 def_levels: LevelData::Uniform { value: 0, count: 2 },
2860 rep_levels: LevelData::Absent,
2861 non_null_indices: vec![],
2862 max_def_level: 2,
2863 max_rep_level: 0,
2864 array: leaf,
2865 logical_nulls: Some(NullBuffer::new_null(2)),
2866 };
2867 assert_eq!(&levels[i], &expected, "leaf {i} mismatch");
2868 }
2869 }
2870}