1use crate::VariantArrayBuilder;
21use crate::type_conversion::{
22    generic_conversion_single_value, generic_conversion_single_value_with_result,
23    primitive_conversion_single_value,
24};
25use arrow::array::{Array, ArrayRef, AsArray, BinaryViewArray, StructArray};
26use arrow::buffer::NullBuffer;
27use arrow::compute::cast;
28use arrow::datatypes::{
29    Date32Type, Decimal32Type, Decimal64Type, Decimal128Type, Float16Type, Float32Type,
30    Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, Time64MicrosecondType,
31    TimestampMicrosecondType, TimestampNanosecondType,
32};
33use arrow::error::Result;
34use arrow_schema::extension::ExtensionType;
35use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, TimeUnit};
36use chrono::{DateTime, NaiveTime};
37use parquet_variant::{
38    Uuid, Variant, VariantDecimal4, VariantDecimal8, VariantDecimal16, VariantDecimalType as _,
39};
40
41use std::borrow::Cow;
42use std::sync::Arc;
43
44pub struct VariantType;
49
50impl ExtensionType for VariantType {
51    const NAME: &'static str = "arrow.parquet.variant";
52
53    type Metadata = &'static str;
56
57    fn metadata(&self) -> &Self::Metadata {
58        &""
59    }
60
61    fn serialize_metadata(&self) -> Option<String> {
62        Some(String::new())
63    }
64
65    fn deserialize_metadata(_metadata: Option<&str>) -> Result<Self::Metadata> {
66        Ok("")
67    }
68
69    fn supports_data_type(&self, data_type: &DataType) -> Result<()> {
70        if matches!(data_type, DataType::Struct(_)) {
71            Ok(())
72        } else {
73            Err(ArrowError::InvalidArgumentError(format!(
74                "VariantType only supports StructArray, got {data_type}"
75            )))
76        }
77    }
78
79    fn try_new(data_type: &DataType, _metadata: Self::Metadata) -> Result<Self> {
80        Self.supports_data_type(data_type)?;
81        Ok(Self)
82    }
83}
84
85#[derive(Debug, Clone, PartialEq)]
217pub struct VariantArray {
218    inner: StructArray,
220
221    metadata: BinaryViewArray,
223
224    shredding_state: ShreddingState,
226}
227
228impl VariantArray {
229    pub fn try_new(inner: &dyn Array) -> Result<Self> {
257        let inner = cast_to_binary_view_arrays(inner)?;
260
261        let Some(inner) = inner.as_struct_opt() else {
262            return Err(ArrowError::InvalidArgumentError(
263                "Invalid VariantArray: requires StructArray as input".to_string(),
264            ));
265        };
266
267        let Some(metadata_field) = inner.column_by_name("metadata") else {
271            return Err(ArrowError::InvalidArgumentError(
272                "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(),
273            ));
274        };
275        let Some(metadata) = metadata_field.as_binary_view_opt() else {
276            return Err(ArrowError::NotYetImplemented(format!(
277                "VariantArray 'metadata' field must be BinaryView, got {}",
278                metadata_field.data_type()
279            )));
280        };
281
282        Ok(Self {
284            inner: inner.clone(),
285            metadata: metadata.clone(),
286            shredding_state: ShreddingState::try_from(inner)?,
287        })
288    }
289
290    pub(crate) fn from_parts(
291        metadata: BinaryViewArray,
292        value: Option<BinaryViewArray>,
293        typed_value: Option<ArrayRef>,
294        nulls: Option<NullBuffer>,
295    ) -> Self {
296        let mut builder =
297            StructArrayBuilder::new().with_field("metadata", Arc::new(metadata.clone()), false);
298        if let Some(value) = value.clone() {
299            builder = builder.with_field("value", Arc::new(value), true);
300        }
301        if let Some(typed_value) = typed_value.clone() {
302            builder = builder.with_field("typed_value", typed_value, true);
303        }
304        if let Some(nulls) = nulls {
305            builder = builder.with_nulls(nulls);
306        }
307
308        Self {
309            inner: builder.build(),
310            metadata,
311            shredding_state: ShreddingState::new(value, typed_value),
312        }
313    }
314
315    pub fn inner(&self) -> &StructArray {
317        &self.inner
318    }
319
320    pub fn into_inner(self) -> StructArray {
322        self.inner
323    }
324
325    pub fn shredding_state(&self) -> &ShreddingState {
327        &self.shredding_state
328    }
329
330    pub fn value(&self, index: usize) -> Variant<'_, '_> {
340        self.try_value(index).unwrap()
341    }
342
343    pub fn try_value(&self, index: usize) -> Result<Variant<'_, '_>> {
371        match (self.typed_value_field(), self.value_field()) {
372            (Some(typed_value), value) if typed_value.is_valid(index) => {
374                typed_value_to_variant(typed_value, value, index)
375            }
376            (_, Some(value)) if value.is_valid(index) => {
378                Ok(Variant::new(self.metadata.value(index), value.value(index)))
379            }
380            _ => Ok(Variant::Null),
383        }
384    }
385
386    pub fn metadata_field(&self) -> &BinaryViewArray {
388        &self.metadata
389    }
390
391    pub fn value_field(&self) -> Option<&BinaryViewArray> {
393        self.shredding_state.value_field()
394    }
395
396    pub fn typed_value_field(&self) -> Option<&ArrayRef> {
398        self.shredding_state.typed_value_field()
399    }
400
401    pub fn field(&self, name: impl Into<String>) -> Field {
404        Field::new(
405            name.into(),
406            self.data_type().clone(),
407            self.inner.is_nullable(),
408        )
409        .with_extension_type(VariantType)
410    }
411
412    pub fn data_type(&self) -> &DataType {
414        self.inner.data_type()
415    }
416
417    pub fn slice(&self, offset: usize, length: usize) -> Self {
418        let inner = self.inner.slice(offset, length);
419        let metadata = self.metadata.slice(offset, length);
420        let shredding_state = self.shredding_state.slice(offset, length);
421        Self {
422            inner,
423            metadata,
424            shredding_state,
425        }
426    }
427
428    pub fn len(&self) -> usize {
429        self.inner.len()
430    }
431
432    pub fn is_empty(&self) -> bool {
433        self.inner.is_empty()
434    }
435
436    pub fn nulls(&self) -> Option<&NullBuffer> {
437        self.inner.nulls()
438    }
439
440    pub fn is_null(&self, index: usize) -> bool {
442        self.nulls().is_some_and(|n| n.is_null(index))
443    }
444
445    pub fn is_valid(&self, index: usize) -> bool {
447        !self.is_null(index)
448    }
449
450    pub fn iter(&self) -> VariantArrayIter<'_> {
452        VariantArrayIter::new(self)
453    }
454}
455
456impl From<VariantArray> for StructArray {
457    fn from(variant_array: VariantArray) -> Self {
458        variant_array.into_inner()
459    }
460}
461
462impl From<VariantArray> for ArrayRef {
463    fn from(variant_array: VariantArray) -> Self {
464        Arc::new(variant_array.into_inner())
465    }
466}
467
468impl<'m, 'v> FromIterator<Option<Variant<'m, 'v>>> for VariantArray {
469    fn from_iter<T: IntoIterator<Item = Option<Variant<'m, 'v>>>>(iter: T) -> Self {
470        let iter = iter.into_iter();
471
472        let mut b = VariantArrayBuilder::new(iter.size_hint().0);
473        b.extend(iter);
474        b.build()
475    }
476}
477
478impl<'m, 'v> FromIterator<Variant<'m, 'v>> for VariantArray {
479    fn from_iter<T: IntoIterator<Item = Variant<'m, 'v>>>(iter: T) -> Self {
480        Self::from_iter(iter.into_iter().map(Some))
481    }
482}
483
484#[derive(Debug)]
509pub struct VariantArrayIter<'a> {
510    array: &'a VariantArray,
511    head_i: usize,
512    tail_i: usize,
513}
514
515impl<'a> VariantArrayIter<'a> {
516    pub fn new(array: &'a VariantArray) -> Self {
518        Self {
519            array,
520            head_i: 0,
521            tail_i: array.len(),
522        }
523    }
524
525    fn value_opt(&self, i: usize) -> Option<Variant<'a, 'a>> {
526        self.array.is_valid(i).then(|| self.array.value(i))
527    }
528}
529
530impl<'a> Iterator for VariantArrayIter<'a> {
531    type Item = Option<Variant<'a, 'a>>;
532
533    #[inline]
534    fn next(&mut self) -> Option<Self::Item> {
535        if self.head_i == self.tail_i {
536            return None;
537        }
538
539        let out = self.value_opt(self.head_i);
540
541        self.head_i += 1;
542
543        Some(out)
544    }
545
546    fn size_hint(&self) -> (usize, Option<usize>) {
547        let remainder = self.tail_i - self.head_i;
548
549        (remainder, Some(remainder))
550    }
551}
552
553impl<'a> DoubleEndedIterator for VariantArrayIter<'a> {
554    fn next_back(&mut self) -> Option<Self::Item> {
555        if self.head_i == self.tail_i {
556            return None;
557        }
558
559        self.tail_i -= 1;
560
561        Some(self.value_opt(self.tail_i))
562    }
563}
564
565impl<'a> ExactSizeIterator for VariantArrayIter<'a> {}
566
567#[derive(Debug)]
602pub struct ShreddedVariantFieldArray {
603    inner: StructArray,
605    shredding_state: ShreddingState,
606}
607
608#[allow(unused)]
609impl ShreddedVariantFieldArray {
610    pub fn try_new(inner: &dyn Array) -> Result<Self> {
631        let Some(inner_struct) = inner.as_struct_opt() else {
632            return Err(ArrowError::InvalidArgumentError(
633                "Invalid ShreddedVariantFieldArray: requires StructArray as input".to_string(),
634            ));
635        };
636
637        Ok(Self {
639            inner: inner_struct.clone(),
640            shredding_state: ShreddingState::try_from(inner_struct)?,
641        })
642    }
643
644    pub fn shredding_state(&self) -> &ShreddingState {
646        &self.shredding_state
647    }
648
649    pub fn value_field(&self) -> Option<&BinaryViewArray> {
651        self.shredding_state.value_field()
652    }
653
654    pub fn typed_value_field(&self) -> Option<&ArrayRef> {
656        self.shredding_state.typed_value_field()
657    }
658
659    pub fn inner(&self) -> &StructArray {
661        &self.inner
662    }
663
664    pub(crate) fn from_parts(
665        value: Option<BinaryViewArray>,
666        typed_value: Option<ArrayRef>,
667        nulls: Option<NullBuffer>,
668    ) -> Self {
669        let mut builder = StructArrayBuilder::new();
670        if let Some(value) = value.clone() {
671            builder = builder.with_field("value", Arc::new(value), true);
672        }
673        if let Some(typed_value) = typed_value.clone() {
674            builder = builder.with_field("typed_value", typed_value, true);
675        }
676        if let Some(nulls) = nulls {
677            builder = builder.with_nulls(nulls);
678        }
679
680        Self {
681            inner: builder.build(),
682            shredding_state: ShreddingState::new(value, typed_value),
683        }
684    }
685
686    pub fn into_inner(self) -> StructArray {
688        self.inner
689    }
690
691    pub fn data_type(&self) -> &DataType {
692        self.inner.data_type()
693    }
694
695    pub fn len(&self) -> usize {
696        self.inner.len()
697    }
698
699    pub fn is_empty(&self) -> bool {
700        self.inner.is_empty()
701    }
702
703    pub fn offset(&self) -> usize {
704        self.inner.offset()
705    }
706
707    pub fn nulls(&self) -> Option<&NullBuffer> {
708        None
712    }
713    pub fn is_null(&self, index: usize) -> bool {
715        self.nulls().is_some_and(|n| n.is_null(index))
716    }
717
718    pub fn is_valid(&self, index: usize) -> bool {
720        !self.is_null(index)
721    }
722}
723
724impl From<ShreddedVariantFieldArray> for ArrayRef {
725    fn from(array: ShreddedVariantFieldArray) -> Self {
726        Arc::new(array.into_inner())
727    }
728}
729
730impl From<ShreddedVariantFieldArray> for StructArray {
731    fn from(array: ShreddedVariantFieldArray) -> Self {
732        array.into_inner()
733    }
734}
735
736#[derive(Debug, Clone, PartialEq)]
770pub struct ShreddingState {
771    value: Option<BinaryViewArray>,
772    typed_value: Option<ArrayRef>,
773}
774
775impl ShreddingState {
776    pub fn new(value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>) -> Self {
791        Self { value, typed_value }
792    }
793
794    pub fn value_field(&self) -> Option<&BinaryViewArray> {
796        self.value.as_ref()
797    }
798
799    pub fn typed_value_field(&self) -> Option<&ArrayRef> {
801        self.typed_value.as_ref()
802    }
803
804    pub fn borrow(&self) -> BorrowedShreddingState<'_> {
806        BorrowedShreddingState {
807            value: self.value_field(),
808            typed_value: self.typed_value_field(),
809        }
810    }
811
812    pub fn slice(&self, offset: usize, length: usize) -> Self {
814        Self {
815            value: self.value.as_ref().map(|v| v.slice(offset, length)),
816            typed_value: self.typed_value.as_ref().map(|tv| tv.slice(offset, length)),
817        }
818    }
819}
820
821#[derive(Clone, Debug)]
824pub struct BorrowedShreddingState<'a> {
825    value: Option<&'a BinaryViewArray>,
826    typed_value: Option<&'a ArrayRef>,
827}
828
829impl<'a> BorrowedShreddingState<'a> {
830    pub fn new(value: Option<&'a BinaryViewArray>, typed_value: Option<&'a ArrayRef>) -> Self {
845        Self { value, typed_value }
846    }
847
848    pub fn value_field(&self) -> Option<&'a BinaryViewArray> {
850        self.value
851    }
852
853    pub fn typed_value_field(&self) -> Option<&'a ArrayRef> {
855        self.typed_value
856    }
857}
858
859impl<'a> TryFrom<&'a StructArray> for BorrowedShreddingState<'a> {
860    type Error = ArrowError;
861
862    fn try_from(inner_struct: &'a StructArray) -> Result<Self> {
863        let value = if let Some(value_col) = inner_struct.column_by_name("value") {
865            let Some(binary_view) = value_col.as_binary_view_opt() else {
866                return Err(ArrowError::NotYetImplemented(format!(
867                    "VariantArray 'value' field must be BinaryView, got {}",
868                    value_col.data_type()
869                )));
870            };
871            Some(binary_view)
872        } else {
873            None
874        };
875        let typed_value = inner_struct.column_by_name("typed_value");
876        Ok(BorrowedShreddingState::new(value, typed_value))
877    }
878}
879
880impl TryFrom<&StructArray> for ShreddingState {
881    type Error = ArrowError;
882
883    fn try_from(inner_struct: &StructArray) -> Result<Self> {
884        Ok(BorrowedShreddingState::try_from(inner_struct)?.into())
885    }
886}
887
888impl From<BorrowedShreddingState<'_>> for ShreddingState {
889    fn from(state: BorrowedShreddingState<'_>) -> Self {
890        ShreddingState {
891            value: state.value_field().cloned(),
892            typed_value: state.typed_value_field().cloned(),
893        }
894    }
895}
896
897#[derive(Debug, Default, Clone)]
901pub(crate) struct StructArrayBuilder {
902    fields: Vec<FieldRef>,
903    arrays: Vec<ArrayRef>,
904    nulls: Option<NullBuffer>,
905}
906
907impl StructArrayBuilder {
908    pub fn new() -> Self {
909        Default::default()
910    }
911
912    pub fn with_field(mut self, field_name: &str, array: ArrayRef, nullable: bool) -> Self {
914        let field = Field::new(field_name, array.data_type().clone(), nullable);
915        self.fields.push(Arc::new(field));
916        self.arrays.push(array);
917        self
918    }
919
920    pub fn with_nulls(mut self, nulls: NullBuffer) -> Self {
922        self.nulls = Some(nulls);
923        self
924    }
925
926    pub fn build(self) -> StructArray {
927        let Self {
928            fields,
929            arrays,
930            nulls,
931        } = self;
932        StructArray::new(Fields::from(fields), arrays, nulls)
933    }
934}
935
936fn typed_value_to_variant<'a>(
938    typed_value: &'a ArrayRef,
939    value: Option<&BinaryViewArray>,
940    index: usize,
941) -> Result<Variant<'a, 'a>> {
942    let data_type = typed_value.data_type();
943    if value.is_some_and(|v| !matches!(data_type, DataType::Struct(_)) && v.is_valid(index)) {
944        panic!("Invalid variant, conflicting value and typed_value");
946    }
947    match data_type {
948        DataType::Null => Ok(Variant::Null),
949        DataType::Boolean => {
950            let boolean_array = typed_value.as_boolean();
951            let value = boolean_array.value(index);
952            Ok(Variant::from(value))
953        }
954        DataType::FixedSizeBinary(16) => {
956            let array = typed_value.as_fixed_size_binary();
957            let value = array.value(index);
958            Ok(Uuid::from_slice(value).unwrap().into()) }
960        DataType::BinaryView => {
961            let array = typed_value.as_binary_view();
962            let value = array.value(index);
963            Ok(Variant::from(value))
964        }
965        DataType::Utf8 => {
966            let array = typed_value.as_string::<i32>();
967            let value = array.value(index);
968            Ok(Variant::from(value))
969        }
970        DataType::Int8 => {
971            primitive_conversion_single_value!(Int8Type, typed_value, index)
972        }
973        DataType::Int16 => {
974            primitive_conversion_single_value!(Int16Type, typed_value, index)
975        }
976        DataType::Int32 => {
977            primitive_conversion_single_value!(Int32Type, typed_value, index)
978        }
979        DataType::Int64 => {
980            primitive_conversion_single_value!(Int64Type, typed_value, index)
981        }
982        DataType::Float16 => {
983            primitive_conversion_single_value!(Float16Type, typed_value, index)
984        }
985        DataType::Float32 => {
986            primitive_conversion_single_value!(Float32Type, typed_value, index)
987        }
988        DataType::Float64 => {
989            primitive_conversion_single_value!(Float64Type, typed_value, index)
990        }
991        DataType::Decimal32(_, s) => {
992            generic_conversion_single_value_with_result!(
993                Decimal32Type,
994                as_primitive,
995                |v| VariantDecimal4::try_new(v, *s as u8),
996                typed_value,
997                index
998            )
999        }
1000        DataType::Decimal64(_, s) => {
1001            generic_conversion_single_value_with_result!(
1002                Decimal64Type,
1003                as_primitive,
1004                |v| VariantDecimal8::try_new(v, *s as u8),
1005                typed_value,
1006                index
1007            )
1008        }
1009        DataType::Decimal128(_, s) => {
1010            generic_conversion_single_value_with_result!(
1011                Decimal128Type,
1012                as_primitive,
1013                |v| VariantDecimal16::try_new(v, *s as u8),
1014                typed_value,
1015                index
1016            )
1017        }
1018        DataType::Date32 => {
1019            generic_conversion_single_value!(
1020                Date32Type,
1021                as_primitive,
1022                Date32Type::to_naive_date,
1023                typed_value,
1024                index
1025            )
1026        }
1027        DataType::Time64(TimeUnit::Microsecond) => {
1028            generic_conversion_single_value_with_result!(
1029                Time64MicrosecondType,
1030                as_primitive,
1031                |v| NaiveTime::from_num_seconds_from_midnight_opt(
1032                    (v / 1_000_000) as u32,
1033                    (v % 1_000_000) as u32 * 1000
1034                )
1035                .ok_or_else(|| format!("Invalid microsecond from midnight: {}", v)),
1036                typed_value,
1037                index
1038            )
1039        }
1040        DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => {
1041            generic_conversion_single_value!(
1042                TimestampMicrosecondType,
1043                as_primitive,
1044                |v| DateTime::from_timestamp_micros(v).unwrap(),
1045                typed_value,
1046                index
1047            )
1048        }
1049        DataType::Timestamp(TimeUnit::Microsecond, None) => {
1050            generic_conversion_single_value!(
1051                TimestampMicrosecondType,
1052                as_primitive,
1053                |v| DateTime::from_timestamp_micros(v).unwrap().naive_utc(),
1054                typed_value,
1055                index
1056            )
1057        }
1058        DataType::Timestamp(TimeUnit::Nanosecond, Some(_)) => {
1059            generic_conversion_single_value!(
1060                TimestampNanosecondType,
1061                as_primitive,
1062                DateTime::from_timestamp_nanos,
1063                typed_value,
1064                index
1065            )
1066        }
1067        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
1068            generic_conversion_single_value!(
1069                TimestampNanosecondType,
1070                as_primitive,
1071                |v| DateTime::from_timestamp_nanos(v).naive_utc(),
1072                typed_value,
1073                index
1074            )
1075        }
1076        _ => {
1079            debug_assert!(
1083                false,
1084                "Unsupported typed_value type: {}",
1085                typed_value.data_type()
1086            );
1087            Ok(Variant::Null)
1088        }
1089    }
1090}
1091
1092fn cast_to_binary_view_arrays(array: &dyn Array) -> Result<ArrayRef> {
1103    let new_type = canonicalize_and_verify_data_type(array.data_type())?;
1104    if let Cow::Borrowed(_) = new_type {
1105        if let Some(array) = array.as_struct_opt() {
1106            return Ok(Arc::new(array.clone())); }
1108    }
1109    cast(array, new_type.as_ref())
1110}
1111
1112fn canonicalize_and_verify_data_type(data_type: &DataType) -> Result<Cow<'_, DataType>> {
1116    use DataType::*;
1117
1118    macro_rules! fail {
1120        () => {
1121            return Err(ArrowError::InvalidArgumentError(format!(
1122                "Illegal shredded value type: {data_type}"
1123            )))
1124        };
1125    }
1126    macro_rules! borrow {
1127        () => {
1128            Cow::Borrowed(data_type)
1129        };
1130    }
1131
1132    let new_data_type = match data_type {
1133        Null | Boolean => borrow!(),
1135        Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => borrow!(),
1136
1137        UInt8 | UInt16 | UInt32 | UInt64 | Float16 => fail!(),
1139
1140        Decimal64(p, s) | Decimal128(p, s)
1145            if VariantDecimal4::is_valid_precision_and_scale(p, s) =>
1146        {
1147            Cow::Owned(Decimal32(*p, *s))
1148        }
1149        Decimal128(p, s) if VariantDecimal8::is_valid_precision_and_scale(p, s) => {
1150            Cow::Owned(Decimal64(*p, *s))
1151        }
1152        Decimal32(p, s) if VariantDecimal4::is_valid_precision_and_scale(p, s) => borrow!(),
1153        Decimal64(p, s) if VariantDecimal8::is_valid_precision_and_scale(p, s) => borrow!(),
1154        Decimal128(p, s) if VariantDecimal16::is_valid_precision_and_scale(p, s) => borrow!(),
1155        Decimal32(..) | Decimal64(..) | Decimal128(..) | Decimal256(..) => fail!(),
1156
1157        Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _) => borrow!(),
1159        Timestamp(TimeUnit::Millisecond | TimeUnit::Second, _) => fail!(),
1160
1161        Date32 | Time64(TimeUnit::Microsecond) => borrow!(),
1163        Date64 | Time32(_) | Time64(_) | Duration(_) | Interval(_) => fail!(),
1164
1165        Binary => Cow::Owned(DataType::BinaryView),
1168        BinaryView | Utf8 => borrow!(),
1169
1170        FixedSizeBinary(16) => borrow!(),
1172        FixedSizeBinary(_) | FixedSizeList(..) => fail!(),
1173
1174        LargeBinary | LargeUtf8 | Utf8View | ListView(_) | LargeList(_) | LargeListView(_) => {
1176            fail!()
1177        }
1178
1179        List(field) => match canonicalize_and_verify_field(field)? {
1181            Cow::Borrowed(_) => borrow!(),
1182            Cow::Owned(new_field) => Cow::Owned(DataType::List(new_field)),
1183        },
1184        Struct(fields) => {
1186            let mut new_fields = std::collections::HashMap::new();
1189            for (i, field) in fields.iter().enumerate() {
1190                if let Cow::Owned(new_field) = canonicalize_and_verify_field(field)? {
1191                    new_fields.insert(i, new_field);
1192                }
1193            }
1194
1195            if new_fields.is_empty() {
1196                borrow!()
1197            } else {
1198                let new_fields = fields
1199                    .iter()
1200                    .enumerate()
1201                    .map(|(i, field)| new_fields.remove(&i).unwrap_or_else(|| field.clone()));
1202                Cow::Owned(DataType::Struct(new_fields.collect()))
1203            }
1204        }
1205        Map(..) | Union(..) => fail!(),
1206
1207        Dictionary(..) | RunEndEncoded(..) => fail!(),
1209    };
1210    Ok(new_data_type)
1211}
1212
1213fn canonicalize_and_verify_field(field: &Arc<Field>) -> Result<Cow<'_, Arc<Field>>> {
1214    let Cow::Owned(new_data_type) = canonicalize_and_verify_data_type(field.data_type())? else {
1215        return Ok(Cow::Borrowed(field));
1216    };
1217    let new_field = field.as_ref().clone().with_data_type(new_data_type);
1218    Ok(Cow::Owned(Arc::new(new_field)))
1219}
1220
1221#[cfg(test)]
1222mod test {
1223    use crate::VariantArrayBuilder;
1224    use std::str::FromStr;
1225
1226    use super::*;
1227    use arrow::array::{
1228        BinaryViewArray, Decimal32Array, Decimal64Array, Decimal128Array, Int32Array,
1229        Time64MicrosecondArray,
1230    };
1231    use arrow_schema::{Field, Fields};
1232    use parquet_variant::{EMPTY_VARIANT_METADATA_BYTES, ShortString};
1233
1234    #[test]
1235    fn invalid_not_a_struct_array() {
1236        let array = make_binary_view_array();
1237        let err = VariantArray::try_new(&array);
1239        assert_eq!(
1240            err.unwrap_err().to_string(),
1241            "Invalid argument error: Invalid VariantArray: requires StructArray as input"
1242        );
1243    }
1244
1245    #[test]
1246    fn invalid_missing_metadata() {
1247        let fields = Fields::from(vec![Field::new("value", DataType::BinaryView, true)]);
1248        let array = StructArray::new(fields, vec![make_binary_view_array()], None);
1249        let err = VariantArray::try_new(&array);
1251        assert_eq!(
1252            err.unwrap_err().to_string(),
1253            "Invalid argument error: Invalid VariantArray: StructArray must contain a 'metadata' field"
1254        );
1255    }
1256
1257    #[test]
1258    fn all_null_missing_value_and_typed_value() {
1259        let fields = Fields::from(vec![Field::new("metadata", DataType::BinaryView, false)]);
1260        let array = StructArray::new(fields, vec![make_binary_view_array()], None);
1261
1262        let variant_array = VariantArray::try_new(&array).unwrap();
1266
1267        assert!(matches!(
1269            variant_array.shredding_state(),
1270            ShreddingState {
1271                value: None,
1272                typed_value: None
1273            }
1274        ));
1275
1276        for i in 0..variant_array.len() {
1278            if variant_array.is_valid(i) {
1279                assert_eq!(variant_array.value(i), parquet_variant::Variant::Null);
1280            }
1281        }
1282    }
1283
1284    #[test]
1285    fn invalid_metadata_field_type() {
1286        let fields = Fields::from(vec![
1287            Field::new("metadata", DataType::Int32, true), Field::new("value", DataType::BinaryView, true),
1289        ]);
1290        let array = StructArray::new(
1291            fields,
1292            vec![make_int32_array(), make_binary_view_array()],
1293            None,
1294        );
1295        let err = VariantArray::try_new(&array);
1296        assert_eq!(
1297            err.unwrap_err().to_string(),
1298            "Not yet implemented: VariantArray 'metadata' field must be BinaryView, got Int32"
1299        );
1300    }
1301
1302    #[test]
1303    fn invalid_value_field_type() {
1304        let fields = Fields::from(vec![
1305            Field::new("metadata", DataType::BinaryView, true),
1306            Field::new("value", DataType::Int32, true), ]);
1308        let array = StructArray::new(
1309            fields,
1310            vec![make_binary_view_array(), make_int32_array()],
1311            None,
1312        );
1313        let err = VariantArray::try_new(&array);
1314        assert_eq!(
1315            err.unwrap_err().to_string(),
1316            "Not yet implemented: VariantArray 'value' field must be BinaryView, got Int32"
1317        );
1318    }
1319
1320    fn make_binary_view_array() -> ArrayRef {
1321        Arc::new(BinaryViewArray::from(vec![b"test" as &[u8]]))
1322    }
1323
1324    fn make_int32_array() -> ArrayRef {
1325        Arc::new(Int32Array::from(vec![1]))
1326    }
1327
1328    #[test]
1329    fn all_null_shredding_state() {
1330        assert!(matches!(
1332            ShreddingState::new(None, None),
1333            ShreddingState {
1334                value: None,
1335                typed_value: None
1336            }
1337        ));
1338    }
1339
1340    #[test]
1341    fn all_null_variant_array_construction() {
1342        let metadata = BinaryViewArray::from(vec![b"test" as &[u8]; 3]);
1343        let nulls = NullBuffer::from(vec![false, false, false]); let fields = Fields::from(vec![Field::new("metadata", DataType::BinaryView, false)]);
1346        let struct_array = StructArray::new(fields, vec![Arc::new(metadata)], Some(nulls));
1347
1348        let variant_array = VariantArray::try_new(&struct_array).unwrap();
1349
1350        assert!(matches!(
1352            variant_array.shredding_state(),
1353            ShreddingState {
1354                value: None,
1355                typed_value: None
1356            }
1357        ));
1358
1359        assert_eq!(variant_array.len(), 3);
1361        assert!(!variant_array.is_valid(0));
1362        assert!(!variant_array.is_valid(1));
1363        assert!(!variant_array.is_valid(2));
1364
1365        for i in 0..variant_array.len() {
1367            assert!(
1368                !variant_array.is_valid(i),
1369                "Expected value at index {i} to be null"
1370            );
1371        }
1372    }
1373
1374    #[test]
1375    fn value_field_present_but_all_null_should_be_unshredded() {
1376        let metadata = BinaryViewArray::from(vec![b"test" as &[u8]; 3]);
1379
1380        let value_nulls = NullBuffer::from(vec![false, false, false]); let value_array = BinaryViewArray::from_iter_values(vec![""; 3]);
1383        let value_data = value_array
1384            .to_data()
1385            .into_builder()
1386            .nulls(Some(value_nulls))
1387            .build()
1388            .unwrap();
1389        let value = BinaryViewArray::from(value_data);
1390
1391        let fields = Fields::from(vec![
1392            Field::new("metadata", DataType::BinaryView, false),
1393            Field::new("value", DataType::BinaryView, true), ]);
1395        let struct_array = StructArray::new(
1396            fields,
1397            vec![Arc::new(metadata), Arc::new(value)],
1398            None, );
1400
1401        let variant_array = VariantArray::try_new(&struct_array).unwrap();
1402
1403        assert!(matches!(
1405            variant_array.shredding_state(),
1406            ShreddingState {
1407                value: Some(_),
1408                typed_value: None
1409            }
1410        ));
1411    }
1412
1413    #[test]
1414    fn test_variant_array_iterable() {
1415        let mut b = VariantArrayBuilder::new(6);
1416
1417        b.append_null();
1418        b.append_variant(Variant::from(1_i8));
1419        b.append_variant(Variant::Null);
1420        b.append_variant(Variant::from(2_i32));
1421        b.append_variant(Variant::from(3_i64));
1422        b.append_null();
1423
1424        let v = b.build();
1425
1426        let variants = v.iter().collect::<Vec<_>>();
1427
1428        assert_eq!(
1429            variants,
1430            vec![
1431                None,
1432                Some(Variant::Int8(1)),
1433                Some(Variant::Null),
1434                Some(Variant::Int32(2)),
1435                Some(Variant::Int64(3)),
1436                None,
1437            ]
1438        );
1439    }
1440
1441    #[test]
1442    fn test_variant_array_iter_double_ended() {
1443        let mut b = VariantArrayBuilder::new(5);
1444
1445        b.append_variant(Variant::from(0_i32));
1446        b.append_null();
1447        b.append_variant(Variant::from(2_i32));
1448        b.append_null();
1449        b.append_variant(Variant::from(4_i32));
1450
1451        let array = b.build();
1452        let mut iter = array.iter();
1453
1454        assert_eq!(iter.next(), Some(Some(Variant::from(0_i32))));
1455        assert_eq!(iter.next(), Some(None));
1456
1457        assert_eq!(iter.next_back(), Some(Some(Variant::from(4_i32))));
1458        assert_eq!(iter.next_back(), Some(None));
1459        assert_eq!(iter.next_back(), Some(Some(Variant::from(2_i32))));
1460
1461        assert_eq!(iter.next_back(), None);
1462        assert_eq!(iter.next(), None);
1463    }
1464
1465    #[test]
1466    fn test_variant_array_iter_reverse() {
1467        let mut b = VariantArrayBuilder::new(5);
1468
1469        b.append_variant(Variant::from("a"));
1470        b.append_null();
1471        b.append_variant(Variant::from("aaa"));
1472        b.append_null();
1473        b.append_variant(Variant::from("aaaaa"));
1474
1475        let array = b.build();
1476
1477        let result: Vec<_> = array.iter().rev().collect();
1478        assert_eq!(
1479            result,
1480            vec![
1481                Some(Variant::from("aaaaa")),
1482                None,
1483                Some(Variant::from("aaa")),
1484                None,
1485                Some(Variant::from("a")),
1486            ]
1487        );
1488    }
1489
1490    #[test]
1491    fn test_variant_array_iter_empty() {
1492        let v = VariantArrayBuilder::new(0).build();
1493        let mut i = v.iter();
1494        assert!(i.next().is_none());
1495        assert!(i.next_back().is_none());
1496    }
1497
1498    #[test]
1499    fn test_from_variant_opts_into_variant_array() {
1500        let v = vec![None, Some(Variant::Null), Some(Variant::BooleanFalse), None];
1501
1502        let variant_array = VariantArray::from_iter(v);
1503
1504        assert_eq!(variant_array.len(), 4);
1505
1506        assert!(variant_array.is_null(0));
1507
1508        assert!(!variant_array.is_null(1));
1509        assert_eq!(variant_array.value(1), Variant::Null);
1510
1511        assert!(!variant_array.is_null(2));
1512        assert_eq!(variant_array.value(2), Variant::BooleanFalse);
1513
1514        assert!(variant_array.is_null(3));
1515    }
1516
1517    #[test]
1518    fn test_from_variants_into_variant_array() {
1519        let v = vec![
1520            Variant::Null,
1521            Variant::BooleanFalse,
1522            Variant::ShortString(ShortString::try_new("norm").unwrap()),
1523        ];
1524
1525        let variant_array = VariantArray::from_iter(v);
1526
1527        assert_eq!(variant_array.len(), 3);
1528
1529        assert!(!variant_array.is_null(0));
1530        assert_eq!(variant_array.value(0), Variant::Null);
1531
1532        assert!(!variant_array.is_null(1));
1533        assert_eq!(variant_array.value(1), Variant::BooleanFalse);
1534
1535        assert!(!variant_array.is_null(2));
1536        assert_eq!(
1537            variant_array.value(2),
1538            Variant::ShortString(ShortString::try_new("norm").unwrap())
1539        );
1540    }
1541
1542    #[test]
1543    fn test_variant_equality() {
1544        let v_iter = [None, Some(Variant::BooleanFalse), Some(Variant::Null), None];
1545        let v = VariantArray::from_iter(v_iter.clone());
1546
1547        {
1548            let v_copy = v.clone();
1549            assert_eq!(v, v_copy);
1550        }
1551
1552        {
1553            let v_iter_reversed = v_iter.iter().cloned().rev();
1554            let v_reversed = VariantArray::from_iter(v_iter_reversed);
1555
1556            assert_ne!(v, v_reversed);
1557        }
1558
1559        {
1560            let v_sliced = v.slice(0, 1);
1561            assert_ne!(v, v_sliced);
1562        }
1563    }
1564
1565    macro_rules! invalid_variant_array_test {
1566        ($fn_name: ident, $invalid_typed_value: expr, $error_msg: literal) => {
1567            #[test]
1568            fn $fn_name() {
1569                let metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n(
1570                    EMPTY_VARIANT_METADATA_BYTES,
1571                    1,
1572                ));
1573                let invalid_typed_value = $invalid_typed_value;
1574
1575                let struct_array = StructArrayBuilder::new()
1576                    .with_field("metadata", Arc::new(metadata), false)
1577                    .with_field("typed_value", Arc::new(invalid_typed_value), true)
1578                    .build();
1579
1580                let array: VariantArray = VariantArray::try_new(&struct_array)
1581                    .expect("should create variant array")
1582                    .into();
1583
1584                let result = array.try_value(0);
1585                assert!(result.is_err());
1586                let error = result.unwrap_err();
1587                assert!(matches!(error, ArrowError::CastError(_)));
1588
1589                let expected: &str = $error_msg;
1590                assert!(
1591                    error.to_string().contains($error_msg),
1592                    "error `{}` did not contain `{}`",
1593                    error,
1594                    expected
1595                )
1596            }
1597        };
1598    }
1599
1600    invalid_variant_array_test!(
1601        test_variant_array_invalide_time,
1602        Time64MicrosecondArray::from(vec![Some(86401000000)]),
1603        "Cast error: Cast failed at index 0 (array type: Time64(µs)): Invalid microsecond from midnight: 86401000000"
1604    );
1605
1606    invalid_variant_array_test!(
1607        test_variant_array_invalid_decimal32,
1608        Decimal32Array::from(vec![Some(1234567890)]),
1609        "Cast error: Cast failed at index 0 (array type: Decimal32(9, 2)): Invalid argument error: 1234567890 is wider than max precision 9"
1610    );
1611
1612    invalid_variant_array_test!(
1613        test_variant_array_invalid_decimal64,
1614        Decimal64Array::from(vec![Some(1234567890123456789)]),
1615        "Cast error: Cast failed at index 0 (array type: Decimal64(18, 6)): Invalid argument error: 1234567890123456789 is wider than max precision 18"
1616    );
1617
1618    invalid_variant_array_test!(
1619        test_variant_array_invalid_decimal128,
1620        Decimal128Array::from(vec![Some(
1621            i128::from_str("123456789012345678901234567890123456789").unwrap()
1622        ),]),
1623        "Cast error: Cast failed at index 0 (array type: Decimal128(38, 10)): Invalid argument error: 123456789012345678901234567890123456789 is wider than max precision 38"
1624    );
1625}