1#![allow(clippy::enum_clike_unportable_variant)]
18
19use crate::{Array, ArrayRef, make_array};
20use arrow_buffer::bit_chunk_iterator::{BitChunkIterator, BitChunks};
21use arrow_buffer::buffer::NullBuffer;
22use arrow_buffer::{BooleanBuffer, Buffer, MutableBuffer, ScalarBuffer};
23use arrow_data::{ArrayData, ArrayDataBuilder};
24use arrow_schema::{ArrowError, DataType, UnionFields, UnionMode};
25use std::any::Any;
28use std::collections::HashSet;
29use std::sync::Arc;
30
31#[derive(Clone)]
123pub struct UnionArray {
124 data_type: DataType,
125 type_ids: ScalarBuffer<i8>,
126 offsets: Option<ScalarBuffer<i32>>,
127 fields: Vec<Option<ArrayRef>>,
128}
129
130impl UnionArray {
131 pub unsafe fn new_unchecked(
150 fields: UnionFields,
151 type_ids: ScalarBuffer<i8>,
152 offsets: Option<ScalarBuffer<i32>>,
153 children: Vec<ArrayRef>,
154 ) -> Self {
155 let mode = if offsets.is_some() {
156 UnionMode::Dense
157 } else {
158 UnionMode::Sparse
159 };
160
161 let len = type_ids.len();
162 let builder = ArrayData::builder(DataType::Union(fields, mode))
163 .add_buffer(type_ids.into_inner())
164 .child_data(children.into_iter().map(Array::into_data).collect())
165 .len(len);
166
167 let data = match offsets {
168 Some(offsets) => unsafe { builder.add_buffer(offsets.into_inner()).build_unchecked() },
169 None => unsafe { builder.build_unchecked() },
170 };
171 Self::from(data)
172 }
173
174 pub fn try_new(
178 fields: UnionFields,
179 type_ids: ScalarBuffer<i8>,
180 offsets: Option<ScalarBuffer<i32>>,
181 children: Vec<ArrayRef>,
182 ) -> Result<Self, ArrowError> {
183 if fields.len() != children.len() {
185 return Err(ArrowError::InvalidArgumentError(
186 "Union fields length must match child arrays length".to_string(),
187 ));
188 }
189
190 if let Some(offsets) = &offsets {
191 if offsets.len() != type_ids.len() {
193 return Err(ArrowError::InvalidArgumentError(
194 "Type Ids and Offsets lengths must match".to_string(),
195 ));
196 }
197 } else {
198 for child in &children {
200 if child.len() != type_ids.len() {
201 return Err(ArrowError::InvalidArgumentError(
202 "Sparse union child arrays must be equal in length to the length of the union".to_string(),
203 ));
204 }
205 }
206 }
207
208 let max_id = fields.iter().map(|(i, _)| i).max().unwrap_or_default() as usize;
210 let mut array_lens = vec![i32::MIN; max_id + 1];
211 for (cd, (field_id, _)) in children.iter().zip(fields.iter()) {
212 array_lens[field_id as usize] = cd.len() as i32;
213 }
214
215 for id in &type_ids {
217 match array_lens.get(*id as usize) {
218 Some(x) if *x != i32::MIN => {}
219 _ => {
220 return Err(ArrowError::InvalidArgumentError(
221 "Type Ids values must match one of the field type ids".to_owned(),
222 ));
223 }
224 }
225 }
226
227 if let Some(offsets) = &offsets {
229 let mut iter = type_ids.iter().zip(offsets.iter());
230 if iter.any(|(type_id, &offset)| offset < 0 || offset >= array_lens[*type_id as usize])
231 {
232 return Err(ArrowError::InvalidArgumentError(
233 "Offsets must be non-negative and within the length of the Array".to_owned(),
234 ));
235 }
236 }
237
238 let union_array = unsafe { Self::new_unchecked(fields, type_ids, offsets, children) };
241 Ok(union_array)
242 }
243
244 pub fn child(&self, type_id: i8) -> &ArrayRef {
251 assert!((type_id as usize) < self.fields.len());
252 let boxed = &self.fields[type_id as usize];
253 boxed.as_ref().expect("invalid type id")
254 }
255
256 pub fn type_id(&self, index: usize) -> i8 {
262 assert!(index < self.type_ids.len());
263 self.type_ids[index]
264 }
265
266 pub fn type_ids(&self) -> &ScalarBuffer<i8> {
268 &self.type_ids
269 }
270
271 pub fn offsets(&self) -> Option<&ScalarBuffer<i32>> {
273 self.offsets.as_ref()
274 }
275
276 pub fn value_offset(&self, index: usize) -> usize {
282 assert!(index < self.len());
283 match &self.offsets {
284 Some(offsets) => offsets[index] as usize,
285 None => self.offset() + index,
286 }
287 }
288
289 pub fn value(&self, i: usize) -> ArrayRef {
297 let type_id = self.type_id(i);
298 let value_offset = self.value_offset(i);
299 let child = self.child(type_id);
300 child.slice(value_offset, 1)
301 }
302
303 pub fn type_names(&self) -> Vec<&str> {
305 match self.data_type() {
306 DataType::Union(fields, _) => fields
307 .iter()
308 .map(|(_, f)| f.name().as_str())
309 .collect::<Vec<&str>>(),
310 _ => unreachable!("Union array's data type is not a union!"),
311 }
312 }
313
314 pub fn fields(&self) -> &UnionFields {
316 match self.data_type() {
317 DataType::Union(fields, _) => fields,
318 _ => unreachable!("Union array's data type is not a union!"),
319 }
320 }
321
322 pub fn is_dense(&self) -> bool {
324 match self.data_type() {
325 DataType::Union(_, mode) => mode == &UnionMode::Dense,
326 _ => unreachable!("Union array's data type is not a union!"),
327 }
328 }
329
330 pub fn slice(&self, offset: usize, length: usize) -> Self {
332 let (offsets, fields) = match self.offsets.as_ref() {
333 Some(offsets) => (Some(offsets.slice(offset, length)), self.fields.clone()),
335 None => {
337 let fields = self
338 .fields
339 .iter()
340 .map(|x| x.as_ref().map(|x| x.slice(offset, length)))
341 .collect();
342 (None, fields)
343 }
344 };
345
346 Self {
347 data_type: self.data_type.clone(),
348 type_ids: self.type_ids.slice(offset, length),
349 offsets,
350 fields,
351 }
352 }
353
354 #[allow(clippy::type_complexity)]
382 pub fn into_parts(
383 self,
384 ) -> (
385 UnionFields,
386 ScalarBuffer<i8>,
387 Option<ScalarBuffer<i32>>,
388 Vec<ArrayRef>,
389 ) {
390 let Self {
391 data_type,
392 type_ids,
393 offsets,
394 mut fields,
395 } = self;
396 match data_type {
397 DataType::Union(union_fields, _) => {
398 let children = union_fields
399 .iter()
400 .map(|(type_id, _)| fields[type_id as usize].take().unwrap())
401 .collect();
402 (union_fields, type_ids, offsets, children)
403 }
404 _ => unreachable!(),
405 }
406 }
407
408 fn mask_sparse_skip_without_nulls(&self, nulls: Vec<(i8, NullBuffer)>) -> BooleanBuffer {
410 let fold = |(with_nulls_selected, union_nulls), (is_field, field_nulls)| {
416 (
417 with_nulls_selected | is_field,
418 union_nulls | (is_field & field_nulls),
419 )
420 };
421
422 self.mask_sparse_helper(
423 nulls,
424 |type_ids_chunk_array, nulls_masks_iters| {
425 let (with_nulls_selected, union_nulls) = nulls_masks_iters
426 .iter_mut()
427 .map(|(field_type_id, field_nulls)| {
428 let field_nulls = field_nulls.next().unwrap();
429 let is_field = selection_mask(type_ids_chunk_array, *field_type_id);
430
431 (is_field, field_nulls)
432 })
433 .fold((0, 0), fold);
434
435 let without_nulls_selected = !with_nulls_selected;
437
438 without_nulls_selected | union_nulls
441 },
442 |type_ids_remainder, bit_chunks| {
443 let (with_nulls_selected, union_nulls) = bit_chunks
444 .iter()
445 .map(|(field_type_id, field_bit_chunks)| {
446 let field_nulls = field_bit_chunks.remainder_bits();
447 let is_field = selection_mask(type_ids_remainder, *field_type_id);
448
449 (is_field, field_nulls)
450 })
451 .fold((0, 0), fold);
452
453 let without_nulls_selected = !with_nulls_selected;
454
455 without_nulls_selected | union_nulls
456 },
457 )
458 }
459
460 fn mask_sparse_skip_fully_null(&self, mut nulls: Vec<(i8, NullBuffer)>) -> BooleanBuffer {
462 let fields = match self.data_type() {
463 DataType::Union(fields, _) => fields,
464 _ => unreachable!("Union array's data type is not a union!"),
465 };
466
467 let type_ids = fields.iter().map(|(id, _)| id).collect::<HashSet<_>>();
468 let with_nulls = nulls.iter().map(|(id, _)| *id).collect::<HashSet<_>>();
469
470 let without_nulls_ids = type_ids
471 .difference(&with_nulls)
472 .copied()
473 .collect::<Vec<_>>();
474
475 nulls.retain(|(_, nulls)| nulls.null_count() < nulls.len());
476
477 self.mask_sparse_helper(
482 nulls,
483 |type_ids_chunk_array, nulls_masks_iters| {
484 let union_nulls = nulls_masks_iters.iter_mut().fold(
485 0,
486 |union_nulls, (field_type_id, nulls_iter)| {
487 let field_nulls = nulls_iter.next().unwrap();
488
489 if field_nulls == 0 {
490 union_nulls
491 } else {
492 let is_field = selection_mask(type_ids_chunk_array, *field_type_id);
493
494 union_nulls | (is_field & field_nulls)
495 }
496 },
497 );
498
499 let without_nulls_selected =
501 without_nulls_selected(type_ids_chunk_array, &without_nulls_ids);
502
503 union_nulls | without_nulls_selected
506 },
507 |type_ids_remainder, bit_chunks| {
508 let union_nulls =
509 bit_chunks
510 .iter()
511 .fold(0, |union_nulls, (field_type_id, field_bit_chunks)| {
512 let is_field = selection_mask(type_ids_remainder, *field_type_id);
513 let field_nulls = field_bit_chunks.remainder_bits();
514
515 union_nulls | is_field & field_nulls
516 });
517
518 union_nulls | without_nulls_selected(type_ids_remainder, &without_nulls_ids)
519 },
520 )
521 }
522
523 fn mask_sparse_all_with_nulls_skip_one(&self, nulls: Vec<(i8, NullBuffer)>) -> BooleanBuffer {
525 self.mask_sparse_helper(
532 nulls,
533 |type_ids_chunk_array, nulls_masks_iters| {
534 let (is_not_first, union_nulls) = nulls_masks_iters[1..] .iter_mut()
536 .fold(
537 (0, 0),
538 |(is_not_first, union_nulls), (field_type_id, nulls_iter)| {
539 let field_nulls = nulls_iter.next().unwrap();
540 let is_field = selection_mask(type_ids_chunk_array, *field_type_id);
541
542 (
543 is_not_first | is_field,
544 union_nulls | (is_field & field_nulls),
545 )
546 },
547 );
548
549 let is_first = !is_not_first;
550 let first_nulls = nulls_masks_iters[0].1.next().unwrap();
551
552 (is_first & first_nulls) | union_nulls
553 },
554 |type_ids_remainder, bit_chunks| {
555 bit_chunks
556 .iter()
557 .fold(0, |union_nulls, (field_type_id, field_bit_chunks)| {
558 let field_nulls = field_bit_chunks.remainder_bits();
559 let is_field = selection_mask(type_ids_remainder, *field_type_id);
562
563 union_nulls | (is_field & field_nulls)
564 })
565 },
566 )
567 }
568
569 fn mask_sparse_helper(
572 &self,
573 nulls: Vec<(i8, NullBuffer)>,
574 mut mask_chunk: impl FnMut(&[i8; 64], &mut [(i8, BitChunkIterator)]) -> u64,
575 mask_remainder: impl FnOnce(&[i8], &[(i8, BitChunks)]) -> u64,
576 ) -> BooleanBuffer {
577 let bit_chunks = nulls
578 .iter()
579 .map(|(type_id, nulls)| (*type_id, nulls.inner().bit_chunks()))
580 .collect::<Vec<_>>();
581
582 let mut nulls_masks_iter = bit_chunks
583 .iter()
584 .map(|(type_id, bit_chunks)| (*type_id, bit_chunks.iter()))
585 .collect::<Vec<_>>();
586
587 let chunks_exact = self.type_ids.chunks_exact(64);
588 let remainder = chunks_exact.remainder();
589
590 let chunks = chunks_exact.map(|type_ids_chunk| {
591 let type_ids_chunk_array = <&[i8; 64]>::try_from(type_ids_chunk).unwrap();
592
593 mask_chunk(type_ids_chunk_array, &mut nulls_masks_iter)
594 });
595
596 let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) };
599
600 if !remainder.is_empty() {
601 buffer.push(mask_remainder(remainder, &bit_chunks));
602 }
603
604 BooleanBuffer::new(buffer.into(), 0, self.type_ids.len())
605 }
606
607 fn gather_nulls(&self, nulls: Vec<(i8, NullBuffer)>) -> BooleanBuffer {
609 let one_null = NullBuffer::new_null(1);
610 let one_valid = NullBuffer::new_valid(1);
611
612 let mut logical_nulls_array = [(&one_valid, Mask::Zero); 256];
619
620 for (type_id, nulls) in &nulls {
621 if nulls.null_count() == nulls.len() {
622 logical_nulls_array[*type_id as u8 as usize] = (&one_null, Mask::Zero);
624 } else {
625 logical_nulls_array[*type_id as u8 as usize] = (nulls, Mask::Max);
626 }
627 }
628
629 match &self.offsets {
630 Some(offsets) => {
631 assert_eq!(self.type_ids.len(), offsets.len());
632
633 BooleanBuffer::collect_bool(self.type_ids.len(), |i| unsafe {
634 let type_id = *self.type_ids.get_unchecked(i);
636 let offset = *offsets.get_unchecked(i);
638
639 let (nulls, offset_mask) = &logical_nulls_array[type_id as u8 as usize];
640
641 nulls
647 .inner()
648 .value_unchecked(offset as usize & *offset_mask as usize)
649 })
650 }
651 None => {
652 BooleanBuffer::collect_bool(self.type_ids.len(), |index| unsafe {
653 let type_id = *self.type_ids.get_unchecked(index);
655
656 let (nulls, index_mask) = &logical_nulls_array[type_id as u8 as usize];
657
658 nulls.inner().value_unchecked(index & *index_mask as usize)
664 })
665 }
666 }
667 }
668
669 fn fields_logical_nulls(&self) -> Vec<(i8, NullBuffer)> {
672 self.fields
673 .iter()
674 .enumerate()
675 .filter_map(|(type_id, field)| Some((type_id as i8, field.as_ref()?.logical_nulls()?)))
676 .filter(|(_, nulls)| nulls.null_count() > 0)
677 .collect()
678 }
679}
680
681impl From<ArrayData> for UnionArray {
682 fn from(data: ArrayData) -> Self {
683 let (data_type, len, _nulls, offset, buffers, child_data) = data.into_parts();
684
685 let (fields, mode) = match &data_type {
686 DataType::Union(fields, mode) => (fields, mode),
687 d => panic!("UnionArray expected ArrayData with type Union got {d}"),
688 };
689
690 let (type_ids, offsets) = match mode {
691 UnionMode::Sparse => {
692 let [buffer]: [Buffer; 1] = buffers.try_into().expect("1 buffer for type_ids");
693 (ScalarBuffer::new(buffer, offset, len), None)
694 }
695 UnionMode::Dense => {
696 let [type_ids_buffer, offsets_buffer]: [Buffer; 2] = buffers
697 .try_into()
698 .expect("2 buffers for type_ids and offsets");
699 (
700 ScalarBuffer::new(type_ids_buffer, offset, len),
701 Some(ScalarBuffer::new(offsets_buffer, offset, len)),
702 )
703 }
704 };
705
706 let max_id = fields.iter().map(|(i, _)| i).max().unwrap_or_default() as usize;
707 let mut boxed_fields = vec![None; max_id + 1];
708 for (cd, (field_id, _)) in child_data.into_iter().zip(fields.iter()) {
709 boxed_fields[field_id as usize] = Some(make_array(cd));
710 }
711 Self {
712 data_type,
713 type_ids,
714 offsets,
715 fields: boxed_fields,
716 }
717 }
718}
719
720impl From<UnionArray> for ArrayData {
721 fn from(array: UnionArray) -> Self {
722 let len = array.len();
723 let f = match &array.data_type {
724 DataType::Union(f, _) => f,
725 _ => unreachable!(),
726 };
727 let buffers = match array.offsets {
728 Some(o) => vec![array.type_ids.into_inner(), o.into_inner()],
729 None => vec![array.type_ids.into_inner()],
730 };
731
732 let child = f
733 .iter()
734 .map(|(i, _)| array.fields[i as usize].as_ref().unwrap().to_data())
735 .collect();
736
737 let builder = ArrayDataBuilder::new(array.data_type)
738 .len(len)
739 .buffers(buffers)
740 .child_data(child);
741 unsafe { builder.build_unchecked() }
742 }
743}
744
745unsafe impl Array for UnionArray {
747 fn as_any(&self) -> &dyn Any {
748 self
749 }
750
751 fn to_data(&self) -> ArrayData {
752 self.clone().into()
753 }
754
755 fn into_data(self) -> ArrayData {
756 self.into()
757 }
758
759 fn data_type(&self) -> &DataType {
760 &self.data_type
761 }
762
763 fn slice(&self, offset: usize, length: usize) -> ArrayRef {
764 Arc::new(self.slice(offset, length))
765 }
766
767 fn len(&self) -> usize {
768 self.type_ids.len()
769 }
770
771 fn is_empty(&self) -> bool {
772 self.type_ids.is_empty()
773 }
774
775 fn shrink_to_fit(&mut self) {
776 self.type_ids.shrink_to_fit();
777 if let Some(offsets) = &mut self.offsets {
778 offsets.shrink_to_fit();
779 }
780 for array in self.fields.iter_mut().flatten() {
781 array.shrink_to_fit();
782 }
783 self.fields.shrink_to_fit();
784 }
785
786 fn offset(&self) -> usize {
787 0
788 }
789
790 fn nulls(&self) -> Option<&NullBuffer> {
791 None
792 }
793
794 fn logical_nulls(&self) -> Option<NullBuffer> {
795 let fields = match self.data_type() {
796 DataType::Union(fields, _) => fields,
797 _ => unreachable!(),
798 };
799
800 if fields.len() <= 1 {
801 return self.fields.iter().find_map(|field_opt| {
802 field_opt
803 .as_ref()
804 .and_then(|field| field.logical_nulls())
805 .map(|logical_nulls| {
806 if self.is_dense() {
807 self.gather_nulls(vec![(0, logical_nulls)]).into()
808 } else {
809 logical_nulls
810 }
811 })
812 });
813 }
814
815 let logical_nulls = self.fields_logical_nulls();
816
817 if logical_nulls.is_empty() {
818 return None;
819 }
820
821 let fully_null_count = logical_nulls
822 .iter()
823 .filter(|(_, nulls)| nulls.null_count() == nulls.len())
824 .count();
825
826 if fully_null_count == fields.len() {
827 if let Some((_, exactly_sized)) = logical_nulls
828 .iter()
829 .find(|(_, nulls)| nulls.len() == self.len())
830 {
831 return Some(exactly_sized.clone());
832 }
833
834 if let Some((_, bigger)) = logical_nulls
835 .iter()
836 .find(|(_, nulls)| nulls.len() > self.len())
837 {
838 return Some(bigger.slice(0, self.len()));
839 }
840
841 return Some(NullBuffer::new_null(self.len()));
842 }
843
844 let boolean_buffer = match &self.offsets {
845 Some(_) => self.gather_nulls(logical_nulls),
846 None => {
847 let gather_relative_cost = if cfg!(target_feature = "avx2") {
855 10
856 } else if cfg!(target_feature = "sse4.1") {
857 3
858 } else if cfg!(target_arch = "x86") || cfg!(target_arch = "x86_64") {
859 2
861 } else {
862 0
866 };
867
868 let strategies = [
869 (SparseStrategy::Gather, gather_relative_cost, true),
870 (
871 SparseStrategy::MaskAllFieldsWithNullsSkipOne,
872 fields.len() - 1,
873 fields.len() == logical_nulls.len(),
874 ),
875 (
876 SparseStrategy::MaskSkipWithoutNulls,
877 logical_nulls.len(),
878 true,
879 ),
880 (
881 SparseStrategy::MaskSkipFullyNull,
882 fields.len() - fully_null_count,
883 true,
884 ),
885 ];
886
887 let (strategy, _, _) = strategies
888 .iter()
889 .filter(|(_, _, applicable)| *applicable)
890 .min_by_key(|(_, cost, _)| cost)
891 .unwrap();
892
893 match strategy {
894 SparseStrategy::Gather => self.gather_nulls(logical_nulls),
895 SparseStrategy::MaskAllFieldsWithNullsSkipOne => {
896 self.mask_sparse_all_with_nulls_skip_one(logical_nulls)
897 }
898 SparseStrategy::MaskSkipWithoutNulls => {
899 self.mask_sparse_skip_without_nulls(logical_nulls)
900 }
901 SparseStrategy::MaskSkipFullyNull => {
902 self.mask_sparse_skip_fully_null(logical_nulls)
903 }
904 }
905 }
906 };
907
908 let null_buffer = NullBuffer::from(boolean_buffer);
909
910 if null_buffer.null_count() > 0 {
911 Some(null_buffer)
912 } else {
913 None
914 }
915 }
916
917 fn is_nullable(&self) -> bool {
918 self.fields
919 .iter()
920 .flatten()
921 .any(|field| field.is_nullable())
922 }
923
924 fn get_buffer_memory_size(&self) -> usize {
925 let mut sum = self.type_ids.inner().capacity();
926 if let Some(o) = self.offsets.as_ref() {
927 sum += o.inner().capacity()
928 }
929 self.fields
930 .iter()
931 .flat_map(|x| x.as_ref().map(|x| x.get_buffer_memory_size()))
932 .sum::<usize>()
933 + sum
934 }
935
936 fn get_array_memory_size(&self) -> usize {
937 let mut sum = self.type_ids.inner().capacity();
938 if let Some(o) = self.offsets.as_ref() {
939 sum += o.inner().capacity()
940 }
941 std::mem::size_of::<Self>()
942 + self
943 .fields
944 .iter()
945 .flat_map(|x| x.as_ref().map(|x| x.get_array_memory_size()))
946 .sum::<usize>()
947 + sum
948 }
949}
950
951impl std::fmt::Debug for UnionArray {
952 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
953 let header = if self.is_dense() {
954 "UnionArray(Dense)\n["
955 } else {
956 "UnionArray(Sparse)\n["
957 };
958 writeln!(f, "{header}")?;
959
960 writeln!(f, "-- type id buffer:")?;
961 writeln!(f, "{:?}", self.type_ids)?;
962
963 if let Some(offsets) = &self.offsets {
964 writeln!(f, "-- offsets buffer:")?;
965 writeln!(f, "{offsets:?}")?;
966 }
967
968 let fields = match self.data_type() {
969 DataType::Union(fields, _) => fields,
970 _ => unreachable!(),
971 };
972
973 for (type_id, field) in fields.iter() {
974 let child = self.child(type_id);
975 writeln!(
976 f,
977 "-- child {}: \"{}\" ({:?})",
978 type_id,
979 field.name(),
980 field.data_type()
981 )?;
982 std::fmt::Debug::fmt(child, f)?;
983 writeln!(f)?;
984 }
985 writeln!(f, "]")
986 }
987}
988
989enum SparseStrategy {
994 Gather,
996 MaskAllFieldsWithNullsSkipOne,
998 MaskSkipWithoutNulls,
1000 MaskSkipFullyNull,
1002}
1003
1004#[derive(Copy, Clone)]
1005#[repr(usize)]
1006enum Mask {
1007 Zero = 0,
1008 #[allow(clippy::enum_clike_unportable_variant)]
1010 Max = usize::MAX,
1011}
1012
1013fn selection_mask(type_ids_chunk: &[i8], type_id: i8) -> u64 {
1014 type_ids_chunk
1015 .iter()
1016 .copied()
1017 .enumerate()
1018 .fold(0, |packed, (bit_idx, v)| {
1019 packed | (((v == type_id) as u64) << bit_idx)
1020 })
1021}
1022
1023fn without_nulls_selected(type_ids_chunk: &[i8], without_nulls_ids: &[i8]) -> u64 {
1025 without_nulls_ids
1026 .iter()
1027 .fold(0, |fully_valid_selected, field_type_id| {
1028 fully_valid_selected | selection_mask(type_ids_chunk, *field_type_id)
1029 })
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034 use super::*;
1035 use std::collections::HashSet;
1036
1037 use crate::array::Int8Type;
1038 use crate::builder::UnionBuilder;
1039 use crate::cast::AsArray;
1040 use crate::types::{Float32Type, Float64Type, Int32Type, Int64Type};
1041 use crate::{Float64Array, Int32Array, Int64Array, StringArray};
1042 use crate::{Int8Array, RecordBatch};
1043 use arrow_buffer::Buffer;
1044 use arrow_schema::{Field, Schema};
1045
1046 #[test]
1047 fn test_dense_i32() {
1048 let mut builder = UnionBuilder::new_dense();
1049 builder.append::<Int32Type>("a", 1).unwrap();
1050 builder.append::<Int32Type>("b", 2).unwrap();
1051 builder.append::<Int32Type>("c", 3).unwrap();
1052 builder.append::<Int32Type>("a", 4).unwrap();
1053 builder.append::<Int32Type>("c", 5).unwrap();
1054 builder.append::<Int32Type>("a", 6).unwrap();
1055 builder.append::<Int32Type>("b", 7).unwrap();
1056 let union = builder.build().unwrap();
1057
1058 let expected_type_ids = vec![0_i8, 1, 2, 0, 2, 0, 1];
1059 let expected_offsets = vec![0_i32, 0, 0, 1, 1, 2, 1];
1060 let expected_array_values = [1_i32, 2, 3, 4, 5, 6, 7];
1061
1062 assert_eq!(*union.type_ids(), expected_type_ids);
1064 for (i, id) in expected_type_ids.iter().enumerate() {
1065 assert_eq!(id, &union.type_id(i));
1066 }
1067
1068 assert_eq!(*union.offsets().unwrap(), expected_offsets);
1070 for (i, id) in expected_offsets.iter().enumerate() {
1071 assert_eq!(union.value_offset(i), *id as usize);
1072 }
1073
1074 assert_eq!(
1076 *union.child(0).as_primitive::<Int32Type>().values(),
1077 [1_i32, 4, 6]
1078 );
1079 assert_eq!(
1080 *union.child(1).as_primitive::<Int32Type>().values(),
1081 [2_i32, 7]
1082 );
1083 assert_eq!(
1084 *union.child(2).as_primitive::<Int32Type>().values(),
1085 [3_i32, 5]
1086 );
1087
1088 assert_eq!(expected_array_values.len(), union.len());
1089 for (i, expected_value) in expected_array_values.iter().enumerate() {
1090 assert!(!union.is_null(i));
1091 let slot = union.value(i);
1092 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1093 assert_eq!(slot.len(), 1);
1094 let value = slot.value(0);
1095 assert_eq!(expected_value, &value);
1096 }
1097 }
1098
1099 #[test]
1100 fn slice_union_array_single_field() {
1101 let union_array = {
1104 let mut builder = UnionBuilder::new_dense();
1105 builder.append::<Int32Type>("a", 1).unwrap();
1106 builder.append_null::<Int32Type>("a").unwrap();
1107 builder.append::<Int32Type>("a", 3).unwrap();
1108 builder.append_null::<Int32Type>("a").unwrap();
1109 builder.append::<Int32Type>("a", 4).unwrap();
1110 builder.build().unwrap()
1111 };
1112
1113 let union_slice = union_array.slice(1, 3);
1115 let logical_nulls = union_slice.logical_nulls().unwrap();
1116
1117 assert_eq!(logical_nulls.len(), 3);
1118 assert!(logical_nulls.is_null(0));
1119 assert!(logical_nulls.is_valid(1));
1120 assert!(logical_nulls.is_null(2));
1121 }
1122
1123 #[test]
1124 #[cfg_attr(miri, ignore)]
1125 fn test_dense_i32_large() {
1126 let mut builder = UnionBuilder::new_dense();
1127
1128 let expected_type_ids = vec![0_i8; 1024];
1129 let expected_offsets: Vec<_> = (0..1024).collect();
1130 let expected_array_values: Vec<_> = (1..=1024).collect();
1131
1132 expected_array_values
1133 .iter()
1134 .for_each(|v| builder.append::<Int32Type>("a", *v).unwrap());
1135
1136 let union = builder.build().unwrap();
1137
1138 assert_eq!(*union.type_ids(), expected_type_ids);
1140 for (i, id) in expected_type_ids.iter().enumerate() {
1141 assert_eq!(id, &union.type_id(i));
1142 }
1143
1144 assert_eq!(*union.offsets().unwrap(), expected_offsets);
1146 for (i, id) in expected_offsets.iter().enumerate() {
1147 assert_eq!(union.value_offset(i), *id as usize);
1148 }
1149
1150 for (i, expected_value) in expected_array_values.iter().enumerate() {
1151 assert!(!union.is_null(i));
1152 let slot = union.value(i);
1153 let slot = slot.as_primitive::<Int32Type>();
1154 assert_eq!(slot.len(), 1);
1155 let value = slot.value(0);
1156 assert_eq!(expected_value, &value);
1157 }
1158 }
1159
1160 #[test]
1161 fn test_dense_mixed() {
1162 let mut builder = UnionBuilder::new_dense();
1163 builder.append::<Int32Type>("a", 1).unwrap();
1164 builder.append::<Int64Type>("c", 3).unwrap();
1165 builder.append::<Int32Type>("a", 4).unwrap();
1166 builder.append::<Int64Type>("c", 5).unwrap();
1167 builder.append::<Int32Type>("a", 6).unwrap();
1168 let union = builder.build().unwrap();
1169
1170 assert_eq!(5, union.len());
1171 for i in 0..union.len() {
1172 let slot = union.value(i);
1173 assert!(!union.is_null(i));
1174 match i {
1175 0 => {
1176 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1177 assert_eq!(slot.len(), 1);
1178 let value = slot.value(0);
1179 assert_eq!(1_i32, value);
1180 }
1181 1 => {
1182 let slot = slot.as_any().downcast_ref::<Int64Array>().unwrap();
1183 assert_eq!(slot.len(), 1);
1184 let value = slot.value(0);
1185 assert_eq!(3_i64, value);
1186 }
1187 2 => {
1188 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1189 assert_eq!(slot.len(), 1);
1190 let value = slot.value(0);
1191 assert_eq!(4_i32, value);
1192 }
1193 3 => {
1194 let slot = slot.as_any().downcast_ref::<Int64Array>().unwrap();
1195 assert_eq!(slot.len(), 1);
1196 let value = slot.value(0);
1197 assert_eq!(5_i64, value);
1198 }
1199 4 => {
1200 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1201 assert_eq!(slot.len(), 1);
1202 let value = slot.value(0);
1203 assert_eq!(6_i32, value);
1204 }
1205 _ => unreachable!(),
1206 }
1207 }
1208 }
1209
1210 #[test]
1211 fn test_dense_mixed_with_nulls() {
1212 let mut builder = UnionBuilder::new_dense();
1213 builder.append::<Int32Type>("a", 1).unwrap();
1214 builder.append::<Int64Type>("c", 3).unwrap();
1215 builder.append::<Int32Type>("a", 10).unwrap();
1216 builder.append_null::<Int32Type>("a").unwrap();
1217 builder.append::<Int32Type>("a", 6).unwrap();
1218 let union = builder.build().unwrap();
1219
1220 assert_eq!(5, union.len());
1221 for i in 0..union.len() {
1222 let slot = union.value(i);
1223 match i {
1224 0 => {
1225 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1226 assert!(!slot.is_null(0));
1227 assert_eq!(slot.len(), 1);
1228 let value = slot.value(0);
1229 assert_eq!(1_i32, value);
1230 }
1231 1 => {
1232 let slot = slot.as_any().downcast_ref::<Int64Array>().unwrap();
1233 assert!(!slot.is_null(0));
1234 assert_eq!(slot.len(), 1);
1235 let value = slot.value(0);
1236 assert_eq!(3_i64, value);
1237 }
1238 2 => {
1239 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1240 assert!(!slot.is_null(0));
1241 assert_eq!(slot.len(), 1);
1242 let value = slot.value(0);
1243 assert_eq!(10_i32, value);
1244 }
1245 3 => assert!(slot.is_null(0)),
1246 4 => {
1247 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1248 assert!(!slot.is_null(0));
1249 assert_eq!(slot.len(), 1);
1250 let value = slot.value(0);
1251 assert_eq!(6_i32, value);
1252 }
1253 _ => unreachable!(),
1254 }
1255 }
1256 }
1257
1258 #[test]
1259 fn test_dense_mixed_with_nulls_and_offset() {
1260 let mut builder = UnionBuilder::new_dense();
1261 builder.append::<Int32Type>("a", 1).unwrap();
1262 builder.append::<Int64Type>("c", 3).unwrap();
1263 builder.append::<Int32Type>("a", 10).unwrap();
1264 builder.append_null::<Int32Type>("a").unwrap();
1265 builder.append::<Int32Type>("a", 6).unwrap();
1266 let union = builder.build().unwrap();
1267
1268 let slice = union.slice(2, 3);
1269 let new_union = slice.as_any().downcast_ref::<UnionArray>().unwrap();
1270
1271 assert_eq!(3, new_union.len());
1272 for i in 0..new_union.len() {
1273 let slot = new_union.value(i);
1274 match i {
1275 0 => {
1276 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1277 assert!(!slot.is_null(0));
1278 assert_eq!(slot.len(), 1);
1279 let value = slot.value(0);
1280 assert_eq!(10_i32, value);
1281 }
1282 1 => assert!(slot.is_null(0)),
1283 2 => {
1284 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1285 assert!(!slot.is_null(0));
1286 assert_eq!(slot.len(), 1);
1287 let value = slot.value(0);
1288 assert_eq!(6_i32, value);
1289 }
1290 _ => unreachable!(),
1291 }
1292 }
1293 }
1294
1295 #[test]
1296 fn test_dense_mixed_with_str() {
1297 let string_array = StringArray::from(vec!["foo", "bar", "baz"]);
1298 let int_array = Int32Array::from(vec![5, 6]);
1299 let float_array = Float64Array::from(vec![10.0]);
1300
1301 let type_ids = [1, 0, 0, 2, 0, 1].into_iter().collect::<ScalarBuffer<i8>>();
1302 let offsets = [0, 0, 1, 0, 2, 1]
1303 .into_iter()
1304 .collect::<ScalarBuffer<i32>>();
1305
1306 let fields = [
1307 (0, Arc::new(Field::new("A", DataType::Utf8, false))),
1308 (1, Arc::new(Field::new("B", DataType::Int32, false))),
1309 (2, Arc::new(Field::new("C", DataType::Float64, false))),
1310 ]
1311 .into_iter()
1312 .collect::<UnionFields>();
1313 let children = [
1314 Arc::new(string_array) as Arc<dyn Array>,
1315 Arc::new(int_array),
1316 Arc::new(float_array),
1317 ]
1318 .into_iter()
1319 .collect();
1320 let array =
1321 UnionArray::try_new(fields, type_ids.clone(), Some(offsets.clone()), children).unwrap();
1322
1323 assert_eq!(*array.type_ids(), type_ids);
1325 for (i, id) in type_ids.iter().enumerate() {
1326 assert_eq!(id, &array.type_id(i));
1327 }
1328
1329 assert_eq!(*array.offsets().unwrap(), offsets);
1331 for (i, id) in offsets.iter().enumerate() {
1332 assert_eq!(*id as usize, array.value_offset(i));
1333 }
1334
1335 assert_eq!(6, array.len());
1337
1338 let slot = array.value(0);
1339 let value = slot.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1340 assert_eq!(5, value);
1341
1342 let slot = array.value(1);
1343 let value = slot
1344 .as_any()
1345 .downcast_ref::<StringArray>()
1346 .unwrap()
1347 .value(0);
1348 assert_eq!("foo", value);
1349
1350 let slot = array.value(2);
1351 let value = slot
1352 .as_any()
1353 .downcast_ref::<StringArray>()
1354 .unwrap()
1355 .value(0);
1356 assert_eq!("bar", value);
1357
1358 let slot = array.value(3);
1359 let value = slot
1360 .as_any()
1361 .downcast_ref::<Float64Array>()
1362 .unwrap()
1363 .value(0);
1364 assert_eq!(10.0, value);
1365
1366 let slot = array.value(4);
1367 let value = slot
1368 .as_any()
1369 .downcast_ref::<StringArray>()
1370 .unwrap()
1371 .value(0);
1372 assert_eq!("baz", value);
1373
1374 let slot = array.value(5);
1375 let value = slot.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
1376 assert_eq!(6, value);
1377 }
1378
1379 #[test]
1380 fn test_sparse_i32() {
1381 let mut builder = UnionBuilder::new_sparse();
1382 builder.append::<Int32Type>("a", 1).unwrap();
1383 builder.append::<Int32Type>("b", 2).unwrap();
1384 builder.append::<Int32Type>("c", 3).unwrap();
1385 builder.append::<Int32Type>("a", 4).unwrap();
1386 builder.append::<Int32Type>("c", 5).unwrap();
1387 builder.append::<Int32Type>("a", 6).unwrap();
1388 builder.append::<Int32Type>("b", 7).unwrap();
1389 let union = builder.build().unwrap();
1390
1391 let expected_type_ids = vec![0_i8, 1, 2, 0, 2, 0, 1];
1392 let expected_array_values = [1_i32, 2, 3, 4, 5, 6, 7];
1393
1394 assert_eq!(*union.type_ids(), expected_type_ids);
1396 for (i, id) in expected_type_ids.iter().enumerate() {
1397 assert_eq!(id, &union.type_id(i));
1398 }
1399
1400 assert!(union.offsets().is_none());
1402
1403 assert_eq!(
1405 *union.child(0).as_primitive::<Int32Type>().values(),
1406 [1_i32, 0, 0, 4, 0, 6, 0],
1407 );
1408 assert_eq!(
1409 *union.child(1).as_primitive::<Int32Type>().values(),
1410 [0_i32, 2_i32, 0, 0, 0, 0, 7]
1411 );
1412 assert_eq!(
1413 *union.child(2).as_primitive::<Int32Type>().values(),
1414 [0_i32, 0, 3_i32, 0, 5, 0, 0]
1415 );
1416
1417 assert_eq!(expected_array_values.len(), union.len());
1418 for (i, expected_value) in expected_array_values.iter().enumerate() {
1419 assert!(!union.is_null(i));
1420 let slot = union.value(i);
1421 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1422 assert_eq!(slot.len(), 1);
1423 let value = slot.value(0);
1424 assert_eq!(expected_value, &value);
1425 }
1426 }
1427
1428 #[test]
1429 fn test_sparse_mixed() {
1430 let mut builder = UnionBuilder::new_sparse();
1431 builder.append::<Int32Type>("a", 1).unwrap();
1432 builder.append::<Float64Type>("c", 3.0).unwrap();
1433 builder.append::<Int32Type>("a", 4).unwrap();
1434 builder.append::<Float64Type>("c", 5.0).unwrap();
1435 builder.append::<Int32Type>("a", 6).unwrap();
1436 let union = builder.build().unwrap();
1437
1438 let expected_type_ids = vec![0_i8, 1, 0, 1, 0];
1439
1440 assert_eq!(*union.type_ids(), expected_type_ids);
1442 for (i, id) in expected_type_ids.iter().enumerate() {
1443 assert_eq!(id, &union.type_id(i));
1444 }
1445
1446 assert!(union.offsets().is_none());
1448
1449 for i in 0..union.len() {
1450 let slot = union.value(i);
1451 assert!(!union.is_null(i));
1452 match i {
1453 0 => {
1454 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1455 assert_eq!(slot.len(), 1);
1456 let value = slot.value(0);
1457 assert_eq!(1_i32, value);
1458 }
1459 1 => {
1460 let slot = slot.as_any().downcast_ref::<Float64Array>().unwrap();
1461 assert_eq!(slot.len(), 1);
1462 let value = slot.value(0);
1463 assert_eq!(value, 3_f64);
1464 }
1465 2 => {
1466 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1467 assert_eq!(slot.len(), 1);
1468 let value = slot.value(0);
1469 assert_eq!(4_i32, value);
1470 }
1471 3 => {
1472 let slot = slot.as_any().downcast_ref::<Float64Array>().unwrap();
1473 assert_eq!(slot.len(), 1);
1474 let value = slot.value(0);
1475 assert_eq!(5_f64, value);
1476 }
1477 4 => {
1478 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1479 assert_eq!(slot.len(), 1);
1480 let value = slot.value(0);
1481 assert_eq!(6_i32, value);
1482 }
1483 _ => unreachable!(),
1484 }
1485 }
1486 }
1487
1488 #[test]
1489 fn test_sparse_mixed_with_nulls() {
1490 let mut builder = UnionBuilder::new_sparse();
1491 builder.append::<Int32Type>("a", 1).unwrap();
1492 builder.append_null::<Int32Type>("a").unwrap();
1493 builder.append::<Float64Type>("c", 3.0).unwrap();
1494 builder.append::<Int32Type>("a", 4).unwrap();
1495 let union = builder.build().unwrap();
1496
1497 let expected_type_ids = vec![0_i8, 0, 1, 0];
1498
1499 assert_eq!(*union.type_ids(), expected_type_ids);
1501 for (i, id) in expected_type_ids.iter().enumerate() {
1502 assert_eq!(id, &union.type_id(i));
1503 }
1504
1505 assert!(union.offsets().is_none());
1507
1508 for i in 0..union.len() {
1509 let slot = union.value(i);
1510 match i {
1511 0 => {
1512 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1513 assert!(!slot.is_null(0));
1514 assert_eq!(slot.len(), 1);
1515 let value = slot.value(0);
1516 assert_eq!(1_i32, value);
1517 }
1518 1 => assert!(slot.is_null(0)),
1519 2 => {
1520 let slot = slot.as_any().downcast_ref::<Float64Array>().unwrap();
1521 assert!(!slot.is_null(0));
1522 assert_eq!(slot.len(), 1);
1523 let value = slot.value(0);
1524 assert_eq!(value, 3_f64);
1525 }
1526 3 => {
1527 let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
1528 assert!(!slot.is_null(0));
1529 assert_eq!(slot.len(), 1);
1530 let value = slot.value(0);
1531 assert_eq!(4_i32, value);
1532 }
1533 _ => unreachable!(),
1534 }
1535 }
1536 }
1537
1538 #[test]
1539 fn test_sparse_mixed_with_nulls_and_offset() {
1540 let mut builder = UnionBuilder::new_sparse();
1541 builder.append::<Int32Type>("a", 1).unwrap();
1542 builder.append_null::<Int32Type>("a").unwrap();
1543 builder.append::<Float64Type>("c", 3.0).unwrap();
1544 builder.append_null::<Float64Type>("c").unwrap();
1545 builder.append::<Int32Type>("a", 4).unwrap();
1546 let union = builder.build().unwrap();
1547
1548 let slice = union.slice(1, 4);
1549 let new_union = slice.as_any().downcast_ref::<UnionArray>().unwrap();
1550
1551 assert_eq!(4, new_union.len());
1552 for i in 0..new_union.len() {
1553 let slot = new_union.value(i);
1554 match i {
1555 0 => assert!(slot.is_null(0)),
1556 1 => {
1557 let slot = slot.as_primitive::<Float64Type>();
1558 assert!(!slot.is_null(0));
1559 assert_eq!(slot.len(), 1);
1560 let value = slot.value(0);
1561 assert_eq!(value, 3_f64);
1562 }
1563 2 => assert!(slot.is_null(0)),
1564 3 => {
1565 let slot = slot.as_primitive::<Int32Type>();
1566 assert!(!slot.is_null(0));
1567 assert_eq!(slot.len(), 1);
1568 let value = slot.value(0);
1569 assert_eq!(4_i32, value);
1570 }
1571 _ => unreachable!(),
1572 }
1573 }
1574 }
1575
1576 fn test_union_validity(union_array: &UnionArray) {
1577 assert_eq!(union_array.null_count(), 0);
1578
1579 for i in 0..union_array.len() {
1580 assert!(!union_array.is_null(i));
1581 assert!(union_array.is_valid(i));
1582 }
1583 }
1584
1585 #[test]
1586 fn test_union_array_validity() {
1587 let mut builder = UnionBuilder::new_sparse();
1588 builder.append::<Int32Type>("a", 1).unwrap();
1589 builder.append_null::<Int32Type>("a").unwrap();
1590 builder.append::<Float64Type>("c", 3.0).unwrap();
1591 builder.append_null::<Float64Type>("c").unwrap();
1592 builder.append::<Int32Type>("a", 4).unwrap();
1593 let union = builder.build().unwrap();
1594
1595 test_union_validity(&union);
1596
1597 let mut builder = UnionBuilder::new_dense();
1598 builder.append::<Int32Type>("a", 1).unwrap();
1599 builder.append_null::<Int32Type>("a").unwrap();
1600 builder.append::<Float64Type>("c", 3.0).unwrap();
1601 builder.append_null::<Float64Type>("c").unwrap();
1602 builder.append::<Int32Type>("a", 4).unwrap();
1603 let union = builder.build().unwrap();
1604
1605 test_union_validity(&union);
1606 }
1607
1608 #[test]
1609 fn test_type_check() {
1610 let mut builder = UnionBuilder::new_sparse();
1611 builder.append::<Float32Type>("a", 1.0).unwrap();
1612 let err = builder.append::<Int32Type>("a", 1).unwrap_err().to_string();
1613 assert!(
1614 err.contains(
1615 "Attempt to write col \"a\" with type Int32 doesn't match existing type Float32"
1616 ),
1617 "{}",
1618 err
1619 );
1620 }
1621
1622 #[test]
1623 fn slice_union_array() {
1624 fn create_union(mut builder: UnionBuilder) -> UnionArray {
1626 builder.append::<Int32Type>("a", 1).unwrap();
1627 builder.append_null::<Int32Type>("a").unwrap();
1628 builder.append::<Float64Type>("c", 3.0).unwrap();
1629 builder.append_null::<Float64Type>("c").unwrap();
1630 builder.append::<Int32Type>("a", 4).unwrap();
1631 builder.build().unwrap()
1632 }
1633
1634 fn create_batch(union: UnionArray) -> RecordBatch {
1635 let schema = Schema::new(vec![Field::new(
1636 "struct_array",
1637 union.data_type().clone(),
1638 true,
1639 )]);
1640
1641 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(union)]).unwrap()
1642 }
1643
1644 fn test_slice_union(record_batch_slice: RecordBatch) {
1645 let union_slice = record_batch_slice
1646 .column(0)
1647 .as_any()
1648 .downcast_ref::<UnionArray>()
1649 .unwrap();
1650
1651 assert_eq!(union_slice.type_id(0), 0);
1652 assert_eq!(union_slice.type_id(1), 1);
1653 assert_eq!(union_slice.type_id(2), 1);
1654
1655 let slot = union_slice.value(0);
1656 let array = slot.as_primitive::<Int32Type>();
1657 assert_eq!(array.len(), 1);
1658 assert!(array.is_null(0));
1659
1660 let slot = union_slice.value(1);
1661 let array = slot.as_primitive::<Float64Type>();
1662 assert_eq!(array.len(), 1);
1663 assert!(array.is_valid(0));
1664 assert_eq!(array.value(0), 3.0);
1665
1666 let slot = union_slice.value(2);
1667 let array = slot.as_primitive::<Float64Type>();
1668 assert_eq!(array.len(), 1);
1669 assert!(array.is_null(0));
1670 }
1671
1672 let builder = UnionBuilder::new_sparse();
1674 let record_batch = create_batch(create_union(builder));
1675 let record_batch_slice = record_batch.slice(1, 3);
1677 test_slice_union(record_batch_slice);
1678
1679 let builder = UnionBuilder::new_dense();
1681 let record_batch = create_batch(create_union(builder));
1682 let record_batch_slice = record_batch.slice(1, 3);
1684 test_slice_union(record_batch_slice);
1685 }
1686
1687 #[test]
1688 fn test_custom_type_ids() {
1689 let data_type = DataType::Union(
1690 UnionFields::try_new(
1691 vec![8, 4, 9],
1692 vec![
1693 Field::new("strings", DataType::Utf8, false),
1694 Field::new("integers", DataType::Int32, false),
1695 Field::new("floats", DataType::Float64, false),
1696 ],
1697 )
1698 .unwrap(),
1699 UnionMode::Dense,
1700 );
1701
1702 let string_array = StringArray::from(vec!["foo", "bar", "baz"]);
1703 let int_array = Int32Array::from(vec![5, 6, 4]);
1704 let float_array = Float64Array::from(vec![10.0]);
1705
1706 let type_ids = Buffer::from_vec(vec![4_i8, 8, 4, 8, 9, 4, 8]);
1707 let value_offsets = Buffer::from_vec(vec![0_i32, 0, 1, 1, 0, 2, 2]);
1708
1709 let data = ArrayData::builder(data_type)
1710 .len(7)
1711 .buffers(vec![type_ids, value_offsets])
1712 .child_data(vec![
1713 string_array.into_data(),
1714 int_array.into_data(),
1715 float_array.into_data(),
1716 ])
1717 .build()
1718 .unwrap();
1719
1720 let array = UnionArray::from(data);
1721
1722 let v = array.value(0);
1723 assert_eq!(v.data_type(), &DataType::Int32);
1724 assert_eq!(v.len(), 1);
1725 assert_eq!(v.as_primitive::<Int32Type>().value(0), 5);
1726
1727 let v = array.value(1);
1728 assert_eq!(v.data_type(), &DataType::Utf8);
1729 assert_eq!(v.len(), 1);
1730 assert_eq!(v.as_string::<i32>().value(0), "foo");
1731
1732 let v = array.value(2);
1733 assert_eq!(v.data_type(), &DataType::Int32);
1734 assert_eq!(v.len(), 1);
1735 assert_eq!(v.as_primitive::<Int32Type>().value(0), 6);
1736
1737 let v = array.value(3);
1738 assert_eq!(v.data_type(), &DataType::Utf8);
1739 assert_eq!(v.len(), 1);
1740 assert_eq!(v.as_string::<i32>().value(0), "bar");
1741
1742 let v = array.value(4);
1743 assert_eq!(v.data_type(), &DataType::Float64);
1744 assert_eq!(v.len(), 1);
1745 assert_eq!(v.as_primitive::<Float64Type>().value(0), 10.0);
1746
1747 let v = array.value(5);
1748 assert_eq!(v.data_type(), &DataType::Int32);
1749 assert_eq!(v.len(), 1);
1750 assert_eq!(v.as_primitive::<Int32Type>().value(0), 4);
1751
1752 let v = array.value(6);
1753 assert_eq!(v.data_type(), &DataType::Utf8);
1754 assert_eq!(v.len(), 1);
1755 assert_eq!(v.as_string::<i32>().value(0), "baz");
1756 }
1757
1758 #[test]
1759 fn into_parts() {
1760 let mut builder = UnionBuilder::new_dense();
1761 builder.append::<Int32Type>("a", 1).unwrap();
1762 builder.append::<Int8Type>("b", 2).unwrap();
1763 builder.append::<Int32Type>("a", 3).unwrap();
1764 let dense_union = builder.build().unwrap();
1765
1766 let field = [
1767 &Arc::new(Field::new("a", DataType::Int32, false)),
1768 &Arc::new(Field::new("b", DataType::Int8, false)),
1769 ];
1770 let (union_fields, type_ids, offsets, children) = dense_union.into_parts();
1771 assert_eq!(
1772 union_fields
1773 .iter()
1774 .map(|(_, field)| field)
1775 .collect::<Vec<_>>(),
1776 field
1777 );
1778 assert_eq!(type_ids, [0, 1, 0]);
1779 assert!(offsets.is_some());
1780 assert_eq!(offsets.as_ref().unwrap(), &[0, 0, 1]);
1781
1782 let result = UnionArray::try_new(union_fields, type_ids, offsets, children);
1783 assert!(result.is_ok());
1784 assert_eq!(result.unwrap().len(), 3);
1785
1786 let mut builder = UnionBuilder::new_sparse();
1787 builder.append::<Int32Type>("a", 1).unwrap();
1788 builder.append::<Int8Type>("b", 2).unwrap();
1789 builder.append::<Int32Type>("a", 3).unwrap();
1790 let sparse_union = builder.build().unwrap();
1791
1792 let (union_fields, type_ids, offsets, children) = sparse_union.into_parts();
1793 assert_eq!(type_ids, [0, 1, 0]);
1794 assert!(offsets.is_none());
1795
1796 let result = UnionArray::try_new(union_fields, type_ids, offsets, children);
1797 assert!(result.is_ok());
1798 assert_eq!(result.unwrap().len(), 3);
1799 }
1800
1801 #[test]
1802 fn into_parts_custom_type_ids() {
1803 let set_field_type_ids: [i8; 3] = [8, 4, 9];
1804 let data_type = DataType::Union(
1805 UnionFields::try_new(
1806 set_field_type_ids,
1807 [
1808 Field::new("strings", DataType::Utf8, false),
1809 Field::new("integers", DataType::Int32, false),
1810 Field::new("floats", DataType::Float64, false),
1811 ],
1812 )
1813 .unwrap(),
1814 UnionMode::Dense,
1815 );
1816 let string_array = StringArray::from(vec!["foo", "bar", "baz"]);
1817 let int_array = Int32Array::from(vec![5, 6, 4]);
1818 let float_array = Float64Array::from(vec![10.0]);
1819 let type_ids = Buffer::from_vec(vec![4_i8, 8, 4, 8, 9, 4, 8]);
1820 let value_offsets = Buffer::from_vec(vec![0_i32, 0, 1, 1, 0, 2, 2]);
1821 let data = ArrayData::builder(data_type)
1822 .len(7)
1823 .buffers(vec![type_ids, value_offsets])
1824 .child_data(vec![
1825 string_array.into_data(),
1826 int_array.into_data(),
1827 float_array.into_data(),
1828 ])
1829 .build()
1830 .unwrap();
1831 let array = UnionArray::from(data);
1832
1833 let (union_fields, type_ids, offsets, children) = array.into_parts();
1834 assert_eq!(
1835 type_ids.iter().collect::<HashSet<_>>(),
1836 set_field_type_ids.iter().collect::<HashSet<_>>()
1837 );
1838 let result = UnionArray::try_new(union_fields, type_ids, offsets, children);
1839 assert!(result.is_ok());
1840 let array = result.unwrap();
1841 assert_eq!(array.len(), 7);
1842 }
1843
1844 #[test]
1845 fn test_invalid() {
1846 let fields = UnionFields::try_new(
1847 [3, 2],
1848 [
1849 Field::new("a", DataType::Utf8, false),
1850 Field::new("b", DataType::Utf8, false),
1851 ],
1852 )
1853 .unwrap();
1854 let children = vec![
1855 Arc::new(StringArray::from_iter_values(["a", "b"])) as _,
1856 Arc::new(StringArray::from_iter_values(["c", "d"])) as _,
1857 ];
1858
1859 let type_ids = vec![3, 3, 2].into();
1860 let err =
1861 UnionArray::try_new(fields.clone(), type_ids, None, children.clone()).unwrap_err();
1862 assert_eq!(
1863 err.to_string(),
1864 "Invalid argument error: Sparse union child arrays must be equal in length to the length of the union"
1865 );
1866
1867 let type_ids = vec![1, 2].into();
1868 let err =
1869 UnionArray::try_new(fields.clone(), type_ids, None, children.clone()).unwrap_err();
1870 assert_eq!(
1871 err.to_string(),
1872 "Invalid argument error: Type Ids values must match one of the field type ids"
1873 );
1874
1875 let type_ids = vec![7, 2].into();
1876 let err = UnionArray::try_new(fields.clone(), type_ids, None, children).unwrap_err();
1877 assert_eq!(
1878 err.to_string(),
1879 "Invalid argument error: Type Ids values must match one of the field type ids"
1880 );
1881
1882 let children = vec![
1883 Arc::new(StringArray::from_iter_values(["a", "b"])) as _,
1884 Arc::new(StringArray::from_iter_values(["c"])) as _,
1885 ];
1886 let type_ids = ScalarBuffer::from(vec![3_i8, 3, 2]);
1887 let offsets = Some(vec![0, 1, 0].into());
1888 UnionArray::try_new(fields.clone(), type_ids.clone(), offsets, children.clone()).unwrap();
1889
1890 let offsets = Some(vec![0, 1, 1].into());
1891 let err = UnionArray::try_new(fields.clone(), type_ids.clone(), offsets, children.clone())
1892 .unwrap_err();
1893
1894 assert_eq!(
1895 err.to_string(),
1896 "Invalid argument error: Offsets must be non-negative and within the length of the Array"
1897 );
1898
1899 let offsets = Some(vec![0, 1].into());
1900 let err =
1901 UnionArray::try_new(fields.clone(), type_ids.clone(), offsets, children).unwrap_err();
1902
1903 assert_eq!(
1904 err.to_string(),
1905 "Invalid argument error: Type Ids and Offsets lengths must match"
1906 );
1907
1908 let err = UnionArray::try_new(fields.clone(), type_ids, None, vec![]).unwrap_err();
1909
1910 assert_eq!(
1911 err.to_string(),
1912 "Invalid argument error: Union fields length must match child arrays length"
1913 );
1914 }
1915
1916 #[test]
1917 fn test_logical_nulls_fast_paths() {
1918 let array = UnionArray::try_new(UnionFields::empty(), vec![].into(), None, vec![]).unwrap();
1920
1921 assert_eq!(array.logical_nulls(), None);
1922
1923 let fields = UnionFields::try_new(
1924 [1, 3],
1925 [
1926 Field::new("a", DataType::Int8, false), Field::new("b", DataType::Int8, false), ],
1929 )
1930 .unwrap();
1931 let array = UnionArray::try_new(
1932 fields,
1933 vec![1].into(),
1934 None,
1935 vec![
1936 Arc::new(Int8Array::from_value(5, 1)),
1937 Arc::new(Int8Array::from_value(5, 1)),
1938 ],
1939 )
1940 .unwrap();
1941
1942 assert_eq!(array.logical_nulls(), None);
1943
1944 let nullable_fields = UnionFields::try_new(
1945 [1, 3],
1946 [
1947 Field::new("a", DataType::Int8, true), Field::new("b", DataType::Int8, true), ],
1950 )
1951 .unwrap();
1952 let array = UnionArray::try_new(
1953 nullable_fields.clone(),
1954 vec![1, 1].into(),
1955 None,
1956 vec![
1957 Arc::new(Int8Array::from_value(-5, 2)), Arc::new(Int8Array::from_value(-5, 2)), ],
1960 )
1961 .unwrap();
1962
1963 assert_eq!(array.logical_nulls(), None);
1964
1965 let array = UnionArray::try_new(
1966 nullable_fields.clone(),
1967 vec![1, 1].into(),
1968 None,
1969 vec![
1970 Arc::new(Int8Array::new_null(2)), Arc::new(Int8Array::new_null(2)), ],
1974 )
1975 .unwrap();
1976
1977 assert_eq!(array.logical_nulls(), Some(NullBuffer::new_null(2)));
1978
1979 let array = UnionArray::try_new(
1980 nullable_fields.clone(),
1981 vec![1, 1].into(),
1982 Some(vec![0, 1].into()),
1983 vec![
1984 Arc::new(Int8Array::new_null(3)), Arc::new(Int8Array::new_null(3)), ],
1988 )
1989 .unwrap();
1990
1991 assert_eq!(array.logical_nulls(), Some(NullBuffer::new_null(2)));
1992 }
1993
1994 #[test]
1995 fn test_dense_union_logical_nulls_gather() {
1996 let int_array = Int32Array::from(vec![1, 2]);
1998 let float_array = Float64Array::from(vec![Some(3.2), None]);
1999 let str_array = StringArray::new_null(1);
2000 let type_ids = [1, 1, 3, 3, 4, 4].into_iter().collect::<ScalarBuffer<i8>>();
2001 let offsets = [0, 1, 0, 1, 0, 0]
2002 .into_iter()
2003 .collect::<ScalarBuffer<i32>>();
2004
2005 let children = vec![
2006 Arc::new(int_array) as Arc<dyn Array>,
2007 Arc::new(float_array),
2008 Arc::new(str_array),
2009 ];
2010
2011 let array = UnionArray::try_new(union_fields(), type_ids, Some(offsets), children).unwrap();
2012
2013 let expected = BooleanBuffer::from(vec![true, true, true, false, false, false]);
2014
2015 assert_eq!(expected, array.logical_nulls().unwrap().into_inner());
2016 assert_eq!(expected, array.gather_nulls(array.fields_logical_nulls()));
2017 }
2018
2019 #[test]
2020 fn test_sparse_union_logical_nulls_mask_all_nulls_skip_one() {
2021 let fields: UnionFields = [
2022 (1, Arc::new(Field::new("A", DataType::Int32, true))),
2023 (3, Arc::new(Field::new("B", DataType::Float64, true))),
2024 ]
2025 .into_iter()
2026 .collect();
2027
2028 let int_array = Int32Array::new_null(4);
2030 let float_array = Float64Array::from(vec![None, None, Some(3.2), None]);
2031 let type_ids = [1, 1, 3, 3].into_iter().collect::<ScalarBuffer<i8>>();
2032
2033 let children = vec![Arc::new(int_array) as Arc<dyn Array>, Arc::new(float_array)];
2034
2035 let array = UnionArray::try_new(fields.clone(), type_ids, None, children).unwrap();
2036
2037 let expected = BooleanBuffer::from(vec![false, false, true, false]);
2038
2039 assert_eq!(expected, array.logical_nulls().unwrap().into_inner());
2040 assert_eq!(
2041 expected,
2042 array.mask_sparse_all_with_nulls_skip_one(array.fields_logical_nulls())
2043 );
2044
2045 let len = 2 * 64 + 32;
2047
2048 let int_array = Int32Array::new_null(len);
2049 let float_array = Float64Array::from_iter([Some(3.2), None].into_iter().cycle().take(len));
2050 let type_ids = ScalarBuffer::from_iter([1, 1, 3, 3].into_iter().cycle().take(len));
2051
2052 let array = UnionArray::try_new(
2053 fields,
2054 type_ids,
2055 None,
2056 vec![Arc::new(int_array), Arc::new(float_array)],
2057 )
2058 .unwrap();
2059
2060 let expected =
2061 BooleanBuffer::from_iter([false, false, true, false].into_iter().cycle().take(len));
2062
2063 assert_eq!(array.len(), len);
2064 assert_eq!(expected, array.logical_nulls().unwrap().into_inner());
2065 assert_eq!(
2066 expected,
2067 array.mask_sparse_all_with_nulls_skip_one(array.fields_logical_nulls())
2068 );
2069 }
2070
2071 #[test]
2072 fn test_sparse_union_logical_mask_mixed_nulls_skip_fully_valid() {
2073 let int_array = Int32Array::from_value(2, 6);
2075 let float_array = Float64Array::from_value(4.2, 6);
2076 let str_array = StringArray::new_null(6);
2077 let type_ids = [1, 1, 3, 3, 4, 4].into_iter().collect::<ScalarBuffer<i8>>();
2078
2079 let children = vec![
2080 Arc::new(int_array) as Arc<dyn Array>,
2081 Arc::new(float_array),
2082 Arc::new(str_array),
2083 ];
2084
2085 let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap();
2086
2087 let expected = BooleanBuffer::from(vec![true, true, true, true, false, false]);
2088
2089 assert_eq!(expected, array.logical_nulls().unwrap().into_inner());
2090 assert_eq!(
2091 expected,
2092 array.mask_sparse_skip_without_nulls(array.fields_logical_nulls())
2093 );
2094
2095 let len = 2 * 64 + 32;
2097
2098 let int_array = Int32Array::from_value(2, len);
2099 let float_array = Float64Array::from_value(4.2, len);
2100 let str_array = StringArray::from_iter([None, Some("a")].into_iter().cycle().take(len));
2101 let type_ids = ScalarBuffer::from_iter([1, 1, 3, 3, 4, 4].into_iter().cycle().take(len));
2102
2103 let children = vec![
2104 Arc::new(int_array) as Arc<dyn Array>,
2105 Arc::new(float_array),
2106 Arc::new(str_array),
2107 ];
2108
2109 let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap();
2110
2111 let expected = BooleanBuffer::from_iter(
2112 [true, true, true, true, false, true]
2113 .into_iter()
2114 .cycle()
2115 .take(len),
2116 );
2117
2118 assert_eq!(array.len(), len);
2119 assert_eq!(expected, array.logical_nulls().unwrap().into_inner());
2120 assert_eq!(
2121 expected,
2122 array.mask_sparse_skip_without_nulls(array.fields_logical_nulls())
2123 );
2124 }
2125
2126 #[test]
2127 fn test_sparse_union_logical_mask_mixed_nulls_skip_fully_null() {
2128 let int_array = Int32Array::new_null(6);
2130 let float_array = Float64Array::from_value(4.2, 6);
2131 let str_array = StringArray::new_null(6);
2132 let type_ids = [1, 1, 3, 3, 4, 4].into_iter().collect::<ScalarBuffer<i8>>();
2133
2134 let children = vec![
2135 Arc::new(int_array) as Arc<dyn Array>,
2136 Arc::new(float_array),
2137 Arc::new(str_array),
2138 ];
2139
2140 let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap();
2141
2142 let expected = BooleanBuffer::from(vec![false, false, true, true, false, false]);
2143
2144 assert_eq!(expected, array.logical_nulls().unwrap().into_inner());
2145 assert_eq!(
2146 expected,
2147 array.mask_sparse_skip_fully_null(array.fields_logical_nulls())
2148 );
2149
2150 let len = 2 * 64 + 32;
2152
2153 let int_array = Int32Array::new_null(len);
2154 let float_array = Float64Array::from_value(4.2, len);
2155 let str_array = StringArray::new_null(len);
2156 let type_ids = ScalarBuffer::from_iter([1, 1, 3, 3, 4, 4].into_iter().cycle().take(len));
2157
2158 let children = vec![
2159 Arc::new(int_array) as Arc<dyn Array>,
2160 Arc::new(float_array),
2161 Arc::new(str_array),
2162 ];
2163
2164 let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap();
2165
2166 let expected = BooleanBuffer::from_iter(
2167 [false, false, true, true, false, false]
2168 .into_iter()
2169 .cycle()
2170 .take(len),
2171 );
2172
2173 assert_eq!(array.len(), len);
2174 assert_eq!(expected, array.logical_nulls().unwrap().into_inner());
2175 assert_eq!(
2176 expected,
2177 array.mask_sparse_skip_fully_null(array.fields_logical_nulls())
2178 );
2179 }
2180
2181 #[test]
2182 fn test_sparse_union_logical_nulls_gather() {
2183 let n_fields = 50;
2184
2185 let non_null = Int32Array::from_value(2, 4);
2186 let mixed = Int32Array::from(vec![None, None, Some(1), None]);
2187 let fully_null = Int32Array::new_null(4);
2188
2189 let array = UnionArray::try_new(
2190 (1..)
2191 .step_by(2)
2192 .map(|i| {
2193 (
2194 i,
2195 Arc::new(Field::new(format!("f{i}"), DataType::Int32, true)),
2196 )
2197 })
2198 .take(n_fields)
2199 .collect(),
2200 vec![1, 3, 3, 5].into(),
2201 None,
2202 [
2203 Arc::new(non_null) as ArrayRef,
2204 Arc::new(mixed),
2205 Arc::new(fully_null),
2206 ]
2207 .into_iter()
2208 .cycle()
2209 .take(n_fields)
2210 .collect(),
2211 )
2212 .unwrap();
2213
2214 let expected = BooleanBuffer::from(vec![true, false, true, false]);
2215
2216 assert_eq!(expected, array.logical_nulls().unwrap().into_inner());
2217 assert_eq!(expected, array.gather_nulls(array.fields_logical_nulls()));
2218 }
2219
2220 fn union_fields() -> UnionFields {
2221 [
2222 (1, Arc::new(Field::new("A", DataType::Int32, true))),
2223 (3, Arc::new(Field::new("B", DataType::Float64, true))),
2224 (4, Arc::new(Field::new("C", DataType::Utf8, true))),
2225 ]
2226 .into_iter()
2227 .collect()
2228 }
2229
2230 #[test]
2231 fn test_is_nullable() {
2232 assert!(!create_union_array(false, false).is_nullable());
2233 assert!(create_union_array(true, false).is_nullable());
2234 assert!(create_union_array(false, true).is_nullable());
2235 assert!(create_union_array(true, true).is_nullable());
2236 }
2237
2238 fn create_union_array(int_nullable: bool, float_nullable: bool) -> UnionArray {
2245 let int_array = if int_nullable {
2246 Int32Array::from(vec![Some(1), None, Some(3)])
2247 } else {
2248 Int32Array::from(vec![1, 2, 3])
2249 };
2250 let float_array = if float_nullable {
2251 Float64Array::from(vec![Some(3.2), None, Some(4.2)])
2252 } else {
2253 Float64Array::from(vec![3.2, 4.2, 5.2])
2254 };
2255 let type_ids = [0, 1, 0].into_iter().collect::<ScalarBuffer<i8>>();
2256 let offsets = [0, 0, 0].into_iter().collect::<ScalarBuffer<i32>>();
2257 let union_fields = [
2258 (0, Arc::new(Field::new("A", DataType::Int32, true))),
2259 (1, Arc::new(Field::new("B", DataType::Float64, true))),
2260 ]
2261 .into_iter()
2262 .collect::<UnionFields>();
2263
2264 let children = vec![Arc::new(int_array) as Arc<dyn Array>, Arc::new(float_array)];
2265
2266 UnionArray::try_new(union_fields, type_ids, Some(offsets), children).unwrap()
2267 }
2268}