Skip to main content

parquet_variant_compute/
shred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for shredding VariantArray with a given schema.
19
20use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
21use crate::variant_to_arrow::{
22    ArrayVariantToArrowRowBuilder, PrimitiveVariantToArrowRowBuilder,
23    make_primitive_variant_to_arrow_row_builder,
24};
25use crate::{VariantArray, VariantValueArrayBuilder};
26use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
27use arrow::buffer::NullBuffer;
28use arrow::compute::CastOptions;
29use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
30use arrow::error::{ArrowError, Result};
31use indexmap::IndexMap;
32use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement};
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36/// Shreds the input binary variant using a target shredding schema derived from the requested data type.
37///
38/// For example, requesting `DataType::Int64` would produce an output variant array with the schema:
39///
40/// ```text
41/// {
42///    metadata: BINARY,
43///    value: BINARY,
44///    typed_value: LONG,
45/// }
46/// ```
47///
48/// Similarly, requesting `DataType::Struct` with two integer fields `a` and `b` would produce an
49/// output variant array with the schema:
50///
51/// ```text
52/// {
53///   metadata: BINARY,
54///   value: BINARY,
55///   typed_value: {
56///     a: {
57///       value: BINARY,
58///       typed_value: INT,
59///     },
60///     b: {
61///       value: BINARY,
62///       typed_value: INT,
63///     },
64///   }
65/// }
66/// ```
67///
68/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
69/// value passed to this function.
70pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<VariantArray> {
71    if array.typed_value_field().is_some() {
72        return Err(ArrowError::InvalidArgumentError(
73            "Input is already shredded".to_string(),
74        ));
75    }
76
77    if array.value_field().is_none() {
78        // all-null case -- nothing to do.
79        return Ok(array.clone());
80    };
81
82    let cast_options = CastOptions::default();
83    let mut builder = make_variant_to_shredded_variant_arrow_row_builder(
84        as_type,
85        &cast_options,
86        array.len(),
87        true,
88    )?;
89    for i in 0..array.len() {
90        if array.is_null(i) {
91            builder.append_null()?;
92        } else {
93            builder.append_value(array.value(i))?;
94        }
95    }
96    let (value, typed_value, nulls) = builder.finish()?;
97    Ok(VariantArray::from_parts(
98        array.metadata_field().clone(),
99        Some(value),
100        Some(typed_value),
101        nulls,
102    ))
103}
104
105pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>(
106    data_type: &'a DataType,
107    cast_options: &'a CastOptions,
108    capacity: usize,
109    top_level: bool,
110) -> Result<VariantToShreddedVariantRowBuilder<'a>> {
111    let builder = match data_type {
112        DataType::Struct(fields) => {
113            let typed_value_builder = VariantToShreddedObjectVariantRowBuilder::try_new(
114                fields,
115                cast_options,
116                capacity,
117                top_level,
118            )?;
119            VariantToShreddedVariantRowBuilder::Object(typed_value_builder)
120        }
121        DataType::List(_)
122        | DataType::LargeList(_)
123        | DataType::ListView(_)
124        | DataType::LargeListView(_)
125        | DataType::FixedSizeList(..) => {
126            let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new(
127                data_type,
128                cast_options,
129                capacity,
130            )?;
131            VariantToShreddedVariantRowBuilder::Array(typed_value_builder)
132        }
133        // Supported shredded primitive types, see Variant shredding spec:
134        // https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types
135        DataType::Boolean
136        | DataType::Int8
137        | DataType::Int16
138        | DataType::Int32
139        | DataType::Int64
140        | DataType::Float32
141        | DataType::Float64
142        | DataType::Decimal32(..)
143        | DataType::Decimal64(..)
144        | DataType::Decimal128(..)
145        | DataType::Date32
146        | DataType::Time64(TimeUnit::Microsecond)
147        | DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _)
148        | DataType::Binary
149        | DataType::BinaryView
150        | DataType::LargeBinary
151        | DataType::Utf8
152        | DataType::Utf8View
153        | DataType::LargeUtf8
154        | DataType::FixedSizeBinary(16) // UUID
155        => {
156            let builder =
157                make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?;
158            let typed_value_builder =
159                VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity, top_level);
160            VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder)
161        }
162        DataType::FixedSizeBinary(_) => {
163            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported.")))
164        }
165        _ => {
166            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type")))
167        }
168    };
169    Ok(builder)
170}
171
172pub(crate) enum VariantToShreddedVariantRowBuilder<'a> {
173    Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>),
174    Array(VariantToShreddedArrayVariantRowBuilder<'a>),
175    Object(VariantToShreddedObjectVariantRowBuilder<'a>),
176}
177
178impl<'a> VariantToShreddedVariantRowBuilder<'a> {
179    pub fn append_null(&mut self) -> Result<()> {
180        use VariantToShreddedVariantRowBuilder::*;
181        match self {
182            Primitive(b) => b.append_null(),
183            Array(b) => b.append_null(),
184            Object(b) => b.append_null(),
185        }
186    }
187
188    pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
189        use VariantToShreddedVariantRowBuilder::*;
190        match self {
191            Primitive(b) => b.append_value(value),
192            Array(b) => b.append_value(value),
193            Object(b) => b.append_value(value),
194        }
195    }
196
197    pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
198        use VariantToShreddedVariantRowBuilder::*;
199        match self {
200            Primitive(b) => b.finish(),
201            Array(b) => b.finish(),
202            Object(b) => b.finish(),
203        }
204    }
205}
206
207/// A top-level variant shredder -- appending NULL produces typed_value=NULL and value=Variant::Null
208pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> {
209    value_builder: VariantValueArrayBuilder,
210    typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
211    nulls: NullBufferBuilder,
212    top_level: bool,
213}
214
215impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> {
216    pub(crate) fn new(
217        typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
218        capacity: usize,
219        top_level: bool,
220    ) -> Self {
221        Self {
222            value_builder: VariantValueArrayBuilder::new(capacity),
223            typed_value_builder,
224            nulls: NullBufferBuilder::new(capacity),
225            top_level,
226        }
227    }
228
229    fn append_null(&mut self) -> Result<()> {
230        // Only the top-level struct that represents the variant can be nullable; object fields and
231        // array elements are non-nullable.
232        self.nulls.append(!self.top_level);
233        self.value_builder.append_null();
234        self.typed_value_builder.append_null()
235    }
236
237    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
238        self.nulls.append_non_null();
239        if self.typed_value_builder.append_value(&value)? {
240            self.value_builder.append_null();
241        } else {
242            self.value_builder.append_value(value);
243        }
244        Ok(true)
245    }
246
247    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
248        Ok((
249            self.value_builder.build()?,
250            self.typed_value_builder.finish()?,
251            self.nulls.finish(),
252        ))
253    }
254}
255
256pub(crate) struct VariantToShreddedArrayVariantRowBuilder<'a> {
257    value_builder: VariantValueArrayBuilder,
258    typed_value_builder: ArrayVariantToArrowRowBuilder<'a>,
259}
260
261impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> {
262    fn try_new(
263        data_type: &'a DataType,
264        cast_options: &'a CastOptions,
265        capacity: usize,
266    ) -> Result<Self> {
267        Ok(Self {
268            value_builder: VariantValueArrayBuilder::new(capacity),
269            typed_value_builder: ArrayVariantToArrowRowBuilder::try_new(
270                data_type,
271                cast_options,
272                capacity,
273            )?,
274        })
275    }
276
277    fn append_null(&mut self) -> Result<()> {
278        self.value_builder.append_value(Variant::Null);
279        self.typed_value_builder.append_null()?;
280        Ok(())
281    }
282
283    fn append_value(&mut self, variant: Variant<'_, '_>) -> Result<bool> {
284        // If the variant is not an array, typed_value must be null.
285        // If the variant is an array, value must be null.
286        match variant {
287            Variant::List(list) => {
288                self.value_builder.append_null();
289                self.typed_value_builder
290                    .append_value(&Variant::List(list))?;
291                Ok(true)
292            }
293            other => {
294                self.value_builder.append_value(other);
295                self.typed_value_builder.append_null()?;
296                Ok(false)
297            }
298        }
299    }
300
301    fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
302        Ok((
303            self.value_builder.build()?,
304            self.typed_value_builder.finish()?,
305            // All elements of an array must be present (not missing) because
306            // the array Variant encoding does not allow missing elements
307            None,
308        ))
309    }
310}
311
312pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> {
313    value_builder: VariantValueArrayBuilder,
314    typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>,
315    typed_value_nulls: NullBufferBuilder,
316    nulls: NullBufferBuilder,
317    top_level: bool,
318}
319
320impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
321    fn try_new(
322        fields: &'a Fields,
323        cast_options: &'a CastOptions,
324        capacity: usize,
325        top_level: bool,
326    ) -> Result<Self> {
327        let typed_value_builders = fields.iter().map(|field| {
328            let builder = make_variant_to_shredded_variant_arrow_row_builder(
329                field.data_type(),
330                cast_options,
331                capacity,
332                false,
333            )?;
334            Ok((field.name().as_str(), builder))
335        });
336        Ok(Self {
337            value_builder: VariantValueArrayBuilder::new(capacity),
338            typed_value_builders: typed_value_builders.collect::<Result<_>>()?,
339            typed_value_nulls: NullBufferBuilder::new(capacity),
340            nulls: NullBufferBuilder::new(capacity),
341            top_level,
342        })
343    }
344
345    fn append_null(&mut self) -> Result<()> {
346        // Only the top-level struct that represents the variant can be nullable; object fields and
347        // array elements are non-nullable.
348        self.nulls.append(!self.top_level);
349        self.value_builder.append_null();
350        self.typed_value_nulls.append_null();
351        for (_, typed_value_builder) in &mut self.typed_value_builders {
352            typed_value_builder.append_null()?;
353        }
354        Ok(())
355    }
356
357    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
358        let Variant::Object(ref obj) = value else {
359            // Not an object => fall back
360            self.nulls.append_non_null();
361            self.value_builder.append_value(value);
362            self.typed_value_nulls.append_null();
363            for (_, typed_value_builder) in &mut self.typed_value_builders {
364                typed_value_builder.append_null()?;
365            }
366            return Ok(false);
367        };
368
369        // Route the object's fields by name as either shredded or unshredded
370        let mut builder = self.value_builder.builder_ext(value.metadata());
371        let mut object_builder = builder.try_new_object()?;
372        let mut seen = std::collections::HashSet::new();
373        let mut partially_shredded = false;
374        for (field_name, value) in obj.iter() {
375            match self.typed_value_builders.get_mut(field_name) {
376                Some(typed_value_builder) => {
377                    typed_value_builder.append_value(value)?;
378                    seen.insert(field_name);
379                }
380                None => {
381                    object_builder.insert_bytes(field_name, value);
382                    partially_shredded = true;
383                }
384            }
385        }
386
387        // Handle missing fields
388        for (field_name, typed_value_builder) in &mut self.typed_value_builders {
389            if !seen.contains(field_name) {
390                typed_value_builder.append_null()?;
391            }
392        }
393
394        // Only emit the value if it captured any unshredded object fields
395        if partially_shredded {
396            object_builder.finish();
397        } else {
398            drop(object_builder);
399            self.value_builder.append_null();
400        }
401
402        self.typed_value_nulls.append_non_null();
403        self.nulls.append_non_null();
404        Ok(true)
405    }
406
407    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
408        let mut builder = StructArrayBuilder::new();
409        for (field_name, typed_value_builder) in self.typed_value_builders {
410            let (value, typed_value, nulls) = typed_value_builder.finish()?;
411            let array =
412                ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls);
413            builder = builder.with_field(field_name, ArrayRef::from(array), false);
414        }
415        if let Some(nulls) = self.typed_value_nulls.finish() {
416            builder = builder.with_nulls(nulls);
417        }
418        Ok((
419            self.value_builder.build()?,
420            Arc::new(builder.build()),
421            self.nulls.finish(),
422        ))
423    }
424}
425
426/// Field configuration captured by the builder (data type + nullability).
427#[derive(Clone)]
428pub struct ShreddingField {
429    data_type: DataType,
430    nullable: bool,
431}
432
433impl ShreddingField {
434    fn new(data_type: DataType, nullable: bool) -> Self {
435        Self {
436            data_type,
437            nullable,
438        }
439    }
440
441    fn null() -> Self {
442        Self::new(DataType::Null, true)
443    }
444}
445
446/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or `(DataType, bool)`.
447pub trait IntoShreddingField {
448    fn into_shredding_field(self) -> ShreddingField;
449}
450
451impl IntoShreddingField for FieldRef {
452    fn into_shredding_field(self) -> ShreddingField {
453        ShreddingField::new(self.data_type().clone(), self.is_nullable())
454    }
455}
456
457impl IntoShreddingField for &DataType {
458    fn into_shredding_field(self) -> ShreddingField {
459        ShreddingField::new(self.clone(), true)
460    }
461}
462
463impl IntoShreddingField for DataType {
464    fn into_shredding_field(self) -> ShreddingField {
465        ShreddingField::new(self, true)
466    }
467}
468
469impl IntoShreddingField for (&DataType, bool) {
470    fn into_shredding_field(self) -> ShreddingField {
471        ShreddingField::new(self.0.clone(), self.1)
472    }
473}
474
475impl IntoShreddingField for (DataType, bool) {
476    fn into_shredding_field(self) -> ShreddingField {
477        ShreddingField::new(self.0, self.1)
478    }
479}
480
481/// Builder for constructing a variant shredding schema.
482///
483/// The builder pattern makes it easy to incrementally define which fields
484/// should be shredded and with what types. Fields are nullable by default; pass
485/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
486///
487/// Note: this builder currently only supports struct fields. List support
488/// will be added in the future.
489///
490/// # Example
491///
492/// ```
493/// use std::sync::Arc;
494/// use arrow::datatypes::{DataType, Field, TimeUnit};
495/// use parquet_variant::{VariantPath, VariantPathElement};
496/// use parquet_variant_compute::ShreddedSchemaBuilder;
497///
498/// fn main() -> Result<(), arrow::error::ArrowError> {
499///     // Define the shredding schema using the builder
500///     let shredding_type = ShreddedSchemaBuilder::default()
501///     // store the "time" field as a separate UTC timestamp
502///     .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), true))?
503///     // store hostname as non-nullable Utf8
504///     .with_path("hostname", (&DataType::Utf8, false))?
505///     // pass a FieldRef directly
506///     .with_path(
507///         "metadata.trace_id",
508///         Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16), false)),
509///     )?
510///     // field name with a dot: use VariantPath to avoid splitting
511///     .with_path(
512///         VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
513///         &DataType::Float64,
514///     )?
515///     .build();
516///    Ok(())
517/// }
518/// // The shredding_type can now be passed to shred_variant:
519/// // let shredded = shred_variant(&input, &shredding_type)?;
520/// ```
521#[derive(Default, Clone)]
522pub struct ShreddedSchemaBuilder {
523    root: VariantSchemaNode,
524}
525
526impl ShreddedSchemaBuilder {
527    /// Create a new empty schema builder.
528    pub fn new() -> Self {
529        Self::default()
530    }
531
532    /// Insert a typed path into the schema using dot notation (or any
533    /// [`VariantPath`] convertible).
534    ///
535    /// The path uses dot notation to specify nested fields.
536    /// For example, "a.b.c" will create a nested structure.
537    ///
538    /// # Arguments
539    ///
540    /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
541    /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g. `FieldRef`,
542    ///   `&DataType`, or `(&DataType, bool)` to control nullability)
543    pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Result<Self>
544    where
545        P: TryInto<VariantPath<'a>>,
546        P::Error: std::fmt::Debug,
547        F: IntoShreddingField,
548    {
549        let path: VariantPath<'a> = path
550            .try_into()
551            .map_err(|e| ArrowError::InvalidArgumentError(format!("{:?}", e)))?;
552        self.root.insert_path(&path, field.into_shredding_field());
553        Ok(self)
554    }
555
556    /// Build the final [`DataType`].
557    pub fn build(self) -> DataType {
558        let shredding_type = self.root.to_shredding_type();
559        match shredding_type {
560            Some(shredding_type) => shredding_type,
561            None => DataType::Null,
562        }
563    }
564}
565
566/// Internal tree node structure for building variant schemas.
567#[derive(Clone)]
568enum VariantSchemaNode {
569    /// A leaf node with a primitive/scalar type (and nullability)
570    Leaf(ShreddingField),
571    /// An inner struct node with nested fields
572    Struct(BTreeMap<String, VariantSchemaNode>),
573}
574
575impl Default for VariantSchemaNode {
576    fn default() -> Self {
577        Self::Leaf(ShreddingField::null())
578    }
579}
580
581impl VariantSchemaNode {
582    /// Insert a path into this node with the given data type.
583    fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
584        self.insert_path_elements(path, field);
585    }
586
587    fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], field: ShreddingField) {
588        let Some((head, tail)) = segments.split_first() else {
589            *self = Self::Leaf(field);
590            return;
591        };
592
593        match head {
594            VariantPathElement::Field { name } => {
595                // Ensure this node is a Struct node
596                let children = match self {
597                    Self::Struct(children) => children,
598                    _ => {
599                        *self = Self::Struct(BTreeMap::new());
600                        match self {
601                            Self::Struct(children) => children,
602                            _ => unreachable!(),
603                        }
604                    }
605                };
606
607                children
608                    .entry(name.to_string())
609                    .or_default()
610                    .insert_path_elements(tail, field);
611            }
612            VariantPathElement::Index { .. } => {
613                // List support to be added later; reject for now
614                unreachable!("List paths are not supported yet");
615            }
616        }
617    }
618
619    /// Convert this node to a shredding type.
620    ///
621    /// Returns the [`DataType`] for passing to [`shred_variant`].
622    fn to_shredding_type(&self) -> Option<DataType> {
623        match self {
624            Self::Leaf(field) => Some(field.data_type.clone()),
625            Self::Struct(children) => {
626                let child_fields: Vec<_> = children
627                    .iter()
628                    .filter_map(|(name, child)| child.to_shredding_field(name))
629                    .collect();
630                if child_fields.is_empty() {
631                    None
632                } else {
633                    Some(DataType::Struct(Fields::from(child_fields)))
634                }
635            }
636        }
637    }
638
639    fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
640        match self {
641            Self::Leaf(field) => Some(Arc::new(Field::new(
642                name,
643                field.data_type.clone(),
644                field.nullable,
645            ))),
646            Self::Struct(_) => self
647                .to_shredding_type()
648                .map(|data_type| Arc::new(Field::new(name, data_type, true))),
649        }
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    use crate::VariantArrayBuilder;
657    use arrow::array::{
658        Array, BinaryViewArray, FixedSizeBinaryArray, Float64Array, GenericListArray,
659        GenericListViewArray, Int64Array, LargeBinaryArray, LargeStringArray, ListArray,
660        ListLikeArray, OffsetSizeTrait, PrimitiveArray, StringArray,
661    };
662    use arrow::datatypes::{
663        ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode,
664    };
665    use parquet_variant::{
666        BuilderSpecificState, EMPTY_VARIANT_METADATA_BYTES, ObjectBuilder, ReadOnlyMetadataBuilder,
667        Variant, VariantBuilder, VariantPath, VariantPathElement,
668    };
669    use std::sync::Arc;
670    use uuid::Uuid;
671
672    #[derive(Clone)]
673    enum VariantValue<'a> {
674        Value(Variant<'a, 'a>),
675        List(Vec<VariantValue<'a>>),
676        Object(Vec<(&'a str, VariantValue<'a>)>),
677        Null,
678    }
679
680    impl<'a, T> From<T> for VariantValue<'a>
681    where
682        T: Into<Variant<'a, 'a>>,
683    {
684        fn from(value: T) -> Self {
685            Self::Value(value.into())
686        }
687    }
688
689    #[derive(Clone)]
690    enum VariantRow<'a> {
691        Value(VariantValue<'a>),
692        List(Vec<VariantValue<'a>>),
693        Object(Vec<(&'a str, VariantValue<'a>)>),
694        Null,
695    }
696
697    fn build_variant_array(rows: Vec<VariantRow<'static>>) -> VariantArray {
698        let mut builder = VariantArrayBuilder::new(rows.len());
699
700        fn append_variant_value<B: VariantBuilderExt>(builder: &mut B, value: VariantValue) {
701            match value {
702                VariantValue::Value(v) => builder.append_value(v),
703                VariantValue::List(values) => {
704                    let mut list = builder.new_list();
705                    for v in values {
706                        append_variant_value(&mut list, v);
707                    }
708                    list.finish();
709                }
710                VariantValue::Object(fields) => {
711                    let mut object = builder.new_object();
712                    for (name, value) in fields {
713                        append_variant_field(&mut object, name, value);
714                    }
715                    object.finish();
716                }
717                VariantValue::Null => builder.append_null(),
718            }
719        }
720
721        fn append_variant_field<'a, S: BuilderSpecificState>(
722            object: &mut ObjectBuilder<'_, S>,
723            name: &'a str,
724            value: VariantValue<'a>,
725        ) {
726            match value {
727                VariantValue::Value(v) => {
728                    object.insert(name, v);
729                }
730                VariantValue::List(values) => {
731                    let mut list = object.new_list(name);
732                    for v in values {
733                        append_variant_value(&mut list, v);
734                    }
735                    list.finish();
736                }
737                VariantValue::Object(fields) => {
738                    let mut nested = object.new_object(name);
739                    for (field_name, v) in fields {
740                        append_variant_field(&mut nested, field_name, v);
741                    }
742                    nested.finish();
743                }
744                VariantValue::Null => {
745                    object.insert(name, Variant::Null);
746                }
747            }
748        }
749
750        rows.into_iter().for_each(|row| match row {
751            VariantRow::Value(value) => append_variant_value(&mut builder, value),
752            VariantRow::List(values) => {
753                let mut list = builder.new_list();
754                for value in values {
755                    append_variant_value(&mut list, value);
756                }
757                list.finish();
758            }
759            VariantRow::Object(fields) => {
760                let mut object = builder.new_object();
761                for (name, value) in fields {
762                    append_variant_field(&mut object, name, value);
763                }
764                object.finish();
765            }
766            VariantRow::Null => builder.append_null(),
767        });
768        builder.build()
769    }
770
771    trait TestListLikeArray: ListLikeArray {
772        type OffsetSize: OffsetSizeTrait;
773        fn value_offsets(&self) -> Option<&[Self::OffsetSize]>;
774        fn value_size(&self, index: usize) -> Self::OffsetSize;
775    }
776
777    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListArray<O> {
778        type OffsetSize = O;
779
780        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
781            Some(GenericListArray::value_offsets(self))
782        }
783
784        fn value_size(&self, index: usize) -> Self::OffsetSize {
785            GenericListArray::value_length(self, index)
786        }
787    }
788
789    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListViewArray<O> {
790        type OffsetSize = O;
791
792        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
793            Some(GenericListViewArray::value_offsets(self))
794        }
795
796        fn value_size(&self, index: usize) -> Self::OffsetSize {
797            GenericListViewArray::value_size(self, index)
798        }
799    }
800
801    fn downcast_list_like_array<O: OffsetSizeTrait>(
802        array: &VariantArray,
803    ) -> &dyn TestListLikeArray<OffsetSize = O> {
804        let typed_value = array.typed_value_field().unwrap();
805        if let Some(list) = typed_value.as_any().downcast_ref::<GenericListArray<O>>() {
806            list
807        } else if let Some(list_view) = typed_value
808            .as_any()
809            .downcast_ref::<GenericListViewArray<O>>()
810        {
811            list_view
812        } else {
813            panic!(
814                "Expected list-like typed_value with matching offset type, got {}",
815                typed_value.data_type()
816            );
817        }
818    }
819
820    fn assert_list_structure<O: OffsetSizeTrait>(
821        array: &VariantArray,
822        expected_len: usize,
823        expected_offsets: &[O],
824        expected_sizes: &[Option<O>],
825        expected_fallbacks: &[Option<Variant<'static, 'static>>],
826    ) {
827        assert_eq!(array.len(), expected_len);
828
829        let fallbacks = (array.value_field().unwrap(), Some(array.metadata_field()));
830        let array = downcast_list_like_array::<O>(array);
831
832        assert_eq!(
833            array.value_offsets().unwrap(),
834            expected_offsets,
835            "list offsets mismatch"
836        );
837        assert_eq!(
838            array.len(),
839            expected_sizes.len(),
840            "expected_sizes should match array length"
841        );
842        assert_eq!(
843            array.len(),
844            expected_fallbacks.len(),
845            "expected_fallbacks should match array length"
846        );
847        assert_eq!(
848            array.len(),
849            fallbacks.0.len(),
850            "fallbacks value field should match array length"
851        );
852
853        // Validate per-row shredding outcomes for the list array
854        for (idx, (expected_size, expected_fallback)) in expected_sizes
855            .iter()
856            .zip(expected_fallbacks.iter())
857            .enumerate()
858        {
859            match expected_size {
860                Some(len) => {
861                    // Successfully shredded: typed list value present, no fallback value
862                    assert!(array.is_valid(idx));
863                    assert_eq!(array.value_size(idx), *len);
864                    assert!(fallbacks.0.is_null(idx));
865                }
866                None => {
867                    // Unable to shred: typed list value absent, fallback should carry the variant
868                    assert!(array.is_null(idx));
869                    assert_eq!(array.value_size(idx), O::zero());
870                    match expected_fallback {
871                        Some(expected_variant) => {
872                            assert!(fallbacks.0.is_valid(idx));
873                            let metadata_bytes = fallbacks
874                                .1
875                                .filter(|m| m.is_valid(idx))
876                                .map(|m| m.value(idx))
877                                .filter(|bytes| !bytes.is_empty())
878                                .unwrap_or(EMPTY_VARIANT_METADATA_BYTES);
879                            assert_eq!(
880                                Variant::new(metadata_bytes, fallbacks.0.value(idx)),
881                                expected_variant.clone()
882                            );
883                        }
884                        None => unreachable!(),
885                    }
886                }
887            }
888        }
889    }
890
891    fn assert_list_structure_and_elements<T: ArrowPrimitiveType, O: OffsetSizeTrait>(
892        array: &VariantArray,
893        expected_len: usize,
894        expected_offsets: &[O],
895        expected_sizes: &[Option<O>],
896        expected_fallbacks: &[Option<Variant<'static, 'static>>],
897        expected_shredded_elements: (&[Option<T::Native>], &[Option<Variant<'static, 'static>>]),
898    ) {
899        assert_list_structure(
900            array,
901            expected_len,
902            expected_offsets,
903            expected_sizes,
904            expected_fallbacks,
905        );
906        let array = downcast_list_like_array::<O>(array);
907
908        // Validate the shredded state of list elements (typed values and fallbacks)
909        let (expected_values, expected_fallbacks) = expected_shredded_elements;
910        assert_eq!(
911            expected_values.len(),
912            expected_fallbacks.len(),
913            "expected_values and expected_fallbacks should be aligned"
914        );
915
916        // Validate the shredded primitive values for list elements
917        let element_array = ShreddedVariantFieldArray::try_new(array.values().as_ref()).unwrap();
918        let element_values = element_array
919            .typed_value_field()
920            .unwrap()
921            .as_any()
922            .downcast_ref::<PrimitiveArray<T>>()
923            .unwrap();
924        assert_eq!(element_values.len(), expected_values.len());
925        for (idx, expected_value) in expected_values.iter().enumerate() {
926            match expected_value {
927                Some(value) => {
928                    assert!(element_values.is_valid(idx));
929                    assert_eq!(element_values.value(idx), *value);
930                }
931                None => assert!(element_values.is_null(idx)),
932            }
933        }
934
935        // Validate fallback variants for list elements that could not be shredded
936        let element_fallbacks = element_array.value_field().unwrap();
937        assert_eq!(element_fallbacks.len(), expected_fallbacks.len());
938        for (idx, expected_fallback) in expected_fallbacks.iter().enumerate() {
939            match expected_fallback {
940                Some(expected_variant) => {
941                    assert!(element_fallbacks.is_valid(idx));
942                    assert_eq!(
943                        Variant::new(EMPTY_VARIANT_METADATA_BYTES, element_fallbacks.value(idx)),
944                        expected_variant.clone()
945                    );
946                }
947                None => assert!(element_fallbacks.is_null(idx)),
948            }
949        }
950    }
951
952    #[test]
953    fn test_already_shredded_input_error() {
954        // Create a VariantArray that already has typed_value_field
955        // First create a valid VariantArray, then extract its parts to construct a shredded one
956        let temp_array = VariantArray::from_iter(vec![Some(Variant::from("test"))]);
957        let metadata = temp_array.metadata_field().clone();
958        let value = temp_array.value_field().unwrap().clone();
959        let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef;
960
961        let shredded_array =
962            VariantArray::from_parts(metadata, Some(value), Some(typed_value), None);
963
964        let result = shred_variant(&shredded_array, &DataType::Int64);
965        assert!(matches!(
966            result.unwrap_err(),
967            ArrowError::InvalidArgumentError(_)
968        ));
969    }
970
971    #[test]
972    fn test_all_null_input() {
973        // Create VariantArray with no value field (all null case)
974        let metadata = BinaryViewArray::from_iter_values([&[1u8, 0u8]]); // minimal valid metadata
975        let all_null_array = VariantArray::from_parts(metadata, None, None, None);
976        let result = shred_variant(&all_null_array, &DataType::Int64).unwrap();
977
978        // Should return array with no value/typed_value fields
979        assert!(result.value_field().is_none());
980        assert!(result.typed_value_field().is_none());
981    }
982
983    #[test]
984    fn test_invalid_fixed_size_binary_shredding() {
985        let mock_uuid_1 = Uuid::new_v4();
986
987        let input = VariantArray::from_iter([Some(Variant::from(mock_uuid_1)), None]);
988
989        // shred_variant only supports FixedSizeBinary(16). Any other length will err.
990        let err = shred_variant(&input, &DataType::FixedSizeBinary(17)).unwrap_err();
991
992        assert_eq!(
993            err.to_string(),
994            "Invalid argument error: FixedSizeBinary(17) is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported."
995        );
996    }
997
998    #[test]
999    fn test_uuid_shredding() {
1000        let mock_uuid_1 = Uuid::new_v4();
1001        let mock_uuid_2 = Uuid::new_v4();
1002
1003        let input = VariantArray::from_iter([
1004            Some(Variant::from(mock_uuid_1)),
1005            None,
1006            Some(Variant::from(false)),
1007            Some(Variant::from(mock_uuid_2)),
1008        ]);
1009
1010        let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();
1011
1012        // // inspect the typed_value Field and make sure it contains the canonical Uuid extension type
1013        // let typed_value_field = variant_array
1014        //     .inner()
1015        //     .fields()
1016        //     .into_iter()
1017        //     .find(|f| f.name() == "typed_value")
1018        //     .unwrap();
1019
1020        // assert!(
1021        //     typed_value_field
1022        //         .try_extension_type::<extension::Uuid>()
1023        //         .is_ok()
1024        // );
1025
1026        // probe the downcasted typed_value array to make sure uuids are shredded correctly
1027        let uuids = variant_array
1028            .typed_value_field()
1029            .unwrap()
1030            .as_any()
1031            .downcast_ref::<FixedSizeBinaryArray>()
1032            .unwrap();
1033
1034        assert_eq!(uuids.len(), 4);
1035
1036        assert!(!uuids.is_null(0));
1037
1038        let got_uuid_1: &[u8] = uuids.value(0);
1039        assert_eq!(got_uuid_1, mock_uuid_1.as_bytes());
1040
1041        assert!(uuids.is_null(1));
1042        assert!(uuids.is_null(2));
1043
1044        assert!(!uuids.is_null(3));
1045
1046        let got_uuid_2: &[u8] = uuids.value(3);
1047        assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
1048    }
1049
1050    #[test]
1051    fn test_primitive_shredding_comprehensive() {
1052        // Test mixed scenarios in a single array
1053        let input = VariantArray::from_iter(vec![
1054            Some(Variant::from(42i64)),   // successful shred
1055            Some(Variant::from("hello")), // failed shred (string)
1056            Some(Variant::from(100i64)),  // successful shred
1057            None,                         // array-level null
1058            Some(Variant::Null),          // variant null
1059            Some(Variant::from(3i8)),     // successful shred (int8->int64 conversion)
1060        ]);
1061
1062        let result = shred_variant(&input, &DataType::Int64).unwrap();
1063
1064        // Verify structure
1065        let metadata_field = result.metadata_field();
1066        let value_field = result.value_field().unwrap();
1067        let typed_value_field = result
1068            .typed_value_field()
1069            .unwrap()
1070            .as_any()
1071            .downcast_ref::<Int64Array>()
1072            .unwrap();
1073
1074        // Check specific outcomes for each row
1075        assert_eq!(result.len(), 6);
1076
1077        // Row 0: 42 -> should shred successfully
1078        assert!(!result.is_null(0));
1079        assert!(value_field.is_null(0)); // value should be null when shredded
1080        assert!(!typed_value_field.is_null(0));
1081        assert_eq!(typed_value_field.value(0), 42);
1082
1083        // Row 1: "hello" -> should fail to shred
1084        assert!(!result.is_null(1));
1085        assert!(!value_field.is_null(1)); // value should contain original
1086        assert!(typed_value_field.is_null(1)); // typed_value should be null
1087        assert_eq!(
1088            Variant::new(metadata_field.value(1), value_field.value(1)),
1089            Variant::from("hello")
1090        );
1091
1092        // Row 2: 100 -> should shred successfully
1093        assert!(!result.is_null(2));
1094        assert!(value_field.is_null(2));
1095        assert_eq!(typed_value_field.value(2), 100);
1096
1097        // Row 3: array null -> should be null in result
1098        assert!(result.is_null(3));
1099
1100        // Row 4: Variant::Null -> should not shred (it's a null variant, not an integer)
1101        assert!(!result.is_null(4));
1102        assert!(!value_field.is_null(4)); // should contain Variant::Null
1103        assert_eq!(
1104            Variant::new(metadata_field.value(4), value_field.value(4)),
1105            Variant::Null
1106        );
1107        assert!(typed_value_field.is_null(4));
1108
1109        // Row 5: 3i8 -> should shred successfully (int8->int64 conversion)
1110        assert!(!result.is_null(5));
1111        assert!(value_field.is_null(5)); // value should be null when shredded
1112        assert!(!typed_value_field.is_null(5));
1113        assert_eq!(typed_value_field.value(5), 3);
1114    }
1115
1116    #[test]
1117    fn test_primitive_different_target_types() {
1118        let input = VariantArray::from_iter(vec![
1119            Variant::from(42i32),
1120            Variant::from(3.15f64),
1121            Variant::from("not_a_number"),
1122        ]);
1123
1124        // Test Int32 target
1125        let result_int32 = shred_variant(&input, &DataType::Int32).unwrap();
1126        let typed_value_int32 = result_int32
1127            .typed_value_field()
1128            .unwrap()
1129            .as_any()
1130            .downcast_ref::<arrow::array::Int32Array>()
1131            .unwrap();
1132        assert_eq!(typed_value_int32.value(0), 42);
1133        assert!(typed_value_int32.is_null(1)); // float doesn't convert to int32
1134        assert!(typed_value_int32.is_null(2)); // string doesn't convert to int32
1135
1136        // Test Float64 target
1137        let result_float64 = shred_variant(&input, &DataType::Float64).unwrap();
1138        let typed_value_float64 = result_float64
1139            .typed_value_field()
1140            .unwrap()
1141            .as_any()
1142            .downcast_ref::<Float64Array>()
1143            .unwrap();
1144        assert_eq!(typed_value_float64.value(0), 42.0); // int converts to float
1145        assert_eq!(typed_value_float64.value(1), 3.15);
1146        assert!(typed_value_float64.is_null(2)); // string doesn't convert
1147    }
1148
1149    #[test]
1150    fn test_largeutf8_shredding() {
1151        let input = VariantArray::from_iter(vec![
1152            Some(Variant::from("hello")),
1153            Some(Variant::from(42i64)),
1154            None,
1155            Some(Variant::Null),
1156            Some(Variant::from("world")),
1157        ]);
1158
1159        let result = shred_variant(&input, &DataType::LargeUtf8).unwrap();
1160        let metadata = result.metadata_field();
1161        let value = result.value_field().unwrap();
1162        let typed_value = result
1163            .typed_value_field()
1164            .unwrap()
1165            .as_any()
1166            .downcast_ref::<LargeStringArray>()
1167            .unwrap();
1168
1169        assert_eq!(result.len(), 5);
1170
1171        // Row 0: string shreds to typed_value
1172        assert!(result.is_valid(0));
1173        assert!(value.is_null(0));
1174        assert_eq!(typed_value.value(0), "hello");
1175
1176        // Row 1: integer falls back to value
1177        assert!(result.is_valid(1));
1178        assert!(value.is_valid(1));
1179        assert!(typed_value.is_null(1));
1180        assert_eq!(
1181            Variant::new(metadata.value(1), value.value(1)),
1182            Variant::from(42i64)
1183        );
1184
1185        // Row 2: top-level null
1186        assert!(result.is_null(2));
1187        assert!(value.is_null(2));
1188        assert!(typed_value.is_null(2));
1189
1190        // Row 3: variant null falls back to value
1191        assert!(result.is_valid(3));
1192        assert!(value.is_valid(3));
1193        assert!(typed_value.is_null(3));
1194        assert_eq!(
1195            Variant::new(metadata.value(3), value.value(3)),
1196            Variant::Null
1197        );
1198
1199        // Row 4: string shreds to typed_value
1200        assert!(result.is_valid(4));
1201        assert!(value.is_null(4));
1202        assert_eq!(typed_value.value(4), "world");
1203    }
1204
1205    #[test]
1206    fn test_largebinary_shredding() {
1207        let input = VariantArray::from_iter(vec![
1208            Some(Variant::from(&b"\x00\x01\x02"[..])),
1209            Some(Variant::from("not_binary")),
1210            None,
1211            Some(Variant::Null),
1212            Some(Variant::from(&b"\xff\xaa"[..])),
1213        ]);
1214
1215        let result = shred_variant(&input, &DataType::LargeBinary).unwrap();
1216        let metadata = result.metadata_field();
1217        let value = result.value_field().unwrap();
1218        let typed_value = result
1219            .typed_value_field()
1220            .unwrap()
1221            .as_any()
1222            .downcast_ref::<LargeBinaryArray>()
1223            .unwrap();
1224
1225        assert_eq!(result.len(), 5);
1226
1227        // Row 0: binary shreds to typed_value
1228        assert!(result.is_valid(0));
1229        assert!(value.is_null(0));
1230        assert_eq!(typed_value.value(0), &[0x00, 0x01, 0x02]);
1231
1232        // Row 1: string falls back to value
1233        assert!(result.is_valid(1));
1234        assert!(value.is_valid(1));
1235        assert!(typed_value.is_null(1));
1236        assert_eq!(
1237            Variant::new(metadata.value(1), value.value(1)),
1238            Variant::from("not_binary")
1239        );
1240
1241        // Row 2: top-level null
1242        assert!(result.is_null(2));
1243        assert!(value.is_null(2));
1244        assert!(typed_value.is_null(2));
1245
1246        // Row 3: variant null falls back to value
1247        assert!(result.is_valid(3));
1248        assert!(value.is_valid(3));
1249        assert!(typed_value.is_null(3));
1250        assert_eq!(
1251            Variant::new(metadata.value(3), value.value(3)),
1252            Variant::Null
1253        );
1254
1255        // Row 4: binary shreds to typed_value
1256        assert!(result.is_valid(4));
1257        assert!(value.is_null(4));
1258        assert_eq!(typed_value.value(4), &[0xff, 0xaa]);
1259    }
1260
1261    #[test]
1262    fn test_invalid_shredded_types_rejected() {
1263        let input = VariantArray::from_iter([Variant::from(42)]);
1264
1265        let invalid_types = vec![
1266            DataType::UInt8,
1267            DataType::Float16,
1268            DataType::Decimal256(38, 10),
1269            DataType::Date64,
1270            DataType::Time32(TimeUnit::Second),
1271            DataType::Time64(TimeUnit::Nanosecond),
1272            DataType::Timestamp(TimeUnit::Millisecond, None),
1273            DataType::FixedSizeBinary(17),
1274            DataType::Union(
1275                UnionFields::from_fields(vec![
1276                    Field::new("int_field", DataType::Int32, false),
1277                    Field::new("str_field", DataType::Utf8, true),
1278                ]),
1279                UnionMode::Dense,
1280            ),
1281            DataType::Map(
1282                Arc::new(Field::new(
1283                    "entries",
1284                    DataType::Struct(Fields::from(vec![
1285                        Field::new("key", DataType::Utf8, false),
1286                        Field::new("value", DataType::Int32, true),
1287                    ])),
1288                    false,
1289                )),
1290                false,
1291            ),
1292            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1293            DataType::RunEndEncoded(
1294                Arc::new(Field::new("run_ends", DataType::Int32, false)),
1295                Arc::new(Field::new("values", DataType::Utf8, true)),
1296            ),
1297        ];
1298
1299        for data_type in invalid_types {
1300            let err = shred_variant(&input, &data_type).unwrap_err();
1301            assert!(
1302                matches!(err, ArrowError::InvalidArgumentError(_)),
1303                "expected InvalidArgumentError for {:?}, got {:?}",
1304                data_type,
1305                err
1306            );
1307        }
1308    }
1309
1310    #[test]
1311    fn test_array_shredding_as_list() {
1312        let input = build_variant_array(vec![
1313            // Row 0: List of ints should shred entirely into typed_value
1314            VariantRow::List(vec![
1315                VariantValue::from(1i64),
1316                VariantValue::from(2i64),
1317                VariantValue::from(3i64),
1318            ]),
1319            // Row 1: Contains incompatible types so values fall back
1320            VariantRow::List(vec![
1321                VariantValue::from(1i64),
1322                VariantValue::from("two"),
1323                VariantValue::from(Variant::Null),
1324            ]),
1325            // Row 2: Not a list -> entire row falls back
1326            VariantRow::Value(VariantValue::from("not a list")),
1327            // Row 3: Array-level null propagates
1328            VariantRow::Null,
1329            // Row 4: Empty list exercises zero-length offsets
1330            VariantRow::List(vec![]),
1331        ]);
1332        let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1333        let result = shred_variant(&input, &list_schema).unwrap();
1334        assert_eq!(result.len(), 5);
1335
1336        assert_list_structure_and_elements::<Int64Type, i32>(
1337            &result,
1338            5,
1339            &[0, 3, 6, 6, 6, 6],
1340            &[Some(3), Some(3), None, None, Some(0)],
1341            &[
1342                None,
1343                None,
1344                Some(Variant::from("not a list")),
1345                Some(Variant::Null),
1346                None,
1347            ],
1348            (
1349                &[Some(1), Some(2), Some(3), Some(1), None, None],
1350                &[
1351                    None,
1352                    None,
1353                    None,
1354                    None,
1355                    Some(Variant::from("two")),
1356                    Some(Variant::Null),
1357                ],
1358            ),
1359        );
1360    }
1361
1362    #[test]
1363    fn test_array_shredding_as_large_list() {
1364        let input = build_variant_array(vec![
1365            // Row 0: List of ints shreds to typed_value
1366            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1367            // Row 1: Not a list -> entire row falls back
1368            VariantRow::Value(VariantValue::from("not a list")),
1369            // Row 2: Empty list
1370            VariantRow::List(vec![]),
1371        ]);
1372        let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true)));
1373        let result = shred_variant(&input, &list_schema).unwrap();
1374        assert_eq!(result.len(), 3);
1375
1376        assert_list_structure_and_elements::<Int64Type, i64>(
1377            &result,
1378            3,
1379            &[0, 2, 2, 2],
1380            &[Some(2), None, Some(0)],
1381            &[None, Some(Variant::from("not a list")), None],
1382            (&[Some(1), Some(2)], &[None, None]),
1383        );
1384    }
1385
1386    #[test]
1387    fn test_array_shredding_as_list_view() {
1388        let input = build_variant_array(vec![
1389            // Row 0: Standard list
1390            VariantRow::List(vec![
1391                VariantValue::from(1i64),
1392                VariantValue::from(2i64),
1393                VariantValue::from(3i64),
1394            ]),
1395            // Row 1: List with incompatible types -> element fallback
1396            VariantRow::List(vec![
1397                VariantValue::from(1i64),
1398                VariantValue::from("two"),
1399                VariantValue::from(Variant::Null),
1400            ]),
1401            // Row 2: Not a list -> top-level fallback
1402            VariantRow::Value(VariantValue::from("not a list")),
1403            // Row 3: Top-level Null
1404            VariantRow::Null,
1405            // Row 4: Empty list
1406            VariantRow::List(vec![]),
1407        ]);
1408        let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Int64, true)));
1409        let result = shred_variant(&input, &list_schema).unwrap();
1410        assert_eq!(result.len(), 5);
1411
1412        assert_list_structure_and_elements::<Int64Type, i32>(
1413            &result,
1414            5,
1415            &[0, 3, 6, 6, 6],
1416            &[Some(3), Some(3), None, None, Some(0)],
1417            &[
1418                None,
1419                None,
1420                Some(Variant::from("not a list")),
1421                Some(Variant::Null),
1422                None,
1423            ],
1424            (
1425                &[Some(1), Some(2), Some(3), Some(1), None, None],
1426                &[
1427                    None,
1428                    None,
1429                    None,
1430                    None,
1431                    Some(Variant::from("two")),
1432                    Some(Variant::Null),
1433                ],
1434            ),
1435        );
1436    }
1437
1438    #[test]
1439    fn test_array_shredding_as_large_list_view() {
1440        let input = build_variant_array(vec![
1441            // Row 0: List of ints shreds to typed_value
1442            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1443            // Row 1: Not a list -> entire row falls back
1444            VariantRow::Value(VariantValue::from("fallback")),
1445            // Row 2: Empty list
1446            VariantRow::List(vec![]),
1447        ]);
1448        let list_schema =
1449            DataType::LargeListView(Arc::new(Field::new("item", DataType::Int64, true)));
1450        let result = shred_variant(&input, &list_schema).unwrap();
1451        assert_eq!(result.len(), 3);
1452
1453        assert_list_structure_and_elements::<Int64Type, i64>(
1454            &result,
1455            3,
1456            &[0, 2, 2],
1457            &[Some(2), None, Some(0)],
1458            &[None, Some(Variant::from("fallback")), None],
1459            (&[Some(1), Some(2)], &[None, None]),
1460        );
1461    }
1462
1463    #[test]
1464    fn test_array_shredding_as_fixed_size_list() {
1465        let input = build_variant_array(vec![VariantRow::List(vec![
1466            VariantValue::from(1i64),
1467            VariantValue::from(2i64),
1468            VariantValue::from(3i64),
1469        ])]);
1470        let list_schema =
1471            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
1472        let err = shred_variant(&input, &list_schema).unwrap_err();
1473        assert_eq!(
1474            err.to_string(),
1475            "Not yet implemented: Converting unshredded variant arrays to arrow fixed-size lists"
1476        );
1477    }
1478
1479    #[test]
1480    fn test_array_shredding_with_array_elements() {
1481        let input = build_variant_array(vec![
1482            // Row 0: [[1, 2], [3, 4], []] - clean nested lists
1483            VariantRow::List(vec![
1484                VariantValue::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1485                VariantValue::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]),
1486                VariantValue::List(vec![]),
1487            ]),
1488            // Row 1: [[5, "bad", null], "not a list inner", null] - inner fallbacks
1489            VariantRow::List(vec![
1490                VariantValue::List(vec![
1491                    VariantValue::from(5i64),
1492                    VariantValue::from("bad"),
1493                    VariantValue::from(Variant::Null),
1494                ]),
1495                VariantValue::from("not a list inner"),
1496                VariantValue::Null,
1497            ]),
1498            // Row 2: "not a list" - top-level fallback
1499            VariantRow::Value(VariantValue::from("not a list")),
1500            // Row 3: null row
1501            VariantRow::Null,
1502        ]);
1503        let inner_field = Arc::new(Field::new("item", DataType::Int64, true));
1504        let inner_list_schema = DataType::List(inner_field);
1505        let list_schema = DataType::List(Arc::new(Field::new(
1506            "item",
1507            inner_list_schema.clone(),
1508            true,
1509        )));
1510        let result = shred_variant(&input, &list_schema).unwrap();
1511        assert_eq!(result.len(), 4);
1512
1513        let typed_value = result
1514            .typed_value_field()
1515            .unwrap()
1516            .as_any()
1517            .downcast_ref::<ListArray>()
1518            .unwrap();
1519
1520        assert_list_structure::<i32>(
1521            &result,
1522            4,
1523            &[0, 3, 6, 6, 6],
1524            &[Some(3), Some(3), None, None],
1525            &[
1526                None,
1527                None,
1528                Some(Variant::from("not a list")),
1529                Some(Variant::Null),
1530            ],
1531        );
1532
1533        let outer_elements =
1534            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1535        assert_eq!(outer_elements.len(), 6);
1536        let outer_values = outer_elements
1537            .typed_value_field()
1538            .unwrap()
1539            .as_any()
1540            .downcast_ref::<ListArray>()
1541            .unwrap();
1542        let outer_fallbacks = outer_elements.value_field().unwrap();
1543
1544        let outer_metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n(
1545            EMPTY_VARIANT_METADATA_BYTES,
1546            outer_elements.len(),
1547        ));
1548        let outer_variant = VariantArray::from_parts(
1549            outer_metadata,
1550            Some(outer_fallbacks.clone()),
1551            Some(Arc::new(outer_values.clone())),
1552            None,
1553        );
1554
1555        assert_list_structure_and_elements::<Int64Type, i32>(
1556            &outer_variant,
1557            outer_elements.len(),
1558            &[0, 2, 4, 4, 7, 7, 7],
1559            &[Some(2), Some(2), Some(0), Some(3), None, None],
1560            &[
1561                None,
1562                None,
1563                None,
1564                None,
1565                Some(Variant::from("not a list inner")),
1566                Some(Variant::Null),
1567            ],
1568            (
1569                &[Some(1), Some(2), Some(3), Some(4), Some(5), None, None],
1570                &[
1571                    None,
1572                    None,
1573                    None,
1574                    None,
1575                    None,
1576                    Some(Variant::from("bad")),
1577                    Some(Variant::Null),
1578                ],
1579            ),
1580        );
1581    }
1582
1583    #[test]
1584    fn test_array_shredding_with_object_elements() {
1585        let input = build_variant_array(vec![
1586            // Row 0: [{"id": 1, "name": "Alice"}, {"id": null}] fully shards
1587            VariantRow::List(vec![
1588                VariantValue::Object(vec![
1589                    ("id", VariantValue::from(1i64)),
1590                    ("name", VariantValue::from("Alice")),
1591                ]),
1592                VariantValue::Object(vec![("id", VariantValue::from(Variant::Null))]),
1593            ]),
1594            // Row 1: "not a list" -> fallback
1595            VariantRow::Value(VariantValue::from("not a list")),
1596            // Row 2: Null row
1597            VariantRow::Null,
1598        ]);
1599
1600        // Target schema is List<Struct<id:int64,name:utf8>>
1601        let object_fields = Fields::from(vec![
1602            Field::new("id", DataType::Int64, true),
1603            Field::new("name", DataType::Utf8, true),
1604        ]);
1605        let list_schema = DataType::List(Arc::new(Field::new(
1606            "item",
1607            DataType::Struct(object_fields),
1608            true,
1609        )));
1610        let result = shred_variant(&input, &list_schema).unwrap();
1611        assert_eq!(result.len(), 3);
1612
1613        assert_list_structure::<i32>(
1614            &result,
1615            3,
1616            &[0, 2, 2, 2],
1617            &[Some(2), None, None],
1618            &[None, Some(Variant::from("not a list")), Some(Variant::Null)],
1619        );
1620
1621        // Validate nested struct fields for each element
1622        let typed_value = result
1623            .typed_value_field()
1624            .unwrap()
1625            .as_any()
1626            .downcast_ref::<ListArray>()
1627            .unwrap();
1628        let element_array =
1629            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1630        assert_eq!(element_array.len(), 2);
1631        let element_objects = element_array
1632            .typed_value_field()
1633            .unwrap()
1634            .as_any()
1635            .downcast_ref::<arrow::array::StructArray>()
1636            .unwrap();
1637
1638        // Id field [1, Variant::Null]
1639        let id_field =
1640            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("id").unwrap())
1641                .unwrap();
1642        let id_values = id_field.value_field().unwrap();
1643        let id_typed_values = id_field
1644            .typed_value_field()
1645            .unwrap()
1646            .as_any()
1647            .downcast_ref::<Int64Array>()
1648            .unwrap();
1649        assert!(id_values.is_null(0));
1650        assert_eq!(id_typed_values.value(0), 1);
1651        // null is stored as Variant::Null in values
1652        assert!(id_values.is_valid(1));
1653        assert_eq!(
1654            Variant::new(EMPTY_VARIANT_METADATA_BYTES, id_values.value(1)),
1655            Variant::Null
1656        );
1657        assert!(id_typed_values.is_null(1));
1658
1659        // Name field ["Alice", null]
1660        let name_field =
1661            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("name").unwrap())
1662                .unwrap();
1663        let name_values = name_field.value_field().unwrap();
1664        let name_typed_values = name_field
1665            .typed_value_field()
1666            .unwrap()
1667            .as_any()
1668            .downcast_ref::<StringArray>()
1669            .unwrap();
1670        assert!(name_values.is_null(0));
1671        assert_eq!(name_typed_values.value(0), "Alice");
1672        // No value provided, both value and typed_value are null
1673        assert!(name_values.is_null(1));
1674        assert!(name_typed_values.is_null(1));
1675    }
1676
1677    #[test]
1678    fn test_object_shredding_comprehensive() -> Result<()> {
1679        let input = build_variant_array(vec![
1680            // Row 0: Fully shredded object
1681            VariantRow::Object(vec![
1682                ("score", VariantValue::from(95.5f64)),
1683                ("age", VariantValue::from(30i64)),
1684            ]),
1685            // Row 1: Partially shredded object (extra email field)
1686            VariantRow::Object(vec![
1687                ("score", VariantValue::from(87.2f64)),
1688                ("age", VariantValue::from(25i64)),
1689                ("email", VariantValue::from("bob@example.com")),
1690            ]),
1691            // Row 2: Missing field (no score)
1692            VariantRow::Object(vec![("age", VariantValue::from(35i64))]),
1693            // Row 3: Type mismatch (score is string, age is string)
1694            VariantRow::Object(vec![
1695                ("score", VariantValue::from("ninety-five")),
1696                ("age", VariantValue::from("thirty")),
1697            ]),
1698            // Row 4: Non-object
1699            VariantRow::Value(VariantValue::from("not an object")),
1700            // Row 5: Empty object
1701            VariantRow::Object(vec![]),
1702            // Row 6: Null
1703            VariantRow::Null,
1704            // Row 7: Object with only "wrong" fields
1705            VariantRow::Object(vec![("foo", VariantValue::from(10))]),
1706            // Row 8: Object with one "right" and one "wrong" field
1707            VariantRow::Object(vec![
1708                ("score", VariantValue::from(66.67f64)),
1709                ("foo", VariantValue::from(10)),
1710            ]),
1711        ]);
1712
1713        // Create target schema: struct<score: float64, age: int64>
1714        // Both types are supported for shredding
1715        let target_schema = ShreddedSchemaBuilder::default()
1716            .with_path("score", &DataType::Float64)?
1717            .with_path("age", &DataType::Int64)?
1718            .build();
1719
1720        let result = shred_variant(&input, &target_schema).unwrap();
1721
1722        // Verify structure
1723        assert!(result.value_field().is_some());
1724        assert!(result.typed_value_field().is_some());
1725        assert_eq!(result.len(), 9);
1726
1727        let metadata = result.metadata_field();
1728
1729        let value = result.value_field().unwrap();
1730        let typed_value = result
1731            .typed_value_field()
1732            .unwrap()
1733            .as_any()
1734            .downcast_ref::<arrow::array::StructArray>()
1735            .unwrap();
1736
1737        // Extract score and age fields from typed_value struct
1738        let score_field =
1739            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("score").unwrap())
1740                .unwrap();
1741        let age_field =
1742            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("age").unwrap()).unwrap();
1743
1744        let score_value = score_field
1745            .value_field()
1746            .unwrap()
1747            .as_any()
1748            .downcast_ref::<BinaryViewArray>()
1749            .unwrap();
1750        let score_typed_value = score_field
1751            .typed_value_field()
1752            .unwrap()
1753            .as_any()
1754            .downcast_ref::<Float64Array>()
1755            .unwrap();
1756        let age_value = age_field
1757            .value_field()
1758            .unwrap()
1759            .as_any()
1760            .downcast_ref::<BinaryViewArray>()
1761            .unwrap();
1762        let age_typed_value = age_field
1763            .typed_value_field()
1764            .unwrap()
1765            .as_any()
1766            .downcast_ref::<Int64Array>()
1767            .unwrap();
1768
1769        // Set up exhaustive checking of all shredded columns and their nulls/values
1770        struct ShreddedValue<'m, 'v, T> {
1771            value: Option<Variant<'m, 'v>>,
1772            typed_value: Option<T>,
1773        }
1774        struct ShreddedStruct<'m, 'v> {
1775            score: ShreddedValue<'m, 'v, f64>,
1776            age: ShreddedValue<'m, 'v, i64>,
1777        }
1778        fn get_value<'m, 'v>(
1779            i: usize,
1780            metadata: &'m BinaryViewArray,
1781            value: &'v BinaryViewArray,
1782        ) -> Variant<'m, 'v> {
1783            Variant::new(metadata.value(i), value.value(i))
1784        }
1785        let expect = |i, expected_result: Option<ShreddedValue<ShreddedStruct>>| {
1786            match expected_result {
1787                Some(ShreddedValue {
1788                    value: expected_value,
1789                    typed_value: expected_typed_value,
1790                }) => {
1791                    assert!(result.is_valid(i));
1792                    match expected_value {
1793                        Some(expected_value) => {
1794                            assert!(value.is_valid(i));
1795                            assert_eq!(expected_value, get_value(i, metadata, value));
1796                        }
1797                        None => {
1798                            assert!(value.is_null(i));
1799                        }
1800                    }
1801                    match expected_typed_value {
1802                        Some(ShreddedStruct {
1803                            score: expected_score,
1804                            age: expected_age,
1805                        }) => {
1806                            assert!(typed_value.is_valid(i));
1807                            assert!(score_field.is_valid(i)); // non-nullable
1808                            assert!(age_field.is_valid(i)); // non-nullable
1809                            match expected_score.value {
1810                                Some(expected_score_value) => {
1811                                    assert!(score_value.is_valid(i));
1812                                    assert_eq!(
1813                                        expected_score_value,
1814                                        get_value(i, metadata, score_value)
1815                                    );
1816                                }
1817                                None => {
1818                                    assert!(score_value.is_null(i));
1819                                }
1820                            }
1821                            match expected_score.typed_value {
1822                                Some(expected_score) => {
1823                                    assert!(score_typed_value.is_valid(i));
1824                                    assert_eq!(expected_score, score_typed_value.value(i));
1825                                }
1826                                None => {
1827                                    assert!(score_typed_value.is_null(i));
1828                                }
1829                            }
1830                            match expected_age.value {
1831                                Some(expected_age_value) => {
1832                                    assert!(age_value.is_valid(i));
1833                                    assert_eq!(
1834                                        expected_age_value,
1835                                        get_value(i, metadata, age_value)
1836                                    );
1837                                }
1838                                None => {
1839                                    assert!(age_value.is_null(i));
1840                                }
1841                            }
1842                            match expected_age.typed_value {
1843                                Some(expected_age) => {
1844                                    assert!(age_typed_value.is_valid(i));
1845                                    assert_eq!(expected_age, age_typed_value.value(i));
1846                                }
1847                                None => {
1848                                    assert!(age_typed_value.is_null(i));
1849                                }
1850                            }
1851                        }
1852                        None => {
1853                            assert!(typed_value.is_null(i));
1854                        }
1855                    }
1856                }
1857                None => {
1858                    assert!(result.is_null(i));
1859                }
1860            };
1861        };
1862
1863        // Row 0: Fully shredded - both fields shred successfully
1864        expect(
1865            0,
1866            Some(ShreddedValue {
1867                value: None,
1868                typed_value: Some(ShreddedStruct {
1869                    score: ShreddedValue {
1870                        value: None,
1871                        typed_value: Some(95.5),
1872                    },
1873                    age: ShreddedValue {
1874                        value: None,
1875                        typed_value: Some(30),
1876                    },
1877                }),
1878            }),
1879        );
1880
1881        // Row 1: Partially shredded - value contains extra email field
1882        let mut builder = VariantBuilder::new();
1883        builder
1884            .new_object()
1885            .with_field("email", "bob@example.com")
1886            .finish();
1887        let (m, v) = builder.finish();
1888        let expected_value = Variant::new(&m, &v);
1889
1890        expect(
1891            1,
1892            Some(ShreddedValue {
1893                value: Some(expected_value),
1894                typed_value: Some(ShreddedStruct {
1895                    score: ShreddedValue {
1896                        value: None,
1897                        typed_value: Some(87.2),
1898                    },
1899                    age: ShreddedValue {
1900                        value: None,
1901                        typed_value: Some(25),
1902                    },
1903                }),
1904            }),
1905        );
1906
1907        // Row 2: Fully shredded -- missing score field
1908        expect(
1909            2,
1910            Some(ShreddedValue {
1911                value: None,
1912                typed_value: Some(ShreddedStruct {
1913                    score: ShreddedValue {
1914                        value: None,
1915                        typed_value: None,
1916                    },
1917                    age: ShreddedValue {
1918                        value: None,
1919                        typed_value: Some(35),
1920                    },
1921                }),
1922            }),
1923        );
1924
1925        // Row 3: Type mismatches - both score and age are strings
1926        expect(
1927            3,
1928            Some(ShreddedValue {
1929                value: None,
1930                typed_value: Some(ShreddedStruct {
1931                    score: ShreddedValue {
1932                        value: Some(Variant::from("ninety-five")),
1933                        typed_value: None,
1934                    },
1935                    age: ShreddedValue {
1936                        value: Some(Variant::from("thirty")),
1937                        typed_value: None,
1938                    },
1939                }),
1940            }),
1941        );
1942
1943        // Row 4: Non-object - falls back to value field
1944        expect(
1945            4,
1946            Some(ShreddedValue {
1947                value: Some(Variant::from("not an object")),
1948                typed_value: None,
1949            }),
1950        );
1951
1952        // Row 5: Empty object
1953        expect(
1954            5,
1955            Some(ShreddedValue {
1956                value: None,
1957                typed_value: Some(ShreddedStruct {
1958                    score: ShreddedValue {
1959                        value: None,
1960                        typed_value: None,
1961                    },
1962                    age: ShreddedValue {
1963                        value: None,
1964                        typed_value: None,
1965                    },
1966                }),
1967            }),
1968        );
1969
1970        // Row 6: Null
1971        expect(6, None);
1972
1973        // Helper to correctly create a variant object using a row's existing metadata
1974        let object_with_foo_field = |i| {
1975            use parquet_variant::{ParentState, ValueBuilder, VariantMetadata};
1976            let metadata = VariantMetadata::new(metadata.value(i));
1977            let mut metadata_builder = ReadOnlyMetadataBuilder::new(&metadata);
1978            let mut value_builder = ValueBuilder::new();
1979            let state = ParentState::variant(&mut value_builder, &mut metadata_builder);
1980            ObjectBuilder::new(state, false)
1981                .with_field("foo", 10)
1982                .finish();
1983            (metadata, value_builder.into_inner())
1984        };
1985
1986        // Row 7: Object with only a "wrong" field
1987        let (m, v) = object_with_foo_field(7);
1988        expect(
1989            7,
1990            Some(ShreddedValue {
1991                value: Some(Variant::new_with_metadata(m, &v)),
1992                typed_value: Some(ShreddedStruct {
1993                    score: ShreddedValue {
1994                        value: None,
1995                        typed_value: None,
1996                    },
1997                    age: ShreddedValue {
1998                        value: None,
1999                        typed_value: None,
2000                    },
2001                }),
2002            }),
2003        );
2004
2005        // Row 8: Object with one "wrong" and one "right" field
2006        let (m, v) = object_with_foo_field(8);
2007        expect(
2008            8,
2009            Some(ShreddedValue {
2010                value: Some(Variant::new_with_metadata(m, &v)),
2011                typed_value: Some(ShreddedStruct {
2012                    score: ShreddedValue {
2013                        value: None,
2014                        typed_value: Some(66.67),
2015                    },
2016                    age: ShreddedValue {
2017                        value: None,
2018                        typed_value: None,
2019                    },
2020                }),
2021            }),
2022        );
2023        Ok(())
2024    }
2025
2026    #[test]
2027    fn test_object_shredding_with_array_field() {
2028        let input = build_variant_array(vec![
2029            // Row 0: Object with well-typed scores list
2030            VariantRow::Object(vec![(
2031                "scores",
2032                VariantValue::List(vec![VariantValue::from(10i64), VariantValue::from(20i64)]),
2033            )]),
2034            // Row 1: Object whose scores list contains incompatible type
2035            VariantRow::Object(vec![(
2036                "scores",
2037                VariantValue::List(vec![
2038                    VariantValue::from("oops"),
2039                    VariantValue::from(Variant::Null),
2040                ]),
2041            )]),
2042            // Row 2: Object missing the scores field entirely
2043            VariantRow::Object(vec![]),
2044            // Row 3: Non-object fallback
2045            VariantRow::Value(VariantValue::from("not an object")),
2046            // Row 4: Top-level Null
2047            VariantRow::Null,
2048        ]);
2049        let list_field = Arc::new(Field::new("item", DataType::Int64, true));
2050        let inner_list_schema = DataType::List(list_field);
2051        let schema = DataType::Struct(Fields::from(vec![Field::new(
2052            "scores",
2053            inner_list_schema.clone(),
2054            true,
2055        )]));
2056
2057        let result = shred_variant(&input, &schema).unwrap();
2058        assert_eq!(result.len(), 5);
2059
2060        // Access base value/typed_value columns
2061        let value_field = result.value_field().unwrap();
2062        let typed_struct = result
2063            .typed_value_field()
2064            .unwrap()
2065            .as_any()
2066            .downcast_ref::<arrow::array::StructArray>()
2067            .unwrap();
2068
2069        // Validate base value fallbacks for non-object rows
2070        assert!(value_field.is_null(0));
2071        assert!(value_field.is_null(1));
2072        assert!(value_field.is_null(2));
2073        assert!(value_field.is_valid(3));
2074        assert_eq!(
2075            Variant::new(result.metadata_field().value(3), value_field.value(3)),
2076            Variant::from("not an object")
2077        );
2078        assert!(value_field.is_null(4));
2079
2080        // Typed struct should only be null for the fallback row
2081        assert!(typed_struct.is_valid(0));
2082        assert!(typed_struct.is_valid(1));
2083        assert!(typed_struct.is_valid(2));
2084        assert!(typed_struct.is_null(3));
2085        assert!(typed_struct.is_null(4));
2086
2087        // Drill into the scores field on the typed struct
2088        let scores_field =
2089            ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("scores").unwrap())
2090                .unwrap();
2091        assert_list_structure_and_elements::<Int64Type, i32>(
2092            &VariantArray::from_parts(
2093                BinaryViewArray::from_iter_values(std::iter::repeat_n(
2094                    EMPTY_VARIANT_METADATA_BYTES,
2095                    scores_field.len(),
2096                )),
2097                Some(scores_field.value_field().unwrap().clone()),
2098                Some(scores_field.typed_value_field().unwrap().clone()),
2099                None,
2100            ),
2101            scores_field.len(),
2102            &[0i32, 2, 4, 4, 4, 4],
2103            &[Some(2), Some(2), None, None, None],
2104            &[
2105                None,
2106                None,
2107                Some(Variant::Null),
2108                Some(Variant::Null),
2109                Some(Variant::Null),
2110            ],
2111            (
2112                &[Some(10), Some(20), None, None],
2113                &[None, None, Some(Variant::from("oops")), Some(Variant::Null)],
2114            ),
2115        );
2116    }
2117
2118    #[test]
2119    fn test_object_different_schemas() -> Result<()> {
2120        // Create object with multiple fields
2121        let input = build_variant_array(vec![VariantRow::Object(vec![
2122            ("id", VariantValue::from(123i32)),
2123            ("age", VariantValue::from(25i64)),
2124            ("score", VariantValue::from(95.5f64)),
2125        ])]);
2126
2127        // Test with schema containing only id field
2128        let schema1 = ShreddedSchemaBuilder::default()
2129            .with_path("id", &DataType::Int32)?
2130            .build();
2131        let result1 = shred_variant(&input, &schema1).unwrap();
2132        let value_field1 = result1.value_field().unwrap();
2133        assert!(!value_field1.is_null(0)); // should contain {"age": 25, "score": 95.5}
2134
2135        // Test with schema containing id and age fields
2136        let schema2 = ShreddedSchemaBuilder::default()
2137            .with_path("id", &DataType::Int32)?
2138            .with_path("age", &DataType::Int64)?
2139            .build();
2140        let result2 = shred_variant(&input, &schema2).unwrap();
2141        let value_field2 = result2.value_field().unwrap();
2142        assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
2143
2144        // Test with schema containing all fields
2145        let schema3 = ShreddedSchemaBuilder::default()
2146            .with_path("id", &DataType::Int32)?
2147            .with_path("age", &DataType::Int64)?
2148            .with_path("score", &DataType::Float64)?
2149            .build();
2150        let result3 = shred_variant(&input, &schema3).unwrap();
2151        let value_field3 = result3.value_field().unwrap();
2152        assert!(value_field3.is_null(0)); // fully shredded, no remaining fields
2153
2154        Ok(())
2155    }
2156
2157    #[test]
2158    fn test_uuid_shredding_in_objects() -> Result<()> {
2159        let mock_uuid_1 = Uuid::new_v4();
2160        let mock_uuid_2 = Uuid::new_v4();
2161        let mock_uuid_3 = Uuid::new_v4();
2162
2163        let input = build_variant_array(vec![
2164            // Row 0: Fully shredded object with both UUID fields
2165            VariantRow::Object(vec![
2166                ("id", VariantValue::from(mock_uuid_1)),
2167                ("session_id", VariantValue::from(mock_uuid_2)),
2168            ]),
2169            // Row 1: Partially shredded object - UUID fields plus extra field
2170            VariantRow::Object(vec![
2171                ("id", VariantValue::from(mock_uuid_2)),
2172                ("session_id", VariantValue::from(mock_uuid_3)),
2173                ("name", VariantValue::from("test_user")),
2174            ]),
2175            // Row 2: Missing UUID field (no session_id)
2176            VariantRow::Object(vec![("id", VariantValue::from(mock_uuid_1))]),
2177            // Row 3: Type mismatch - id is UUID but session_id is a string
2178            VariantRow::Object(vec![
2179                ("id", VariantValue::from(mock_uuid_3)),
2180                ("session_id", VariantValue::from("not-a-uuid")),
2181            ]),
2182            // Row 4: Object with non-UUID value in id field
2183            VariantRow::Object(vec![
2184                ("id", VariantValue::from(12345i64)),
2185                ("session_id", VariantValue::from(mock_uuid_1)),
2186            ]),
2187            // Row 5: Null
2188            VariantRow::Null,
2189        ]);
2190
2191        let target_schema = ShreddedSchemaBuilder::default()
2192            .with_path("id", DataType::FixedSizeBinary(16))?
2193            .with_path("session_id", DataType::FixedSizeBinary(16))?
2194            .build();
2195
2196        let result = shred_variant(&input, &target_schema).unwrap();
2197
2198        assert!(result.value_field().is_some());
2199        assert!(result.typed_value_field().is_some());
2200        assert_eq!(result.len(), 6);
2201
2202        let metadata = result.metadata_field();
2203        let value = result.value_field().unwrap();
2204        let typed_value = result
2205            .typed_value_field()
2206            .unwrap()
2207            .as_any()
2208            .downcast_ref::<arrow::array::StructArray>()
2209            .unwrap();
2210
2211        // Extract id and session_id fields from typed_value struct
2212        let id_field =
2213            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("id").unwrap()).unwrap();
2214        let session_id_field =
2215            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("session_id").unwrap())
2216                .unwrap();
2217
2218        let id_value = id_field
2219            .value_field()
2220            .unwrap()
2221            .as_any()
2222            .downcast_ref::<BinaryViewArray>()
2223            .unwrap();
2224        let id_typed_value = id_field
2225            .typed_value_field()
2226            .unwrap()
2227            .as_any()
2228            .downcast_ref::<FixedSizeBinaryArray>()
2229            .unwrap();
2230        let session_id_value = session_id_field
2231            .value_field()
2232            .unwrap()
2233            .as_any()
2234            .downcast_ref::<BinaryViewArray>()
2235            .unwrap();
2236        let session_id_typed_value = session_id_field
2237            .typed_value_field()
2238            .unwrap()
2239            .as_any()
2240            .downcast_ref::<FixedSizeBinaryArray>()
2241            .unwrap();
2242
2243        // Row 0: Fully shredded - both UUID fields shred successfully
2244        assert!(result.is_valid(0));
2245
2246        assert!(value.is_null(0)); // fully shredded, no remaining fields
2247        assert!(id_value.is_null(0));
2248        assert!(session_id_value.is_null(0));
2249
2250        assert!(typed_value.is_valid(0));
2251        assert!(id_typed_value.is_valid(0));
2252        assert!(session_id_typed_value.is_valid(0));
2253
2254        assert_eq!(id_typed_value.value(0), mock_uuid_1.as_bytes());
2255        assert_eq!(session_id_typed_value.value(0), mock_uuid_2.as_bytes());
2256
2257        // Row 1: Partially shredded - value contains extra name field
2258        assert!(result.is_valid(1));
2259
2260        assert!(value.is_valid(1)); // contains unshredded "name" field
2261        assert!(typed_value.is_valid(1));
2262
2263        assert!(id_value.is_null(1));
2264        assert!(id_typed_value.is_valid(1));
2265        assert_eq!(id_typed_value.value(1), mock_uuid_2.as_bytes());
2266
2267        assert!(session_id_value.is_null(1));
2268        assert!(session_id_typed_value.is_valid(1));
2269        assert_eq!(session_id_typed_value.value(1), mock_uuid_3.as_bytes());
2270
2271        // Verify the value field contains the name field
2272        let row_1_variant = Variant::new(metadata.value(1), value.value(1));
2273        let Variant::Object(obj) = row_1_variant else {
2274            panic!("Expected object");
2275        };
2276
2277        assert_eq!(obj.get("name"), Some(Variant::from("test_user")));
2278
2279        // Row 2: Missing session_id field
2280        assert!(result.is_valid(2));
2281
2282        assert!(value.is_null(2)); // fully shredded, no extra fields
2283        assert!(typed_value.is_valid(2));
2284
2285        assert!(id_value.is_null(2));
2286        assert!(id_typed_value.is_valid(2));
2287        assert_eq!(id_typed_value.value(2), mock_uuid_1.as_bytes());
2288
2289        assert!(session_id_value.is_null(2));
2290        assert!(session_id_typed_value.is_null(2)); // missing field
2291
2292        // Row 3: Type mismatch - session_id is a string, not UUID
2293        assert!(result.is_valid(3));
2294
2295        assert!(value.is_null(3)); // no extra fields
2296        assert!(typed_value.is_valid(3));
2297
2298        assert!(id_value.is_null(3));
2299        assert!(id_typed_value.is_valid(3));
2300        assert_eq!(id_typed_value.value(3), mock_uuid_3.as_bytes());
2301
2302        assert!(session_id_value.is_valid(3)); // type mismatch, stored in value
2303        assert!(session_id_typed_value.is_null(3));
2304        let session_id_variant = Variant::new(metadata.value(3), session_id_value.value(3));
2305        assert_eq!(session_id_variant, Variant::from("not-a-uuid"));
2306
2307        // Row 4: Type mismatch - id is int64, not UUID
2308        assert!(result.is_valid(4));
2309
2310        assert!(value.is_null(4)); // no extra fields
2311        assert!(typed_value.is_valid(4));
2312
2313        assert!(id_value.is_valid(4)); // type mismatch, stored in value
2314        assert!(id_typed_value.is_null(4));
2315        let id_variant = Variant::new(metadata.value(4), id_value.value(4));
2316        assert_eq!(id_variant, Variant::from(12345i64));
2317
2318        assert!(session_id_value.is_null(4));
2319        assert!(session_id_typed_value.is_valid(4));
2320        assert_eq!(session_id_typed_value.value(4), mock_uuid_1.as_bytes());
2321
2322        // Row 5: Null
2323        assert!(result.is_null(5));
2324
2325        Ok(())
2326    }
2327
2328    #[test]
2329    fn test_spec_compliance() {
2330        let input = VariantArray::from_iter(vec![Variant::from(42i64), Variant::from("hello")]);
2331
2332        let result = shred_variant(&input, &DataType::Int64).unwrap();
2333
2334        // Test field access by name (not position)
2335        let inner_struct = result.inner();
2336        assert!(inner_struct.column_by_name("metadata").is_some());
2337        assert!(inner_struct.column_by_name("value").is_some());
2338        assert!(inner_struct.column_by_name("typed_value").is_some());
2339
2340        // Test metadata preservation
2341        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2342        // The metadata should be the same reference (cheap clone)
2343        // Note: BinaryViewArray doesn't have a .values() method, so we compare the arrays directly
2344        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2345
2346        // Test output structure correctness
2347        assert_eq!(result.len(), input.len());
2348        assert!(result.value_field().is_some());
2349        assert!(result.typed_value_field().is_some());
2350
2351        // For primitive shredding, verify that value and typed_value are never both non-null
2352        // (This rule applies to primitives; for objects, both can be non-null for partial shredding)
2353        let value_field = result.value_field().unwrap();
2354        let typed_value_field = result
2355            .typed_value_field()
2356            .unwrap()
2357            .as_any()
2358            .downcast_ref::<Int64Array>()
2359            .unwrap();
2360
2361        for i in 0..result.len() {
2362            if !result.is_null(i) {
2363                let value_is_null = value_field.is_null(i);
2364                let typed_value_is_null = typed_value_field.is_null(i);
2365                // For primitive shredding, at least one should be null
2366                assert!(
2367                    value_is_null || typed_value_is_null,
2368                    "Row {}: both value and typed_value are non-null for primitive shredding",
2369                    i
2370                );
2371            }
2372        }
2373    }
2374
2375    #[test]
2376    fn test_variant_schema_builder_simple() -> Result<()> {
2377        let shredding_type = ShreddedSchemaBuilder::default()
2378            .with_path("a", &DataType::Int64)?
2379            .with_path("b", &DataType::Float64)?
2380            .build();
2381
2382        assert_eq!(
2383            shredding_type,
2384            DataType::Struct(Fields::from(vec![
2385                Field::new("a", DataType::Int64, true),
2386                Field::new("b", DataType::Float64, true),
2387            ]))
2388        );
2389
2390        Ok(())
2391    }
2392
2393    #[test]
2394    fn test_variant_schema_builder_nested() -> Result<()> {
2395        let shredding_type = ShreddedSchemaBuilder::default()
2396            .with_path("a", &DataType::Int64)?
2397            .with_path("b.c", &DataType::Utf8)?
2398            .with_path("b.d", &DataType::Float64)?
2399            .build();
2400
2401        assert_eq!(
2402            shredding_type,
2403            DataType::Struct(Fields::from(vec![
2404                Field::new("a", DataType::Int64, true),
2405                Field::new(
2406                    "b",
2407                    DataType::Struct(Fields::from(vec![
2408                        Field::new("c", DataType::Utf8, true),
2409                        Field::new("d", DataType::Float64, true),
2410                    ])),
2411                    true
2412                ),
2413            ]))
2414        );
2415
2416        Ok(())
2417    }
2418
2419    #[test]
2420    fn test_variant_schema_builder_with_path_variant_path_arg() -> Result<()> {
2421        let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
2422        let shredding_type = ShreddedSchemaBuilder::default()
2423            .with_path(path, &DataType::Int64)?
2424            .build();
2425
2426        match shredding_type {
2427            DataType::Struct(fields) => {
2428                assert_eq!(fields.len(), 1);
2429                assert_eq!(fields[0].name(), "a.b");
2430                assert_eq!(fields[0].data_type(), &DataType::Int64);
2431            }
2432            _ => panic!("expected struct data type"),
2433        }
2434
2435        Ok(())
2436    }
2437
2438    #[test]
2439    fn test_variant_schema_builder_custom_nullability() -> Result<()> {
2440        let shredding_type = ShreddedSchemaBuilder::default()
2441            .with_path(
2442                "foo",
2443                Arc::new(Field::new("should_be_renamed", DataType::Utf8, false)),
2444            )?
2445            .with_path("bar", (&DataType::Int64, false))?
2446            .build();
2447
2448        let DataType::Struct(fields) = shredding_type else {
2449            panic!("expected struct data type");
2450        };
2451
2452        let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
2453        assert_eq!(foo.data_type(), &DataType::Utf8);
2454        assert!(!foo.is_nullable());
2455
2456        let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
2457        assert_eq!(bar.data_type(), &DataType::Int64);
2458        assert!(!bar.is_nullable());
2459
2460        Ok(())
2461    }
2462
2463    #[test]
2464    fn test_variant_schema_builder_with_shred_variant() -> Result<()> {
2465        let input = build_variant_array(vec![
2466            VariantRow::Object(vec![
2467                ("time", VariantValue::from(1234567890i64)),
2468                ("hostname", VariantValue::from("server1")),
2469                ("extra", VariantValue::from(42)),
2470            ]),
2471            VariantRow::Object(vec![
2472                ("time", VariantValue::from(9876543210i64)),
2473                ("hostname", VariantValue::from("server2")),
2474            ]),
2475            VariantRow::Null,
2476        ]);
2477
2478        let shredding_type = ShreddedSchemaBuilder::default()
2479            .with_path("time", &DataType::Int64)?
2480            .with_path("hostname", &DataType::Utf8)?
2481            .build();
2482
2483        let result = shred_variant(&input, &shredding_type).unwrap();
2484
2485        assert_eq!(
2486            result.data_type(),
2487            &DataType::Struct(Fields::from(vec![
2488                Field::new("metadata", DataType::BinaryView, false),
2489                Field::new("value", DataType::BinaryView, true),
2490                Field::new(
2491                    "typed_value",
2492                    DataType::Struct(Fields::from(vec![
2493                        Field::new(
2494                            "hostname",
2495                            DataType::Struct(Fields::from(vec![
2496                                Field::new("value", DataType::BinaryView, true),
2497                                Field::new("typed_value", DataType::Utf8, true),
2498                            ])),
2499                            false,
2500                        ),
2501                        Field::new(
2502                            "time",
2503                            DataType::Struct(Fields::from(vec![
2504                                Field::new("value", DataType::BinaryView, true),
2505                                Field::new("typed_value", DataType::Int64, true),
2506                            ])),
2507                            false,
2508                        ),
2509                    ])),
2510                    true,
2511                ),
2512            ]))
2513        );
2514
2515        assert_eq!(result.len(), 3);
2516        assert!(result.typed_value_field().is_some());
2517
2518        let typed_value = result
2519            .typed_value_field()
2520            .unwrap()
2521            .as_any()
2522            .downcast_ref::<arrow::array::StructArray>()
2523            .unwrap();
2524
2525        let time_field =
2526            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
2527                .unwrap();
2528        let hostname_field =
2529            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
2530                .unwrap();
2531
2532        let time_typed = time_field
2533            .typed_value_field()
2534            .unwrap()
2535            .as_any()
2536            .downcast_ref::<Int64Array>()
2537            .unwrap();
2538        let hostname_typed = hostname_field
2539            .typed_value_field()
2540            .unwrap()
2541            .as_any()
2542            .downcast_ref::<arrow::array::StringArray>()
2543            .unwrap();
2544
2545        // Row 0
2546        assert!(!result.is_null(0));
2547        assert_eq!(time_typed.value(0), 1234567890);
2548        assert_eq!(hostname_typed.value(0), "server1");
2549
2550        // Row 1
2551        assert!(!result.is_null(1));
2552        assert_eq!(time_typed.value(1), 9876543210);
2553        assert_eq!(hostname_typed.value(1), "server2");
2554
2555        // Row 2
2556        assert!(result.is_null(2));
2557
2558        Ok(())
2559    }
2560
2561    #[test]
2562    fn test_variant_schema_builder_conflicting_path() -> Result<()> {
2563        let shredding_type = ShreddedSchemaBuilder::default()
2564            .with_path("a", &DataType::Int64)?
2565            .with_path("a", &DataType::Float64)?
2566            .build();
2567
2568        assert_eq!(
2569            shredding_type,
2570            DataType::Struct(Fields::from(
2571                vec![Field::new("a", DataType::Float64, true),]
2572            ))
2573        );
2574
2575        Ok(())
2576    }
2577
2578    #[test]
2579    fn test_variant_schema_builder_root_path() -> Result<()> {
2580        let path = VariantPath::new(vec![]);
2581        let shredding_type = ShreddedSchemaBuilder::default()
2582            .with_path(path, &DataType::Int64)?
2583            .build();
2584
2585        assert_eq!(shredding_type, DataType::Int64);
2586
2587        Ok(())
2588    }
2589
2590    #[test]
2591    fn test_variant_schema_builder_empty_path() -> Result<()> {
2592        let shredding_type = ShreddedSchemaBuilder::default()
2593            .with_path("", &DataType::Int64)?
2594            .build();
2595
2596        assert_eq!(shredding_type, DataType::Int64);
2597        Ok(())
2598    }
2599
2600    #[test]
2601    fn test_variant_schema_builder_default() {
2602        let shredding_type = ShreddedSchemaBuilder::default().build();
2603        assert_eq!(shredding_type, DataType::Null);
2604    }
2605}