1use crate::column::chunker::CdcChunk;
44use crate::errors::{ParquetError, Result};
45use arrow_array::cast::AsArray;
46use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
47use arrow_buffer::bit_iterator::BitIndexIterator;
48use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
49use arrow_schema::{DataType, Field};
50use std::ops::Range;
51use std::sync::Arc;
52
53pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
56 let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?;
57 builder.write(0..array.len());
58 Ok(builder.finish())
59}
60
61fn is_leaf(data_type: &DataType) -> bool {
64 matches!(
65 data_type,
66 DataType::Null
67 | DataType::Boolean
68 | DataType::Int8
69 | DataType::Int16
70 | DataType::Int32
71 | DataType::Int64
72 | DataType::UInt8
73 | DataType::UInt16
74 | DataType::UInt32
75 | DataType::UInt64
76 | DataType::Float16
77 | DataType::Float32
78 | DataType::Float64
79 | DataType::Utf8
80 | DataType::Utf8View
81 | DataType::LargeUtf8
82 | DataType::Timestamp(_, _)
83 | DataType::Date32
84 | DataType::Date64
85 | DataType::Time32(_)
86 | DataType::Time64(_)
87 | DataType::Duration(_)
88 | DataType::Interval(_)
89 | DataType::Binary
90 | DataType::LargeBinary
91 | DataType::BinaryView
92 | DataType::Decimal32(_, _)
93 | DataType::Decimal64(_, _)
94 | DataType::Decimal128(_, _)
95 | DataType::Decimal256(_, _)
96 | DataType::FixedSizeBinary(_)
97 )
98}
99
100#[derive(Debug, Default, Clone, Copy)]
102struct LevelContext {
103 rep_level: i16,
105 def_level: i16,
107}
108
109#[derive(Debug)]
111enum LevelInfoBuilder {
112 Primitive(ArrayLevels),
114 List(
116 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i32>, Option<NullBuffer>, ),
121 LargeList(
123 Box<LevelInfoBuilder>, LevelContext, OffsetBuffer<i64>, Option<NullBuffer>, ),
128 FixedSizeList(
130 Box<LevelInfoBuilder>, LevelContext, usize, Option<NullBuffer>, ),
135 ListView(
137 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i32>, ScalarBuffer<i32>, Option<NullBuffer>, ),
143 LargeListView(
145 Box<LevelInfoBuilder>, LevelContext, ScalarBuffer<i64>, ScalarBuffer<i64>, Option<NullBuffer>, ),
151 Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
153}
154
155impl LevelInfoBuilder {
156 fn try_new(field: &Field, parent_ctx: LevelContext, array: &ArrayRef) -> Result<Self> {
158 if !Self::types_compatible(field.data_type(), array.data_type()) {
159 return Err(arrow_err!(format!(
160 "Incompatible type. Field '{}' has type {}, array has type {}",
161 field.name(),
162 field.data_type(),
163 array.data_type(),
164 )));
165 }
166
167 let is_nullable = field.is_nullable();
168
169 match array.data_type() {
170 d if is_leaf(d) => {
171 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
172 Ok(Self::Primitive(levels))
173 }
174 DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
175 let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
176 Ok(Self::Primitive(levels))
177 }
178 DataType::Struct(children) => {
179 let array = array.as_struct();
180 let def_level = match is_nullable {
181 true => parent_ctx.def_level + 1,
182 false => parent_ctx.def_level,
183 };
184
185 let ctx = LevelContext {
186 rep_level: parent_ctx.rep_level,
187 def_level,
188 };
189
190 let children = children
191 .iter()
192 .zip(array.columns())
193 .map(|(f, a)| Self::try_new(f, ctx, a))
194 .collect::<Result<_>>()?;
195
196 Ok(Self::Struct(children, ctx, array.nulls().cloned()))
197 }
198 DataType::List(child)
199 | DataType::LargeList(child)
200 | DataType::Map(child, _)
201 | DataType::FixedSizeList(child, _)
202 | DataType::ListView(child)
203 | DataType::LargeListView(child) => {
204 let def_level = match is_nullable {
205 true => parent_ctx.def_level + 2,
206 false => parent_ctx.def_level + 1,
207 };
208
209 let ctx = LevelContext {
210 rep_level: parent_ctx.rep_level + 1,
211 def_level,
212 };
213
214 Ok(match field.data_type() {
215 DataType::List(_) => {
216 let list = array.as_list();
217 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
218 let offsets = list.offsets().clone();
219 Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
220 }
221 DataType::LargeList(_) => {
222 let list = array.as_list();
223 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
224 let offsets = list.offsets().clone();
225 let nulls = list.nulls().cloned();
226 Self::LargeList(Box::new(child), ctx, offsets, nulls)
227 }
228 DataType::Map(_, _) => {
229 let map = array.as_map();
230 let entries = Arc::new(map.entries().clone()) as ArrayRef;
231 let child = Self::try_new(child.as_ref(), ctx, &entries)?;
232 let offsets = map.offsets().clone();
233 Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
234 }
235 DataType::FixedSizeList(_, size) => {
236 let list = array.as_fixed_size_list();
237 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
238 let nulls = list.nulls().cloned();
239 Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls)
240 }
241 DataType::ListView(_) => {
242 let list = array.as_list_view();
243 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
244 let offsets = list.offsets().clone();
245 let sizes = list.sizes().clone();
246 let nulls = list.nulls().cloned();
247 Self::ListView(Box::new(child), ctx, offsets, sizes, nulls)
248 }
249 DataType::LargeListView(_) => {
250 let list = array.as_list_view();
251 let child = Self::try_new(child.as_ref(), ctx, list.values())?;
252 let offsets = list.offsets().clone();
253 let sizes = list.sizes().clone();
254 let nulls = list.nulls().cloned();
255 Self::LargeListView(Box::new(child), ctx, offsets, sizes, nulls)
256 }
257 _ => unreachable!(),
258 })
259 }
260 d => Err(nyi_err!("Datatype {} is not yet supported", d)),
261 }
262 }
263
264 fn finish(self) -> Vec<ArrayLevels> {
267 match self {
268 LevelInfoBuilder::Primitive(v) => vec![v],
269 LevelInfoBuilder::List(v, _, _, _)
270 | LevelInfoBuilder::LargeList(v, _, _, _)
271 | LevelInfoBuilder::FixedSizeList(v, _, _, _)
272 | LevelInfoBuilder::ListView(v, _, _, _, _)
273 | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
274 LevelInfoBuilder::Struct(v, _, _) => v.into_iter().flat_map(|l| l.finish()).collect(),
275 }
276 }
277
278 fn write(&mut self, range: Range<usize>) {
280 match self {
281 LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
282 LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
283 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
284 }
285 LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
286 Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
287 }
288 LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
289 Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
290 }
291 LevelInfoBuilder::ListView(child, ctx, offsets, sizes, nulls) => {
292 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
293 }
294 LevelInfoBuilder::LargeListView(child, ctx, offsets, sizes, nulls) => {
295 Self::write_list_view(child, ctx, offsets, sizes, nulls.as_ref(), range)
296 }
297 LevelInfoBuilder::Struct(children, ctx, nulls) => {
298 Self::write_struct(children, ctx, nulls.as_ref(), range)
299 }
300 }
301 }
302
303 fn write_list<O: OffsetSizeTrait>(
307 child: &mut LevelInfoBuilder,
308 ctx: &LevelContext,
309 offsets: &[O],
310 nulls: Option<&NullBuffer>,
311 range: Range<usize>,
312 ) {
313 let offsets = &offsets[range.start..range.end + 1];
314
315 let write_non_null_slice =
316 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
317 child.write(start_idx..end_idx);
318 child.visit_leaves(|leaf| {
319 let rep_levels = leaf.rep_levels.as_mut().unwrap();
320 let mut rev = rep_levels.iter_mut().rev();
321 let mut remaining = end_idx - start_idx;
322
323 loop {
324 let next = rev.next().unwrap();
325 if *next > ctx.rep_level {
326 continue;
328 }
329
330 remaining -= 1;
331 if remaining == 0 {
332 *next = ctx.rep_level - 1;
333 break;
334 }
335 }
336 })
337 };
338
339 let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
340 if count > 0 {
341 child.visit_leaves(|leaf| {
342 leaf.rep_levels
343 .as_mut()
344 .unwrap()
345 .extend(std::iter::repeat_n(ctx.rep_level - 1, count));
346 leaf.def_levels
347 .as_mut()
348 .unwrap()
349 .extend(std::iter::repeat_n(ctx.def_level - 2, count));
350 });
351 }
352 };
353
354 let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
355 if count > 0 {
356 child.visit_leaves(|leaf| {
357 leaf.rep_levels
358 .as_mut()
359 .unwrap()
360 .extend(std::iter::repeat_n(ctx.rep_level - 1, count));
361 leaf.def_levels
362 .as_mut()
363 .unwrap()
364 .extend(std::iter::repeat_n(ctx.def_level - 1, count));
365 });
366 }
367 };
368
369 match nulls {
370 Some(nulls) => {
371 let null_offset = range.start;
372 let mut pending_nulls: usize = 0;
373 let mut pending_empties: usize = 0;
374
375 for (idx, w) in offsets.windows(2).enumerate() {
377 let is_valid = nulls.is_valid(idx + null_offset);
378 let start_idx = w[0].as_usize();
379 let end_idx = w[1].as_usize();
380
381 if !is_valid {
382 write_empty_run(child, pending_empties);
383 pending_empties = 0;
384 pending_nulls += 1;
385 } else if start_idx == end_idx {
386 write_null_run(child, pending_nulls);
387 pending_nulls = 0;
388 pending_empties += 1;
389 } else {
390 write_null_run(child, pending_nulls);
391 pending_nulls = 0;
392 write_empty_run(child, pending_empties);
393 pending_empties = 0;
394 write_non_null_slice(child, start_idx, end_idx);
395 }
396 }
397 write_null_run(child, pending_nulls);
398 write_empty_run(child, pending_empties);
399 }
400 None => {
401 let mut pending_empties: usize = 0;
402 for w in offsets.windows(2) {
403 let start_idx = w[0].as_usize();
404 let end_idx = w[1].as_usize();
405 if start_idx == end_idx {
406 pending_empties += 1;
407 } else {
408 write_empty_run(child, pending_empties);
409 pending_empties = 0;
410 write_non_null_slice(child, start_idx, end_idx);
411 }
412 }
413 write_empty_run(child, pending_empties);
414 }
415 }
416 }
417
418 fn write_list_view<O: OffsetSizeTrait>(
420 child: &mut LevelInfoBuilder,
421 ctx: &LevelContext,
422 offsets: &[O],
423 sizes: &[O],
424 nulls: Option<&NullBuffer>,
425 range: Range<usize>,
426 ) {
427 let offsets = &offsets[range.start..range.end];
428 let sizes = &sizes[range.start..range.end];
429
430 let write_non_null_slice =
431 |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
432 child.write(start_idx..end_idx);
433 child.visit_leaves(|leaf| {
434 let rep_levels = leaf.rep_levels.as_mut().unwrap();
435 let mut rev = rep_levels.iter_mut().rev();
436 let mut remaining = end_idx - start_idx;
437
438 loop {
439 let next = rev.next().unwrap();
440 if *next > ctx.rep_level {
441 continue;
443 }
444
445 remaining -= 1;
446 if remaining == 0 {
447 *next = ctx.rep_level - 1;
448 break;
449 }
450 }
451 })
452 };
453
454 let write_empty_slice = |child: &mut LevelInfoBuilder| {
455 child.visit_leaves(|leaf| {
456 let rep_levels = leaf.rep_levels.as_mut().unwrap();
457 rep_levels.push(ctx.rep_level - 1);
458 let def_levels = leaf.def_levels.as_mut().unwrap();
459 def_levels.push(ctx.def_level - 1);
460 })
461 };
462
463 let write_null_slice = |child: &mut LevelInfoBuilder| {
464 child.visit_leaves(|leaf| {
465 let rep_levels = leaf.rep_levels.as_mut().unwrap();
466 rep_levels.push(ctx.rep_level - 1);
467 let def_levels = leaf.def_levels.as_mut().unwrap();
468 def_levels.push(ctx.def_level - 2);
469 })
470 };
471
472 match nulls {
473 Some(nulls) => {
474 let null_offset = range.start;
475 for (idx, (offset, size)) in offsets.iter().zip(sizes.iter()).enumerate() {
477 let is_valid = nulls.is_valid(idx + null_offset);
478 let start_idx = offset.as_usize();
479 let size = size.as_usize();
480 let end_idx = start_idx + size;
481 if !is_valid {
482 write_null_slice(child)
483 } else if size == 0 {
484 write_empty_slice(child)
485 } else {
486 write_non_null_slice(child, start_idx, end_idx)
487 }
488 }
489 }
490 None => {
491 for (offset, size) in offsets.iter().zip(sizes.iter()) {
492 let start_idx = offset.as_usize();
493 let size = size.as_usize();
494 let end_idx = start_idx + size;
495 if size == 0 {
496 write_empty_slice(child)
497 } else {
498 write_non_null_slice(child, start_idx, end_idx)
499 }
500 }
501 }
502 }
503 }
504
505 fn write_struct(
507 children: &mut [LevelInfoBuilder],
508 ctx: &LevelContext,
509 nulls: Option<&NullBuffer>,
510 range: Range<usize>,
511 ) {
512 let write_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
513 for child in children {
514 child.visit_leaves(|info| {
515 let len = range.end - range.start;
516
517 let def_levels = info.def_levels.as_mut().unwrap();
518 def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
519
520 if let Some(rep_levels) = info.rep_levels.as_mut() {
521 rep_levels.extend(std::iter::repeat_n(ctx.rep_level, len));
522 }
523 })
524 }
525 };
526
527 let write_non_null = |children: &mut [LevelInfoBuilder], range: Range<usize>| {
528 for child in children {
529 child.write(range.clone())
530 }
531 };
532
533 match nulls {
534 Some(validity) => {
535 let mut last_non_null_idx = None;
536 let mut last_null_idx = None;
537
538 for i in range.clone() {
540 match validity.is_valid(i) {
541 true => {
542 if let Some(last_idx) = last_null_idx.take() {
543 write_null(children, last_idx..i)
544 }
545 last_non_null_idx.get_or_insert(i);
546 }
547 false => {
548 if let Some(last_idx) = last_non_null_idx.take() {
549 write_non_null(children, last_idx..i)
550 }
551 last_null_idx.get_or_insert(i);
552 }
553 }
554 }
555
556 if let Some(last_idx) = last_null_idx.take() {
557 write_null(children, last_idx..range.end)
558 }
559
560 if let Some(last_idx) = last_non_null_idx.take() {
561 write_non_null(children, last_idx..range.end)
562 }
563 }
564 None => write_non_null(children, range),
565 }
566 }
567
568 fn write_fixed_size_list(
570 child: &mut LevelInfoBuilder,
571 ctx: &LevelContext,
572 fixed_size: usize,
573 nulls: Option<&NullBuffer>,
574 range: Range<usize>,
575 ) {
576 let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
577 let values_start = start_idx * fixed_size;
578 let values_end = end_idx * fixed_size;
579 child.write(values_start..values_end);
580
581 child.visit_leaves(|leaf| {
582 let rep_levels = leaf.rep_levels.as_mut().unwrap();
583
584 let row_indices = (0..fixed_size)
585 .rev()
586 .cycle()
587 .take(values_end - values_start);
588
589 rep_levels
591 .iter_mut()
592 .rev()
593 .filter(|&&mut r| r == ctx.rep_level)
595 .zip(row_indices)
596 .for_each(|(r, idx)| {
597 if idx == 0 {
598 *r = ctx.rep_level - 1;
599 }
600 });
601 })
602 };
603
604 let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
606 let len = end_idx - start_idx;
607 child.visit_leaves(|leaf| {
608 let rep_levels = leaf.rep_levels.as_mut().unwrap();
609 rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
610 let def_levels = leaf.def_levels.as_mut().unwrap();
611 def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
612 })
613 };
614
615 let write_rows = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
616 if fixed_size > 0 {
617 write_non_null(child, start_idx, end_idx)
618 } else {
619 write_empty(child, start_idx, end_idx)
620 }
621 };
622
623 match nulls {
624 Some(nulls) => {
625 let mut start_idx = None;
626 for idx in range.clone() {
627 if nulls.is_valid(idx) {
628 start_idx.get_or_insert(idx);
630 } else {
631 if let Some(start) = start_idx.take() {
633 write_rows(child, start, idx);
634 }
635 child.visit_leaves(|leaf| {
637 let rep_levels = leaf.rep_levels.as_mut().unwrap();
638 rep_levels.push(ctx.rep_level - 1);
639 let def_levels = leaf.def_levels.as_mut().unwrap();
640 def_levels.push(ctx.def_level - 2);
641 })
642 }
643 }
644 if let Some(start) = start_idx.take() {
646 write_rows(child, start, range.end);
647 }
648 }
649 None => write_rows(child, range.start, range.end),
651 }
652 }
653
654 fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
656 let len = range.end - range.start;
657
658 match &mut info.def_levels {
659 Some(def_levels) => {
660 def_levels.reserve(len);
661 info.non_null_indices.reserve(len);
662
663 match &info.logical_nulls {
664 Some(nulls) => {
665 assert!(range.end <= nulls.len());
666 let nulls = nulls.inner();
667 def_levels.extend(range.clone().map(|i| {
668 let valid = unsafe { nulls.value_unchecked(i) };
670 info.max_def_level - (!valid as i16)
671 }));
672 info.non_null_indices.extend(
673 BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len)
674 .map(|i| i + range.start),
675 );
676 }
677 None => {
678 let iter = std::iter::repeat_n(info.max_def_level, len);
679 def_levels.extend(iter);
680 info.non_null_indices.extend(range);
681 }
682 }
683 }
684 None => info.non_null_indices.extend(range),
685 }
686
687 if let Some(rep_levels) = &mut info.rep_levels {
688 rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
689 }
690 }
691
692 fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
694 match self {
695 LevelInfoBuilder::Primitive(info) => visit(info),
696 LevelInfoBuilder::List(c, _, _, _)
697 | LevelInfoBuilder::LargeList(c, _, _, _)
698 | LevelInfoBuilder::FixedSizeList(c, _, _, _)
699 | LevelInfoBuilder::ListView(c, _, _, _, _)
700 | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
701 LevelInfoBuilder::Struct(children, _, _) => {
702 for c in children {
703 c.visit_leaves(visit)
704 }
705 }
706 }
707 }
708
709 fn types_compatible(a: &DataType, b: &DataType) -> bool {
715 if a.equals_datatype(b) {
717 return true;
718 }
719
720 let (a, b) = match (a, b) {
722 (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
723 (va.as_ref(), vb.as_ref())
724 }
725 (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
726 (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
727 _ => (a, b),
728 };
729
730 if a == b {
733 return true;
734 }
735
736 match a {
739 DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
741 DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
742 DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
743
744 DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
746 DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
747 DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
748
749 _ => false,
751 }
752 }
753}
754
755#[derive(Debug, Clone)]
758pub(crate) struct ArrayLevels {
759 def_levels: Option<Vec<i16>>,
763
764 rep_levels: Option<Vec<i16>>,
768
769 non_null_indices: Vec<usize>,
772
773 max_def_level: i16,
775
776 max_rep_level: i16,
778
779 array: ArrayRef,
781
782 logical_nulls: Option<NullBuffer>,
784}
785
786impl PartialEq for ArrayLevels {
787 fn eq(&self, other: &Self) -> bool {
788 self.def_levels == other.def_levels
789 && self.rep_levels == other.rep_levels
790 && self.non_null_indices == other.non_null_indices
791 && self.max_def_level == other.max_def_level
792 && self.max_rep_level == other.max_rep_level
793 && self.array.as_ref() == other.array.as_ref()
794 && self.logical_nulls.as_ref() == other.logical_nulls.as_ref()
795 }
796}
797impl Eq for ArrayLevels {}
798
799impl ArrayLevels {
800 fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
801 let max_rep_level = ctx.rep_level;
802 let max_def_level = match is_nullable {
803 true => ctx.def_level + 1,
804 false => ctx.def_level,
805 };
806
807 let logical_nulls = array.logical_nulls();
808
809 Self {
810 def_levels: (max_def_level != 0).then(Vec::new),
811 rep_levels: (max_rep_level != 0).then(Vec::new),
812 non_null_indices: vec![],
813 max_def_level,
814 max_rep_level,
815 array,
816 logical_nulls,
817 }
818 }
819
820 pub fn array(&self) -> &ArrayRef {
821 &self.array
822 }
823
824 pub fn def_levels(&self) -> Option<&[i16]> {
825 self.def_levels.as_deref()
826 }
827
828 pub fn rep_levels(&self) -> Option<&[i16]> {
829 self.rep_levels.as_deref()
830 }
831
832 pub fn non_null_indices(&self) -> &[usize] {
833 &self.non_null_indices
834 }
835
836 pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
842 let def_levels = self.def_levels.as_ref().map(|levels| {
843 levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
844 });
845 let rep_levels = self.rep_levels.as_ref().map(|levels| {
846 levels[chunk.level_offset..chunk.level_offset + chunk.num_levels].to_vec()
847 });
848
849 let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values];
851 let start = nni.first().copied().unwrap_or(0);
856 let end = nni.last().map_or(0, |&i| i + 1);
857 let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
859 let array = self.array.slice(start, end - start);
861 let logical_nulls = array.logical_nulls();
862
863 Self {
864 def_levels,
865 rep_levels,
866 non_null_indices,
867 max_def_level: self.max_def_level,
868 max_rep_level: self.max_rep_level,
869 array,
870 logical_nulls,
871 }
872 }
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878 use crate::column::chunker::CdcChunk;
879
880 use arrow_array::builder::*;
881 use arrow_array::types::Int32Type;
882 use arrow_array::*;
883 use arrow_buffer::{Buffer, ToByteSlice};
884 use arrow_cast::display::array_value_to_string;
885 use arrow_data::{ArrayData, ArrayDataBuilder};
886 use arrow_schema::{Fields, Schema};
887
888 #[test]
889 fn test_calculate_array_levels_twitter_example() {
890 let leaf_type = Field::new_list_field(DataType::Int32, false);
894 let inner_type = DataType::List(Arc::new(leaf_type));
895 let inner_field = Field::new("l2", inner_type.clone(), false);
896 let outer_type = DataType::List(Arc::new(inner_field));
897 let outer_field = Field::new("l1", outer_type.clone(), false);
898
899 let primitives = Int32Array::from_iter(0..10);
900
901 let offsets = Buffer::from_iter([0_i32, 3, 7, 8, 10]);
903 let inner_list = ArrayDataBuilder::new(inner_type)
904 .len(4)
905 .add_buffer(offsets)
906 .add_child_data(primitives.to_data())
907 .build()
908 .unwrap();
909
910 let offsets = Buffer::from_iter([0_i32, 2, 4]);
911 let outer_list = ArrayDataBuilder::new(outer_type)
912 .len(2)
913 .add_buffer(offsets)
914 .add_child_data(inner_list)
915 .build()
916 .unwrap();
917 let outer_list = make_array(outer_list);
918
919 let levels = calculate_array_levels(&outer_list, &outer_field).unwrap();
920 assert_eq!(levels.len(), 1);
921
922 let expected = ArrayLevels {
923 def_levels: Some(vec![2; 10]),
924 rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
925 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
926 max_def_level: 2,
927 max_rep_level: 2,
928 array: Arc::new(primitives),
929 logical_nulls: None,
930 };
931 assert_eq!(&levels[0], &expected);
932 }
933
934 #[test]
935 fn test_calculate_one_level_1() {
936 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
938 let field = Field::new_list_field(DataType::Int32, false);
939
940 let levels = calculate_array_levels(&array, &field).unwrap();
941 assert_eq!(levels.len(), 1);
942
943 let expected_levels = ArrayLevels {
944 def_levels: None,
945 rep_levels: None,
946 non_null_indices: (0..10).collect(),
947 max_def_level: 0,
948 max_rep_level: 0,
949 array,
950 logical_nulls: None,
951 };
952 assert_eq!(&levels[0], &expected_levels);
953 }
954
955 #[test]
956 fn test_calculate_one_level_2() {
957 let array = Arc::new(Int32Array::from_iter([
959 Some(0),
960 None,
961 Some(0),
962 Some(0),
963 None,
964 ])) as ArrayRef;
965 let field = Field::new_list_field(DataType::Int32, true);
966
967 let levels = calculate_array_levels(&array, &field).unwrap();
968 assert_eq!(levels.len(), 1);
969
970 let logical_nulls = array.logical_nulls();
971 let expected_levels = ArrayLevels {
972 def_levels: Some(vec![1, 0, 1, 1, 0]),
973 rep_levels: None,
974 non_null_indices: vec![0, 2, 3],
975 max_def_level: 1,
976 max_rep_level: 0,
977 array,
978 logical_nulls,
979 };
980 assert_eq!(&levels[0], &expected_levels);
981 }
982
983 #[test]
984 fn test_calculate_array_levels_1() {
985 let leaf_field = Field::new_list_field(DataType::Int32, false);
986 let list_type = DataType::List(Arc::new(leaf_field));
987
988 let leaf_array = Int32Array::from_iter(0..5);
992 let offsets = Buffer::from_iter(0_i32..6);
994 let list = ArrayDataBuilder::new(list_type.clone())
995 .len(5)
996 .add_buffer(offsets)
997 .add_child_data(leaf_array.to_data())
998 .build()
999 .unwrap();
1000 let list = make_array(list);
1001
1002 let list_field = Field::new("list", list_type.clone(), false);
1003 let levels = calculate_array_levels(&list, &list_field).unwrap();
1004 assert_eq!(levels.len(), 1);
1005
1006 let expected_levels = ArrayLevels {
1007 def_levels: Some(vec![1; 5]),
1008 rep_levels: Some(vec![0; 5]),
1009 non_null_indices: (0..5).collect(),
1010 max_def_level: 1,
1011 max_rep_level: 1,
1012 array: Arc::new(leaf_array),
1013 logical_nulls: None,
1014 };
1015 assert_eq!(&levels[0], &expected_levels);
1016
1017 let leaf_array = Int32Array::from_iter([0, 0, 2, 2, 3, 3, 3, 3, 4, 4, 4]);
1026 let offsets = Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]);
1027 let list = ArrayDataBuilder::new(list_type.clone())
1028 .len(5)
1029 .add_buffer(offsets)
1030 .add_child_data(leaf_array.to_data())
1031 .null_bit_buffer(Some(Buffer::from([0b00011101])))
1032 .build()
1033 .unwrap();
1034 let list = make_array(list);
1035
1036 let list_field = Field::new("list", list_type, true);
1037 let levels = calculate_array_levels(&list, &list_field).unwrap();
1038 assert_eq!(levels.len(), 1);
1039
1040 let expected_levels = ArrayLevels {
1041 def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
1042 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
1043 non_null_indices: (0..11).collect(),
1044 max_def_level: 2,
1045 max_rep_level: 1,
1046 array: Arc::new(leaf_array),
1047 logical_nulls: None,
1048 };
1049 assert_eq!(&levels[0], &expected_levels);
1050 }
1051
1052 #[test]
1053 fn test_calculate_array_levels_2() {
1054 let leaf = Int32Array::from_iter(0..11);
1068 let leaf_field = Field::new("leaf", DataType::Int32, false);
1069
1070 let list_type = DataType::List(Arc::new(leaf_field));
1071 let list = ArrayData::builder(list_type.clone())
1072 .len(5)
1073 .add_child_data(leaf.to_data())
1074 .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
1075 .build()
1076 .unwrap();
1077
1078 let list = make_array(list);
1079 let list_field = Arc::new(Field::new("list", list_type, true));
1080
1081 let struct_array =
1082 StructArray::from((vec![(list_field, list)], Buffer::from([0b00011010])));
1083 let array = Arc::new(struct_array) as ArrayRef;
1084
1085 let struct_field = Field::new("struct", array.data_type().clone(), true);
1086
1087 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1088 assert_eq!(levels.len(), 1);
1089
1090 let expected_levels = ArrayLevels {
1091 def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
1092 rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
1093 non_null_indices: (4..11).collect(),
1094 max_def_level: 3,
1095 max_rep_level: 1,
1096 array: Arc::new(leaf),
1097 logical_nulls: None,
1098 };
1099
1100 assert_eq!(&levels[0], &expected_levels);
1101
1102 let leaf = Int32Array::from_iter(100..122);
1111 let leaf_field = Field::new("leaf", DataType::Int32, true);
1112
1113 let l1_type = DataType::List(Arc::new(leaf_field));
1114 let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]);
1115 let l1 = ArrayData::builder(l1_type.clone())
1116 .len(11)
1117 .add_child_data(leaf.to_data())
1118 .add_buffer(offsets)
1119 .build()
1120 .unwrap();
1121
1122 let l1_field = Field::new("l1", l1_type, true);
1123 let l2_type = DataType::List(Arc::new(l1_field));
1124 let l2 = ArrayData::builder(l2_type)
1125 .len(5)
1126 .add_child_data(l1)
1127 .add_buffer(Buffer::from_iter([0, 2, 2, 4, 8, 11]))
1128 .build()
1129 .unwrap();
1130
1131 let l2 = make_array(l2);
1132 let l2_field = Field::new("l2", l2.data_type().clone(), true);
1133
1134 let levels = calculate_array_levels(&l2, &l2_field).unwrap();
1135 assert_eq!(levels.len(), 1);
1136
1137 let expected_levels = ArrayLevels {
1138 def_levels: Some(vec![
1139 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
1140 ]),
1141 rep_levels: Some(vec![
1142 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
1143 ]),
1144 non_null_indices: (0..22).collect(),
1145 max_def_level: 5,
1146 max_rep_level: 2,
1147 array: Arc::new(leaf),
1148 logical_nulls: None,
1149 };
1150
1151 assert_eq!(&levels[0], &expected_levels);
1152 }
1153
1154 #[test]
1155 fn test_calculate_array_levels_nested_list() {
1156 let leaf_field = Field::new("leaf", DataType::Int32, false);
1157 let list_type = DataType::List(Arc::new(leaf_field));
1158
1159 let leaf = Int32Array::from_iter([0; 4]);
1167 let list = ArrayData::builder(list_type.clone())
1168 .len(4)
1169 .add_buffer(Buffer::from_iter(0_i32..5))
1170 .add_child_data(leaf.to_data())
1171 .build()
1172 .unwrap();
1173 let list = make_array(list);
1174
1175 let list_field = Field::new("list", list_type.clone(), false);
1176 let levels = calculate_array_levels(&list, &list_field).unwrap();
1177 assert_eq!(levels.len(), 1);
1178
1179 let expected_levels = ArrayLevels {
1180 def_levels: Some(vec![1; 4]),
1181 rep_levels: Some(vec![0; 4]),
1182 non_null_indices: (0..4).collect(),
1183 max_def_level: 1,
1184 max_rep_level: 1,
1185 array: Arc::new(leaf),
1186 logical_nulls: None,
1187 };
1188 assert_eq!(&levels[0], &expected_levels);
1189
1190 let leaf = Int32Array::from_iter(0..8);
1195 let list = ArrayData::builder(list_type.clone())
1196 .len(4)
1197 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1198 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1199 .add_child_data(leaf.to_data())
1200 .build()
1201 .unwrap();
1202 let list = make_array(list);
1203 let list_field = Arc::new(Field::new("list", list_type, true));
1204
1205 let struct_array = StructArray::from(vec![(list_field, list)]);
1206 let array = Arc::new(struct_array) as ArrayRef;
1207
1208 let struct_field = Field::new("struct", array.data_type().clone(), true);
1209 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1210 assert_eq!(levels.len(), 1);
1211
1212 let expected_levels = ArrayLevels {
1213 def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
1214 rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
1215 non_null_indices: (0..7).collect(),
1216 max_def_level: 3,
1217 max_rep_level: 1,
1218 array: Arc::new(leaf),
1219 logical_nulls: None,
1220 };
1221 assert_eq!(&levels[0], &expected_levels);
1222
1223 let leaf = Int32Array::from_iter(201..216);
1231 let leaf_field = Field::new("leaf", DataType::Int32, false);
1232 let list_1_type = DataType::List(Arc::new(leaf_field));
1233 let list_1 = ArrayData::builder(list_1_type.clone())
1234 .len(7)
1235 .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
1236 .add_child_data(leaf.to_data())
1237 .build()
1238 .unwrap();
1239
1240 let list_1_field = Field::new("l1", list_1_type, true);
1241 let list_2_type = DataType::List(Arc::new(list_1_field));
1242 let list_2 = ArrayData::builder(list_2_type.clone())
1243 .len(4)
1244 .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
1245 .null_bit_buffer(Some(Buffer::from([0b00001110])))
1246 .add_child_data(list_1)
1247 .build()
1248 .unwrap();
1249
1250 let list_2 = make_array(list_2);
1251 let list_2_field = Arc::new(Field::new("list_2", list_2_type, true));
1252
1253 let struct_array =
1254 StructArray::from((vec![(list_2_field, list_2)], Buffer::from([0b00001111])));
1255 let struct_field = Field::new("struct", struct_array.data_type().clone(), true);
1256
1257 let array = Arc::new(struct_array) as ArrayRef;
1258 let levels = calculate_array_levels(&array, &struct_field).unwrap();
1259 assert_eq!(levels.len(), 1);
1260
1261 let expected_levels = ArrayLevels {
1262 def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]),
1263 rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
1264 non_null_indices: (0..15).collect(),
1265 max_def_level: 5,
1266 max_rep_level: 2,
1267 array: Arc::new(leaf),
1268 logical_nulls: None,
1269 };
1270 assert_eq!(&levels[0], &expected_levels);
1271 }
1272
1273 #[test]
1274 fn test_calculate_nested_struct_levels() {
1275 let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]);
1285 let leaf = Arc::new(c) as ArrayRef;
1286 let c_field = Arc::new(Field::new("c", DataType::Int32, true));
1287 let b = StructArray::from(((vec![(c_field, leaf.clone())]), Buffer::from([0b00110111])));
1288
1289 let b_field = Arc::new(Field::new("b", b.data_type().clone(), true));
1290 let a = StructArray::from((
1291 (vec![(b_field, Arc::new(b) as ArrayRef)]),
1292 Buffer::from([0b00101111]),
1293 ));
1294
1295 let a_field = Field::new("a", a.data_type().clone(), true);
1296 let a_array = Arc::new(a) as ArrayRef;
1297
1298 let levels = calculate_array_levels(&a_array, &a_field).unwrap();
1299 assert_eq!(levels.len(), 1);
1300
1301 let logical_nulls = leaf.logical_nulls();
1302 let expected_levels = ArrayLevels {
1303 def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
1304 rep_levels: None,
1305 non_null_indices: vec![0, 2, 5],
1306 max_def_level: 3,
1307 max_rep_level: 0,
1308 array: leaf,
1309 logical_nulls,
1310 };
1311 assert_eq!(&levels[0], &expected_levels);
1312 }
1313
1314 #[test]
1315 fn list_single_column() {
1316 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1319 let a_value_offsets = arrow::buffer::Buffer::from_iter([0_i32, 1, 3, 3, 6, 10]);
1320 let a_list_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1321 let a_list_data = ArrayData::builder(a_list_type.clone())
1322 .len(5)
1323 .add_buffer(a_value_offsets)
1324 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1325 .add_child_data(a_values.to_data())
1326 .build()
1327 .unwrap();
1328
1329 assert_eq!(a_list_data.null_count(), 1);
1330
1331 let a = ListArray::from(a_list_data);
1332
1333 let item_field = Field::new_list_field(a_list_type, true);
1334 let mut builder = levels(&item_field, a);
1335 builder.write(2..4);
1336 let levels = builder.finish();
1337
1338 assert_eq!(levels.len(), 1);
1339
1340 let list_level = &levels[0];
1341
1342 let expected_level = ArrayLevels {
1343 def_levels: Some(vec![0, 3, 3, 3]),
1344 rep_levels: Some(vec![0, 0, 1, 1]),
1345 non_null_indices: vec![3, 4, 5],
1346 max_def_level: 3,
1347 max_rep_level: 1,
1348 array: Arc::new(a_values),
1349 logical_nulls: None,
1350 };
1351 assert_eq!(list_level, &expected_level);
1352 }
1353
1354 #[test]
1355 fn mixed_struct_list() {
1356 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1360 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1361 let struct_field_g = Arc::new(Field::new(
1362 "g",
1363 DataType::List(Arc::new(Field::new("items", DataType::Int16, false))),
1364 false,
1365 ));
1366 let struct_field_e = Arc::new(Field::new(
1367 "e",
1368 DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()].into()),
1369 true,
1370 ));
1371 let schema = Schema::new(vec![
1372 Field::new("a", DataType::Int32, false),
1373 Field::new("b", DataType::Int32, true),
1374 Field::new(
1375 "c",
1376 DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()].into()),
1377 true, ),
1379 ]);
1380
1381 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1383 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1384 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1385 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1386
1387 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1388
1389 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1392
1393 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1395 .len(5)
1396 .add_buffer(g_value_offsets)
1397 .add_child_data(g_value.into_data())
1398 .build()
1399 .unwrap();
1400 let g = ListArray::from(g_list_data);
1401
1402 let e = StructArray::from(vec![
1403 (struct_field_f, Arc::new(f.clone()) as ArrayRef),
1404 (struct_field_g, Arc::new(g) as ArrayRef),
1405 ]);
1406
1407 let c = StructArray::from(vec![
1408 (struct_field_d, Arc::new(d.clone()) as ArrayRef),
1409 (struct_field_e, Arc::new(e) as ArrayRef),
1410 ]);
1411
1412 let batch = RecordBatch::try_new(
1414 Arc::new(schema),
1415 vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
1416 )
1417 .unwrap();
1418
1419 let mut levels = vec![];
1422 batch
1423 .columns()
1424 .iter()
1425 .zip(batch.schema().fields())
1426 .for_each(|(array, field)| {
1427 let mut array_levels = calculate_array_levels(array, field).unwrap();
1428 levels.append(&mut array_levels);
1429 });
1430 assert_eq!(levels.len(), 5);
1431
1432 let list_level = &levels[0];
1434
1435 let expected_level = ArrayLevels {
1436 def_levels: None,
1437 rep_levels: None,
1438 non_null_indices: vec![0, 1, 2, 3, 4],
1439 max_def_level: 0,
1440 max_rep_level: 0,
1441 array: Arc::new(a),
1442 logical_nulls: None,
1443 };
1444 assert_eq!(list_level, &expected_level);
1445
1446 let list_level = levels.get(1).unwrap();
1448
1449 let b_logical_nulls = b.logical_nulls();
1450 let expected_level = ArrayLevels {
1451 def_levels: Some(vec![1, 0, 0, 1, 1]),
1452 rep_levels: None,
1453 non_null_indices: vec![0, 3, 4],
1454 max_def_level: 1,
1455 max_rep_level: 0,
1456 array: Arc::new(b),
1457 logical_nulls: b_logical_nulls,
1458 };
1459 assert_eq!(list_level, &expected_level);
1460
1461 let list_level = levels.get(2).unwrap();
1463
1464 let d_logical_nulls = d.logical_nulls();
1465 let expected_level = ArrayLevels {
1466 def_levels: Some(vec![1, 1, 1, 2, 1]),
1467 rep_levels: None,
1468 non_null_indices: vec![3],
1469 max_def_level: 2,
1470 max_rep_level: 0,
1471 array: Arc::new(d),
1472 logical_nulls: d_logical_nulls,
1473 };
1474 assert_eq!(list_level, &expected_level);
1475
1476 let list_level = levels.get(3).unwrap();
1478
1479 let f_logical_nulls = f.logical_nulls();
1480 let expected_level = ArrayLevels {
1481 def_levels: Some(vec![3, 2, 3, 2, 3]),
1482 rep_levels: None,
1483 non_null_indices: vec![0, 2, 4],
1484 max_def_level: 3,
1485 max_rep_level: 0,
1486 array: Arc::new(f),
1487 logical_nulls: f_logical_nulls,
1488 };
1489 assert_eq!(list_level, &expected_level);
1490 }
1491
1492 #[test]
1493 fn test_null_vs_nonnull_struct() {
1494 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1496 let schema = Schema::new(vec![Field::new(
1497 "some_nested_object",
1498 DataType::Struct(vec![offset_field.clone()].into()),
1499 false,
1500 )]);
1501
1502 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1504
1505 let some_nested_object =
1506 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1507
1508 let batch =
1510 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1511
1512 let struct_null_level =
1513 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1514
1515 let offset_field = Arc::new(Field::new("offset", DataType::Int32, true));
1518 let schema = Schema::new(vec![Field::new(
1519 "some_nested_object",
1520 DataType::Struct(vec![offset_field.clone()].into()),
1521 true,
1522 )]);
1523
1524 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1526
1527 let some_nested_object =
1528 StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]);
1529
1530 let batch =
1532 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1533
1534 let struct_non_null_level =
1535 calculate_array_levels(batch.column(0), batch.schema().field(0)).unwrap();
1536
1537 if struct_non_null_level == struct_null_level {
1539 panic!("Levels should not be equal, to reflect the difference in struct nullness");
1540 }
1541 }
1542
1543 #[test]
1544 fn test_map_array() {
1545 let json_content = r#"
1547 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1548 {"stocks":{"long": "$CCC", "short": null}}
1549 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1550 "#;
1551 let entries_struct_type = DataType::Struct(Fields::from(vec![
1552 Field::new("key", DataType::Utf8, false),
1553 Field::new("value", DataType::Utf8, true),
1554 ]));
1555 let stocks_field = Field::new(
1556 "stocks",
1557 DataType::Map(
1558 Arc::new(Field::new("entries", entries_struct_type, false)),
1559 false,
1560 ),
1561 false,
1563 );
1564 let schema = Arc::new(Schema::new(vec![stocks_field]));
1565 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1566 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1567
1568 let batch = reader.next().unwrap().unwrap();
1569
1570 let mut levels = vec![];
1572 batch
1573 .columns()
1574 .iter()
1575 .zip(batch.schema().fields())
1576 .for_each(|(array, field)| {
1577 let mut array_levels = calculate_array_levels(array, field).unwrap();
1578 levels.append(&mut array_levels);
1579 });
1580 assert_eq!(levels.len(), 2);
1581
1582 let map = batch.column(0).as_map();
1583 let map_keys_logical_nulls = map.keys().logical_nulls();
1584
1585 let list_level = &levels[0];
1587
1588 let expected_level = ArrayLevels {
1589 def_levels: Some(vec![1; 7]),
1590 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1591 non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
1592 max_def_level: 1,
1593 max_rep_level: 1,
1594 array: map.keys().clone(),
1595 logical_nulls: map_keys_logical_nulls,
1596 };
1597 assert_eq!(list_level, &expected_level);
1598
1599 let list_level = levels.get(1).unwrap();
1601 let map_values_logical_nulls = map.values().logical_nulls();
1602
1603 let expected_level = ArrayLevels {
1604 def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
1605 rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
1606 non_null_indices: vec![0, 1, 2, 4, 6],
1607 max_def_level: 2,
1608 max_rep_level: 1,
1609 array: map.values().clone(),
1610 logical_nulls: map_values_logical_nulls,
1611 };
1612 assert_eq!(list_level, &expected_level);
1613 }
1614
1615 #[test]
1616 fn test_list_of_struct() {
1617 let int_field = Field::new("a", DataType::Int32, true);
1619 let fields = Fields::from([Arc::new(int_field)]);
1620 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1621 let list_field = Field::new("list", DataType::List(Arc::new(item_field)), true);
1622
1623 let int_builder = Int32Builder::with_capacity(10);
1624 let struct_builder = StructBuilder::new(fields, vec![Box::new(int_builder)]);
1625 let mut list_builder = ListBuilder::new(struct_builder);
1626
1627 let values = list_builder.values();
1631 values
1632 .field_builder::<Int32Builder>(0)
1633 .unwrap()
1634 .append_value(1);
1635 values.append(true);
1636 list_builder.append(true);
1637
1638 list_builder.append(true);
1640
1641 list_builder.append(false);
1643
1644 let values = list_builder.values();
1646 values
1647 .field_builder::<Int32Builder>(0)
1648 .unwrap()
1649 .append_null();
1650 values.append(false);
1651 values
1652 .field_builder::<Int32Builder>(0)
1653 .unwrap()
1654 .append_null();
1655 values.append(false);
1656 list_builder.append(true);
1657
1658 let values = list_builder.values();
1660 values
1661 .field_builder::<Int32Builder>(0)
1662 .unwrap()
1663 .append_null();
1664 values.append(true);
1665 list_builder.append(true);
1666
1667 let values = list_builder.values();
1669 values
1670 .field_builder::<Int32Builder>(0)
1671 .unwrap()
1672 .append_value(2);
1673 values.append(true);
1674 list_builder.append(true);
1675
1676 let array = Arc::new(list_builder.finish());
1677
1678 let values = array.values().as_struct().column(0).clone();
1679 let values_len = values.len();
1680 assert_eq!(values_len, 5);
1681
1682 let schema = Arc::new(Schema::new(vec![list_field]));
1683
1684 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
1685
1686 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
1687 let list_level = &levels[0];
1688
1689 let logical_nulls = values.logical_nulls();
1690 let expected_level = ArrayLevels {
1691 def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
1692 rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
1693 non_null_indices: vec![0, 4],
1694 max_def_level: 4,
1695 max_rep_level: 1,
1696 array: values,
1697 logical_nulls,
1698 };
1699
1700 assert_eq!(list_level, &expected_level);
1701 }
1702
1703 #[test]
1704 fn test_struct_mask_list() {
1705 let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1707 Some(vec![Some(1), Some(2)]),
1708 Some(vec![None]),
1709 Some(vec![]),
1710 Some(vec![Some(3), None]), Some(vec![Some(4), Some(5)]),
1712 None, None,
1714 ]);
1715 let values = inner.values().clone();
1716
1717 assert_eq!(inner.values().len(), 7);
1719
1720 let field = Arc::new(Field::new("list", inner.data_type().clone(), true));
1721 let array = Arc::new(inner) as ArrayRef;
1722 let nulls = Buffer::from([0b01010111]);
1723 let struct_a = StructArray::from((vec![(field, array)], nulls));
1724
1725 let field = Field::new("struct", struct_a.data_type().clone(), true);
1726 let array = Arc::new(struct_a) as ArrayRef;
1727 let levels = calculate_array_levels(&array, &field).unwrap();
1728
1729 assert_eq!(levels.len(), 1);
1730
1731 let logical_nulls = values.logical_nulls();
1732 let expected_level = ArrayLevels {
1733 def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
1734 rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
1735 non_null_indices: vec![0, 1, 5, 6],
1736 max_def_level: 4,
1737 max_rep_level: 1,
1738 array: values,
1739 logical_nulls,
1740 };
1741
1742 assert_eq!(&levels[0], &expected_level);
1743 }
1744
1745 #[test]
1746 fn test_list_mask_struct() {
1747 let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1751 Some(vec![None]), Some(vec![]), Some(vec![Some(3), None]),
1754 Some(vec![Some(4), Some(5), None, Some(6)]), None,
1756 None,
1757 ]);
1758 let a1_values = a1.values().clone();
1759 let a1 = Arc::new(a1) as ArrayRef;
1760
1761 let a2 = Arc::new(Int32Array::from_iter(vec![
1762 Some(1), Some(2), None,
1765 Some(4), Some(5),
1767 None,
1768 ])) as ArrayRef;
1769 let a2_values = a2.clone();
1770
1771 let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true));
1772 let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true));
1773
1774 let nulls = Buffer::from([0b00110111]);
1775 let struct_a = Arc::new(StructArray::from((
1776 vec![(field_a1, a1), (field_a2, a2)],
1777 nulls,
1778 ))) as ArrayRef;
1779
1780 let offsets = Buffer::from_iter([0_i32, 0, 2, 2, 3, 5, 5]);
1781 let nulls = Buffer::from([0b00111100]);
1782
1783 let list_type = DataType::List(Arc::new(Field::new(
1784 "struct",
1785 struct_a.data_type().clone(),
1786 true,
1787 )));
1788
1789 let data = ArrayDataBuilder::new(list_type.clone())
1790 .len(6)
1791 .null_bit_buffer(Some(nulls))
1792 .add_buffer(offsets)
1793 .add_child_data(struct_a.into_data())
1794 .build()
1795 .unwrap();
1796
1797 let list = make_array(data);
1798 let list_field = Field::new("col", list_type, true);
1799
1800 let expected = vec![
1801 r#""#.to_string(),
1802 r#""#.to_string(),
1803 r#"[]"#.to_string(),
1804 r#"[{list: [3, ], integers: }]"#.to_string(),
1805 r#"[, {list: , integers: 5}]"#.to_string(),
1806 r#"[]"#.to_string(),
1807 ];
1808
1809 let actual: Vec<_> = (0..6)
1810 .map(|x| array_value_to_string(&list, x).unwrap())
1811 .collect();
1812 assert_eq!(actual, expected);
1813
1814 let levels = calculate_array_levels(&list, &list_field).unwrap();
1815
1816 assert_eq!(levels.len(), 2);
1817
1818 let a1_logical_nulls = a1_values.logical_nulls();
1819 let expected_level = ArrayLevels {
1820 def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
1821 rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
1822 non_null_indices: vec![1],
1823 max_def_level: 6,
1824 max_rep_level: 2,
1825 array: a1_values,
1826 logical_nulls: a1_logical_nulls,
1827 };
1828
1829 assert_eq!(&levels[0], &expected_level);
1830
1831 let a2_logical_nulls = a2_values.logical_nulls();
1832 let expected_level = ArrayLevels {
1833 def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
1834 rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
1835 non_null_indices: vec![4],
1836 max_def_level: 4,
1837 max_rep_level: 1,
1838 array: a2_values,
1839 logical_nulls: a2_logical_nulls,
1840 };
1841
1842 assert_eq!(&levels[1], &expected_level);
1843 }
1844
1845 #[test]
1846 fn test_fixed_size_list() {
1847 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2);
1849 builder.values().append_slice(&[1, 2]);
1850 builder.append(true);
1851 builder.values().append_slice(&[3, 4]);
1852 builder.append(false);
1853 builder.values().append_slice(&[5, 6]);
1854 builder.append(false);
1855 builder.values().append_slice(&[7, 8]);
1856 builder.append(true);
1857 builder.values().append_slice(&[9, 10]);
1858 builder.append(false);
1859 let a = builder.finish();
1860 let values = a.values().clone();
1861
1862 let item_field = Field::new_list_field(a.data_type().clone(), true);
1863 let mut builder = levels(&item_field, a);
1864 builder.write(1..4);
1865 let levels = builder.finish();
1866
1867 assert_eq!(levels.len(), 1);
1868
1869 let list_level = &levels[0];
1870
1871 let logical_nulls = values.logical_nulls();
1872 let expected_level = ArrayLevels {
1873 def_levels: Some(vec![0, 0, 3, 3]),
1874 rep_levels: Some(vec![0, 0, 0, 1]),
1875 non_null_indices: vec![6, 7],
1876 max_def_level: 3,
1877 max_rep_level: 1,
1878 array: values,
1879 logical_nulls,
1880 };
1881 assert_eq!(list_level, &expected_level);
1882 }
1883
1884 #[test]
1885 fn test_fixed_size_list_of_struct() {
1886 let field_a = Field::new("a", DataType::Int32, true);
1888 let field_b = Field::new("b", DataType::Int64, false);
1889 let fields = Fields::from([Arc::new(field_a), Arc::new(field_b)]);
1890 let item_field = Field::new_list_field(DataType::Struct(fields.clone()), true);
1891 let list_field = Field::new(
1892 "list",
1893 DataType::FixedSizeList(Arc::new(item_field), 2),
1894 true,
1895 );
1896
1897 let builder_a = Int32Builder::with_capacity(10);
1898 let builder_b = Int64Builder::with_capacity(10);
1899 let struct_builder =
1900 StructBuilder::new(fields, vec![Box::new(builder_a), Box::new(builder_b)]);
1901 let mut list_builder = FixedSizeListBuilder::new(struct_builder, 2);
1902
1903 let values = list_builder.values();
1912 values
1914 .field_builder::<Int32Builder>(0)
1915 .unwrap()
1916 .append_value(1);
1917 values
1918 .field_builder::<Int64Builder>(1)
1919 .unwrap()
1920 .append_value(2);
1921 values.append(true);
1922 values
1924 .field_builder::<Int32Builder>(0)
1925 .unwrap()
1926 .append_null();
1927 values
1928 .field_builder::<Int64Builder>(1)
1929 .unwrap()
1930 .append_value(0);
1931 values.append(false);
1932 list_builder.append(true);
1933
1934 let values = list_builder.values();
1936 values
1938 .field_builder::<Int32Builder>(0)
1939 .unwrap()
1940 .append_null();
1941 values
1942 .field_builder::<Int64Builder>(1)
1943 .unwrap()
1944 .append_value(0);
1945 values.append(false);
1946 values
1948 .field_builder::<Int32Builder>(0)
1949 .unwrap()
1950 .append_null();
1951 values
1952 .field_builder::<Int64Builder>(1)
1953 .unwrap()
1954 .append_value(0);
1955 values.append(false);
1956 list_builder.append(false);
1957
1958 let values = list_builder.values();
1960 values
1962 .field_builder::<Int32Builder>(0)
1963 .unwrap()
1964 .append_null();
1965 values
1966 .field_builder::<Int64Builder>(1)
1967 .unwrap()
1968 .append_value(0);
1969 values.append(false);
1970 values
1972 .field_builder::<Int32Builder>(0)
1973 .unwrap()
1974 .append_null();
1975 values
1976 .field_builder::<Int64Builder>(1)
1977 .unwrap()
1978 .append_value(0);
1979 values.append(false);
1980 list_builder.append(true);
1981
1982 let values = list_builder.values();
1984 values
1986 .field_builder::<Int32Builder>(0)
1987 .unwrap()
1988 .append_null();
1989 values
1990 .field_builder::<Int64Builder>(1)
1991 .unwrap()
1992 .append_value(3);
1993 values.append(true);
1994 values
1996 .field_builder::<Int32Builder>(0)
1997 .unwrap()
1998 .append_value(2);
1999 values
2000 .field_builder::<Int64Builder>(1)
2001 .unwrap()
2002 .append_value(4);
2003 values.append(true);
2004 list_builder.append(true);
2005
2006 let array = Arc::new(list_builder.finish());
2007
2008 assert_eq!(array.values().len(), 8);
2009 assert_eq!(array.len(), 4);
2010
2011 let struct_values = array.values().as_struct();
2012 let values_a = struct_values.column(0).clone();
2013 let values_b = struct_values.column(1).clone();
2014
2015 let schema = Arc::new(Schema::new(vec![list_field]));
2016 let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
2017
2018 let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap();
2019 let a_levels = &levels[0];
2020 let b_levels = &levels[1];
2021
2022 let values_a_logical_nulls = values_a.logical_nulls();
2024 let expected_a = ArrayLevels {
2025 def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
2026 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
2027 non_null_indices: vec![0, 7],
2028 max_def_level: 4,
2029 max_rep_level: 1,
2030 array: values_a,
2031 logical_nulls: values_a_logical_nulls,
2032 };
2033 let values_b_logical_nulls = values_b.logical_nulls();
2035 let expected_b = ArrayLevels {
2036 def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
2037 rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
2038 non_null_indices: vec![0, 6, 7],
2039 max_def_level: 3,
2040 max_rep_level: 1,
2041 array: values_b,
2042 logical_nulls: values_b_logical_nulls,
2043 };
2044
2045 assert_eq!(a_levels, &expected_a);
2046 assert_eq!(b_levels, &expected_b);
2047 }
2048
2049 #[test]
2050 fn test_fixed_size_list_empty() {
2051 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 0);
2052 builder.append(true);
2053 builder.append(false);
2054 builder.append(true);
2055 let array = builder.finish();
2056 let values = array.values().clone();
2057
2058 let item_field = Field::new_list_field(array.data_type().clone(), true);
2059 let mut builder = levels(&item_field, array);
2060 builder.write(0..3);
2061 let levels = builder.finish();
2062
2063 assert_eq!(levels.len(), 1);
2064
2065 let list_level = &levels[0];
2066
2067 let logical_nulls = values.logical_nulls();
2068 let expected_level = ArrayLevels {
2069 def_levels: Some(vec![1, 0, 1]),
2070 rep_levels: Some(vec![0, 0, 0]),
2071 non_null_indices: vec![],
2072 max_def_level: 3,
2073 max_rep_level: 1,
2074 array: values,
2075 logical_nulls,
2076 };
2077 assert_eq!(list_level, &expected_level);
2078 }
2079
2080 #[test]
2081 fn test_fixed_size_list_of_var_lists() {
2082 let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
2084 builder.values().append_value([Some(1), None, Some(3)]);
2085 builder.values().append_null();
2086 builder.append(true);
2087 builder.values().append_value([Some(4)]);
2088 builder.values().append_value([]);
2089 builder.append(true);
2090 builder.values().append_value([Some(5), Some(6)]);
2091 builder.values().append_value([None, None]);
2092 builder.append(true);
2093 builder.values().append_null();
2094 builder.values().append_null();
2095 builder.append(false);
2096 let a = builder.finish();
2097 let values = a.values().as_list::<i32>().values().clone();
2098
2099 let item_field = Field::new_list_field(a.data_type().clone(), true);
2100 let mut builder = levels(&item_field, a);
2101 builder.write(0..4);
2102 let levels = builder.finish();
2103
2104 let logical_nulls = values.logical_nulls();
2105 let expected_level = ArrayLevels {
2106 def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
2107 rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
2108 non_null_indices: vec![0, 2, 3, 4, 5],
2109 max_def_level: 5,
2110 max_rep_level: 2,
2111 array: values,
2112 logical_nulls,
2113 };
2114
2115 assert_eq!(levels[0], expected_level);
2116 }
2117
2118 #[test]
2119 fn test_null_dictionary_values() {
2120 let values = Int32Array::new(
2121 vec![1, 2, 3, 4].into(),
2122 Some(NullBuffer::from(vec![true, false, true, true])),
2123 );
2124 let keys = Int32Array::new(
2125 vec![1, 54, 2, 0].into(),
2126 Some(NullBuffer::from(vec![true, false, true, true])),
2127 );
2128 let dict = DictionaryArray::new(keys, Arc::new(values));
2130
2131 let item_field = Field::new_list_field(dict.data_type().clone(), true);
2132
2133 let mut builder = levels(&item_field, dict.clone());
2134 builder.write(0..4);
2135 let levels = builder.finish();
2136
2137 let logical_nulls = dict.logical_nulls();
2138 let expected_level = ArrayLevels {
2139 def_levels: Some(vec![0, 0, 1, 1]),
2140 rep_levels: None,
2141 non_null_indices: vec![2, 3],
2142 max_def_level: 1,
2143 max_rep_level: 0,
2144 array: Arc::new(dict),
2145 logical_nulls,
2146 };
2147 assert_eq!(levels[0], expected_level);
2148 }
2149
2150 #[test]
2151 fn mismatched_types() {
2152 let array = Arc::new(Int32Array::from_iter(0..10)) as ArrayRef;
2153 let field = Field::new_list_field(DataType::Float64, false);
2154
2155 let err = LevelInfoBuilder::try_new(&field, Default::default(), &array)
2156 .unwrap_err()
2157 .to_string();
2158
2159 assert_eq!(
2160 err,
2161 "Arrow: Incompatible type. Field 'item' has type Float64, array has type Int32",
2162 );
2163 }
2164
2165 fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder {
2166 let v = Arc::new(array) as ArrayRef;
2167 LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
2168 }
2169
2170 #[test]
2171 fn test_slice_for_chunk_flat() {
2172 let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2177 let logical_nulls = array.logical_nulls();
2178 let levels = ArrayLevels {
2179 def_levels: None,
2180 rep_levels: None,
2181 non_null_indices: vec![0, 1, 2, 3, 4, 5],
2182 max_def_level: 0,
2183 max_rep_level: 0,
2184 array,
2185 logical_nulls,
2186 };
2187 let sliced = levels.slice_for_chunk(&CdcChunk {
2188 level_offset: 0,
2189 num_levels: 0,
2190 value_offset: 2,
2191 num_values: 3,
2192 });
2193 assert!(sliced.def_levels.is_none());
2194 assert!(sliced.rep_levels.is_none());
2195 assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2196 assert_eq!(sliced.array.len(), 3);
2197
2198 let array: ArrayRef = Arc::new(Int32Array::from(vec![
2204 Some(1),
2205 None,
2206 Some(3),
2207 None,
2208 Some(5),
2209 Some(6),
2210 ]));
2211 let logical_nulls = array.logical_nulls();
2212 let levels = ArrayLevels {
2213 def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
2214 rep_levels: None,
2215 non_null_indices: vec![0, 2, 4, 5],
2216 max_def_level: 1,
2217 max_rep_level: 0,
2218 array,
2219 logical_nulls,
2220 };
2221 let sliced = levels.slice_for_chunk(&CdcChunk {
2222 level_offset: 1,
2223 num_levels: 3,
2224 value_offset: 1,
2225 num_values: 1,
2226 });
2227 assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
2228 assert!(sliced.rep_levels.is_none());
2229 assert_eq!(sliced.non_null_indices, vec![0]); assert_eq!(sliced.array.len(), 1);
2231 }
2232
2233 #[test]
2234 fn test_slice_for_chunk_nested_with_nulls() {
2235 let array: ArrayRef = Arc::new(Int32Array::from(vec![
2254 Some(1), None, None, Some(2), None, None, None, None, Some(4), Some(5), ]));
2265 let logical_nulls = array.logical_nulls();
2266 let levels = ArrayLevels {
2267 def_levels: Some(vec![3, 0, 3, 2, 0, 3, 3]),
2268 rep_levels: Some(vec![0, 0, 0, 1, 0, 0, 1]),
2269 non_null_indices: vec![0, 3, 8, 9],
2270 max_def_level: 3,
2271 max_rep_level: 1,
2272 array,
2273 logical_nulls,
2274 };
2275
2276 let chunk0 = levels.slice_for_chunk(&CdcChunk {
2278 level_offset: 0,
2279 num_levels: 2,
2280 value_offset: 0,
2281 num_values: 1,
2282 });
2283 assert_eq!(chunk0.non_null_indices, vec![0]);
2284 assert_eq!(chunk0.array.len(), 1);
2285
2286 let chunk1 = levels.slice_for_chunk(&CdcChunk {
2288 level_offset: 2,
2289 num_levels: 3,
2290 value_offset: 1,
2291 num_values: 1,
2292 });
2293 assert_eq!(chunk1.non_null_indices, vec![0]);
2294 assert_eq!(chunk1.array.len(), 1);
2295
2296 let chunk2 = levels.slice_for_chunk(&CdcChunk {
2298 level_offset: 5,
2299 num_levels: 2,
2300 value_offset: 2,
2301 num_values: 2,
2302 });
2303 assert_eq!(chunk2.non_null_indices, vec![0, 1]);
2304 assert_eq!(chunk2.array.len(), 2);
2305 }
2306
2307 #[test]
2308 fn test_slice_for_chunk_all_null() {
2309 let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)]));
2311 let logical_nulls = array.logical_nulls();
2312 let levels = ArrayLevels {
2313 def_levels: Some(vec![1, 0, 0, 1]),
2314 rep_levels: None,
2315 non_null_indices: vec![0, 3],
2316 max_def_level: 1,
2317 max_rep_level: 0,
2318 array,
2319 logical_nulls,
2320 };
2321 let sliced = levels.slice_for_chunk(&CdcChunk {
2323 level_offset: 1,
2324 num_levels: 2,
2325 value_offset: 1,
2326 num_values: 0,
2327 });
2328 assert_eq!(sliced.def_levels, Some(vec![0, 0]));
2329 assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
2330 assert_eq!(sliced.array.len(), 0);
2331 }
2332}