Skip to main content

parquet_variant_compute/
shred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for shredding VariantArray with a given schema.
19
20use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
21use crate::variant_to_arrow::{
22    ArrayVariantToArrowRowBuilder, PrimitiveVariantToArrowRowBuilder,
23    make_primitive_variant_to_arrow_row_builder,
24};
25use crate::{VariantArray, VariantValueArrayBuilder};
26use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
27use arrow::buffer::NullBuffer;
28use arrow::compute::CastOptions;
29use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
30use arrow::error::{ArrowError, Result};
31use indexmap::IndexMap;
32use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement};
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36/// Shreds the input binary variant using a target shredding schema derived from the requested data type.
37///
38/// For example, requesting `DataType::Int64` would produce an output variant array with the schema:
39///
40/// ```text
41/// {
42///    metadata: BINARY,
43///    value: BINARY,
44///    typed_value: LONG,
45/// }
46/// ```
47///
48/// Similarly, requesting `DataType::Struct` with two integer fields `a` and `b` would produce an
49/// output variant array with the schema:
50///
51/// ```text
52/// {
53///   metadata: BINARY,
54///   value: BINARY,
55///   typed_value: {
56///     a: {
57///       value: BINARY,
58///       typed_value: INT,
59///     },
60///     b: {
61///       value: BINARY,
62///       typed_value: INT,
63///     },
64///   }
65/// }
66/// ```
67///
68/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
69/// value passed to this function.
70pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<VariantArray> {
71    if array.typed_value_field().is_some() {
72        return Err(ArrowError::InvalidArgumentError(
73            "Input is already shredded".to_string(),
74        ));
75    }
76
77    if array.value_field().is_none() {
78        // all-null case -- nothing to do.
79        return Ok(array.clone());
80    };
81
82    let cast_options = CastOptions::default();
83    let mut builder = make_variant_to_shredded_variant_arrow_row_builder(
84        as_type,
85        &cast_options,
86        array.len(),
87        true,
88    )?;
89    for i in 0..array.len() {
90        if array.is_null(i) {
91            builder.append_null()?;
92        } else {
93            builder.append_value(array.value(i))?;
94        }
95    }
96    let (value, typed_value, nulls) = builder.finish()?;
97    Ok(VariantArray::from_parts(
98        array.metadata_field().clone(),
99        Some(value),
100        Some(typed_value),
101        nulls,
102    ))
103}
104
105pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>(
106    data_type: &'a DataType,
107    cast_options: &'a CastOptions,
108    capacity: usize,
109    top_level: bool,
110) -> Result<VariantToShreddedVariantRowBuilder<'a>> {
111    let builder = match data_type {
112        DataType::Struct(fields) => {
113            let typed_value_builder = VariantToShreddedObjectVariantRowBuilder::try_new(
114                fields,
115                cast_options,
116                capacity,
117                top_level,
118            )?;
119            VariantToShreddedVariantRowBuilder::Object(typed_value_builder)
120        }
121        DataType::List(_)
122        | DataType::LargeList(_)
123        | DataType::ListView(_)
124        | DataType::LargeListView(_)
125        | DataType::FixedSizeList(..) => {
126            let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new(
127                data_type,
128                cast_options,
129                capacity,
130            )?;
131            VariantToShreddedVariantRowBuilder::Array(typed_value_builder)
132        }
133        // Supported shredded primitive types, see Variant shredding spec:
134        // https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types
135        DataType::Boolean
136        | DataType::Int8
137        | DataType::Int16
138        | DataType::Int32
139        | DataType::Int64
140        | DataType::Float32
141        | DataType::Float64
142        | DataType::Decimal32(..)
143        | DataType::Decimal64(..)
144        | DataType::Decimal128(..)
145        | DataType::Date32
146        | DataType::Time64(TimeUnit::Microsecond)
147        | DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _)
148        | DataType::Binary
149        | DataType::BinaryView
150        | DataType::Utf8
151        | DataType::Utf8View
152        | DataType::FixedSizeBinary(16) // UUID
153        => {
154            let builder =
155                make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?;
156            let typed_value_builder =
157                VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity, top_level);
158            VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder)
159        }
160        DataType::FixedSizeBinary(_) => {
161            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported.")))
162        }
163        _ => {
164            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type")))
165        }
166    };
167    Ok(builder)
168}
169
170pub(crate) enum VariantToShreddedVariantRowBuilder<'a> {
171    Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>),
172    Array(VariantToShreddedArrayVariantRowBuilder<'a>),
173    Object(VariantToShreddedObjectVariantRowBuilder<'a>),
174}
175
176impl<'a> VariantToShreddedVariantRowBuilder<'a> {
177    pub fn append_null(&mut self) -> Result<()> {
178        use VariantToShreddedVariantRowBuilder::*;
179        match self {
180            Primitive(b) => b.append_null(),
181            Array(b) => b.append_null(),
182            Object(b) => b.append_null(),
183        }
184    }
185
186    pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
187        use VariantToShreddedVariantRowBuilder::*;
188        match self {
189            Primitive(b) => b.append_value(value),
190            Array(b) => b.append_value(value),
191            Object(b) => b.append_value(value),
192        }
193    }
194
195    pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
196        use VariantToShreddedVariantRowBuilder::*;
197        match self {
198            Primitive(b) => b.finish(),
199            Array(b) => b.finish(),
200            Object(b) => b.finish(),
201        }
202    }
203}
204
205/// A top-level variant shredder -- appending NULL produces typed_value=NULL and value=Variant::Null
206pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> {
207    value_builder: VariantValueArrayBuilder,
208    typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
209    nulls: NullBufferBuilder,
210    top_level: bool,
211}
212
213impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> {
214    pub(crate) fn new(
215        typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
216        capacity: usize,
217        top_level: bool,
218    ) -> Self {
219        Self {
220            value_builder: VariantValueArrayBuilder::new(capacity),
221            typed_value_builder,
222            nulls: NullBufferBuilder::new(capacity),
223            top_level,
224        }
225    }
226
227    fn append_null(&mut self) -> Result<()> {
228        // Only the top-level struct that represents the variant can be nullable; object fields and
229        // array elements are non-nullable.
230        self.nulls.append(!self.top_level);
231        self.value_builder.append_null();
232        self.typed_value_builder.append_null()
233    }
234
235    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
236        self.nulls.append_non_null();
237        if self.typed_value_builder.append_value(&value)? {
238            self.value_builder.append_null();
239        } else {
240            self.value_builder.append_value(value);
241        }
242        Ok(true)
243    }
244
245    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
246        Ok((
247            self.value_builder.build()?,
248            self.typed_value_builder.finish()?,
249            self.nulls.finish(),
250        ))
251    }
252}
253
254pub(crate) struct VariantToShreddedArrayVariantRowBuilder<'a> {
255    value_builder: VariantValueArrayBuilder,
256    typed_value_builder: ArrayVariantToArrowRowBuilder<'a>,
257}
258
259impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> {
260    fn try_new(
261        data_type: &'a DataType,
262        cast_options: &'a CastOptions,
263        capacity: usize,
264    ) -> Result<Self> {
265        Ok(Self {
266            value_builder: VariantValueArrayBuilder::new(capacity),
267            typed_value_builder: ArrayVariantToArrowRowBuilder::try_new(
268                data_type,
269                cast_options,
270                capacity,
271            )?,
272        })
273    }
274
275    fn append_null(&mut self) -> Result<()> {
276        self.value_builder.append_value(Variant::Null);
277        self.typed_value_builder.append_null()?;
278        Ok(())
279    }
280
281    fn append_value(&mut self, variant: Variant<'_, '_>) -> Result<bool> {
282        // If the variant is not an array, typed_value must be null.
283        // If the variant is an array, value must be null.
284        match variant {
285            Variant::List(list) => {
286                self.value_builder.append_null();
287                self.typed_value_builder
288                    .append_value(&Variant::List(list))?;
289                Ok(true)
290            }
291            other => {
292                self.value_builder.append_value(other);
293                self.typed_value_builder.append_null()?;
294                Ok(false)
295            }
296        }
297    }
298
299    fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
300        Ok((
301            self.value_builder.build()?,
302            self.typed_value_builder.finish()?,
303            // All elements of an array must be present (not missing) because
304            // the array Variant encoding does not allow missing elements
305            None,
306        ))
307    }
308}
309
310pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> {
311    value_builder: VariantValueArrayBuilder,
312    typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>,
313    typed_value_nulls: NullBufferBuilder,
314    nulls: NullBufferBuilder,
315    top_level: bool,
316}
317
318impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
319    fn try_new(
320        fields: &'a Fields,
321        cast_options: &'a CastOptions,
322        capacity: usize,
323        top_level: bool,
324    ) -> Result<Self> {
325        let typed_value_builders = fields.iter().map(|field| {
326            let builder = make_variant_to_shredded_variant_arrow_row_builder(
327                field.data_type(),
328                cast_options,
329                capacity,
330                false,
331            )?;
332            Ok((field.name().as_str(), builder))
333        });
334        Ok(Self {
335            value_builder: VariantValueArrayBuilder::new(capacity),
336            typed_value_builders: typed_value_builders.collect::<Result<_>>()?,
337            typed_value_nulls: NullBufferBuilder::new(capacity),
338            nulls: NullBufferBuilder::new(capacity),
339            top_level,
340        })
341    }
342
343    fn append_null(&mut self) -> Result<()> {
344        // Only the top-level struct that represents the variant can be nullable; object fields and
345        // array elements are non-nullable.
346        self.nulls.append(!self.top_level);
347        self.value_builder.append_null();
348        self.typed_value_nulls.append_null();
349        for (_, typed_value_builder) in &mut self.typed_value_builders {
350            typed_value_builder.append_null()?;
351        }
352        Ok(())
353    }
354
355    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
356        let Variant::Object(ref obj) = value else {
357            // Not an object => fall back
358            self.nulls.append_non_null();
359            self.value_builder.append_value(value);
360            self.typed_value_nulls.append_null();
361            for (_, typed_value_builder) in &mut self.typed_value_builders {
362                typed_value_builder.append_null()?;
363            }
364            return Ok(false);
365        };
366
367        // Route the object's fields by name as either shredded or unshredded
368        let mut builder = self.value_builder.builder_ext(value.metadata());
369        let mut object_builder = builder.try_new_object()?;
370        let mut seen = std::collections::HashSet::new();
371        let mut partially_shredded = false;
372        for (field_name, value) in obj.iter() {
373            match self.typed_value_builders.get_mut(field_name) {
374                Some(typed_value_builder) => {
375                    typed_value_builder.append_value(value)?;
376                    seen.insert(field_name);
377                }
378                None => {
379                    object_builder.insert_bytes(field_name, value);
380                    partially_shredded = true;
381                }
382            }
383        }
384
385        // Handle missing fields
386        for (field_name, typed_value_builder) in &mut self.typed_value_builders {
387            if !seen.contains(field_name) {
388                typed_value_builder.append_null()?;
389            }
390        }
391
392        // Only emit the value if it captured any unshredded object fields
393        if partially_shredded {
394            object_builder.finish();
395        } else {
396            drop(object_builder);
397            self.value_builder.append_null();
398        }
399
400        self.typed_value_nulls.append_non_null();
401        self.nulls.append_non_null();
402        Ok(true)
403    }
404
405    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
406        let mut builder = StructArrayBuilder::new();
407        for (field_name, typed_value_builder) in self.typed_value_builders {
408            let (value, typed_value, nulls) = typed_value_builder.finish()?;
409            let array =
410                ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls);
411            builder = builder.with_field(field_name, ArrayRef::from(array), false);
412        }
413        if let Some(nulls) = self.typed_value_nulls.finish() {
414            builder = builder.with_nulls(nulls);
415        }
416        Ok((
417            self.value_builder.build()?,
418            Arc::new(builder.build()),
419            self.nulls.finish(),
420        ))
421    }
422}
423
424/// Field configuration captured by the builder (data type + nullability).
425#[derive(Clone)]
426pub struct ShreddingField {
427    data_type: DataType,
428    nullable: bool,
429}
430
431impl ShreddingField {
432    fn new(data_type: DataType, nullable: bool) -> Self {
433        Self {
434            data_type,
435            nullable,
436        }
437    }
438
439    fn null() -> Self {
440        Self::new(DataType::Null, true)
441    }
442}
443
444/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or `(DataType, bool)`.
445pub trait IntoShreddingField {
446    fn into_shredding_field(self) -> ShreddingField;
447}
448
449impl IntoShreddingField for FieldRef {
450    fn into_shredding_field(self) -> ShreddingField {
451        ShreddingField::new(self.data_type().clone(), self.is_nullable())
452    }
453}
454
455impl IntoShreddingField for &DataType {
456    fn into_shredding_field(self) -> ShreddingField {
457        ShreddingField::new(self.clone(), true)
458    }
459}
460
461impl IntoShreddingField for DataType {
462    fn into_shredding_field(self) -> ShreddingField {
463        ShreddingField::new(self, true)
464    }
465}
466
467impl IntoShreddingField for (&DataType, bool) {
468    fn into_shredding_field(self) -> ShreddingField {
469        ShreddingField::new(self.0.clone(), self.1)
470    }
471}
472
473impl IntoShreddingField for (DataType, bool) {
474    fn into_shredding_field(self) -> ShreddingField {
475        ShreddingField::new(self.0, self.1)
476    }
477}
478
479/// Builder for constructing a variant shredding schema.
480///
481/// The builder pattern makes it easy to incrementally define which fields
482/// should be shredded and with what types. Fields are nullable by default; pass
483/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
484///
485/// Note: this builder currently only supports struct fields. List support
486/// will be added in the future.
487///
488/// # Example
489///
490/// ```
491/// use std::sync::Arc;
492/// use arrow::datatypes::{DataType, Field, TimeUnit};
493/// use parquet_variant::{VariantPath, VariantPathElement};
494/// use parquet_variant_compute::ShreddedSchemaBuilder;
495///
496/// // Define the shredding schema using the builder
497/// let shredding_type = ShreddedSchemaBuilder::default()
498///     // store the "time" field as a separate UTC timestamp
499///     .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), true))
500///     // store hostname as non-nullable Utf8
501///     .with_path("hostname", (&DataType::Utf8, false))
502///     // pass a FieldRef directly
503///     .with_path(
504///         "metadata.trace_id",
505///         Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16), false)),
506///     )
507///     // field name with a dot: use VariantPath to avoid splitting
508///     .with_path(
509///         VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
510///         &DataType::Float64,
511///     )
512///     .build();
513///
514/// // The shredding_type can now be passed to shred_variant:
515/// // let shredded = shred_variant(&input, &shredding_type)?;
516/// ```
517#[derive(Default, Clone)]
518pub struct ShreddedSchemaBuilder {
519    root: VariantSchemaNode,
520}
521
522impl ShreddedSchemaBuilder {
523    /// Create a new empty schema builder.
524    pub fn new() -> Self {
525        Self::default()
526    }
527
528    /// Insert a typed path into the schema using dot notation (or any
529    /// [`VariantPath`] convertible).
530    ///
531    /// The path uses dot notation to specify nested fields.
532    /// For example, "a.b.c" will create a nested structure.
533    ///
534    /// # Arguments
535    ///
536    /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
537    /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g. `FieldRef`,
538    ///   `&DataType`, or `(&DataType, bool)` to control nullability)
539    pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Self
540    where
541        P: Into<VariantPath<'a>>,
542        F: IntoShreddingField,
543    {
544        let path: VariantPath<'a> = path.into();
545        self.root.insert_path(&path, field.into_shredding_field());
546        self
547    }
548
549    /// Build the final [`DataType`].
550    pub fn build(self) -> DataType {
551        let shredding_type = self.root.to_shredding_type();
552        match shredding_type {
553            Some(shredding_type) => shredding_type,
554            None => DataType::Null,
555        }
556    }
557}
558
559/// Internal tree node structure for building variant schemas.
560#[derive(Clone)]
561enum VariantSchemaNode {
562    /// A leaf node with a primitive/scalar type (and nullability)
563    Leaf(ShreddingField),
564    /// An inner struct node with nested fields
565    Struct(BTreeMap<String, VariantSchemaNode>),
566}
567
568impl Default for VariantSchemaNode {
569    fn default() -> Self {
570        Self::Leaf(ShreddingField::null())
571    }
572}
573
574impl VariantSchemaNode {
575    /// Insert a path into this node with the given data type.
576    fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
577        self.insert_path_elements(path, field);
578    }
579
580    fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], field: ShreddingField) {
581        let Some((head, tail)) = segments.split_first() else {
582            *self = Self::Leaf(field);
583            return;
584        };
585
586        match head {
587            VariantPathElement::Field { name } => {
588                // Ensure this node is a Struct node
589                let children = match self {
590                    Self::Struct(children) => children,
591                    _ => {
592                        *self = Self::Struct(BTreeMap::new());
593                        match self {
594                            Self::Struct(children) => children,
595                            _ => unreachable!(),
596                        }
597                    }
598                };
599
600                children
601                    .entry(name.to_string())
602                    .or_default()
603                    .insert_path_elements(tail, field);
604            }
605            VariantPathElement::Index { .. } => {
606                // List support to be added later; reject for now
607                unreachable!("List paths are not supported yet");
608            }
609        }
610    }
611
612    /// Convert this node to a shredding type.
613    ///
614    /// Returns the [`DataType`] for passing to [`shred_variant`].
615    fn to_shredding_type(&self) -> Option<DataType> {
616        match self {
617            Self::Leaf(field) => Some(field.data_type.clone()),
618            Self::Struct(children) => {
619                let child_fields: Vec<_> = children
620                    .iter()
621                    .filter_map(|(name, child)| child.to_shredding_field(name))
622                    .collect();
623                if child_fields.is_empty() {
624                    None
625                } else {
626                    Some(DataType::Struct(Fields::from(child_fields)))
627                }
628            }
629        }
630    }
631
632    fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
633        match self {
634            Self::Leaf(field) => Some(Arc::new(Field::new(
635                name,
636                field.data_type.clone(),
637                field.nullable,
638            ))),
639            Self::Struct(_) => self
640                .to_shredding_type()
641                .map(|data_type| Arc::new(Field::new(name, data_type, true))),
642        }
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649    use crate::VariantArrayBuilder;
650    use crate::arrow_to_variant::ListLikeArray;
651    use arrow::array::{
652        Array, BinaryViewArray, FixedSizeBinaryArray, Float64Array, GenericListArray,
653        GenericListViewArray, Int64Array, ListArray, OffsetSizeTrait, PrimitiveArray, StringArray,
654    };
655    use arrow::datatypes::{
656        ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode,
657    };
658    use parquet_variant::{
659        BuilderSpecificState, EMPTY_VARIANT_METADATA_BYTES, ObjectBuilder, ReadOnlyMetadataBuilder,
660        Variant, VariantBuilder, VariantPath, VariantPathElement,
661    };
662    use std::sync::Arc;
663    use uuid::Uuid;
664
665    #[derive(Clone)]
666    enum VariantValue<'a> {
667        Value(Variant<'a, 'a>),
668        List(Vec<VariantValue<'a>>),
669        Object(Vec<(&'a str, VariantValue<'a>)>),
670        Null,
671    }
672
673    impl<'a, T> From<T> for VariantValue<'a>
674    where
675        T: Into<Variant<'a, 'a>>,
676    {
677        fn from(value: T) -> Self {
678            Self::Value(value.into())
679        }
680    }
681
682    #[derive(Clone)]
683    enum VariantRow<'a> {
684        Value(VariantValue<'a>),
685        List(Vec<VariantValue<'a>>),
686        Object(Vec<(&'a str, VariantValue<'a>)>),
687        Null,
688    }
689
690    fn build_variant_array(rows: Vec<VariantRow<'static>>) -> VariantArray {
691        let mut builder = VariantArrayBuilder::new(rows.len());
692
693        fn append_variant_value<B: VariantBuilderExt>(builder: &mut B, value: VariantValue) {
694            match value {
695                VariantValue::Value(v) => builder.append_value(v),
696                VariantValue::List(values) => {
697                    let mut list = builder.new_list();
698                    for v in values {
699                        append_variant_value(&mut list, v);
700                    }
701                    list.finish();
702                }
703                VariantValue::Object(fields) => {
704                    let mut object = builder.new_object();
705                    for (name, value) in fields {
706                        append_variant_field(&mut object, name, value);
707                    }
708                    object.finish();
709                }
710                VariantValue::Null => builder.append_null(),
711            }
712        }
713
714        fn append_variant_field<'a, S: BuilderSpecificState>(
715            object: &mut ObjectBuilder<'_, S>,
716            name: &'a str,
717            value: VariantValue<'a>,
718        ) {
719            match value {
720                VariantValue::Value(v) => {
721                    object.insert(name, v);
722                }
723                VariantValue::List(values) => {
724                    let mut list = object.new_list(name);
725                    for v in values {
726                        append_variant_value(&mut list, v);
727                    }
728                    list.finish();
729                }
730                VariantValue::Object(fields) => {
731                    let mut nested = object.new_object(name);
732                    for (field_name, v) in fields {
733                        append_variant_field(&mut nested, field_name, v);
734                    }
735                    nested.finish();
736                }
737                VariantValue::Null => {
738                    object.insert(name, Variant::Null);
739                }
740            }
741        }
742
743        rows.into_iter().for_each(|row| match row {
744            VariantRow::Value(value) => append_variant_value(&mut builder, value),
745            VariantRow::List(values) => {
746                let mut list = builder.new_list();
747                for value in values {
748                    append_variant_value(&mut list, value);
749                }
750                list.finish();
751            }
752            VariantRow::Object(fields) => {
753                let mut object = builder.new_object();
754                for (name, value) in fields {
755                    append_variant_field(&mut object, name, value);
756                }
757                object.finish();
758            }
759            VariantRow::Null => builder.append_null(),
760        });
761        builder.build()
762    }
763
764    trait TestListLikeArray: ListLikeArray {
765        type OffsetSize: OffsetSizeTrait;
766        fn value_offsets(&self) -> Option<&[Self::OffsetSize]>;
767        fn value_size(&self, index: usize) -> Self::OffsetSize;
768    }
769
770    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListArray<O> {
771        type OffsetSize = O;
772
773        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
774            Some(GenericListArray::value_offsets(self))
775        }
776
777        fn value_size(&self, index: usize) -> Self::OffsetSize {
778            GenericListArray::value_length(self, index)
779        }
780    }
781
782    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListViewArray<O> {
783        type OffsetSize = O;
784
785        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
786            Some(GenericListViewArray::value_offsets(self))
787        }
788
789        fn value_size(&self, index: usize) -> Self::OffsetSize {
790            GenericListViewArray::value_size(self, index)
791        }
792    }
793
794    fn downcast_list_like_array<O: OffsetSizeTrait>(
795        array: &VariantArray,
796    ) -> &dyn TestListLikeArray<OffsetSize = O> {
797        let typed_value = array.typed_value_field().unwrap();
798        if let Some(list) = typed_value.as_any().downcast_ref::<GenericListArray<O>>() {
799            list
800        } else if let Some(list_view) = typed_value
801            .as_any()
802            .downcast_ref::<GenericListViewArray<O>>()
803        {
804            list_view
805        } else {
806            panic!(
807                "Expected list-like typed_value with matching offset type, got {}",
808                typed_value.data_type()
809            );
810        }
811    }
812
813    fn assert_list_structure<O: OffsetSizeTrait>(
814        array: &VariantArray,
815        expected_len: usize,
816        expected_offsets: &[O],
817        expected_sizes: &[Option<O>],
818        expected_fallbacks: &[Option<Variant<'static, 'static>>],
819    ) {
820        assert_eq!(array.len(), expected_len);
821
822        let fallbacks = (array.value_field().unwrap(), Some(array.metadata_field()));
823        let array = downcast_list_like_array::<O>(array);
824
825        assert_eq!(
826            array.value_offsets().unwrap(),
827            expected_offsets,
828            "list offsets mismatch"
829        );
830        assert_eq!(
831            array.len(),
832            expected_sizes.len(),
833            "expected_sizes should match array length"
834        );
835        assert_eq!(
836            array.len(),
837            expected_fallbacks.len(),
838            "expected_fallbacks should match array length"
839        );
840        assert_eq!(
841            array.len(),
842            fallbacks.0.len(),
843            "fallbacks value field should match array length"
844        );
845
846        // Validate per-row shredding outcomes for the list array
847        for (idx, (expected_size, expected_fallback)) in expected_sizes
848            .iter()
849            .zip(expected_fallbacks.iter())
850            .enumerate()
851        {
852            match expected_size {
853                Some(len) => {
854                    // Successfully shredded: typed list value present, no fallback value
855                    assert!(array.is_valid(idx));
856                    assert_eq!(array.value_size(idx), *len);
857                    assert!(fallbacks.0.is_null(idx));
858                }
859                None => {
860                    // Unable to shred: typed list value absent, fallback should carry the variant
861                    assert!(array.is_null(idx));
862                    assert_eq!(array.value_size(idx), O::zero());
863                    match expected_fallback {
864                        Some(expected_variant) => {
865                            assert!(fallbacks.0.is_valid(idx));
866                            let metadata_bytes = fallbacks
867                                .1
868                                .filter(|m| m.is_valid(idx))
869                                .map(|m| m.value(idx))
870                                .filter(|bytes| !bytes.is_empty())
871                                .unwrap_or(EMPTY_VARIANT_METADATA_BYTES);
872                            assert_eq!(
873                                Variant::new(metadata_bytes, fallbacks.0.value(idx)),
874                                expected_variant.clone()
875                            );
876                        }
877                        None => unreachable!(),
878                    }
879                }
880            }
881        }
882    }
883
884    fn assert_list_structure_and_elements<T: ArrowPrimitiveType, O: OffsetSizeTrait>(
885        array: &VariantArray,
886        expected_len: usize,
887        expected_offsets: &[O],
888        expected_sizes: &[Option<O>],
889        expected_fallbacks: &[Option<Variant<'static, 'static>>],
890        expected_shredded_elements: (&[Option<T::Native>], &[Option<Variant<'static, 'static>>]),
891    ) {
892        assert_list_structure(
893            array,
894            expected_len,
895            expected_offsets,
896            expected_sizes,
897            expected_fallbacks,
898        );
899        let array = downcast_list_like_array::<O>(array);
900
901        // Validate the shredded state of list elements (typed values and fallbacks)
902        let (expected_values, expected_fallbacks) = expected_shredded_elements;
903        assert_eq!(
904            expected_values.len(),
905            expected_fallbacks.len(),
906            "expected_values and expected_fallbacks should be aligned"
907        );
908
909        // Validate the shredded primitive values for list elements
910        let element_array = ShreddedVariantFieldArray::try_new(array.values().as_ref()).unwrap();
911        let element_values = element_array
912            .typed_value_field()
913            .unwrap()
914            .as_any()
915            .downcast_ref::<PrimitiveArray<T>>()
916            .unwrap();
917        assert_eq!(element_values.len(), expected_values.len());
918        for (idx, expected_value) in expected_values.iter().enumerate() {
919            match expected_value {
920                Some(value) => {
921                    assert!(element_values.is_valid(idx));
922                    assert_eq!(element_values.value(idx), *value);
923                }
924                None => assert!(element_values.is_null(idx)),
925            }
926        }
927
928        // Validate fallback variants for list elements that could not be shredded
929        let element_fallbacks = element_array.value_field().unwrap();
930        assert_eq!(element_fallbacks.len(), expected_fallbacks.len());
931        for (idx, expected_fallback) in expected_fallbacks.iter().enumerate() {
932            match expected_fallback {
933                Some(expected_variant) => {
934                    assert!(element_fallbacks.is_valid(idx));
935                    assert_eq!(
936                        Variant::new(EMPTY_VARIANT_METADATA_BYTES, element_fallbacks.value(idx)),
937                        expected_variant.clone()
938                    );
939                }
940                None => assert!(element_fallbacks.is_null(idx)),
941            }
942        }
943    }
944
945    #[test]
946    fn test_already_shredded_input_error() {
947        // Create a VariantArray that already has typed_value_field
948        // First create a valid VariantArray, then extract its parts to construct a shredded one
949        let temp_array = VariantArray::from_iter(vec![Some(Variant::from("test"))]);
950        let metadata = temp_array.metadata_field().clone();
951        let value = temp_array.value_field().unwrap().clone();
952        let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef;
953
954        let shredded_array =
955            VariantArray::from_parts(metadata, Some(value), Some(typed_value), None);
956
957        let result = shred_variant(&shredded_array, &DataType::Int64);
958        assert!(matches!(
959            result.unwrap_err(),
960            ArrowError::InvalidArgumentError(_)
961        ));
962    }
963
964    #[test]
965    fn test_all_null_input() {
966        // Create VariantArray with no value field (all null case)
967        let metadata = BinaryViewArray::from_iter_values([&[1u8, 0u8]]); // minimal valid metadata
968        let all_null_array = VariantArray::from_parts(metadata, None, None, None);
969        let result = shred_variant(&all_null_array, &DataType::Int64).unwrap();
970
971        // Should return array with no value/typed_value fields
972        assert!(result.value_field().is_none());
973        assert!(result.typed_value_field().is_none());
974    }
975
976    #[test]
977    fn test_invalid_fixed_size_binary_shredding() {
978        let mock_uuid_1 = Uuid::new_v4();
979
980        let input = VariantArray::from_iter([Some(Variant::from(mock_uuid_1)), None]);
981
982        // shred_variant only supports FixedSizeBinary(16). Any other length will err.
983        let err = shred_variant(&input, &DataType::FixedSizeBinary(17)).unwrap_err();
984
985        assert_eq!(
986            err.to_string(),
987            "Invalid argument error: FixedSizeBinary(17) is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported."
988        );
989    }
990
991    #[test]
992    fn test_uuid_shredding() {
993        let mock_uuid_1 = Uuid::new_v4();
994        let mock_uuid_2 = Uuid::new_v4();
995
996        let input = VariantArray::from_iter([
997            Some(Variant::from(mock_uuid_1)),
998            None,
999            Some(Variant::from(false)),
1000            Some(Variant::from(mock_uuid_2)),
1001        ]);
1002
1003        let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();
1004
1005        // // inspect the typed_value Field and make sure it contains the canonical Uuid extension type
1006        // let typed_value_field = variant_array
1007        //     .inner()
1008        //     .fields()
1009        //     .into_iter()
1010        //     .find(|f| f.name() == "typed_value")
1011        //     .unwrap();
1012
1013        // assert!(
1014        //     typed_value_field
1015        //         .try_extension_type::<extension::Uuid>()
1016        //         .is_ok()
1017        // );
1018
1019        // probe the downcasted typed_value array to make sure uuids are shredded correctly
1020        let uuids = variant_array
1021            .typed_value_field()
1022            .unwrap()
1023            .as_any()
1024            .downcast_ref::<FixedSizeBinaryArray>()
1025            .unwrap();
1026
1027        assert_eq!(uuids.len(), 4);
1028
1029        assert!(!uuids.is_null(0));
1030
1031        let got_uuid_1: &[u8] = uuids.value(0);
1032        assert_eq!(got_uuid_1, mock_uuid_1.as_bytes());
1033
1034        assert!(uuids.is_null(1));
1035        assert!(uuids.is_null(2));
1036
1037        assert!(!uuids.is_null(3));
1038
1039        let got_uuid_2: &[u8] = uuids.value(3);
1040        assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
1041    }
1042
1043    #[test]
1044    fn test_primitive_shredding_comprehensive() {
1045        // Test mixed scenarios in a single array
1046        let input = VariantArray::from_iter(vec![
1047            Some(Variant::from(42i64)),   // successful shred
1048            Some(Variant::from("hello")), // failed shred (string)
1049            Some(Variant::from(100i64)),  // successful shred
1050            None,                         // array-level null
1051            Some(Variant::Null),          // variant null
1052            Some(Variant::from(3i8)),     // successful shred (int8->int64 conversion)
1053        ]);
1054
1055        let result = shred_variant(&input, &DataType::Int64).unwrap();
1056
1057        // Verify structure
1058        let metadata_field = result.metadata_field();
1059        let value_field = result.value_field().unwrap();
1060        let typed_value_field = result
1061            .typed_value_field()
1062            .unwrap()
1063            .as_any()
1064            .downcast_ref::<Int64Array>()
1065            .unwrap();
1066
1067        // Check specific outcomes for each row
1068        assert_eq!(result.len(), 6);
1069
1070        // Row 0: 42 -> should shred successfully
1071        assert!(!result.is_null(0));
1072        assert!(value_field.is_null(0)); // value should be null when shredded
1073        assert!(!typed_value_field.is_null(0));
1074        assert_eq!(typed_value_field.value(0), 42);
1075
1076        // Row 1: "hello" -> should fail to shred
1077        assert!(!result.is_null(1));
1078        assert!(!value_field.is_null(1)); // value should contain original
1079        assert!(typed_value_field.is_null(1)); // typed_value should be null
1080        assert_eq!(
1081            Variant::new(metadata_field.value(1), value_field.value(1)),
1082            Variant::from("hello")
1083        );
1084
1085        // Row 2: 100 -> should shred successfully
1086        assert!(!result.is_null(2));
1087        assert!(value_field.is_null(2));
1088        assert_eq!(typed_value_field.value(2), 100);
1089
1090        // Row 3: array null -> should be null in result
1091        assert!(result.is_null(3));
1092
1093        // Row 4: Variant::Null -> should not shred (it's a null variant, not an integer)
1094        assert!(!result.is_null(4));
1095        assert!(!value_field.is_null(4)); // should contain Variant::Null
1096        assert_eq!(
1097            Variant::new(metadata_field.value(4), value_field.value(4)),
1098            Variant::Null
1099        );
1100        assert!(typed_value_field.is_null(4));
1101
1102        // Row 5: 3i8 -> should shred successfully (int8->int64 conversion)
1103        assert!(!result.is_null(5));
1104        assert!(value_field.is_null(5)); // value should be null when shredded
1105        assert!(!typed_value_field.is_null(5));
1106        assert_eq!(typed_value_field.value(5), 3);
1107    }
1108
1109    #[test]
1110    fn test_primitive_different_target_types() {
1111        let input = VariantArray::from_iter(vec![
1112            Variant::from(42i32),
1113            Variant::from(3.15f64),
1114            Variant::from("not_a_number"),
1115        ]);
1116
1117        // Test Int32 target
1118        let result_int32 = shred_variant(&input, &DataType::Int32).unwrap();
1119        let typed_value_int32 = result_int32
1120            .typed_value_field()
1121            .unwrap()
1122            .as_any()
1123            .downcast_ref::<arrow::array::Int32Array>()
1124            .unwrap();
1125        assert_eq!(typed_value_int32.value(0), 42);
1126        assert!(typed_value_int32.is_null(1)); // float doesn't convert to int32
1127        assert!(typed_value_int32.is_null(2)); // string doesn't convert to int32
1128
1129        // Test Float64 target
1130        let result_float64 = shred_variant(&input, &DataType::Float64).unwrap();
1131        let typed_value_float64 = result_float64
1132            .typed_value_field()
1133            .unwrap()
1134            .as_any()
1135            .downcast_ref::<Float64Array>()
1136            .unwrap();
1137        assert_eq!(typed_value_float64.value(0), 42.0); // int converts to float
1138        assert_eq!(typed_value_float64.value(1), 3.15);
1139        assert!(typed_value_float64.is_null(2)); // string doesn't convert
1140    }
1141
1142    #[test]
1143    fn test_invalid_shredded_types_rejected() {
1144        let input = VariantArray::from_iter([Variant::from(42)]);
1145
1146        let invalid_types = vec![
1147            DataType::UInt8,
1148            DataType::Float16,
1149            DataType::Decimal256(38, 10),
1150            DataType::Date64,
1151            DataType::Time32(TimeUnit::Second),
1152            DataType::Time64(TimeUnit::Nanosecond),
1153            DataType::Timestamp(TimeUnit::Millisecond, None),
1154            DataType::LargeBinary,
1155            DataType::LargeUtf8,
1156            DataType::FixedSizeBinary(17),
1157            DataType::Union(
1158                UnionFields::from_fields(vec![
1159                    Field::new("int_field", DataType::Int32, false),
1160                    Field::new("str_field", DataType::Utf8, true),
1161                ]),
1162                UnionMode::Dense,
1163            ),
1164            DataType::Map(
1165                Arc::new(Field::new(
1166                    "entries",
1167                    DataType::Struct(Fields::from(vec![
1168                        Field::new("key", DataType::Utf8, false),
1169                        Field::new("value", DataType::Int32, true),
1170                    ])),
1171                    false,
1172                )),
1173                false,
1174            ),
1175            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1176            DataType::RunEndEncoded(
1177                Arc::new(Field::new("run_ends", DataType::Int32, false)),
1178                Arc::new(Field::new("values", DataType::Utf8, true)),
1179            ),
1180        ];
1181
1182        for data_type in invalid_types {
1183            let err = shred_variant(&input, &data_type).unwrap_err();
1184            assert!(
1185                matches!(err, ArrowError::InvalidArgumentError(_)),
1186                "expected InvalidArgumentError for {:?}, got {:?}",
1187                data_type,
1188                err
1189            );
1190        }
1191    }
1192
1193    #[test]
1194    fn test_array_shredding_as_list() {
1195        let input = build_variant_array(vec![
1196            // Row 0: List of ints should shred entirely into typed_value
1197            VariantRow::List(vec![
1198                VariantValue::from(1i64),
1199                VariantValue::from(2i64),
1200                VariantValue::from(3i64),
1201            ]),
1202            // Row 1: Contains incompatible types so values fall back
1203            VariantRow::List(vec![
1204                VariantValue::from(1i64),
1205                VariantValue::from("two"),
1206                VariantValue::from(Variant::Null),
1207            ]),
1208            // Row 2: Not a list -> entire row falls back
1209            VariantRow::Value(VariantValue::from("not a list")),
1210            // Row 3: Array-level null propagates
1211            VariantRow::Null,
1212            // Row 4: Empty list exercises zero-length offsets
1213            VariantRow::List(vec![]),
1214        ]);
1215        let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1216        let result = shred_variant(&input, &list_schema).unwrap();
1217        assert_eq!(result.len(), 5);
1218
1219        assert_list_structure_and_elements::<Int64Type, i32>(
1220            &result,
1221            5,
1222            &[0, 3, 6, 6, 6, 6],
1223            &[Some(3), Some(3), None, None, Some(0)],
1224            &[
1225                None,
1226                None,
1227                Some(Variant::from("not a list")),
1228                Some(Variant::Null),
1229                None,
1230            ],
1231            (
1232                &[Some(1), Some(2), Some(3), Some(1), None, None],
1233                &[
1234                    None,
1235                    None,
1236                    None,
1237                    None,
1238                    Some(Variant::from("two")),
1239                    Some(Variant::Null),
1240                ],
1241            ),
1242        );
1243    }
1244
1245    #[test]
1246    fn test_array_shredding_as_large_list() {
1247        let input = build_variant_array(vec![
1248            // Row 0: List of ints shreds to typed_value
1249            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1250            // Row 1: Not a list -> entire row falls back
1251            VariantRow::Value(VariantValue::from("not a list")),
1252            // Row 2: Empty list
1253            VariantRow::List(vec![]),
1254        ]);
1255        let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true)));
1256        let result = shred_variant(&input, &list_schema).unwrap();
1257        assert_eq!(result.len(), 3);
1258
1259        assert_list_structure_and_elements::<Int64Type, i64>(
1260            &result,
1261            3,
1262            &[0, 2, 2, 2],
1263            &[Some(2), None, Some(0)],
1264            &[None, Some(Variant::from("not a list")), None],
1265            (&[Some(1), Some(2)], &[None, None]),
1266        );
1267    }
1268
1269    #[test]
1270    fn test_array_shredding_as_list_view() {
1271        let input = build_variant_array(vec![
1272            // Row 0: Standard list
1273            VariantRow::List(vec![
1274                VariantValue::from(1i64),
1275                VariantValue::from(2i64),
1276                VariantValue::from(3i64),
1277            ]),
1278            // Row 1: List with incompatible types -> element fallback
1279            VariantRow::List(vec![
1280                VariantValue::from(1i64),
1281                VariantValue::from("two"),
1282                VariantValue::from(Variant::Null),
1283            ]),
1284            // Row 2: Not a list -> top-level fallback
1285            VariantRow::Value(VariantValue::from("not a list")),
1286            // Row 3: Top-level Null
1287            VariantRow::Null,
1288            // Row 4: Empty list
1289            VariantRow::List(vec![]),
1290        ]);
1291        let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Int64, true)));
1292        let result = shred_variant(&input, &list_schema).unwrap();
1293        assert_eq!(result.len(), 5);
1294
1295        assert_list_structure_and_elements::<Int64Type, i32>(
1296            &result,
1297            5,
1298            &[0, 3, 6, 6, 6],
1299            &[Some(3), Some(3), None, None, Some(0)],
1300            &[
1301                None,
1302                None,
1303                Some(Variant::from("not a list")),
1304                Some(Variant::Null),
1305                None,
1306            ],
1307            (
1308                &[Some(1), Some(2), Some(3), Some(1), None, None],
1309                &[
1310                    None,
1311                    None,
1312                    None,
1313                    None,
1314                    Some(Variant::from("two")),
1315                    Some(Variant::Null),
1316                ],
1317            ),
1318        );
1319    }
1320
1321    #[test]
1322    fn test_array_shredding_as_large_list_view() {
1323        let input = build_variant_array(vec![
1324            // Row 0: List of ints shreds to typed_value
1325            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1326            // Row 1: Not a list -> entire row falls back
1327            VariantRow::Value(VariantValue::from("fallback")),
1328            // Row 2: Empty list
1329            VariantRow::List(vec![]),
1330        ]);
1331        let list_schema =
1332            DataType::LargeListView(Arc::new(Field::new("item", DataType::Int64, true)));
1333        let result = shred_variant(&input, &list_schema).unwrap();
1334        assert_eq!(result.len(), 3);
1335
1336        assert_list_structure_and_elements::<Int64Type, i64>(
1337            &result,
1338            3,
1339            &[0, 2, 2],
1340            &[Some(2), None, Some(0)],
1341            &[None, Some(Variant::from("fallback")), None],
1342            (&[Some(1), Some(2)], &[None, None]),
1343        );
1344    }
1345
1346    #[test]
1347    fn test_array_shredding_as_fixed_size_list() {
1348        let input = build_variant_array(vec![VariantRow::List(vec![
1349            VariantValue::from(1i64),
1350            VariantValue::from(2i64),
1351            VariantValue::from(3i64),
1352        ])]);
1353        let list_schema =
1354            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
1355        let err = shred_variant(&input, &list_schema).unwrap_err();
1356        assert_eq!(
1357            err.to_string(),
1358            "Not yet implemented: Converting unshredded variant arrays to arrow fixed-size lists"
1359        );
1360    }
1361
1362    #[test]
1363    fn test_array_shredding_with_array_elements() {
1364        let input = build_variant_array(vec![
1365            // Row 0: [[1, 2], [3, 4], []] - clean nested lists
1366            VariantRow::List(vec![
1367                VariantValue::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1368                VariantValue::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]),
1369                VariantValue::List(vec![]),
1370            ]),
1371            // Row 1: [[5, "bad", null], "not a list inner", null] - inner fallbacks
1372            VariantRow::List(vec![
1373                VariantValue::List(vec![
1374                    VariantValue::from(5i64),
1375                    VariantValue::from("bad"),
1376                    VariantValue::from(Variant::Null),
1377                ]),
1378                VariantValue::from("not a list inner"),
1379                VariantValue::Null,
1380            ]),
1381            // Row 2: "not a list" - top-level fallback
1382            VariantRow::Value(VariantValue::from("not a list")),
1383            // Row 3: null row
1384            VariantRow::Null,
1385        ]);
1386        let inner_field = Arc::new(Field::new("item", DataType::Int64, true));
1387        let inner_list_schema = DataType::List(inner_field);
1388        let list_schema = DataType::List(Arc::new(Field::new(
1389            "item",
1390            inner_list_schema.clone(),
1391            true,
1392        )));
1393        let result = shred_variant(&input, &list_schema).unwrap();
1394        assert_eq!(result.len(), 4);
1395
1396        let typed_value = result
1397            .typed_value_field()
1398            .unwrap()
1399            .as_any()
1400            .downcast_ref::<ListArray>()
1401            .unwrap();
1402
1403        assert_list_structure::<i32>(
1404            &result,
1405            4,
1406            &[0, 3, 6, 6, 6],
1407            &[Some(3), Some(3), None, None],
1408            &[
1409                None,
1410                None,
1411                Some(Variant::from("not a list")),
1412                Some(Variant::Null),
1413            ],
1414        );
1415
1416        let outer_elements =
1417            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1418        assert_eq!(outer_elements.len(), 6);
1419        let outer_values = outer_elements
1420            .typed_value_field()
1421            .unwrap()
1422            .as_any()
1423            .downcast_ref::<ListArray>()
1424            .unwrap();
1425        let outer_fallbacks = outer_elements.value_field().unwrap();
1426
1427        let outer_metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n(
1428            EMPTY_VARIANT_METADATA_BYTES,
1429            outer_elements.len(),
1430        ));
1431        let outer_variant = VariantArray::from_parts(
1432            outer_metadata,
1433            Some(outer_fallbacks.clone()),
1434            Some(Arc::new(outer_values.clone())),
1435            None,
1436        );
1437
1438        assert_list_structure_and_elements::<Int64Type, i32>(
1439            &outer_variant,
1440            outer_elements.len(),
1441            &[0, 2, 4, 4, 7, 7, 7],
1442            &[Some(2), Some(2), Some(0), Some(3), None, None],
1443            &[
1444                None,
1445                None,
1446                None,
1447                None,
1448                Some(Variant::from("not a list inner")),
1449                Some(Variant::Null),
1450            ],
1451            (
1452                &[Some(1), Some(2), Some(3), Some(4), Some(5), None, None],
1453                &[
1454                    None,
1455                    None,
1456                    None,
1457                    None,
1458                    None,
1459                    Some(Variant::from("bad")),
1460                    Some(Variant::Null),
1461                ],
1462            ),
1463        );
1464    }
1465
1466    #[test]
1467    fn test_array_shredding_with_object_elements() {
1468        let input = build_variant_array(vec![
1469            // Row 0: [{"id": 1, "name": "Alice"}, {"id": null}] fully shards
1470            VariantRow::List(vec![
1471                VariantValue::Object(vec![
1472                    ("id", VariantValue::from(1i64)),
1473                    ("name", VariantValue::from("Alice")),
1474                ]),
1475                VariantValue::Object(vec![("id", VariantValue::from(Variant::Null))]),
1476            ]),
1477            // Row 1: "not a list" -> fallback
1478            VariantRow::Value(VariantValue::from("not a list")),
1479            // Row 2: Null row
1480            VariantRow::Null,
1481        ]);
1482
1483        // Target schema is List<Struct<id:int64,name:utf8>>
1484        let object_fields = Fields::from(vec![
1485            Field::new("id", DataType::Int64, true),
1486            Field::new("name", DataType::Utf8, true),
1487        ]);
1488        let list_schema = DataType::List(Arc::new(Field::new(
1489            "item",
1490            DataType::Struct(object_fields),
1491            true,
1492        )));
1493        let result = shred_variant(&input, &list_schema).unwrap();
1494        assert_eq!(result.len(), 3);
1495
1496        assert_list_structure::<i32>(
1497            &result,
1498            3,
1499            &[0, 2, 2, 2],
1500            &[Some(2), None, None],
1501            &[None, Some(Variant::from("not a list")), Some(Variant::Null)],
1502        );
1503
1504        // Validate nested struct fields for each element
1505        let typed_value = result
1506            .typed_value_field()
1507            .unwrap()
1508            .as_any()
1509            .downcast_ref::<ListArray>()
1510            .unwrap();
1511        let element_array =
1512            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1513        assert_eq!(element_array.len(), 2);
1514        let element_objects = element_array
1515            .typed_value_field()
1516            .unwrap()
1517            .as_any()
1518            .downcast_ref::<arrow::array::StructArray>()
1519            .unwrap();
1520
1521        // Id field [1, Variant::Null]
1522        let id_field =
1523            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("id").unwrap())
1524                .unwrap();
1525        let id_values = id_field.value_field().unwrap();
1526        let id_typed_values = id_field
1527            .typed_value_field()
1528            .unwrap()
1529            .as_any()
1530            .downcast_ref::<Int64Array>()
1531            .unwrap();
1532        assert!(id_values.is_null(0));
1533        assert_eq!(id_typed_values.value(0), 1);
1534        // null is stored as Variant::Null in values
1535        assert!(id_values.is_valid(1));
1536        assert_eq!(
1537            Variant::new(EMPTY_VARIANT_METADATA_BYTES, id_values.value(1)),
1538            Variant::Null
1539        );
1540        assert!(id_typed_values.is_null(1));
1541
1542        // Name field ["Alice", null]
1543        let name_field =
1544            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("name").unwrap())
1545                .unwrap();
1546        let name_values = name_field.value_field().unwrap();
1547        let name_typed_values = name_field
1548            .typed_value_field()
1549            .unwrap()
1550            .as_any()
1551            .downcast_ref::<StringArray>()
1552            .unwrap();
1553        assert!(name_values.is_null(0));
1554        assert_eq!(name_typed_values.value(0), "Alice");
1555        // No value provided, both value and typed_value are null
1556        assert!(name_values.is_null(1));
1557        assert!(name_typed_values.is_null(1));
1558    }
1559
1560    #[test]
1561    fn test_object_shredding_comprehensive() {
1562        let input = build_variant_array(vec![
1563            // Row 0: Fully shredded object
1564            VariantRow::Object(vec![
1565                ("score", VariantValue::from(95.5f64)),
1566                ("age", VariantValue::from(30i64)),
1567            ]),
1568            // Row 1: Partially shredded object (extra email field)
1569            VariantRow::Object(vec![
1570                ("score", VariantValue::from(87.2f64)),
1571                ("age", VariantValue::from(25i64)),
1572                ("email", VariantValue::from("bob@example.com")),
1573            ]),
1574            // Row 2: Missing field (no score)
1575            VariantRow::Object(vec![("age", VariantValue::from(35i64))]),
1576            // Row 3: Type mismatch (score is string, age is string)
1577            VariantRow::Object(vec![
1578                ("score", VariantValue::from("ninety-five")),
1579                ("age", VariantValue::from("thirty")),
1580            ]),
1581            // Row 4: Non-object
1582            VariantRow::Value(VariantValue::from("not an object")),
1583            // Row 5: Empty object
1584            VariantRow::Object(vec![]),
1585            // Row 6: Null
1586            VariantRow::Null,
1587            // Row 7: Object with only "wrong" fields
1588            VariantRow::Object(vec![("foo", VariantValue::from(10))]),
1589            // Row 8: Object with one "right" and one "wrong" field
1590            VariantRow::Object(vec![
1591                ("score", VariantValue::from(66.67f64)),
1592                ("foo", VariantValue::from(10)),
1593            ]),
1594        ]);
1595
1596        // Create target schema: struct<score: float64, age: int64>
1597        // Both types are supported for shredding
1598        let target_schema = ShreddedSchemaBuilder::default()
1599            .with_path("score", &DataType::Float64)
1600            .with_path("age", &DataType::Int64)
1601            .build();
1602
1603        let result = shred_variant(&input, &target_schema).unwrap();
1604
1605        // Verify structure
1606        assert!(result.value_field().is_some());
1607        assert!(result.typed_value_field().is_some());
1608        assert_eq!(result.len(), 9);
1609
1610        let metadata = result.metadata_field();
1611
1612        let value = result.value_field().unwrap();
1613        let typed_value = result
1614            .typed_value_field()
1615            .unwrap()
1616            .as_any()
1617            .downcast_ref::<arrow::array::StructArray>()
1618            .unwrap();
1619
1620        // Extract score and age fields from typed_value struct
1621        let score_field =
1622            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("score").unwrap())
1623                .unwrap();
1624        let age_field =
1625            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("age").unwrap()).unwrap();
1626
1627        let score_value = score_field
1628            .value_field()
1629            .unwrap()
1630            .as_any()
1631            .downcast_ref::<BinaryViewArray>()
1632            .unwrap();
1633        let score_typed_value = score_field
1634            .typed_value_field()
1635            .unwrap()
1636            .as_any()
1637            .downcast_ref::<Float64Array>()
1638            .unwrap();
1639        let age_value = age_field
1640            .value_field()
1641            .unwrap()
1642            .as_any()
1643            .downcast_ref::<BinaryViewArray>()
1644            .unwrap();
1645        let age_typed_value = age_field
1646            .typed_value_field()
1647            .unwrap()
1648            .as_any()
1649            .downcast_ref::<Int64Array>()
1650            .unwrap();
1651
1652        // Set up exhaustive checking of all shredded columns and their nulls/values
1653        struct ShreddedValue<'m, 'v, T> {
1654            value: Option<Variant<'m, 'v>>,
1655            typed_value: Option<T>,
1656        }
1657        struct ShreddedStruct<'m, 'v> {
1658            score: ShreddedValue<'m, 'v, f64>,
1659            age: ShreddedValue<'m, 'v, i64>,
1660        }
1661        fn get_value<'m, 'v>(
1662            i: usize,
1663            metadata: &'m BinaryViewArray,
1664            value: &'v BinaryViewArray,
1665        ) -> Variant<'m, 'v> {
1666            Variant::new(metadata.value(i), value.value(i))
1667        }
1668        let expect = |i, expected_result: Option<ShreddedValue<ShreddedStruct>>| {
1669            match expected_result {
1670                Some(ShreddedValue {
1671                    value: expected_value,
1672                    typed_value: expected_typed_value,
1673                }) => {
1674                    assert!(result.is_valid(i));
1675                    match expected_value {
1676                        Some(expected_value) => {
1677                            assert!(value.is_valid(i));
1678                            assert_eq!(expected_value, get_value(i, metadata, value));
1679                        }
1680                        None => {
1681                            assert!(value.is_null(i));
1682                        }
1683                    }
1684                    match expected_typed_value {
1685                        Some(ShreddedStruct {
1686                            score: expected_score,
1687                            age: expected_age,
1688                        }) => {
1689                            assert!(typed_value.is_valid(i));
1690                            assert!(score_field.is_valid(i)); // non-nullable
1691                            assert!(age_field.is_valid(i)); // non-nullable
1692                            match expected_score.value {
1693                                Some(expected_score_value) => {
1694                                    assert!(score_value.is_valid(i));
1695                                    assert_eq!(
1696                                        expected_score_value,
1697                                        get_value(i, metadata, score_value)
1698                                    );
1699                                }
1700                                None => {
1701                                    assert!(score_value.is_null(i));
1702                                }
1703                            }
1704                            match expected_score.typed_value {
1705                                Some(expected_score) => {
1706                                    assert!(score_typed_value.is_valid(i));
1707                                    assert_eq!(expected_score, score_typed_value.value(i));
1708                                }
1709                                None => {
1710                                    assert!(score_typed_value.is_null(i));
1711                                }
1712                            }
1713                            match expected_age.value {
1714                                Some(expected_age_value) => {
1715                                    assert!(age_value.is_valid(i));
1716                                    assert_eq!(
1717                                        expected_age_value,
1718                                        get_value(i, metadata, age_value)
1719                                    );
1720                                }
1721                                None => {
1722                                    assert!(age_value.is_null(i));
1723                                }
1724                            }
1725                            match expected_age.typed_value {
1726                                Some(expected_age) => {
1727                                    assert!(age_typed_value.is_valid(i));
1728                                    assert_eq!(expected_age, age_typed_value.value(i));
1729                                }
1730                                None => {
1731                                    assert!(age_typed_value.is_null(i));
1732                                }
1733                            }
1734                        }
1735                        None => {
1736                            assert!(typed_value.is_null(i));
1737                        }
1738                    }
1739                }
1740                None => {
1741                    assert!(result.is_null(i));
1742                }
1743            };
1744        };
1745
1746        // Row 0: Fully shredded - both fields shred successfully
1747        expect(
1748            0,
1749            Some(ShreddedValue {
1750                value: None,
1751                typed_value: Some(ShreddedStruct {
1752                    score: ShreddedValue {
1753                        value: None,
1754                        typed_value: Some(95.5),
1755                    },
1756                    age: ShreddedValue {
1757                        value: None,
1758                        typed_value: Some(30),
1759                    },
1760                }),
1761            }),
1762        );
1763
1764        // Row 1: Partially shredded - value contains extra email field
1765        let mut builder = VariantBuilder::new();
1766        builder
1767            .new_object()
1768            .with_field("email", "bob@example.com")
1769            .finish();
1770        let (m, v) = builder.finish();
1771        let expected_value = Variant::new(&m, &v);
1772
1773        expect(
1774            1,
1775            Some(ShreddedValue {
1776                value: Some(expected_value),
1777                typed_value: Some(ShreddedStruct {
1778                    score: ShreddedValue {
1779                        value: None,
1780                        typed_value: Some(87.2),
1781                    },
1782                    age: ShreddedValue {
1783                        value: None,
1784                        typed_value: Some(25),
1785                    },
1786                }),
1787            }),
1788        );
1789
1790        // Row 2: Fully shredded -- missing score field
1791        expect(
1792            2,
1793            Some(ShreddedValue {
1794                value: None,
1795                typed_value: Some(ShreddedStruct {
1796                    score: ShreddedValue {
1797                        value: None,
1798                        typed_value: None,
1799                    },
1800                    age: ShreddedValue {
1801                        value: None,
1802                        typed_value: Some(35),
1803                    },
1804                }),
1805            }),
1806        );
1807
1808        // Row 3: Type mismatches - both score and age are strings
1809        expect(
1810            3,
1811            Some(ShreddedValue {
1812                value: None,
1813                typed_value: Some(ShreddedStruct {
1814                    score: ShreddedValue {
1815                        value: Some(Variant::from("ninety-five")),
1816                        typed_value: None,
1817                    },
1818                    age: ShreddedValue {
1819                        value: Some(Variant::from("thirty")),
1820                        typed_value: None,
1821                    },
1822                }),
1823            }),
1824        );
1825
1826        // Row 4: Non-object - falls back to value field
1827        expect(
1828            4,
1829            Some(ShreddedValue {
1830                value: Some(Variant::from("not an object")),
1831                typed_value: None,
1832            }),
1833        );
1834
1835        // Row 5: Empty object
1836        expect(
1837            5,
1838            Some(ShreddedValue {
1839                value: None,
1840                typed_value: Some(ShreddedStruct {
1841                    score: ShreddedValue {
1842                        value: None,
1843                        typed_value: None,
1844                    },
1845                    age: ShreddedValue {
1846                        value: None,
1847                        typed_value: None,
1848                    },
1849                }),
1850            }),
1851        );
1852
1853        // Row 6: Null
1854        expect(6, None);
1855
1856        // Helper to correctly create a variant object using a row's existing metadata
1857        let object_with_foo_field = |i| {
1858            use parquet_variant::{ParentState, ValueBuilder, VariantMetadata};
1859            let metadata = VariantMetadata::new(metadata.value(i));
1860            let mut metadata_builder = ReadOnlyMetadataBuilder::new(&metadata);
1861            let mut value_builder = ValueBuilder::new();
1862            let state = ParentState::variant(&mut value_builder, &mut metadata_builder);
1863            ObjectBuilder::new(state, false)
1864                .with_field("foo", 10)
1865                .finish();
1866            (metadata, value_builder.into_inner())
1867        };
1868
1869        // Row 7: Object with only a "wrong" field
1870        let (m, v) = object_with_foo_field(7);
1871        expect(
1872            7,
1873            Some(ShreddedValue {
1874                value: Some(Variant::new_with_metadata(m, &v)),
1875                typed_value: Some(ShreddedStruct {
1876                    score: ShreddedValue {
1877                        value: None,
1878                        typed_value: None,
1879                    },
1880                    age: ShreddedValue {
1881                        value: None,
1882                        typed_value: None,
1883                    },
1884                }),
1885            }),
1886        );
1887
1888        // Row 8: Object with one "wrong" and one "right" field
1889        let (m, v) = object_with_foo_field(8);
1890        expect(
1891            8,
1892            Some(ShreddedValue {
1893                value: Some(Variant::new_with_metadata(m, &v)),
1894                typed_value: Some(ShreddedStruct {
1895                    score: ShreddedValue {
1896                        value: None,
1897                        typed_value: Some(66.67),
1898                    },
1899                    age: ShreddedValue {
1900                        value: None,
1901                        typed_value: None,
1902                    },
1903                }),
1904            }),
1905        );
1906    }
1907
1908    #[test]
1909    fn test_object_shredding_with_array_field() {
1910        let input = build_variant_array(vec![
1911            // Row 0: Object with well-typed scores list
1912            VariantRow::Object(vec![(
1913                "scores",
1914                VariantValue::List(vec![VariantValue::from(10i64), VariantValue::from(20i64)]),
1915            )]),
1916            // Row 1: Object whose scores list contains incompatible type
1917            VariantRow::Object(vec![(
1918                "scores",
1919                VariantValue::List(vec![
1920                    VariantValue::from("oops"),
1921                    VariantValue::from(Variant::Null),
1922                ]),
1923            )]),
1924            // Row 2: Object missing the scores field entirely
1925            VariantRow::Object(vec![]),
1926            // Row 3: Non-object fallback
1927            VariantRow::Value(VariantValue::from("not an object")),
1928            // Row 4: Top-level Null
1929            VariantRow::Null,
1930        ]);
1931        let list_field = Arc::new(Field::new("item", DataType::Int64, true));
1932        let inner_list_schema = DataType::List(list_field);
1933        let schema = DataType::Struct(Fields::from(vec![Field::new(
1934            "scores",
1935            inner_list_schema.clone(),
1936            true,
1937        )]));
1938
1939        let result = shred_variant(&input, &schema).unwrap();
1940        assert_eq!(result.len(), 5);
1941
1942        // Access base value/typed_value columns
1943        let value_field = result.value_field().unwrap();
1944        let typed_struct = result
1945            .typed_value_field()
1946            .unwrap()
1947            .as_any()
1948            .downcast_ref::<arrow::array::StructArray>()
1949            .unwrap();
1950
1951        // Validate base value fallbacks for non-object rows
1952        assert!(value_field.is_null(0));
1953        assert!(value_field.is_null(1));
1954        assert!(value_field.is_null(2));
1955        assert!(value_field.is_valid(3));
1956        assert_eq!(
1957            Variant::new(result.metadata_field().value(3), value_field.value(3)),
1958            Variant::from("not an object")
1959        );
1960        assert!(value_field.is_null(4));
1961
1962        // Typed struct should only be null for the fallback row
1963        assert!(typed_struct.is_valid(0));
1964        assert!(typed_struct.is_valid(1));
1965        assert!(typed_struct.is_valid(2));
1966        assert!(typed_struct.is_null(3));
1967        assert!(typed_struct.is_null(4));
1968
1969        // Drill into the scores field on the typed struct
1970        let scores_field =
1971            ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("scores").unwrap())
1972                .unwrap();
1973        assert_list_structure_and_elements::<Int64Type, i32>(
1974            &VariantArray::from_parts(
1975                BinaryViewArray::from_iter_values(std::iter::repeat_n(
1976                    EMPTY_VARIANT_METADATA_BYTES,
1977                    scores_field.len(),
1978                )),
1979                Some(scores_field.value_field().unwrap().clone()),
1980                Some(scores_field.typed_value_field().unwrap().clone()),
1981                None,
1982            ),
1983            scores_field.len(),
1984            &[0i32, 2, 4, 4, 4, 4],
1985            &[Some(2), Some(2), None, None, None],
1986            &[
1987                None,
1988                None,
1989                Some(Variant::Null),
1990                Some(Variant::Null),
1991                Some(Variant::Null),
1992            ],
1993            (
1994                &[Some(10), Some(20), None, None],
1995                &[None, None, Some(Variant::from("oops")), Some(Variant::Null)],
1996            ),
1997        );
1998    }
1999
2000    #[test]
2001    fn test_object_different_schemas() {
2002        // Create object with multiple fields
2003        let input = build_variant_array(vec![VariantRow::Object(vec![
2004            ("id", VariantValue::from(123i32)),
2005            ("age", VariantValue::from(25i64)),
2006            ("score", VariantValue::from(95.5f64)),
2007        ])]);
2008
2009        // Test with schema containing only id field
2010        let schema1 = ShreddedSchemaBuilder::default()
2011            .with_path("id", &DataType::Int32)
2012            .build();
2013        let result1 = shred_variant(&input, &schema1).unwrap();
2014        let value_field1 = result1.value_field().unwrap();
2015        assert!(!value_field1.is_null(0)); // should contain {"age": 25, "score": 95.5}
2016
2017        // Test with schema containing id and age fields
2018        let schema2 = ShreddedSchemaBuilder::default()
2019            .with_path("id", &DataType::Int32)
2020            .with_path("age", &DataType::Int64)
2021            .build();
2022        let result2 = shred_variant(&input, &schema2).unwrap();
2023        let value_field2 = result2.value_field().unwrap();
2024        assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
2025
2026        // Test with schema containing all fields
2027        let schema3 = ShreddedSchemaBuilder::default()
2028            .with_path("id", &DataType::Int32)
2029            .with_path("age", &DataType::Int64)
2030            .with_path("score", &DataType::Float64)
2031            .build();
2032        let result3 = shred_variant(&input, &schema3).unwrap();
2033        let value_field3 = result3.value_field().unwrap();
2034        assert!(value_field3.is_null(0)); // fully shredded, no remaining fields
2035    }
2036
2037    #[test]
2038    fn test_uuid_shredding_in_objects() {
2039        let mock_uuid_1 = Uuid::new_v4();
2040        let mock_uuid_2 = Uuid::new_v4();
2041        let mock_uuid_3 = Uuid::new_v4();
2042
2043        let input = build_variant_array(vec![
2044            // Row 0: Fully shredded object with both UUID fields
2045            VariantRow::Object(vec![
2046                ("id", VariantValue::from(mock_uuid_1)),
2047                ("session_id", VariantValue::from(mock_uuid_2)),
2048            ]),
2049            // Row 1: Partially shredded object - UUID fields plus extra field
2050            VariantRow::Object(vec![
2051                ("id", VariantValue::from(mock_uuid_2)),
2052                ("session_id", VariantValue::from(mock_uuid_3)),
2053                ("name", VariantValue::from("test_user")),
2054            ]),
2055            // Row 2: Missing UUID field (no session_id)
2056            VariantRow::Object(vec![("id", VariantValue::from(mock_uuid_1))]),
2057            // Row 3: Type mismatch - id is UUID but session_id is a string
2058            VariantRow::Object(vec![
2059                ("id", VariantValue::from(mock_uuid_3)),
2060                ("session_id", VariantValue::from("not-a-uuid")),
2061            ]),
2062            // Row 4: Object with non-UUID value in id field
2063            VariantRow::Object(vec![
2064                ("id", VariantValue::from(12345i64)),
2065                ("session_id", VariantValue::from(mock_uuid_1)),
2066            ]),
2067            // Row 5: Null
2068            VariantRow::Null,
2069        ]);
2070
2071        let target_schema = ShreddedSchemaBuilder::default()
2072            .with_path("id", DataType::FixedSizeBinary(16))
2073            .with_path("session_id", DataType::FixedSizeBinary(16))
2074            .build();
2075
2076        let result = shred_variant(&input, &target_schema).unwrap();
2077
2078        assert!(result.value_field().is_some());
2079        assert!(result.typed_value_field().is_some());
2080        assert_eq!(result.len(), 6);
2081
2082        let metadata = result.metadata_field();
2083        let value = result.value_field().unwrap();
2084        let typed_value = result
2085            .typed_value_field()
2086            .unwrap()
2087            .as_any()
2088            .downcast_ref::<arrow::array::StructArray>()
2089            .unwrap();
2090
2091        // Extract id and session_id fields from typed_value struct
2092        let id_field =
2093            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("id").unwrap()).unwrap();
2094        let session_id_field =
2095            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("session_id").unwrap())
2096                .unwrap();
2097
2098        let id_value = id_field
2099            .value_field()
2100            .unwrap()
2101            .as_any()
2102            .downcast_ref::<BinaryViewArray>()
2103            .unwrap();
2104        let id_typed_value = id_field
2105            .typed_value_field()
2106            .unwrap()
2107            .as_any()
2108            .downcast_ref::<FixedSizeBinaryArray>()
2109            .unwrap();
2110        let session_id_value = session_id_field
2111            .value_field()
2112            .unwrap()
2113            .as_any()
2114            .downcast_ref::<BinaryViewArray>()
2115            .unwrap();
2116        let session_id_typed_value = session_id_field
2117            .typed_value_field()
2118            .unwrap()
2119            .as_any()
2120            .downcast_ref::<FixedSizeBinaryArray>()
2121            .unwrap();
2122
2123        // Row 0: Fully shredded - both UUID fields shred successfully
2124        assert!(result.is_valid(0));
2125
2126        assert!(value.is_null(0)); // fully shredded, no remaining fields
2127        assert!(id_value.is_null(0));
2128        assert!(session_id_value.is_null(0));
2129
2130        assert!(typed_value.is_valid(0));
2131        assert!(id_typed_value.is_valid(0));
2132        assert!(session_id_typed_value.is_valid(0));
2133
2134        assert_eq!(id_typed_value.value(0), mock_uuid_1.as_bytes());
2135        assert_eq!(session_id_typed_value.value(0), mock_uuid_2.as_bytes());
2136
2137        // Row 1: Partially shredded - value contains extra name field
2138        assert!(result.is_valid(1));
2139
2140        assert!(value.is_valid(1)); // contains unshredded "name" field
2141        assert!(typed_value.is_valid(1));
2142
2143        assert!(id_value.is_null(1));
2144        assert!(id_typed_value.is_valid(1));
2145        assert_eq!(id_typed_value.value(1), mock_uuid_2.as_bytes());
2146
2147        assert!(session_id_value.is_null(1));
2148        assert!(session_id_typed_value.is_valid(1));
2149        assert_eq!(session_id_typed_value.value(1), mock_uuid_3.as_bytes());
2150
2151        // Verify the value field contains the name field
2152        let row_1_variant = Variant::new(metadata.value(1), value.value(1));
2153        let Variant::Object(obj) = row_1_variant else {
2154            panic!("Expected object");
2155        };
2156
2157        assert_eq!(obj.get("name"), Some(Variant::from("test_user")));
2158
2159        // Row 2: Missing session_id field
2160        assert!(result.is_valid(2));
2161
2162        assert!(value.is_null(2)); // fully shredded, no extra fields
2163        assert!(typed_value.is_valid(2));
2164
2165        assert!(id_value.is_null(2));
2166        assert!(id_typed_value.is_valid(2));
2167        assert_eq!(id_typed_value.value(2), mock_uuid_1.as_bytes());
2168
2169        assert!(session_id_value.is_null(2));
2170        assert!(session_id_typed_value.is_null(2)); // missing field
2171
2172        // Row 3: Type mismatch - session_id is a string, not UUID
2173        assert!(result.is_valid(3));
2174
2175        assert!(value.is_null(3)); // no extra fields
2176        assert!(typed_value.is_valid(3));
2177
2178        assert!(id_value.is_null(3));
2179        assert!(id_typed_value.is_valid(3));
2180        assert_eq!(id_typed_value.value(3), mock_uuid_3.as_bytes());
2181
2182        assert!(session_id_value.is_valid(3)); // type mismatch, stored in value
2183        assert!(session_id_typed_value.is_null(3));
2184        let session_id_variant = Variant::new(metadata.value(3), session_id_value.value(3));
2185        assert_eq!(session_id_variant, Variant::from("not-a-uuid"));
2186
2187        // Row 4: Type mismatch - id is int64, not UUID
2188        assert!(result.is_valid(4));
2189
2190        assert!(value.is_null(4)); // no extra fields
2191        assert!(typed_value.is_valid(4));
2192
2193        assert!(id_value.is_valid(4)); // type mismatch, stored in value
2194        assert!(id_typed_value.is_null(4));
2195        let id_variant = Variant::new(metadata.value(4), id_value.value(4));
2196        assert_eq!(id_variant, Variant::from(12345i64));
2197
2198        assert!(session_id_value.is_null(4));
2199        assert!(session_id_typed_value.is_valid(4));
2200        assert_eq!(session_id_typed_value.value(4), mock_uuid_1.as_bytes());
2201
2202        // Row 5: Null
2203        assert!(result.is_null(5));
2204    }
2205
2206    #[test]
2207    fn test_spec_compliance() {
2208        let input = VariantArray::from_iter(vec![Variant::from(42i64), Variant::from("hello")]);
2209
2210        let result = shred_variant(&input, &DataType::Int64).unwrap();
2211
2212        // Test field access by name (not position)
2213        let inner_struct = result.inner();
2214        assert!(inner_struct.column_by_name("metadata").is_some());
2215        assert!(inner_struct.column_by_name("value").is_some());
2216        assert!(inner_struct.column_by_name("typed_value").is_some());
2217
2218        // Test metadata preservation
2219        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2220        // The metadata should be the same reference (cheap clone)
2221        // Note: BinaryViewArray doesn't have a .values() method, so we compare the arrays directly
2222        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2223
2224        // Test output structure correctness
2225        assert_eq!(result.len(), input.len());
2226        assert!(result.value_field().is_some());
2227        assert!(result.typed_value_field().is_some());
2228
2229        // For primitive shredding, verify that value and typed_value are never both non-null
2230        // (This rule applies to primitives; for objects, both can be non-null for partial shredding)
2231        let value_field = result.value_field().unwrap();
2232        let typed_value_field = result
2233            .typed_value_field()
2234            .unwrap()
2235            .as_any()
2236            .downcast_ref::<Int64Array>()
2237            .unwrap();
2238
2239        for i in 0..result.len() {
2240            if !result.is_null(i) {
2241                let value_is_null = value_field.is_null(i);
2242                let typed_value_is_null = typed_value_field.is_null(i);
2243                // For primitive shredding, at least one should be null
2244                assert!(
2245                    value_is_null || typed_value_is_null,
2246                    "Row {}: both value and typed_value are non-null for primitive shredding",
2247                    i
2248                );
2249            }
2250        }
2251    }
2252
2253    #[test]
2254    fn test_variant_schema_builder_simple() {
2255        let shredding_type = ShreddedSchemaBuilder::default()
2256            .with_path("a", &DataType::Int64)
2257            .with_path("b", &DataType::Float64)
2258            .build();
2259
2260        assert_eq!(
2261            shredding_type,
2262            DataType::Struct(Fields::from(vec![
2263                Field::new("a", DataType::Int64, true),
2264                Field::new("b", DataType::Float64, true),
2265            ]))
2266        );
2267    }
2268
2269    #[test]
2270    fn test_variant_schema_builder_nested() {
2271        let shredding_type = ShreddedSchemaBuilder::default()
2272            .with_path("a", &DataType::Int64)
2273            .with_path("b.c", &DataType::Utf8)
2274            .with_path("b.d", &DataType::Float64)
2275            .build();
2276
2277        assert_eq!(
2278            shredding_type,
2279            DataType::Struct(Fields::from(vec![
2280                Field::new("a", DataType::Int64, true),
2281                Field::new(
2282                    "b",
2283                    DataType::Struct(Fields::from(vec![
2284                        Field::new("c", DataType::Utf8, true),
2285                        Field::new("d", DataType::Float64, true),
2286                    ])),
2287                    true
2288                ),
2289            ]))
2290        );
2291    }
2292
2293    #[test]
2294    fn test_variant_schema_builder_with_path_variant_path_arg() {
2295        let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
2296        let shredding_type = ShreddedSchemaBuilder::default()
2297            .with_path(path, &DataType::Int64)
2298            .build();
2299
2300        match shredding_type {
2301            DataType::Struct(fields) => {
2302                assert_eq!(fields.len(), 1);
2303                assert_eq!(fields[0].name(), "a.b");
2304                assert_eq!(fields[0].data_type(), &DataType::Int64);
2305            }
2306            _ => panic!("expected struct data type"),
2307        }
2308    }
2309
2310    #[test]
2311    fn test_variant_schema_builder_custom_nullability() {
2312        let shredding_type = ShreddedSchemaBuilder::default()
2313            .with_path(
2314                "foo",
2315                Arc::new(Field::new("should_be_renamed", DataType::Utf8, false)),
2316            )
2317            .with_path("bar", (&DataType::Int64, false))
2318            .build();
2319
2320        let DataType::Struct(fields) = shredding_type else {
2321            panic!("expected struct data type");
2322        };
2323
2324        let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
2325        assert_eq!(foo.data_type(), &DataType::Utf8);
2326        assert!(!foo.is_nullable());
2327
2328        let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
2329        assert_eq!(bar.data_type(), &DataType::Int64);
2330        assert!(!bar.is_nullable());
2331    }
2332
2333    #[test]
2334    fn test_variant_schema_builder_with_shred_variant() {
2335        let input = build_variant_array(vec![
2336            VariantRow::Object(vec![
2337                ("time", VariantValue::from(1234567890i64)),
2338                ("hostname", VariantValue::from("server1")),
2339                ("extra", VariantValue::from(42)),
2340            ]),
2341            VariantRow::Object(vec![
2342                ("time", VariantValue::from(9876543210i64)),
2343                ("hostname", VariantValue::from("server2")),
2344            ]),
2345            VariantRow::Null,
2346        ]);
2347
2348        let shredding_type = ShreddedSchemaBuilder::default()
2349            .with_path("time", &DataType::Int64)
2350            .with_path("hostname", &DataType::Utf8)
2351            .build();
2352
2353        let result = shred_variant(&input, &shredding_type).unwrap();
2354
2355        assert_eq!(
2356            result.data_type(),
2357            &DataType::Struct(Fields::from(vec![
2358                Field::new("metadata", DataType::BinaryView, false),
2359                Field::new("value", DataType::BinaryView, true),
2360                Field::new(
2361                    "typed_value",
2362                    DataType::Struct(Fields::from(vec![
2363                        Field::new(
2364                            "hostname",
2365                            DataType::Struct(Fields::from(vec![
2366                                Field::new("value", DataType::BinaryView, true),
2367                                Field::new("typed_value", DataType::Utf8, true),
2368                            ])),
2369                            false,
2370                        ),
2371                        Field::new(
2372                            "time",
2373                            DataType::Struct(Fields::from(vec![
2374                                Field::new("value", DataType::BinaryView, true),
2375                                Field::new("typed_value", DataType::Int64, true),
2376                            ])),
2377                            false,
2378                        ),
2379                    ])),
2380                    true,
2381                ),
2382            ]))
2383        );
2384
2385        assert_eq!(result.len(), 3);
2386        assert!(result.typed_value_field().is_some());
2387
2388        let typed_value = result
2389            .typed_value_field()
2390            .unwrap()
2391            .as_any()
2392            .downcast_ref::<arrow::array::StructArray>()
2393            .unwrap();
2394
2395        let time_field =
2396            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
2397                .unwrap();
2398        let hostname_field =
2399            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
2400                .unwrap();
2401
2402        let time_typed = time_field
2403            .typed_value_field()
2404            .unwrap()
2405            .as_any()
2406            .downcast_ref::<Int64Array>()
2407            .unwrap();
2408        let hostname_typed = hostname_field
2409            .typed_value_field()
2410            .unwrap()
2411            .as_any()
2412            .downcast_ref::<arrow::array::StringArray>()
2413            .unwrap();
2414
2415        // Row 0
2416        assert!(!result.is_null(0));
2417        assert_eq!(time_typed.value(0), 1234567890);
2418        assert_eq!(hostname_typed.value(0), "server1");
2419
2420        // Row 1
2421        assert!(!result.is_null(1));
2422        assert_eq!(time_typed.value(1), 9876543210);
2423        assert_eq!(hostname_typed.value(1), "server2");
2424
2425        // Row 2
2426        assert!(result.is_null(2));
2427    }
2428
2429    #[test]
2430    fn test_variant_schema_builder_conflicting_path() {
2431        let shredding_type = ShreddedSchemaBuilder::default()
2432            .with_path("a", &DataType::Int64)
2433            .with_path("a", &DataType::Float64)
2434            .build();
2435
2436        assert_eq!(
2437            shredding_type,
2438            DataType::Struct(Fields::from(
2439                vec![Field::new("a", DataType::Float64, true),]
2440            ))
2441        );
2442    }
2443
2444    #[test]
2445    fn test_variant_schema_builder_root_path() {
2446        let path = VariantPath::new(vec![]);
2447        let shredding_type = ShreddedSchemaBuilder::default()
2448            .with_path(path, &DataType::Int64)
2449            .build();
2450
2451        assert_eq!(shredding_type, DataType::Int64);
2452    }
2453
2454    #[test]
2455    fn test_variant_schema_builder_empty_path() {
2456        let shredding_type = ShreddedSchemaBuilder::default()
2457            .with_path("", &DataType::Int64)
2458            .build();
2459
2460        assert_eq!(shredding_type, DataType::Int64);
2461    }
2462
2463    #[test]
2464    fn test_variant_schema_builder_default() {
2465        let shredding_type = ShreddedSchemaBuilder::default().build();
2466        assert_eq!(shredding_type, DataType::Null);
2467    }
2468}