Skip to main content

parquet_variant_compute/
shred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for shredding VariantArray with a given schema.
19
20use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
21use crate::variant_to_arrow::{
22    ArrayVariantToArrowRowBuilder, PrimitiveVariantToArrowRowBuilder,
23    make_primitive_variant_to_arrow_row_builder,
24};
25use crate::{VariantArray, VariantValueArrayBuilder};
26use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
27use arrow::buffer::NullBuffer;
28use arrow::compute::CastOptions;
29use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
30use arrow::error::{ArrowError, Result};
31use indexmap::IndexMap;
32use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement};
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36/// Shreds the input binary variant using a target shredding schema derived from the requested data type.
37///
38/// For example, requesting `DataType::Int64` would produce an output variant array with the schema:
39///
40/// ```text
41/// {
42///    metadata: BINARY,
43///    value: BINARY,
44///    typed_value: LONG,
45/// }
46/// ```
47///
48/// Similarly, requesting `DataType::Struct` with two integer fields `a` and `b` would produce an
49/// output variant array with the schema:
50///
51/// ```text
52/// {
53///   metadata: BINARY,
54///   value: BINARY,
55///   typed_value: {
56///     a: {
57///       value: BINARY,
58///       typed_value: INT,
59///     },
60///     b: {
61///       value: BINARY,
62///       typed_value: INT,
63///     },
64///   }
65/// }
66/// ```
67///
68/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
69/// value passed to this function.
70pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<VariantArray> {
71    if array.typed_value_field().is_some() {
72        return Err(ArrowError::InvalidArgumentError(
73            "Input is already shredded".to_string(),
74        ));
75    }
76
77    if array.value_field().is_none() {
78        // all-null case -- nothing to do.
79        return Ok(array.clone());
80    };
81
82    let cast_options = CastOptions::default();
83    let mut builder = make_variant_to_shredded_variant_arrow_row_builder(
84        as_type,
85        &cast_options,
86        array.len(),
87        true,
88    )?;
89    for i in 0..array.len() {
90        if array.is_null(i) {
91            builder.append_null()?;
92        } else {
93            builder.append_value(array.value(i))?;
94        }
95    }
96    let (value, typed_value, nulls) = builder.finish()?;
97    Ok(VariantArray::from_parts(
98        array.metadata_field().clone(),
99        Some(value),
100        Some(typed_value),
101        nulls,
102    ))
103}
104
105pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>(
106    data_type: &'a DataType,
107    cast_options: &'a CastOptions,
108    capacity: usize,
109    top_level: bool,
110) -> Result<VariantToShreddedVariantRowBuilder<'a>> {
111    let builder = match data_type {
112        DataType::Struct(fields) => {
113            let typed_value_builder = VariantToShreddedObjectVariantRowBuilder::try_new(
114                fields,
115                cast_options,
116                capacity,
117                top_level,
118            )?;
119            VariantToShreddedVariantRowBuilder::Object(typed_value_builder)
120        }
121        DataType::List(_)
122        | DataType::LargeList(_)
123        | DataType::ListView(_)
124        | DataType::LargeListView(_)
125        | DataType::FixedSizeList(..) => {
126            let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new(
127                data_type,
128                cast_options,
129                capacity,
130            )?;
131            VariantToShreddedVariantRowBuilder::Array(typed_value_builder)
132        }
133        // Supported shredded primitive types, see Variant shredding spec:
134        // https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types
135        DataType::Boolean
136        | DataType::Int8
137        | DataType::Int16
138        | DataType::Int32
139        | DataType::Int64
140        | DataType::Float32
141        | DataType::Float64
142        | DataType::Decimal32(..)
143        | DataType::Decimal64(..)
144        | DataType::Decimal128(..)
145        | DataType::Date32
146        | DataType::Time64(TimeUnit::Microsecond)
147        | DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _)
148        | DataType::Binary
149        | DataType::BinaryView
150        | DataType::Utf8
151        | DataType::Utf8View
152        | DataType::FixedSizeBinary(16) // UUID
153        => {
154            let builder =
155                make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?;
156            let typed_value_builder =
157                VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity, top_level);
158            VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder)
159        }
160        DataType::FixedSizeBinary(_) => {
161            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported.")))
162        }
163        _ => {
164            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type")))
165        }
166    };
167    Ok(builder)
168}
169
170pub(crate) enum VariantToShreddedVariantRowBuilder<'a> {
171    Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>),
172    Array(VariantToShreddedArrayVariantRowBuilder<'a>),
173    Object(VariantToShreddedObjectVariantRowBuilder<'a>),
174}
175
176impl<'a> VariantToShreddedVariantRowBuilder<'a> {
177    pub fn append_null(&mut self) -> Result<()> {
178        use VariantToShreddedVariantRowBuilder::*;
179        match self {
180            Primitive(b) => b.append_null(),
181            Array(b) => b.append_null(),
182            Object(b) => b.append_null(),
183        }
184    }
185
186    pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
187        use VariantToShreddedVariantRowBuilder::*;
188        match self {
189            Primitive(b) => b.append_value(value),
190            Array(b) => b.append_value(value),
191            Object(b) => b.append_value(value),
192        }
193    }
194
195    pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
196        use VariantToShreddedVariantRowBuilder::*;
197        match self {
198            Primitive(b) => b.finish(),
199            Array(b) => b.finish(),
200            Object(b) => b.finish(),
201        }
202    }
203}
204
205/// A top-level variant shredder -- appending NULL produces typed_value=NULL and value=Variant::Null
206pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> {
207    value_builder: VariantValueArrayBuilder,
208    typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
209    nulls: NullBufferBuilder,
210    top_level: bool,
211}
212
213impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> {
214    pub(crate) fn new(
215        typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
216        capacity: usize,
217        top_level: bool,
218    ) -> Self {
219        Self {
220            value_builder: VariantValueArrayBuilder::new(capacity),
221            typed_value_builder,
222            nulls: NullBufferBuilder::new(capacity),
223            top_level,
224        }
225    }
226
227    fn append_null(&mut self) -> Result<()> {
228        // Only the top-level struct that represents the variant can be nullable; object fields and
229        // array elements are non-nullable.
230        self.nulls.append(!self.top_level);
231        self.value_builder.append_null();
232        self.typed_value_builder.append_null()
233    }
234
235    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
236        self.nulls.append_non_null();
237        if self.typed_value_builder.append_value(&value)? {
238            self.value_builder.append_null();
239        } else {
240            self.value_builder.append_value(value);
241        }
242        Ok(true)
243    }
244
245    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
246        Ok((
247            self.value_builder.build()?,
248            self.typed_value_builder.finish()?,
249            self.nulls.finish(),
250        ))
251    }
252}
253
254pub(crate) struct VariantToShreddedArrayVariantRowBuilder<'a> {
255    value_builder: VariantValueArrayBuilder,
256    typed_value_builder: ArrayVariantToArrowRowBuilder<'a>,
257}
258
259impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> {
260    fn try_new(
261        data_type: &'a DataType,
262        cast_options: &'a CastOptions,
263        capacity: usize,
264    ) -> Result<Self> {
265        Ok(Self {
266            value_builder: VariantValueArrayBuilder::new(capacity),
267            typed_value_builder: ArrayVariantToArrowRowBuilder::try_new(
268                data_type,
269                cast_options,
270                capacity,
271            )?,
272        })
273    }
274
275    fn append_null(&mut self) -> Result<()> {
276        self.value_builder.append_value(Variant::Null);
277        self.typed_value_builder.append_null()?;
278        Ok(())
279    }
280
281    fn append_value(&mut self, variant: Variant<'_, '_>) -> Result<bool> {
282        // If the variant is not an array, typed_value must be null.
283        // If the variant is an array, value must be null.
284        match variant {
285            Variant::List(list) => {
286                self.value_builder.append_null();
287                self.typed_value_builder
288                    .append_value(&Variant::List(list))?;
289                Ok(true)
290            }
291            other => {
292                self.value_builder.append_value(other);
293                self.typed_value_builder.append_null()?;
294                Ok(false)
295            }
296        }
297    }
298
299    fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
300        Ok((
301            self.value_builder.build()?,
302            self.typed_value_builder.finish()?,
303            // All elements of an array must be present (not missing) because
304            // the array Variant encoding does not allow missing elements
305            None,
306        ))
307    }
308}
309
310pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> {
311    value_builder: VariantValueArrayBuilder,
312    typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>,
313    typed_value_nulls: NullBufferBuilder,
314    nulls: NullBufferBuilder,
315    top_level: bool,
316}
317
318impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
319    fn try_new(
320        fields: &'a Fields,
321        cast_options: &'a CastOptions,
322        capacity: usize,
323        top_level: bool,
324    ) -> Result<Self> {
325        let typed_value_builders = fields.iter().map(|field| {
326            let builder = make_variant_to_shredded_variant_arrow_row_builder(
327                field.data_type(),
328                cast_options,
329                capacity,
330                false,
331            )?;
332            Ok((field.name().as_str(), builder))
333        });
334        Ok(Self {
335            value_builder: VariantValueArrayBuilder::new(capacity),
336            typed_value_builders: typed_value_builders.collect::<Result<_>>()?,
337            typed_value_nulls: NullBufferBuilder::new(capacity),
338            nulls: NullBufferBuilder::new(capacity),
339            top_level,
340        })
341    }
342
343    fn append_null(&mut self) -> Result<()> {
344        // Only the top-level struct that represents the variant can be nullable; object fields and
345        // array elements are non-nullable.
346        self.nulls.append(!self.top_level);
347        self.value_builder.append_null();
348        self.typed_value_nulls.append_null();
349        for (_, typed_value_builder) in &mut self.typed_value_builders {
350            typed_value_builder.append_null()?;
351        }
352        Ok(())
353    }
354
355    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
356        let Variant::Object(ref obj) = value else {
357            // Not an object => fall back
358            self.nulls.append_non_null();
359            self.value_builder.append_value(value);
360            self.typed_value_nulls.append_null();
361            for (_, typed_value_builder) in &mut self.typed_value_builders {
362                typed_value_builder.append_null()?;
363            }
364            return Ok(false);
365        };
366
367        // Route the object's fields by name as either shredded or unshredded
368        let mut builder = self.value_builder.builder_ext(value.metadata());
369        let mut object_builder = builder.try_new_object()?;
370        let mut seen = std::collections::HashSet::new();
371        let mut partially_shredded = false;
372        for (field_name, value) in obj.iter() {
373            match self.typed_value_builders.get_mut(field_name) {
374                Some(typed_value_builder) => {
375                    typed_value_builder.append_value(value)?;
376                    seen.insert(field_name);
377                }
378                None => {
379                    object_builder.insert_bytes(field_name, value);
380                    partially_shredded = true;
381                }
382            }
383        }
384
385        // Handle missing fields
386        for (field_name, typed_value_builder) in &mut self.typed_value_builders {
387            if !seen.contains(field_name) {
388                typed_value_builder.append_null()?;
389            }
390        }
391
392        // Only emit the value if it captured any unshredded object fields
393        if partially_shredded {
394            object_builder.finish();
395        } else {
396            drop(object_builder);
397            self.value_builder.append_null();
398        }
399
400        self.typed_value_nulls.append_non_null();
401        self.nulls.append_non_null();
402        Ok(true)
403    }
404
405    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
406        let mut builder = StructArrayBuilder::new();
407        for (field_name, typed_value_builder) in self.typed_value_builders {
408            let (value, typed_value, nulls) = typed_value_builder.finish()?;
409            let array =
410                ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls);
411            builder = builder.with_field(field_name, ArrayRef::from(array), false);
412        }
413        if let Some(nulls) = self.typed_value_nulls.finish() {
414            builder = builder.with_nulls(nulls);
415        }
416        Ok((
417            self.value_builder.build()?,
418            Arc::new(builder.build()),
419            self.nulls.finish(),
420        ))
421    }
422}
423
424/// Field configuration captured by the builder (data type + nullability).
425#[derive(Clone)]
426pub struct ShreddingField {
427    data_type: DataType,
428    nullable: bool,
429}
430
431impl ShreddingField {
432    fn new(data_type: DataType, nullable: bool) -> Self {
433        Self {
434            data_type,
435            nullable,
436        }
437    }
438
439    fn null() -> Self {
440        Self::new(DataType::Null, true)
441    }
442}
443
444/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or `(DataType, bool)`.
445pub trait IntoShreddingField {
446    fn into_shredding_field(self) -> ShreddingField;
447}
448
449impl IntoShreddingField for FieldRef {
450    fn into_shredding_field(self) -> ShreddingField {
451        ShreddingField::new(self.data_type().clone(), self.is_nullable())
452    }
453}
454
455impl IntoShreddingField for &DataType {
456    fn into_shredding_field(self) -> ShreddingField {
457        ShreddingField::new(self.clone(), true)
458    }
459}
460
461impl IntoShreddingField for DataType {
462    fn into_shredding_field(self) -> ShreddingField {
463        ShreddingField::new(self, true)
464    }
465}
466
467impl IntoShreddingField for (&DataType, bool) {
468    fn into_shredding_field(self) -> ShreddingField {
469        ShreddingField::new(self.0.clone(), self.1)
470    }
471}
472
473impl IntoShreddingField for (DataType, bool) {
474    fn into_shredding_field(self) -> ShreddingField {
475        ShreddingField::new(self.0, self.1)
476    }
477}
478
479/// Builder for constructing a variant shredding schema.
480///
481/// The builder pattern makes it easy to incrementally define which fields
482/// should be shredded and with what types. Fields are nullable by default; pass
483/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
484///
485/// Note: this builder currently only supports struct fields. List support
486/// will be added in the future.
487///
488/// # Example
489///
490/// ```
491/// use std::sync::Arc;
492/// use arrow::datatypes::{DataType, Field, TimeUnit};
493/// use parquet_variant::{VariantPath, VariantPathElement};
494/// use parquet_variant_compute::ShreddedSchemaBuilder;
495///
496/// fn main() -> Result<(), arrow::error::ArrowError> {
497///     // Define the shredding schema using the builder
498///     let shredding_type = ShreddedSchemaBuilder::default()
499///     // store the "time" field as a separate UTC timestamp
500///     .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), true))?
501///     // store hostname as non-nullable Utf8
502///     .with_path("hostname", (&DataType::Utf8, false))?
503///     // pass a FieldRef directly
504///     .with_path(
505///         "metadata.trace_id",
506///         Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16), false)),
507///     )?
508///     // field name with a dot: use VariantPath to avoid splitting
509///     .with_path(
510///         VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
511///         &DataType::Float64,
512///     )?
513///     .build();
514///    Ok(())
515/// }
516/// // The shredding_type can now be passed to shred_variant:
517/// // let shredded = shred_variant(&input, &shredding_type)?;
518/// ```
519#[derive(Default, Clone)]
520pub struct ShreddedSchemaBuilder {
521    root: VariantSchemaNode,
522}
523
524impl ShreddedSchemaBuilder {
525    /// Create a new empty schema builder.
526    pub fn new() -> Self {
527        Self::default()
528    }
529
530    /// Insert a typed path into the schema using dot notation (or any
531    /// [`VariantPath`] convertible).
532    ///
533    /// The path uses dot notation to specify nested fields.
534    /// For example, "a.b.c" will create a nested structure.
535    ///
536    /// # Arguments
537    ///
538    /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
539    /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g. `FieldRef`,
540    ///   `&DataType`, or `(&DataType, bool)` to control nullability)
541    pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Result<Self>
542    where
543        P: TryInto<VariantPath<'a>>,
544        P::Error: std::fmt::Debug,
545        F: IntoShreddingField,
546    {
547        let path: VariantPath<'a> = path
548            .try_into()
549            .map_err(|e| ArrowError::InvalidArgumentError(format!("{:?}", e)))?;
550        self.root.insert_path(&path, field.into_shredding_field());
551        Ok(self)
552    }
553
554    /// Build the final [`DataType`].
555    pub fn build(self) -> DataType {
556        let shredding_type = self.root.to_shredding_type();
557        match shredding_type {
558            Some(shredding_type) => shredding_type,
559            None => DataType::Null,
560        }
561    }
562}
563
564/// Internal tree node structure for building variant schemas.
565#[derive(Clone)]
566enum VariantSchemaNode {
567    /// A leaf node with a primitive/scalar type (and nullability)
568    Leaf(ShreddingField),
569    /// An inner struct node with nested fields
570    Struct(BTreeMap<String, VariantSchemaNode>),
571}
572
573impl Default for VariantSchemaNode {
574    fn default() -> Self {
575        Self::Leaf(ShreddingField::null())
576    }
577}
578
579impl VariantSchemaNode {
580    /// Insert a path into this node with the given data type.
581    fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
582        self.insert_path_elements(path, field);
583    }
584
585    fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], field: ShreddingField) {
586        let Some((head, tail)) = segments.split_first() else {
587            *self = Self::Leaf(field);
588            return;
589        };
590
591        match head {
592            VariantPathElement::Field { name } => {
593                // Ensure this node is a Struct node
594                let children = match self {
595                    Self::Struct(children) => children,
596                    _ => {
597                        *self = Self::Struct(BTreeMap::new());
598                        match self {
599                            Self::Struct(children) => children,
600                            _ => unreachable!(),
601                        }
602                    }
603                };
604
605                children
606                    .entry(name.to_string())
607                    .or_default()
608                    .insert_path_elements(tail, field);
609            }
610            VariantPathElement::Index { .. } => {
611                // List support to be added later; reject for now
612                unreachable!("List paths are not supported yet");
613            }
614        }
615    }
616
617    /// Convert this node to a shredding type.
618    ///
619    /// Returns the [`DataType`] for passing to [`shred_variant`].
620    fn to_shredding_type(&self) -> Option<DataType> {
621        match self {
622            Self::Leaf(field) => Some(field.data_type.clone()),
623            Self::Struct(children) => {
624                let child_fields: Vec<_> = children
625                    .iter()
626                    .filter_map(|(name, child)| child.to_shredding_field(name))
627                    .collect();
628                if child_fields.is_empty() {
629                    None
630                } else {
631                    Some(DataType::Struct(Fields::from(child_fields)))
632                }
633            }
634        }
635    }
636
637    fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
638        match self {
639            Self::Leaf(field) => Some(Arc::new(Field::new(
640                name,
641                field.data_type.clone(),
642                field.nullable,
643            ))),
644            Self::Struct(_) => self
645                .to_shredding_type()
646                .map(|data_type| Arc::new(Field::new(name, data_type, true))),
647        }
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use super::*;
654    use crate::VariantArrayBuilder;
655    use crate::arrow_to_variant::ListLikeArray;
656    use arrow::array::{
657        Array, BinaryViewArray, FixedSizeBinaryArray, Float64Array, GenericListArray,
658        GenericListViewArray, Int64Array, ListArray, OffsetSizeTrait, PrimitiveArray, StringArray,
659    };
660    use arrow::datatypes::{
661        ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode,
662    };
663    use parquet_variant::{
664        BuilderSpecificState, EMPTY_VARIANT_METADATA_BYTES, ObjectBuilder, ReadOnlyMetadataBuilder,
665        Variant, VariantBuilder, VariantPath, VariantPathElement,
666    };
667    use std::sync::Arc;
668    use uuid::Uuid;
669
670    #[derive(Clone)]
671    enum VariantValue<'a> {
672        Value(Variant<'a, 'a>),
673        List(Vec<VariantValue<'a>>),
674        Object(Vec<(&'a str, VariantValue<'a>)>),
675        Null,
676    }
677
678    impl<'a, T> From<T> for VariantValue<'a>
679    where
680        T: Into<Variant<'a, 'a>>,
681    {
682        fn from(value: T) -> Self {
683            Self::Value(value.into())
684        }
685    }
686
687    #[derive(Clone)]
688    enum VariantRow<'a> {
689        Value(VariantValue<'a>),
690        List(Vec<VariantValue<'a>>),
691        Object(Vec<(&'a str, VariantValue<'a>)>),
692        Null,
693    }
694
695    fn build_variant_array(rows: Vec<VariantRow<'static>>) -> VariantArray {
696        let mut builder = VariantArrayBuilder::new(rows.len());
697
698        fn append_variant_value<B: VariantBuilderExt>(builder: &mut B, value: VariantValue) {
699            match value {
700                VariantValue::Value(v) => builder.append_value(v),
701                VariantValue::List(values) => {
702                    let mut list = builder.new_list();
703                    for v in values {
704                        append_variant_value(&mut list, v);
705                    }
706                    list.finish();
707                }
708                VariantValue::Object(fields) => {
709                    let mut object = builder.new_object();
710                    for (name, value) in fields {
711                        append_variant_field(&mut object, name, value);
712                    }
713                    object.finish();
714                }
715                VariantValue::Null => builder.append_null(),
716            }
717        }
718
719        fn append_variant_field<'a, S: BuilderSpecificState>(
720            object: &mut ObjectBuilder<'_, S>,
721            name: &'a str,
722            value: VariantValue<'a>,
723        ) {
724            match value {
725                VariantValue::Value(v) => {
726                    object.insert(name, v);
727                }
728                VariantValue::List(values) => {
729                    let mut list = object.new_list(name);
730                    for v in values {
731                        append_variant_value(&mut list, v);
732                    }
733                    list.finish();
734                }
735                VariantValue::Object(fields) => {
736                    let mut nested = object.new_object(name);
737                    for (field_name, v) in fields {
738                        append_variant_field(&mut nested, field_name, v);
739                    }
740                    nested.finish();
741                }
742                VariantValue::Null => {
743                    object.insert(name, Variant::Null);
744                }
745            }
746        }
747
748        rows.into_iter().for_each(|row| match row {
749            VariantRow::Value(value) => append_variant_value(&mut builder, value),
750            VariantRow::List(values) => {
751                let mut list = builder.new_list();
752                for value in values {
753                    append_variant_value(&mut list, value);
754                }
755                list.finish();
756            }
757            VariantRow::Object(fields) => {
758                let mut object = builder.new_object();
759                for (name, value) in fields {
760                    append_variant_field(&mut object, name, value);
761                }
762                object.finish();
763            }
764            VariantRow::Null => builder.append_null(),
765        });
766        builder.build()
767    }
768
769    trait TestListLikeArray: ListLikeArray {
770        type OffsetSize: OffsetSizeTrait;
771        fn value_offsets(&self) -> Option<&[Self::OffsetSize]>;
772        fn value_size(&self, index: usize) -> Self::OffsetSize;
773    }
774
775    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListArray<O> {
776        type OffsetSize = O;
777
778        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
779            Some(GenericListArray::value_offsets(self))
780        }
781
782        fn value_size(&self, index: usize) -> Self::OffsetSize {
783            GenericListArray::value_length(self, index)
784        }
785    }
786
787    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListViewArray<O> {
788        type OffsetSize = O;
789
790        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
791            Some(GenericListViewArray::value_offsets(self))
792        }
793
794        fn value_size(&self, index: usize) -> Self::OffsetSize {
795            GenericListViewArray::value_size(self, index)
796        }
797    }
798
799    fn downcast_list_like_array<O: OffsetSizeTrait>(
800        array: &VariantArray,
801    ) -> &dyn TestListLikeArray<OffsetSize = O> {
802        let typed_value = array.typed_value_field().unwrap();
803        if let Some(list) = typed_value.as_any().downcast_ref::<GenericListArray<O>>() {
804            list
805        } else if let Some(list_view) = typed_value
806            .as_any()
807            .downcast_ref::<GenericListViewArray<O>>()
808        {
809            list_view
810        } else {
811            panic!(
812                "Expected list-like typed_value with matching offset type, got {}",
813                typed_value.data_type()
814            );
815        }
816    }
817
818    fn assert_list_structure<O: OffsetSizeTrait>(
819        array: &VariantArray,
820        expected_len: usize,
821        expected_offsets: &[O],
822        expected_sizes: &[Option<O>],
823        expected_fallbacks: &[Option<Variant<'static, 'static>>],
824    ) {
825        assert_eq!(array.len(), expected_len);
826
827        let fallbacks = (array.value_field().unwrap(), Some(array.metadata_field()));
828        let array = downcast_list_like_array::<O>(array);
829
830        assert_eq!(
831            array.value_offsets().unwrap(),
832            expected_offsets,
833            "list offsets mismatch"
834        );
835        assert_eq!(
836            array.len(),
837            expected_sizes.len(),
838            "expected_sizes should match array length"
839        );
840        assert_eq!(
841            array.len(),
842            expected_fallbacks.len(),
843            "expected_fallbacks should match array length"
844        );
845        assert_eq!(
846            array.len(),
847            fallbacks.0.len(),
848            "fallbacks value field should match array length"
849        );
850
851        // Validate per-row shredding outcomes for the list array
852        for (idx, (expected_size, expected_fallback)) in expected_sizes
853            .iter()
854            .zip(expected_fallbacks.iter())
855            .enumerate()
856        {
857            match expected_size {
858                Some(len) => {
859                    // Successfully shredded: typed list value present, no fallback value
860                    assert!(array.is_valid(idx));
861                    assert_eq!(array.value_size(idx), *len);
862                    assert!(fallbacks.0.is_null(idx));
863                }
864                None => {
865                    // Unable to shred: typed list value absent, fallback should carry the variant
866                    assert!(array.is_null(idx));
867                    assert_eq!(array.value_size(idx), O::zero());
868                    match expected_fallback {
869                        Some(expected_variant) => {
870                            assert!(fallbacks.0.is_valid(idx));
871                            let metadata_bytes = fallbacks
872                                .1
873                                .filter(|m| m.is_valid(idx))
874                                .map(|m| m.value(idx))
875                                .filter(|bytes| !bytes.is_empty())
876                                .unwrap_or(EMPTY_VARIANT_METADATA_BYTES);
877                            assert_eq!(
878                                Variant::new(metadata_bytes, fallbacks.0.value(idx)),
879                                expected_variant.clone()
880                            );
881                        }
882                        None => unreachable!(),
883                    }
884                }
885            }
886        }
887    }
888
889    fn assert_list_structure_and_elements<T: ArrowPrimitiveType, O: OffsetSizeTrait>(
890        array: &VariantArray,
891        expected_len: usize,
892        expected_offsets: &[O],
893        expected_sizes: &[Option<O>],
894        expected_fallbacks: &[Option<Variant<'static, 'static>>],
895        expected_shredded_elements: (&[Option<T::Native>], &[Option<Variant<'static, 'static>>]),
896    ) {
897        assert_list_structure(
898            array,
899            expected_len,
900            expected_offsets,
901            expected_sizes,
902            expected_fallbacks,
903        );
904        let array = downcast_list_like_array::<O>(array);
905
906        // Validate the shredded state of list elements (typed values and fallbacks)
907        let (expected_values, expected_fallbacks) = expected_shredded_elements;
908        assert_eq!(
909            expected_values.len(),
910            expected_fallbacks.len(),
911            "expected_values and expected_fallbacks should be aligned"
912        );
913
914        // Validate the shredded primitive values for list elements
915        let element_array = ShreddedVariantFieldArray::try_new(array.values().as_ref()).unwrap();
916        let element_values = element_array
917            .typed_value_field()
918            .unwrap()
919            .as_any()
920            .downcast_ref::<PrimitiveArray<T>>()
921            .unwrap();
922        assert_eq!(element_values.len(), expected_values.len());
923        for (idx, expected_value) in expected_values.iter().enumerate() {
924            match expected_value {
925                Some(value) => {
926                    assert!(element_values.is_valid(idx));
927                    assert_eq!(element_values.value(idx), *value);
928                }
929                None => assert!(element_values.is_null(idx)),
930            }
931        }
932
933        // Validate fallback variants for list elements that could not be shredded
934        let element_fallbacks = element_array.value_field().unwrap();
935        assert_eq!(element_fallbacks.len(), expected_fallbacks.len());
936        for (idx, expected_fallback) in expected_fallbacks.iter().enumerate() {
937            match expected_fallback {
938                Some(expected_variant) => {
939                    assert!(element_fallbacks.is_valid(idx));
940                    assert_eq!(
941                        Variant::new(EMPTY_VARIANT_METADATA_BYTES, element_fallbacks.value(idx)),
942                        expected_variant.clone()
943                    );
944                }
945                None => assert!(element_fallbacks.is_null(idx)),
946            }
947        }
948    }
949
950    #[test]
951    fn test_already_shredded_input_error() {
952        // Create a VariantArray that already has typed_value_field
953        // First create a valid VariantArray, then extract its parts to construct a shredded one
954        let temp_array = VariantArray::from_iter(vec![Some(Variant::from("test"))]);
955        let metadata = temp_array.metadata_field().clone();
956        let value = temp_array.value_field().unwrap().clone();
957        let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef;
958
959        let shredded_array =
960            VariantArray::from_parts(metadata, Some(value), Some(typed_value), None);
961
962        let result = shred_variant(&shredded_array, &DataType::Int64);
963        assert!(matches!(
964            result.unwrap_err(),
965            ArrowError::InvalidArgumentError(_)
966        ));
967    }
968
969    #[test]
970    fn test_all_null_input() {
971        // Create VariantArray with no value field (all null case)
972        let metadata = BinaryViewArray::from_iter_values([&[1u8, 0u8]]); // minimal valid metadata
973        let all_null_array = VariantArray::from_parts(metadata, None, None, None);
974        let result = shred_variant(&all_null_array, &DataType::Int64).unwrap();
975
976        // Should return array with no value/typed_value fields
977        assert!(result.value_field().is_none());
978        assert!(result.typed_value_field().is_none());
979    }
980
981    #[test]
982    fn test_invalid_fixed_size_binary_shredding() {
983        let mock_uuid_1 = Uuid::new_v4();
984
985        let input = VariantArray::from_iter([Some(Variant::from(mock_uuid_1)), None]);
986
987        // shred_variant only supports FixedSizeBinary(16). Any other length will err.
988        let err = shred_variant(&input, &DataType::FixedSizeBinary(17)).unwrap_err();
989
990        assert_eq!(
991            err.to_string(),
992            "Invalid argument error: FixedSizeBinary(17) is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported."
993        );
994    }
995
996    #[test]
997    fn test_uuid_shredding() {
998        let mock_uuid_1 = Uuid::new_v4();
999        let mock_uuid_2 = Uuid::new_v4();
1000
1001        let input = VariantArray::from_iter([
1002            Some(Variant::from(mock_uuid_1)),
1003            None,
1004            Some(Variant::from(false)),
1005            Some(Variant::from(mock_uuid_2)),
1006        ]);
1007
1008        let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();
1009
1010        // // inspect the typed_value Field and make sure it contains the canonical Uuid extension type
1011        // let typed_value_field = variant_array
1012        //     .inner()
1013        //     .fields()
1014        //     .into_iter()
1015        //     .find(|f| f.name() == "typed_value")
1016        //     .unwrap();
1017
1018        // assert!(
1019        //     typed_value_field
1020        //         .try_extension_type::<extension::Uuid>()
1021        //         .is_ok()
1022        // );
1023
1024        // probe the downcasted typed_value array to make sure uuids are shredded correctly
1025        let uuids = variant_array
1026            .typed_value_field()
1027            .unwrap()
1028            .as_any()
1029            .downcast_ref::<FixedSizeBinaryArray>()
1030            .unwrap();
1031
1032        assert_eq!(uuids.len(), 4);
1033
1034        assert!(!uuids.is_null(0));
1035
1036        let got_uuid_1: &[u8] = uuids.value(0);
1037        assert_eq!(got_uuid_1, mock_uuid_1.as_bytes());
1038
1039        assert!(uuids.is_null(1));
1040        assert!(uuids.is_null(2));
1041
1042        assert!(!uuids.is_null(3));
1043
1044        let got_uuid_2: &[u8] = uuids.value(3);
1045        assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
1046    }
1047
1048    #[test]
1049    fn test_primitive_shredding_comprehensive() {
1050        // Test mixed scenarios in a single array
1051        let input = VariantArray::from_iter(vec![
1052            Some(Variant::from(42i64)),   // successful shred
1053            Some(Variant::from("hello")), // failed shred (string)
1054            Some(Variant::from(100i64)),  // successful shred
1055            None,                         // array-level null
1056            Some(Variant::Null),          // variant null
1057            Some(Variant::from(3i8)),     // successful shred (int8->int64 conversion)
1058        ]);
1059
1060        let result = shred_variant(&input, &DataType::Int64).unwrap();
1061
1062        // Verify structure
1063        let metadata_field = result.metadata_field();
1064        let value_field = result.value_field().unwrap();
1065        let typed_value_field = result
1066            .typed_value_field()
1067            .unwrap()
1068            .as_any()
1069            .downcast_ref::<Int64Array>()
1070            .unwrap();
1071
1072        // Check specific outcomes for each row
1073        assert_eq!(result.len(), 6);
1074
1075        // Row 0: 42 -> should shred successfully
1076        assert!(!result.is_null(0));
1077        assert!(value_field.is_null(0)); // value should be null when shredded
1078        assert!(!typed_value_field.is_null(0));
1079        assert_eq!(typed_value_field.value(0), 42);
1080
1081        // Row 1: "hello" -> should fail to shred
1082        assert!(!result.is_null(1));
1083        assert!(!value_field.is_null(1)); // value should contain original
1084        assert!(typed_value_field.is_null(1)); // typed_value should be null
1085        assert_eq!(
1086            Variant::new(metadata_field.value(1), value_field.value(1)),
1087            Variant::from("hello")
1088        );
1089
1090        // Row 2: 100 -> should shred successfully
1091        assert!(!result.is_null(2));
1092        assert!(value_field.is_null(2));
1093        assert_eq!(typed_value_field.value(2), 100);
1094
1095        // Row 3: array null -> should be null in result
1096        assert!(result.is_null(3));
1097
1098        // Row 4: Variant::Null -> should not shred (it's a null variant, not an integer)
1099        assert!(!result.is_null(4));
1100        assert!(!value_field.is_null(4)); // should contain Variant::Null
1101        assert_eq!(
1102            Variant::new(metadata_field.value(4), value_field.value(4)),
1103            Variant::Null
1104        );
1105        assert!(typed_value_field.is_null(4));
1106
1107        // Row 5: 3i8 -> should shred successfully (int8->int64 conversion)
1108        assert!(!result.is_null(5));
1109        assert!(value_field.is_null(5)); // value should be null when shredded
1110        assert!(!typed_value_field.is_null(5));
1111        assert_eq!(typed_value_field.value(5), 3);
1112    }
1113
1114    #[test]
1115    fn test_primitive_different_target_types() {
1116        let input = VariantArray::from_iter(vec![
1117            Variant::from(42i32),
1118            Variant::from(3.15f64),
1119            Variant::from("not_a_number"),
1120        ]);
1121
1122        // Test Int32 target
1123        let result_int32 = shred_variant(&input, &DataType::Int32).unwrap();
1124        let typed_value_int32 = result_int32
1125            .typed_value_field()
1126            .unwrap()
1127            .as_any()
1128            .downcast_ref::<arrow::array::Int32Array>()
1129            .unwrap();
1130        assert_eq!(typed_value_int32.value(0), 42);
1131        assert!(typed_value_int32.is_null(1)); // float doesn't convert to int32
1132        assert!(typed_value_int32.is_null(2)); // string doesn't convert to int32
1133
1134        // Test Float64 target
1135        let result_float64 = shred_variant(&input, &DataType::Float64).unwrap();
1136        let typed_value_float64 = result_float64
1137            .typed_value_field()
1138            .unwrap()
1139            .as_any()
1140            .downcast_ref::<Float64Array>()
1141            .unwrap();
1142        assert_eq!(typed_value_float64.value(0), 42.0); // int converts to float
1143        assert_eq!(typed_value_float64.value(1), 3.15);
1144        assert!(typed_value_float64.is_null(2)); // string doesn't convert
1145    }
1146
1147    #[test]
1148    fn test_invalid_shredded_types_rejected() {
1149        let input = VariantArray::from_iter([Variant::from(42)]);
1150
1151        let invalid_types = vec![
1152            DataType::UInt8,
1153            DataType::Float16,
1154            DataType::Decimal256(38, 10),
1155            DataType::Date64,
1156            DataType::Time32(TimeUnit::Second),
1157            DataType::Time64(TimeUnit::Nanosecond),
1158            DataType::Timestamp(TimeUnit::Millisecond, None),
1159            DataType::LargeBinary,
1160            DataType::LargeUtf8,
1161            DataType::FixedSizeBinary(17),
1162            DataType::Union(
1163                UnionFields::from_fields(vec![
1164                    Field::new("int_field", DataType::Int32, false),
1165                    Field::new("str_field", DataType::Utf8, true),
1166                ]),
1167                UnionMode::Dense,
1168            ),
1169            DataType::Map(
1170                Arc::new(Field::new(
1171                    "entries",
1172                    DataType::Struct(Fields::from(vec![
1173                        Field::new("key", DataType::Utf8, false),
1174                        Field::new("value", DataType::Int32, true),
1175                    ])),
1176                    false,
1177                )),
1178                false,
1179            ),
1180            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1181            DataType::RunEndEncoded(
1182                Arc::new(Field::new("run_ends", DataType::Int32, false)),
1183                Arc::new(Field::new("values", DataType::Utf8, true)),
1184            ),
1185        ];
1186
1187        for data_type in invalid_types {
1188            let err = shred_variant(&input, &data_type).unwrap_err();
1189            assert!(
1190                matches!(err, ArrowError::InvalidArgumentError(_)),
1191                "expected InvalidArgumentError for {:?}, got {:?}",
1192                data_type,
1193                err
1194            );
1195        }
1196    }
1197
1198    #[test]
1199    fn test_array_shredding_as_list() {
1200        let input = build_variant_array(vec![
1201            // Row 0: List of ints should shred entirely into typed_value
1202            VariantRow::List(vec![
1203                VariantValue::from(1i64),
1204                VariantValue::from(2i64),
1205                VariantValue::from(3i64),
1206            ]),
1207            // Row 1: Contains incompatible types so values fall back
1208            VariantRow::List(vec![
1209                VariantValue::from(1i64),
1210                VariantValue::from("two"),
1211                VariantValue::from(Variant::Null),
1212            ]),
1213            // Row 2: Not a list -> entire row falls back
1214            VariantRow::Value(VariantValue::from("not a list")),
1215            // Row 3: Array-level null propagates
1216            VariantRow::Null,
1217            // Row 4: Empty list exercises zero-length offsets
1218            VariantRow::List(vec![]),
1219        ]);
1220        let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1221        let result = shred_variant(&input, &list_schema).unwrap();
1222        assert_eq!(result.len(), 5);
1223
1224        assert_list_structure_and_elements::<Int64Type, i32>(
1225            &result,
1226            5,
1227            &[0, 3, 6, 6, 6, 6],
1228            &[Some(3), Some(3), None, None, Some(0)],
1229            &[
1230                None,
1231                None,
1232                Some(Variant::from("not a list")),
1233                Some(Variant::Null),
1234                None,
1235            ],
1236            (
1237                &[Some(1), Some(2), Some(3), Some(1), None, None],
1238                &[
1239                    None,
1240                    None,
1241                    None,
1242                    None,
1243                    Some(Variant::from("two")),
1244                    Some(Variant::Null),
1245                ],
1246            ),
1247        );
1248    }
1249
1250    #[test]
1251    fn test_array_shredding_as_large_list() {
1252        let input = build_variant_array(vec![
1253            // Row 0: List of ints shreds to typed_value
1254            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1255            // Row 1: Not a list -> entire row falls back
1256            VariantRow::Value(VariantValue::from("not a list")),
1257            // Row 2: Empty list
1258            VariantRow::List(vec![]),
1259        ]);
1260        let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true)));
1261        let result = shred_variant(&input, &list_schema).unwrap();
1262        assert_eq!(result.len(), 3);
1263
1264        assert_list_structure_and_elements::<Int64Type, i64>(
1265            &result,
1266            3,
1267            &[0, 2, 2, 2],
1268            &[Some(2), None, Some(0)],
1269            &[None, Some(Variant::from("not a list")), None],
1270            (&[Some(1), Some(2)], &[None, None]),
1271        );
1272    }
1273
1274    #[test]
1275    fn test_array_shredding_as_list_view() {
1276        let input = build_variant_array(vec![
1277            // Row 0: Standard list
1278            VariantRow::List(vec![
1279                VariantValue::from(1i64),
1280                VariantValue::from(2i64),
1281                VariantValue::from(3i64),
1282            ]),
1283            // Row 1: List with incompatible types -> element fallback
1284            VariantRow::List(vec![
1285                VariantValue::from(1i64),
1286                VariantValue::from("two"),
1287                VariantValue::from(Variant::Null),
1288            ]),
1289            // Row 2: Not a list -> top-level fallback
1290            VariantRow::Value(VariantValue::from("not a list")),
1291            // Row 3: Top-level Null
1292            VariantRow::Null,
1293            // Row 4: Empty list
1294            VariantRow::List(vec![]),
1295        ]);
1296        let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Int64, true)));
1297        let result = shred_variant(&input, &list_schema).unwrap();
1298        assert_eq!(result.len(), 5);
1299
1300        assert_list_structure_and_elements::<Int64Type, i32>(
1301            &result,
1302            5,
1303            &[0, 3, 6, 6, 6],
1304            &[Some(3), Some(3), None, None, Some(0)],
1305            &[
1306                None,
1307                None,
1308                Some(Variant::from("not a list")),
1309                Some(Variant::Null),
1310                None,
1311            ],
1312            (
1313                &[Some(1), Some(2), Some(3), Some(1), None, None],
1314                &[
1315                    None,
1316                    None,
1317                    None,
1318                    None,
1319                    Some(Variant::from("two")),
1320                    Some(Variant::Null),
1321                ],
1322            ),
1323        );
1324    }
1325
1326    #[test]
1327    fn test_array_shredding_as_large_list_view() {
1328        let input = build_variant_array(vec![
1329            // Row 0: List of ints shreds to typed_value
1330            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1331            // Row 1: Not a list -> entire row falls back
1332            VariantRow::Value(VariantValue::from("fallback")),
1333            // Row 2: Empty list
1334            VariantRow::List(vec![]),
1335        ]);
1336        let list_schema =
1337            DataType::LargeListView(Arc::new(Field::new("item", DataType::Int64, true)));
1338        let result = shred_variant(&input, &list_schema).unwrap();
1339        assert_eq!(result.len(), 3);
1340
1341        assert_list_structure_and_elements::<Int64Type, i64>(
1342            &result,
1343            3,
1344            &[0, 2, 2],
1345            &[Some(2), None, Some(0)],
1346            &[None, Some(Variant::from("fallback")), None],
1347            (&[Some(1), Some(2)], &[None, None]),
1348        );
1349    }
1350
1351    #[test]
1352    fn test_array_shredding_as_fixed_size_list() {
1353        let input = build_variant_array(vec![VariantRow::List(vec![
1354            VariantValue::from(1i64),
1355            VariantValue::from(2i64),
1356            VariantValue::from(3i64),
1357        ])]);
1358        let list_schema =
1359            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
1360        let err = shred_variant(&input, &list_schema).unwrap_err();
1361        assert_eq!(
1362            err.to_string(),
1363            "Not yet implemented: Converting unshredded variant arrays to arrow fixed-size lists"
1364        );
1365    }
1366
1367    #[test]
1368    fn test_array_shredding_with_array_elements() {
1369        let input = build_variant_array(vec![
1370            // Row 0: [[1, 2], [3, 4], []] - clean nested lists
1371            VariantRow::List(vec![
1372                VariantValue::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1373                VariantValue::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]),
1374                VariantValue::List(vec![]),
1375            ]),
1376            // Row 1: [[5, "bad", null], "not a list inner", null] - inner fallbacks
1377            VariantRow::List(vec![
1378                VariantValue::List(vec![
1379                    VariantValue::from(5i64),
1380                    VariantValue::from("bad"),
1381                    VariantValue::from(Variant::Null),
1382                ]),
1383                VariantValue::from("not a list inner"),
1384                VariantValue::Null,
1385            ]),
1386            // Row 2: "not a list" - top-level fallback
1387            VariantRow::Value(VariantValue::from("not a list")),
1388            // Row 3: null row
1389            VariantRow::Null,
1390        ]);
1391        let inner_field = Arc::new(Field::new("item", DataType::Int64, true));
1392        let inner_list_schema = DataType::List(inner_field);
1393        let list_schema = DataType::List(Arc::new(Field::new(
1394            "item",
1395            inner_list_schema.clone(),
1396            true,
1397        )));
1398        let result = shred_variant(&input, &list_schema).unwrap();
1399        assert_eq!(result.len(), 4);
1400
1401        let typed_value = result
1402            .typed_value_field()
1403            .unwrap()
1404            .as_any()
1405            .downcast_ref::<ListArray>()
1406            .unwrap();
1407
1408        assert_list_structure::<i32>(
1409            &result,
1410            4,
1411            &[0, 3, 6, 6, 6],
1412            &[Some(3), Some(3), None, None],
1413            &[
1414                None,
1415                None,
1416                Some(Variant::from("not a list")),
1417                Some(Variant::Null),
1418            ],
1419        );
1420
1421        let outer_elements =
1422            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1423        assert_eq!(outer_elements.len(), 6);
1424        let outer_values = outer_elements
1425            .typed_value_field()
1426            .unwrap()
1427            .as_any()
1428            .downcast_ref::<ListArray>()
1429            .unwrap();
1430        let outer_fallbacks = outer_elements.value_field().unwrap();
1431
1432        let outer_metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n(
1433            EMPTY_VARIANT_METADATA_BYTES,
1434            outer_elements.len(),
1435        ));
1436        let outer_variant = VariantArray::from_parts(
1437            outer_metadata,
1438            Some(outer_fallbacks.clone()),
1439            Some(Arc::new(outer_values.clone())),
1440            None,
1441        );
1442
1443        assert_list_structure_and_elements::<Int64Type, i32>(
1444            &outer_variant,
1445            outer_elements.len(),
1446            &[0, 2, 4, 4, 7, 7, 7],
1447            &[Some(2), Some(2), Some(0), Some(3), None, None],
1448            &[
1449                None,
1450                None,
1451                None,
1452                None,
1453                Some(Variant::from("not a list inner")),
1454                Some(Variant::Null),
1455            ],
1456            (
1457                &[Some(1), Some(2), Some(3), Some(4), Some(5), None, None],
1458                &[
1459                    None,
1460                    None,
1461                    None,
1462                    None,
1463                    None,
1464                    Some(Variant::from("bad")),
1465                    Some(Variant::Null),
1466                ],
1467            ),
1468        );
1469    }
1470
1471    #[test]
1472    fn test_array_shredding_with_object_elements() {
1473        let input = build_variant_array(vec![
1474            // Row 0: [{"id": 1, "name": "Alice"}, {"id": null}] fully shards
1475            VariantRow::List(vec![
1476                VariantValue::Object(vec![
1477                    ("id", VariantValue::from(1i64)),
1478                    ("name", VariantValue::from("Alice")),
1479                ]),
1480                VariantValue::Object(vec![("id", VariantValue::from(Variant::Null))]),
1481            ]),
1482            // Row 1: "not a list" -> fallback
1483            VariantRow::Value(VariantValue::from("not a list")),
1484            // Row 2: Null row
1485            VariantRow::Null,
1486        ]);
1487
1488        // Target schema is List<Struct<id:int64,name:utf8>>
1489        let object_fields = Fields::from(vec![
1490            Field::new("id", DataType::Int64, true),
1491            Field::new("name", DataType::Utf8, true),
1492        ]);
1493        let list_schema = DataType::List(Arc::new(Field::new(
1494            "item",
1495            DataType::Struct(object_fields),
1496            true,
1497        )));
1498        let result = shred_variant(&input, &list_schema).unwrap();
1499        assert_eq!(result.len(), 3);
1500
1501        assert_list_structure::<i32>(
1502            &result,
1503            3,
1504            &[0, 2, 2, 2],
1505            &[Some(2), None, None],
1506            &[None, Some(Variant::from("not a list")), Some(Variant::Null)],
1507        );
1508
1509        // Validate nested struct fields for each element
1510        let typed_value = result
1511            .typed_value_field()
1512            .unwrap()
1513            .as_any()
1514            .downcast_ref::<ListArray>()
1515            .unwrap();
1516        let element_array =
1517            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1518        assert_eq!(element_array.len(), 2);
1519        let element_objects = element_array
1520            .typed_value_field()
1521            .unwrap()
1522            .as_any()
1523            .downcast_ref::<arrow::array::StructArray>()
1524            .unwrap();
1525
1526        // Id field [1, Variant::Null]
1527        let id_field =
1528            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("id").unwrap())
1529                .unwrap();
1530        let id_values = id_field.value_field().unwrap();
1531        let id_typed_values = id_field
1532            .typed_value_field()
1533            .unwrap()
1534            .as_any()
1535            .downcast_ref::<Int64Array>()
1536            .unwrap();
1537        assert!(id_values.is_null(0));
1538        assert_eq!(id_typed_values.value(0), 1);
1539        // null is stored as Variant::Null in values
1540        assert!(id_values.is_valid(1));
1541        assert_eq!(
1542            Variant::new(EMPTY_VARIANT_METADATA_BYTES, id_values.value(1)),
1543            Variant::Null
1544        );
1545        assert!(id_typed_values.is_null(1));
1546
1547        // Name field ["Alice", null]
1548        let name_field =
1549            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("name").unwrap())
1550                .unwrap();
1551        let name_values = name_field.value_field().unwrap();
1552        let name_typed_values = name_field
1553            .typed_value_field()
1554            .unwrap()
1555            .as_any()
1556            .downcast_ref::<StringArray>()
1557            .unwrap();
1558        assert!(name_values.is_null(0));
1559        assert_eq!(name_typed_values.value(0), "Alice");
1560        // No value provided, both value and typed_value are null
1561        assert!(name_values.is_null(1));
1562        assert!(name_typed_values.is_null(1));
1563    }
1564
1565    #[test]
1566    fn test_object_shredding_comprehensive() -> Result<()> {
1567        let input = build_variant_array(vec![
1568            // Row 0: Fully shredded object
1569            VariantRow::Object(vec![
1570                ("score", VariantValue::from(95.5f64)),
1571                ("age", VariantValue::from(30i64)),
1572            ]),
1573            // Row 1: Partially shredded object (extra email field)
1574            VariantRow::Object(vec![
1575                ("score", VariantValue::from(87.2f64)),
1576                ("age", VariantValue::from(25i64)),
1577                ("email", VariantValue::from("bob@example.com")),
1578            ]),
1579            // Row 2: Missing field (no score)
1580            VariantRow::Object(vec![("age", VariantValue::from(35i64))]),
1581            // Row 3: Type mismatch (score is string, age is string)
1582            VariantRow::Object(vec![
1583                ("score", VariantValue::from("ninety-five")),
1584                ("age", VariantValue::from("thirty")),
1585            ]),
1586            // Row 4: Non-object
1587            VariantRow::Value(VariantValue::from("not an object")),
1588            // Row 5: Empty object
1589            VariantRow::Object(vec![]),
1590            // Row 6: Null
1591            VariantRow::Null,
1592            // Row 7: Object with only "wrong" fields
1593            VariantRow::Object(vec![("foo", VariantValue::from(10))]),
1594            // Row 8: Object with one "right" and one "wrong" field
1595            VariantRow::Object(vec![
1596                ("score", VariantValue::from(66.67f64)),
1597                ("foo", VariantValue::from(10)),
1598            ]),
1599        ]);
1600
1601        // Create target schema: struct<score: float64, age: int64>
1602        // Both types are supported for shredding
1603        let target_schema = ShreddedSchemaBuilder::default()
1604            .with_path("score", &DataType::Float64)?
1605            .with_path("age", &DataType::Int64)?
1606            .build();
1607
1608        let result = shred_variant(&input, &target_schema).unwrap();
1609
1610        // Verify structure
1611        assert!(result.value_field().is_some());
1612        assert!(result.typed_value_field().is_some());
1613        assert_eq!(result.len(), 9);
1614
1615        let metadata = result.metadata_field();
1616
1617        let value = result.value_field().unwrap();
1618        let typed_value = result
1619            .typed_value_field()
1620            .unwrap()
1621            .as_any()
1622            .downcast_ref::<arrow::array::StructArray>()
1623            .unwrap();
1624
1625        // Extract score and age fields from typed_value struct
1626        let score_field =
1627            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("score").unwrap())
1628                .unwrap();
1629        let age_field =
1630            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("age").unwrap()).unwrap();
1631
1632        let score_value = score_field
1633            .value_field()
1634            .unwrap()
1635            .as_any()
1636            .downcast_ref::<BinaryViewArray>()
1637            .unwrap();
1638        let score_typed_value = score_field
1639            .typed_value_field()
1640            .unwrap()
1641            .as_any()
1642            .downcast_ref::<Float64Array>()
1643            .unwrap();
1644        let age_value = age_field
1645            .value_field()
1646            .unwrap()
1647            .as_any()
1648            .downcast_ref::<BinaryViewArray>()
1649            .unwrap();
1650        let age_typed_value = age_field
1651            .typed_value_field()
1652            .unwrap()
1653            .as_any()
1654            .downcast_ref::<Int64Array>()
1655            .unwrap();
1656
1657        // Set up exhaustive checking of all shredded columns and their nulls/values
1658        struct ShreddedValue<'m, 'v, T> {
1659            value: Option<Variant<'m, 'v>>,
1660            typed_value: Option<T>,
1661        }
1662        struct ShreddedStruct<'m, 'v> {
1663            score: ShreddedValue<'m, 'v, f64>,
1664            age: ShreddedValue<'m, 'v, i64>,
1665        }
1666        fn get_value<'m, 'v>(
1667            i: usize,
1668            metadata: &'m BinaryViewArray,
1669            value: &'v BinaryViewArray,
1670        ) -> Variant<'m, 'v> {
1671            Variant::new(metadata.value(i), value.value(i))
1672        }
1673        let expect = |i, expected_result: Option<ShreddedValue<ShreddedStruct>>| {
1674            match expected_result {
1675                Some(ShreddedValue {
1676                    value: expected_value,
1677                    typed_value: expected_typed_value,
1678                }) => {
1679                    assert!(result.is_valid(i));
1680                    match expected_value {
1681                        Some(expected_value) => {
1682                            assert!(value.is_valid(i));
1683                            assert_eq!(expected_value, get_value(i, metadata, value));
1684                        }
1685                        None => {
1686                            assert!(value.is_null(i));
1687                        }
1688                    }
1689                    match expected_typed_value {
1690                        Some(ShreddedStruct {
1691                            score: expected_score,
1692                            age: expected_age,
1693                        }) => {
1694                            assert!(typed_value.is_valid(i));
1695                            assert!(score_field.is_valid(i)); // non-nullable
1696                            assert!(age_field.is_valid(i)); // non-nullable
1697                            match expected_score.value {
1698                                Some(expected_score_value) => {
1699                                    assert!(score_value.is_valid(i));
1700                                    assert_eq!(
1701                                        expected_score_value,
1702                                        get_value(i, metadata, score_value)
1703                                    );
1704                                }
1705                                None => {
1706                                    assert!(score_value.is_null(i));
1707                                }
1708                            }
1709                            match expected_score.typed_value {
1710                                Some(expected_score) => {
1711                                    assert!(score_typed_value.is_valid(i));
1712                                    assert_eq!(expected_score, score_typed_value.value(i));
1713                                }
1714                                None => {
1715                                    assert!(score_typed_value.is_null(i));
1716                                }
1717                            }
1718                            match expected_age.value {
1719                                Some(expected_age_value) => {
1720                                    assert!(age_value.is_valid(i));
1721                                    assert_eq!(
1722                                        expected_age_value,
1723                                        get_value(i, metadata, age_value)
1724                                    );
1725                                }
1726                                None => {
1727                                    assert!(age_value.is_null(i));
1728                                }
1729                            }
1730                            match expected_age.typed_value {
1731                                Some(expected_age) => {
1732                                    assert!(age_typed_value.is_valid(i));
1733                                    assert_eq!(expected_age, age_typed_value.value(i));
1734                                }
1735                                None => {
1736                                    assert!(age_typed_value.is_null(i));
1737                                }
1738                            }
1739                        }
1740                        None => {
1741                            assert!(typed_value.is_null(i));
1742                        }
1743                    }
1744                }
1745                None => {
1746                    assert!(result.is_null(i));
1747                }
1748            };
1749        };
1750
1751        // Row 0: Fully shredded - both fields shred successfully
1752        expect(
1753            0,
1754            Some(ShreddedValue {
1755                value: None,
1756                typed_value: Some(ShreddedStruct {
1757                    score: ShreddedValue {
1758                        value: None,
1759                        typed_value: Some(95.5),
1760                    },
1761                    age: ShreddedValue {
1762                        value: None,
1763                        typed_value: Some(30),
1764                    },
1765                }),
1766            }),
1767        );
1768
1769        // Row 1: Partially shredded - value contains extra email field
1770        let mut builder = VariantBuilder::new();
1771        builder
1772            .new_object()
1773            .with_field("email", "bob@example.com")
1774            .finish();
1775        let (m, v) = builder.finish();
1776        let expected_value = Variant::new(&m, &v);
1777
1778        expect(
1779            1,
1780            Some(ShreddedValue {
1781                value: Some(expected_value),
1782                typed_value: Some(ShreddedStruct {
1783                    score: ShreddedValue {
1784                        value: None,
1785                        typed_value: Some(87.2),
1786                    },
1787                    age: ShreddedValue {
1788                        value: None,
1789                        typed_value: Some(25),
1790                    },
1791                }),
1792            }),
1793        );
1794
1795        // Row 2: Fully shredded -- missing score field
1796        expect(
1797            2,
1798            Some(ShreddedValue {
1799                value: None,
1800                typed_value: Some(ShreddedStruct {
1801                    score: ShreddedValue {
1802                        value: None,
1803                        typed_value: None,
1804                    },
1805                    age: ShreddedValue {
1806                        value: None,
1807                        typed_value: Some(35),
1808                    },
1809                }),
1810            }),
1811        );
1812
1813        // Row 3: Type mismatches - both score and age are strings
1814        expect(
1815            3,
1816            Some(ShreddedValue {
1817                value: None,
1818                typed_value: Some(ShreddedStruct {
1819                    score: ShreddedValue {
1820                        value: Some(Variant::from("ninety-five")),
1821                        typed_value: None,
1822                    },
1823                    age: ShreddedValue {
1824                        value: Some(Variant::from("thirty")),
1825                        typed_value: None,
1826                    },
1827                }),
1828            }),
1829        );
1830
1831        // Row 4: Non-object - falls back to value field
1832        expect(
1833            4,
1834            Some(ShreddedValue {
1835                value: Some(Variant::from("not an object")),
1836                typed_value: None,
1837            }),
1838        );
1839
1840        // Row 5: Empty object
1841        expect(
1842            5,
1843            Some(ShreddedValue {
1844                value: None,
1845                typed_value: Some(ShreddedStruct {
1846                    score: ShreddedValue {
1847                        value: None,
1848                        typed_value: None,
1849                    },
1850                    age: ShreddedValue {
1851                        value: None,
1852                        typed_value: None,
1853                    },
1854                }),
1855            }),
1856        );
1857
1858        // Row 6: Null
1859        expect(6, None);
1860
1861        // Helper to correctly create a variant object using a row's existing metadata
1862        let object_with_foo_field = |i| {
1863            use parquet_variant::{ParentState, ValueBuilder, VariantMetadata};
1864            let metadata = VariantMetadata::new(metadata.value(i));
1865            let mut metadata_builder = ReadOnlyMetadataBuilder::new(&metadata);
1866            let mut value_builder = ValueBuilder::new();
1867            let state = ParentState::variant(&mut value_builder, &mut metadata_builder);
1868            ObjectBuilder::new(state, false)
1869                .with_field("foo", 10)
1870                .finish();
1871            (metadata, value_builder.into_inner())
1872        };
1873
1874        // Row 7: Object with only a "wrong" field
1875        let (m, v) = object_with_foo_field(7);
1876        expect(
1877            7,
1878            Some(ShreddedValue {
1879                value: Some(Variant::new_with_metadata(m, &v)),
1880                typed_value: Some(ShreddedStruct {
1881                    score: ShreddedValue {
1882                        value: None,
1883                        typed_value: None,
1884                    },
1885                    age: ShreddedValue {
1886                        value: None,
1887                        typed_value: None,
1888                    },
1889                }),
1890            }),
1891        );
1892
1893        // Row 8: Object with one "wrong" and one "right" field
1894        let (m, v) = object_with_foo_field(8);
1895        expect(
1896            8,
1897            Some(ShreddedValue {
1898                value: Some(Variant::new_with_metadata(m, &v)),
1899                typed_value: Some(ShreddedStruct {
1900                    score: ShreddedValue {
1901                        value: None,
1902                        typed_value: Some(66.67),
1903                    },
1904                    age: ShreddedValue {
1905                        value: None,
1906                        typed_value: None,
1907                    },
1908                }),
1909            }),
1910        );
1911        Ok(())
1912    }
1913
1914    #[test]
1915    fn test_object_shredding_with_array_field() {
1916        let input = build_variant_array(vec![
1917            // Row 0: Object with well-typed scores list
1918            VariantRow::Object(vec![(
1919                "scores",
1920                VariantValue::List(vec![VariantValue::from(10i64), VariantValue::from(20i64)]),
1921            )]),
1922            // Row 1: Object whose scores list contains incompatible type
1923            VariantRow::Object(vec![(
1924                "scores",
1925                VariantValue::List(vec![
1926                    VariantValue::from("oops"),
1927                    VariantValue::from(Variant::Null),
1928                ]),
1929            )]),
1930            // Row 2: Object missing the scores field entirely
1931            VariantRow::Object(vec![]),
1932            // Row 3: Non-object fallback
1933            VariantRow::Value(VariantValue::from("not an object")),
1934            // Row 4: Top-level Null
1935            VariantRow::Null,
1936        ]);
1937        let list_field = Arc::new(Field::new("item", DataType::Int64, true));
1938        let inner_list_schema = DataType::List(list_field);
1939        let schema = DataType::Struct(Fields::from(vec![Field::new(
1940            "scores",
1941            inner_list_schema.clone(),
1942            true,
1943        )]));
1944
1945        let result = shred_variant(&input, &schema).unwrap();
1946        assert_eq!(result.len(), 5);
1947
1948        // Access base value/typed_value columns
1949        let value_field = result.value_field().unwrap();
1950        let typed_struct = result
1951            .typed_value_field()
1952            .unwrap()
1953            .as_any()
1954            .downcast_ref::<arrow::array::StructArray>()
1955            .unwrap();
1956
1957        // Validate base value fallbacks for non-object rows
1958        assert!(value_field.is_null(0));
1959        assert!(value_field.is_null(1));
1960        assert!(value_field.is_null(2));
1961        assert!(value_field.is_valid(3));
1962        assert_eq!(
1963            Variant::new(result.metadata_field().value(3), value_field.value(3)),
1964            Variant::from("not an object")
1965        );
1966        assert!(value_field.is_null(4));
1967
1968        // Typed struct should only be null for the fallback row
1969        assert!(typed_struct.is_valid(0));
1970        assert!(typed_struct.is_valid(1));
1971        assert!(typed_struct.is_valid(2));
1972        assert!(typed_struct.is_null(3));
1973        assert!(typed_struct.is_null(4));
1974
1975        // Drill into the scores field on the typed struct
1976        let scores_field =
1977            ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("scores").unwrap())
1978                .unwrap();
1979        assert_list_structure_and_elements::<Int64Type, i32>(
1980            &VariantArray::from_parts(
1981                BinaryViewArray::from_iter_values(std::iter::repeat_n(
1982                    EMPTY_VARIANT_METADATA_BYTES,
1983                    scores_field.len(),
1984                )),
1985                Some(scores_field.value_field().unwrap().clone()),
1986                Some(scores_field.typed_value_field().unwrap().clone()),
1987                None,
1988            ),
1989            scores_field.len(),
1990            &[0i32, 2, 4, 4, 4, 4],
1991            &[Some(2), Some(2), None, None, None],
1992            &[
1993                None,
1994                None,
1995                Some(Variant::Null),
1996                Some(Variant::Null),
1997                Some(Variant::Null),
1998            ],
1999            (
2000                &[Some(10), Some(20), None, None],
2001                &[None, None, Some(Variant::from("oops")), Some(Variant::Null)],
2002            ),
2003        );
2004    }
2005
2006    #[test]
2007    fn test_object_different_schemas() -> Result<()> {
2008        // Create object with multiple fields
2009        let input = build_variant_array(vec![VariantRow::Object(vec![
2010            ("id", VariantValue::from(123i32)),
2011            ("age", VariantValue::from(25i64)),
2012            ("score", VariantValue::from(95.5f64)),
2013        ])]);
2014
2015        // Test with schema containing only id field
2016        let schema1 = ShreddedSchemaBuilder::default()
2017            .with_path("id", &DataType::Int32)?
2018            .build();
2019        let result1 = shred_variant(&input, &schema1).unwrap();
2020        let value_field1 = result1.value_field().unwrap();
2021        assert!(!value_field1.is_null(0)); // should contain {"age": 25, "score": 95.5}
2022
2023        // Test with schema containing id and age fields
2024        let schema2 = ShreddedSchemaBuilder::default()
2025            .with_path("id", &DataType::Int32)?
2026            .with_path("age", &DataType::Int64)?
2027            .build();
2028        let result2 = shred_variant(&input, &schema2).unwrap();
2029        let value_field2 = result2.value_field().unwrap();
2030        assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
2031
2032        // Test with schema containing all fields
2033        let schema3 = ShreddedSchemaBuilder::default()
2034            .with_path("id", &DataType::Int32)?
2035            .with_path("age", &DataType::Int64)?
2036            .with_path("score", &DataType::Float64)?
2037            .build();
2038        let result3 = shred_variant(&input, &schema3).unwrap();
2039        let value_field3 = result3.value_field().unwrap();
2040        assert!(value_field3.is_null(0)); // fully shredded, no remaining fields
2041
2042        Ok(())
2043    }
2044
2045    #[test]
2046    fn test_uuid_shredding_in_objects() -> Result<()> {
2047        let mock_uuid_1 = Uuid::new_v4();
2048        let mock_uuid_2 = Uuid::new_v4();
2049        let mock_uuid_3 = Uuid::new_v4();
2050
2051        let input = build_variant_array(vec![
2052            // Row 0: Fully shredded object with both UUID fields
2053            VariantRow::Object(vec![
2054                ("id", VariantValue::from(mock_uuid_1)),
2055                ("session_id", VariantValue::from(mock_uuid_2)),
2056            ]),
2057            // Row 1: Partially shredded object - UUID fields plus extra field
2058            VariantRow::Object(vec![
2059                ("id", VariantValue::from(mock_uuid_2)),
2060                ("session_id", VariantValue::from(mock_uuid_3)),
2061                ("name", VariantValue::from("test_user")),
2062            ]),
2063            // Row 2: Missing UUID field (no session_id)
2064            VariantRow::Object(vec![("id", VariantValue::from(mock_uuid_1))]),
2065            // Row 3: Type mismatch - id is UUID but session_id is a string
2066            VariantRow::Object(vec![
2067                ("id", VariantValue::from(mock_uuid_3)),
2068                ("session_id", VariantValue::from("not-a-uuid")),
2069            ]),
2070            // Row 4: Object with non-UUID value in id field
2071            VariantRow::Object(vec![
2072                ("id", VariantValue::from(12345i64)),
2073                ("session_id", VariantValue::from(mock_uuid_1)),
2074            ]),
2075            // Row 5: Null
2076            VariantRow::Null,
2077        ]);
2078
2079        let target_schema = ShreddedSchemaBuilder::default()
2080            .with_path("id", DataType::FixedSizeBinary(16))?
2081            .with_path("session_id", DataType::FixedSizeBinary(16))?
2082            .build();
2083
2084        let result = shred_variant(&input, &target_schema).unwrap();
2085
2086        assert!(result.value_field().is_some());
2087        assert!(result.typed_value_field().is_some());
2088        assert_eq!(result.len(), 6);
2089
2090        let metadata = result.metadata_field();
2091        let value = result.value_field().unwrap();
2092        let typed_value = result
2093            .typed_value_field()
2094            .unwrap()
2095            .as_any()
2096            .downcast_ref::<arrow::array::StructArray>()
2097            .unwrap();
2098
2099        // Extract id and session_id fields from typed_value struct
2100        let id_field =
2101            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("id").unwrap()).unwrap();
2102        let session_id_field =
2103            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("session_id").unwrap())
2104                .unwrap();
2105
2106        let id_value = id_field
2107            .value_field()
2108            .unwrap()
2109            .as_any()
2110            .downcast_ref::<BinaryViewArray>()
2111            .unwrap();
2112        let id_typed_value = id_field
2113            .typed_value_field()
2114            .unwrap()
2115            .as_any()
2116            .downcast_ref::<FixedSizeBinaryArray>()
2117            .unwrap();
2118        let session_id_value = session_id_field
2119            .value_field()
2120            .unwrap()
2121            .as_any()
2122            .downcast_ref::<BinaryViewArray>()
2123            .unwrap();
2124        let session_id_typed_value = session_id_field
2125            .typed_value_field()
2126            .unwrap()
2127            .as_any()
2128            .downcast_ref::<FixedSizeBinaryArray>()
2129            .unwrap();
2130
2131        // Row 0: Fully shredded - both UUID fields shred successfully
2132        assert!(result.is_valid(0));
2133
2134        assert!(value.is_null(0)); // fully shredded, no remaining fields
2135        assert!(id_value.is_null(0));
2136        assert!(session_id_value.is_null(0));
2137
2138        assert!(typed_value.is_valid(0));
2139        assert!(id_typed_value.is_valid(0));
2140        assert!(session_id_typed_value.is_valid(0));
2141
2142        assert_eq!(id_typed_value.value(0), mock_uuid_1.as_bytes());
2143        assert_eq!(session_id_typed_value.value(0), mock_uuid_2.as_bytes());
2144
2145        // Row 1: Partially shredded - value contains extra name field
2146        assert!(result.is_valid(1));
2147
2148        assert!(value.is_valid(1)); // contains unshredded "name" field
2149        assert!(typed_value.is_valid(1));
2150
2151        assert!(id_value.is_null(1));
2152        assert!(id_typed_value.is_valid(1));
2153        assert_eq!(id_typed_value.value(1), mock_uuid_2.as_bytes());
2154
2155        assert!(session_id_value.is_null(1));
2156        assert!(session_id_typed_value.is_valid(1));
2157        assert_eq!(session_id_typed_value.value(1), mock_uuid_3.as_bytes());
2158
2159        // Verify the value field contains the name field
2160        let row_1_variant = Variant::new(metadata.value(1), value.value(1));
2161        let Variant::Object(obj) = row_1_variant else {
2162            panic!("Expected object");
2163        };
2164
2165        assert_eq!(obj.get("name"), Some(Variant::from("test_user")));
2166
2167        // Row 2: Missing session_id field
2168        assert!(result.is_valid(2));
2169
2170        assert!(value.is_null(2)); // fully shredded, no extra fields
2171        assert!(typed_value.is_valid(2));
2172
2173        assert!(id_value.is_null(2));
2174        assert!(id_typed_value.is_valid(2));
2175        assert_eq!(id_typed_value.value(2), mock_uuid_1.as_bytes());
2176
2177        assert!(session_id_value.is_null(2));
2178        assert!(session_id_typed_value.is_null(2)); // missing field
2179
2180        // Row 3: Type mismatch - session_id is a string, not UUID
2181        assert!(result.is_valid(3));
2182
2183        assert!(value.is_null(3)); // no extra fields
2184        assert!(typed_value.is_valid(3));
2185
2186        assert!(id_value.is_null(3));
2187        assert!(id_typed_value.is_valid(3));
2188        assert_eq!(id_typed_value.value(3), mock_uuid_3.as_bytes());
2189
2190        assert!(session_id_value.is_valid(3)); // type mismatch, stored in value
2191        assert!(session_id_typed_value.is_null(3));
2192        let session_id_variant = Variant::new(metadata.value(3), session_id_value.value(3));
2193        assert_eq!(session_id_variant, Variant::from("not-a-uuid"));
2194
2195        // Row 4: Type mismatch - id is int64, not UUID
2196        assert!(result.is_valid(4));
2197
2198        assert!(value.is_null(4)); // no extra fields
2199        assert!(typed_value.is_valid(4));
2200
2201        assert!(id_value.is_valid(4)); // type mismatch, stored in value
2202        assert!(id_typed_value.is_null(4));
2203        let id_variant = Variant::new(metadata.value(4), id_value.value(4));
2204        assert_eq!(id_variant, Variant::from(12345i64));
2205
2206        assert!(session_id_value.is_null(4));
2207        assert!(session_id_typed_value.is_valid(4));
2208        assert_eq!(session_id_typed_value.value(4), mock_uuid_1.as_bytes());
2209
2210        // Row 5: Null
2211        assert!(result.is_null(5));
2212
2213        Ok(())
2214    }
2215
2216    #[test]
2217    fn test_spec_compliance() {
2218        let input = VariantArray::from_iter(vec![Variant::from(42i64), Variant::from("hello")]);
2219
2220        let result = shred_variant(&input, &DataType::Int64).unwrap();
2221
2222        // Test field access by name (not position)
2223        let inner_struct = result.inner();
2224        assert!(inner_struct.column_by_name("metadata").is_some());
2225        assert!(inner_struct.column_by_name("value").is_some());
2226        assert!(inner_struct.column_by_name("typed_value").is_some());
2227
2228        // Test metadata preservation
2229        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2230        // The metadata should be the same reference (cheap clone)
2231        // Note: BinaryViewArray doesn't have a .values() method, so we compare the arrays directly
2232        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2233
2234        // Test output structure correctness
2235        assert_eq!(result.len(), input.len());
2236        assert!(result.value_field().is_some());
2237        assert!(result.typed_value_field().is_some());
2238
2239        // For primitive shredding, verify that value and typed_value are never both non-null
2240        // (This rule applies to primitives; for objects, both can be non-null for partial shredding)
2241        let value_field = result.value_field().unwrap();
2242        let typed_value_field = result
2243            .typed_value_field()
2244            .unwrap()
2245            .as_any()
2246            .downcast_ref::<Int64Array>()
2247            .unwrap();
2248
2249        for i in 0..result.len() {
2250            if !result.is_null(i) {
2251                let value_is_null = value_field.is_null(i);
2252                let typed_value_is_null = typed_value_field.is_null(i);
2253                // For primitive shredding, at least one should be null
2254                assert!(
2255                    value_is_null || typed_value_is_null,
2256                    "Row {}: both value and typed_value are non-null for primitive shredding",
2257                    i
2258                );
2259            }
2260        }
2261    }
2262
2263    #[test]
2264    fn test_variant_schema_builder_simple() -> Result<()> {
2265        let shredding_type = ShreddedSchemaBuilder::default()
2266            .with_path("a", &DataType::Int64)?
2267            .with_path("b", &DataType::Float64)?
2268            .build();
2269
2270        assert_eq!(
2271            shredding_type,
2272            DataType::Struct(Fields::from(vec![
2273                Field::new("a", DataType::Int64, true),
2274                Field::new("b", DataType::Float64, true),
2275            ]))
2276        );
2277
2278        Ok(())
2279    }
2280
2281    #[test]
2282    fn test_variant_schema_builder_nested() -> Result<()> {
2283        let shredding_type = ShreddedSchemaBuilder::default()
2284            .with_path("a", &DataType::Int64)?
2285            .with_path("b.c", &DataType::Utf8)?
2286            .with_path("b.d", &DataType::Float64)?
2287            .build();
2288
2289        assert_eq!(
2290            shredding_type,
2291            DataType::Struct(Fields::from(vec![
2292                Field::new("a", DataType::Int64, true),
2293                Field::new(
2294                    "b",
2295                    DataType::Struct(Fields::from(vec![
2296                        Field::new("c", DataType::Utf8, true),
2297                        Field::new("d", DataType::Float64, true),
2298                    ])),
2299                    true
2300                ),
2301            ]))
2302        );
2303
2304        Ok(())
2305    }
2306
2307    #[test]
2308    fn test_variant_schema_builder_with_path_variant_path_arg() -> Result<()> {
2309        let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
2310        let shredding_type = ShreddedSchemaBuilder::default()
2311            .with_path(path, &DataType::Int64)?
2312            .build();
2313
2314        match shredding_type {
2315            DataType::Struct(fields) => {
2316                assert_eq!(fields.len(), 1);
2317                assert_eq!(fields[0].name(), "a.b");
2318                assert_eq!(fields[0].data_type(), &DataType::Int64);
2319            }
2320            _ => panic!("expected struct data type"),
2321        }
2322
2323        Ok(())
2324    }
2325
2326    #[test]
2327    fn test_variant_schema_builder_custom_nullability() -> Result<()> {
2328        let shredding_type = ShreddedSchemaBuilder::default()
2329            .with_path(
2330                "foo",
2331                Arc::new(Field::new("should_be_renamed", DataType::Utf8, false)),
2332            )?
2333            .with_path("bar", (&DataType::Int64, false))?
2334            .build();
2335
2336        let DataType::Struct(fields) = shredding_type else {
2337            panic!("expected struct data type");
2338        };
2339
2340        let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
2341        assert_eq!(foo.data_type(), &DataType::Utf8);
2342        assert!(!foo.is_nullable());
2343
2344        let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
2345        assert_eq!(bar.data_type(), &DataType::Int64);
2346        assert!(!bar.is_nullable());
2347
2348        Ok(())
2349    }
2350
2351    #[test]
2352    fn test_variant_schema_builder_with_shred_variant() -> Result<()> {
2353        let input = build_variant_array(vec![
2354            VariantRow::Object(vec![
2355                ("time", VariantValue::from(1234567890i64)),
2356                ("hostname", VariantValue::from("server1")),
2357                ("extra", VariantValue::from(42)),
2358            ]),
2359            VariantRow::Object(vec![
2360                ("time", VariantValue::from(9876543210i64)),
2361                ("hostname", VariantValue::from("server2")),
2362            ]),
2363            VariantRow::Null,
2364        ]);
2365
2366        let shredding_type = ShreddedSchemaBuilder::default()
2367            .with_path("time", &DataType::Int64)?
2368            .with_path("hostname", &DataType::Utf8)?
2369            .build();
2370
2371        let result = shred_variant(&input, &shredding_type).unwrap();
2372
2373        assert_eq!(
2374            result.data_type(),
2375            &DataType::Struct(Fields::from(vec![
2376                Field::new("metadata", DataType::BinaryView, false),
2377                Field::new("value", DataType::BinaryView, true),
2378                Field::new(
2379                    "typed_value",
2380                    DataType::Struct(Fields::from(vec![
2381                        Field::new(
2382                            "hostname",
2383                            DataType::Struct(Fields::from(vec![
2384                                Field::new("value", DataType::BinaryView, true),
2385                                Field::new("typed_value", DataType::Utf8, true),
2386                            ])),
2387                            false,
2388                        ),
2389                        Field::new(
2390                            "time",
2391                            DataType::Struct(Fields::from(vec![
2392                                Field::new("value", DataType::BinaryView, true),
2393                                Field::new("typed_value", DataType::Int64, true),
2394                            ])),
2395                            false,
2396                        ),
2397                    ])),
2398                    true,
2399                ),
2400            ]))
2401        );
2402
2403        assert_eq!(result.len(), 3);
2404        assert!(result.typed_value_field().is_some());
2405
2406        let typed_value = result
2407            .typed_value_field()
2408            .unwrap()
2409            .as_any()
2410            .downcast_ref::<arrow::array::StructArray>()
2411            .unwrap();
2412
2413        let time_field =
2414            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
2415                .unwrap();
2416        let hostname_field =
2417            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
2418                .unwrap();
2419
2420        let time_typed = time_field
2421            .typed_value_field()
2422            .unwrap()
2423            .as_any()
2424            .downcast_ref::<Int64Array>()
2425            .unwrap();
2426        let hostname_typed = hostname_field
2427            .typed_value_field()
2428            .unwrap()
2429            .as_any()
2430            .downcast_ref::<arrow::array::StringArray>()
2431            .unwrap();
2432
2433        // Row 0
2434        assert!(!result.is_null(0));
2435        assert_eq!(time_typed.value(0), 1234567890);
2436        assert_eq!(hostname_typed.value(0), "server1");
2437
2438        // Row 1
2439        assert!(!result.is_null(1));
2440        assert_eq!(time_typed.value(1), 9876543210);
2441        assert_eq!(hostname_typed.value(1), "server2");
2442
2443        // Row 2
2444        assert!(result.is_null(2));
2445
2446        Ok(())
2447    }
2448
2449    #[test]
2450    fn test_variant_schema_builder_conflicting_path() -> Result<()> {
2451        let shredding_type = ShreddedSchemaBuilder::default()
2452            .with_path("a", &DataType::Int64)?
2453            .with_path("a", &DataType::Float64)?
2454            .build();
2455
2456        assert_eq!(
2457            shredding_type,
2458            DataType::Struct(Fields::from(
2459                vec![Field::new("a", DataType::Float64, true),]
2460            ))
2461        );
2462
2463        Ok(())
2464    }
2465
2466    #[test]
2467    fn test_variant_schema_builder_root_path() -> Result<()> {
2468        let path = VariantPath::new(vec![]);
2469        let shredding_type = ShreddedSchemaBuilder::default()
2470            .with_path(path, &DataType::Int64)?
2471            .build();
2472
2473        assert_eq!(shredding_type, DataType::Int64);
2474
2475        Ok(())
2476    }
2477
2478    #[test]
2479    fn test_variant_schema_builder_empty_path() -> Result<()> {
2480        let shredding_type = ShreddedSchemaBuilder::default()
2481            .with_path("", &DataType::Int64)?
2482            .build();
2483
2484        assert_eq!(shredding_type, DataType::Int64);
2485        Ok(())
2486    }
2487
2488    #[test]
2489    fn test_variant_schema_builder_default() {
2490        let shredding_type = ShreddedSchemaBuilder::default().build();
2491        assert_eq!(shredding_type, DataType::Null);
2492    }
2493}