parquet_variant_compute/
shred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for shredding VariantArray with a given schema.
19
20use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
21use crate::variant_to_arrow::{
22    ArrayVariantToArrowRowBuilder, PrimitiveVariantToArrowRowBuilder,
23    make_primitive_variant_to_arrow_row_builder,
24};
25use crate::{VariantArray, VariantValueArrayBuilder};
26use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
27use arrow::buffer::NullBuffer;
28use arrow::compute::CastOptions;
29use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
30use arrow::error::{ArrowError, Result};
31use indexmap::IndexMap;
32use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement};
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36/// Shreds the input binary variant using a target shredding schema derived from the requested data type.
37///
38/// For example, requesting `DataType::Int64` would produce an output variant array with the schema:
39///
40/// ```text
41/// {
42///    metadata: BINARY,
43///    value: BINARY,
44///    typed_value: LONG,
45/// }
46/// ```
47///
48/// Similarly, requesting `DataType::Struct` with two integer fields `a` and `b` would produce an
49/// output variant array with the schema:
50///
51/// ```text
52/// {
53///   metadata: BINARY,
54///   value: BINARY,
55///   typed_value: {
56///     a: {
57///       value: BINARY,
58///       typed_value: INT,
59///     },
60///     b: {
61///       value: BINARY,
62///       typed_value: INT,
63///     },
64///   }
65/// }
66/// ```
67///
68/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
69/// value passed to this function.
70pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<VariantArray> {
71    if array.typed_value_field().is_some() {
72        return Err(ArrowError::InvalidArgumentError(
73            "Input is already shredded".to_string(),
74        ));
75    }
76
77    if array.value_field().is_none() {
78        // all-null case -- nothing to do.
79        return Ok(array.clone());
80    };
81
82    let cast_options = CastOptions::default();
83    let mut builder = make_variant_to_shredded_variant_arrow_row_builder(
84        as_type,
85        &cast_options,
86        array.len(),
87        true,
88    )?;
89    for i in 0..array.len() {
90        if array.is_null(i) {
91            builder.append_null()?;
92        } else {
93            builder.append_value(array.value(i))?;
94        }
95    }
96    let (value, typed_value, nulls) = builder.finish()?;
97    Ok(VariantArray::from_parts(
98        array.metadata_field().clone(),
99        Some(value),
100        Some(typed_value),
101        nulls,
102    ))
103}
104
105pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>(
106    data_type: &'a DataType,
107    cast_options: &'a CastOptions,
108    capacity: usize,
109    top_level: bool,
110) -> Result<VariantToShreddedVariantRowBuilder<'a>> {
111    let builder = match data_type {
112        DataType::Struct(fields) => {
113            let typed_value_builder = VariantToShreddedObjectVariantRowBuilder::try_new(
114                fields,
115                cast_options,
116                capacity,
117                top_level,
118            )?;
119            VariantToShreddedVariantRowBuilder::Object(typed_value_builder)
120        }
121        DataType::List(_)
122        | DataType::LargeList(_)
123        | DataType::ListView(_)
124        | DataType::LargeListView(_)
125        | DataType::FixedSizeList(..) => {
126            let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new(
127                data_type,
128                cast_options,
129                capacity,
130            )?;
131            VariantToShreddedVariantRowBuilder::Array(typed_value_builder)
132        }
133        // Supported shredded primitive types, see Variant shredding spec:
134        // https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types
135        DataType::Boolean
136        | DataType::Int8
137        | DataType::Int16
138        | DataType::Int32
139        | DataType::Int64
140        | DataType::Float32
141        | DataType::Float64
142        | DataType::Decimal32(..)
143        | DataType::Decimal64(..)
144        | DataType::Decimal128(..)
145        | DataType::Date32
146        | DataType::Time64(TimeUnit::Microsecond)
147        | DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _)
148        | DataType::Binary
149        | DataType::BinaryView
150        | DataType::Utf8
151        | DataType::Utf8View
152        | DataType::FixedSizeBinary(16) // UUID
153        => {
154            let builder =
155                make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?;
156            let typed_value_builder =
157                VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity, top_level);
158            VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder)
159        }
160        DataType::FixedSizeBinary(_) => {
161            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported.")))
162        }
163        _ => {
164            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type")))
165        }
166    };
167    Ok(builder)
168}
169
170pub(crate) enum VariantToShreddedVariantRowBuilder<'a> {
171    Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>),
172    Array(VariantToShreddedArrayVariantRowBuilder<'a>),
173    Object(VariantToShreddedObjectVariantRowBuilder<'a>),
174}
175
176impl<'a> VariantToShreddedVariantRowBuilder<'a> {
177    pub fn append_null(&mut self) -> Result<()> {
178        use VariantToShreddedVariantRowBuilder::*;
179        match self {
180            Primitive(b) => b.append_null(),
181            Array(b) => b.append_null(),
182            Object(b) => b.append_null(),
183        }
184    }
185
186    pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
187        use VariantToShreddedVariantRowBuilder::*;
188        match self {
189            Primitive(b) => b.append_value(value),
190            Array(b) => b.append_value(value),
191            Object(b) => b.append_value(value),
192        }
193    }
194
195    pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
196        use VariantToShreddedVariantRowBuilder::*;
197        match self {
198            Primitive(b) => b.finish(),
199            Array(b) => b.finish(),
200            Object(b) => b.finish(),
201        }
202    }
203}
204
205/// A top-level variant shredder -- appending NULL produces typed_value=NULL and value=Variant::Null
206pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> {
207    value_builder: VariantValueArrayBuilder,
208    typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
209    nulls: NullBufferBuilder,
210    top_level: bool,
211}
212
213impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> {
214    pub(crate) fn new(
215        typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
216        capacity: usize,
217        top_level: bool,
218    ) -> Self {
219        Self {
220            value_builder: VariantValueArrayBuilder::new(capacity),
221            typed_value_builder,
222            nulls: NullBufferBuilder::new(capacity),
223            top_level,
224        }
225    }
226
227    fn append_null(&mut self) -> Result<()> {
228        // Only the top-level struct that represents the variant can be nullable; object fields and
229        // array elements are non-nullable.
230        self.nulls.append(!self.top_level);
231        self.value_builder.append_null();
232        self.typed_value_builder.append_null()
233    }
234
235    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
236        self.nulls.append_non_null();
237        if self.typed_value_builder.append_value(&value)? {
238            self.value_builder.append_null();
239        } else {
240            self.value_builder.append_value(value);
241        }
242        Ok(true)
243    }
244
245    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
246        Ok((
247            self.value_builder.build()?,
248            self.typed_value_builder.finish()?,
249            self.nulls.finish(),
250        ))
251    }
252}
253
254pub(crate) struct VariantToShreddedArrayVariantRowBuilder<'a> {
255    value_builder: VariantValueArrayBuilder,
256    typed_value_builder: ArrayVariantToArrowRowBuilder<'a>,
257}
258
259impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> {
260    fn try_new(
261        data_type: &'a DataType,
262        cast_options: &'a CastOptions,
263        capacity: usize,
264    ) -> Result<Self> {
265        Ok(Self {
266            value_builder: VariantValueArrayBuilder::new(capacity),
267            typed_value_builder: ArrayVariantToArrowRowBuilder::try_new(
268                data_type,
269                cast_options,
270                capacity,
271            )?,
272        })
273    }
274
275    fn append_null(&mut self) -> Result<()> {
276        self.value_builder.append_value(Variant::Null);
277        self.typed_value_builder.append_null();
278        Ok(())
279    }
280
281    fn append_value(&mut self, variant: Variant<'_, '_>) -> Result<bool> {
282        // If the variant is not an array, typed_value must be null.
283        // If the variant is an array, value must be null.
284        match variant {
285            Variant::List(list) => {
286                self.value_builder.append_null();
287                self.typed_value_builder.append_value(list)?;
288                Ok(true)
289            }
290            other => {
291                self.value_builder.append_value(other);
292                self.typed_value_builder.append_null();
293                Ok(false)
294            }
295        }
296    }
297
298    fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
299        Ok((
300            self.value_builder.build()?,
301            self.typed_value_builder.finish()?,
302            // All elements of an array must be present (not missing) because
303            // the array Variant encoding does not allow missing elements
304            None,
305        ))
306    }
307}
308
309pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> {
310    value_builder: VariantValueArrayBuilder,
311    typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>,
312    typed_value_nulls: NullBufferBuilder,
313    nulls: NullBufferBuilder,
314    top_level: bool,
315}
316
317impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
318    fn try_new(
319        fields: &'a Fields,
320        cast_options: &'a CastOptions,
321        capacity: usize,
322        top_level: bool,
323    ) -> Result<Self> {
324        let typed_value_builders = fields.iter().map(|field| {
325            let builder = make_variant_to_shredded_variant_arrow_row_builder(
326                field.data_type(),
327                cast_options,
328                capacity,
329                false,
330            )?;
331            Ok((field.name().as_str(), builder))
332        });
333        Ok(Self {
334            value_builder: VariantValueArrayBuilder::new(capacity),
335            typed_value_builders: typed_value_builders.collect::<Result<_>>()?,
336            typed_value_nulls: NullBufferBuilder::new(capacity),
337            nulls: NullBufferBuilder::new(capacity),
338            top_level,
339        })
340    }
341
342    fn append_null(&mut self) -> Result<()> {
343        // Only the top-level struct that represents the variant can be nullable; object fields and
344        // array elements are non-nullable.
345        self.nulls.append(!self.top_level);
346        self.value_builder.append_null();
347        self.typed_value_nulls.append_null();
348        for (_, typed_value_builder) in &mut self.typed_value_builders {
349            typed_value_builder.append_null()?;
350        }
351        Ok(())
352    }
353
354    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
355        let Variant::Object(ref obj) = value else {
356            // Not an object => fall back
357            self.nulls.append_non_null();
358            self.value_builder.append_value(value);
359            self.typed_value_nulls.append_null();
360            for (_, typed_value_builder) in &mut self.typed_value_builders {
361                typed_value_builder.append_null()?;
362            }
363            return Ok(false);
364        };
365
366        // Route the object's fields by name as either shredded or unshredded
367        let mut builder = self.value_builder.builder_ext(value.metadata());
368        let mut object_builder = builder.try_new_object()?;
369        let mut seen = std::collections::HashSet::new();
370        let mut partially_shredded = false;
371        for (field_name, value) in obj.iter() {
372            match self.typed_value_builders.get_mut(field_name) {
373                Some(typed_value_builder) => {
374                    typed_value_builder.append_value(value)?;
375                    seen.insert(field_name);
376                }
377                None => {
378                    object_builder.insert_bytes(field_name, value);
379                    partially_shredded = true;
380                }
381            }
382        }
383
384        // Handle missing fields
385        for (field_name, typed_value_builder) in &mut self.typed_value_builders {
386            if !seen.contains(field_name) {
387                typed_value_builder.append_null()?;
388            }
389        }
390
391        // Only emit the value if it captured any unshredded object fields
392        if partially_shredded {
393            object_builder.finish();
394        } else {
395            drop(object_builder);
396            self.value_builder.append_null();
397        }
398
399        self.typed_value_nulls.append_non_null();
400        self.nulls.append_non_null();
401        Ok(true)
402    }
403
404    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
405        let mut builder = StructArrayBuilder::new();
406        for (field_name, typed_value_builder) in self.typed_value_builders {
407            let (value, typed_value, nulls) = typed_value_builder.finish()?;
408            let array =
409                ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls);
410            builder = builder.with_field(field_name, ArrayRef::from(array), false);
411        }
412        if let Some(nulls) = self.typed_value_nulls.finish() {
413            builder = builder.with_nulls(nulls);
414        }
415        Ok((
416            self.value_builder.build()?,
417            Arc::new(builder.build()),
418            self.nulls.finish(),
419        ))
420    }
421}
422
423/// Field configuration captured by the builder (data type + nullability).
424#[derive(Clone)]
425pub struct ShreddingField {
426    data_type: DataType,
427    nullable: bool,
428}
429
430impl ShreddingField {
431    fn new(data_type: DataType, nullable: bool) -> Self {
432        Self {
433            data_type,
434            nullable,
435        }
436    }
437
438    fn null() -> Self {
439        Self::new(DataType::Null, true)
440    }
441}
442
443/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or `(DataType, bool)`.
444pub trait IntoShreddingField {
445    fn into_shredding_field(self) -> ShreddingField;
446}
447
448impl IntoShreddingField for FieldRef {
449    fn into_shredding_field(self) -> ShreddingField {
450        ShreddingField::new(self.data_type().clone(), self.is_nullable())
451    }
452}
453
454impl IntoShreddingField for &DataType {
455    fn into_shredding_field(self) -> ShreddingField {
456        ShreddingField::new(self.clone(), true)
457    }
458}
459
460impl IntoShreddingField for DataType {
461    fn into_shredding_field(self) -> ShreddingField {
462        ShreddingField::new(self, true)
463    }
464}
465
466impl IntoShreddingField for (&DataType, bool) {
467    fn into_shredding_field(self) -> ShreddingField {
468        ShreddingField::new(self.0.clone(), self.1)
469    }
470}
471
472impl IntoShreddingField for (DataType, bool) {
473    fn into_shredding_field(self) -> ShreddingField {
474        ShreddingField::new(self.0, self.1)
475    }
476}
477
478/// Builder for constructing a variant shredding schema.
479///
480/// The builder pattern makes it easy to incrementally define which fields
481/// should be shredded and with what types. Fields are nullable by default; pass
482/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
483///
484/// Note: this builder currently only supports struct fields. List support
485/// will be added in the future.
486///
487/// # Example
488///
489/// ```
490/// use std::sync::Arc;
491/// use arrow::datatypes::{DataType, Field, TimeUnit};
492/// use parquet_variant::{VariantPath, VariantPathElement};
493/// use parquet_variant_compute::ShreddedSchemaBuilder;
494///
495/// // Define the shredding schema using the builder
496/// let shredding_type = ShreddedSchemaBuilder::default()
497///     // store the "time" field as a separate UTC timestamp
498///     .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), true))
499///     // store hostname as non-nullable Utf8
500///     .with_path("hostname", (&DataType::Utf8, false))
501///     // pass a FieldRef directly
502///     .with_path(
503///         "metadata.trace_id",
504///         Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16), false)),
505///     )
506///     // field name with a dot: use VariantPath to avoid splitting
507///     .with_path(
508///         VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
509///         &DataType::Float64,
510///     )
511///     .build();
512///
513/// // The shredding_type can now be passed to shred_variant:
514/// // let shredded = shred_variant(&input, &shredding_type)?;
515/// ```
516#[derive(Default, Clone)]
517pub struct ShreddedSchemaBuilder {
518    root: VariantSchemaNode,
519}
520
521impl ShreddedSchemaBuilder {
522    /// Create a new empty schema builder.
523    pub fn new() -> Self {
524        Self::default()
525    }
526
527    /// Insert a typed path into the schema using dot notation (or any
528    /// [`VariantPath`] convertible).
529    ///
530    /// The path uses dot notation to specify nested fields.
531    /// For example, "a.b.c" will create a nested structure.
532    ///
533    /// # Arguments
534    ///
535    /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
536    /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g. `FieldRef`,
537    ///   `&DataType`, or `(&DataType, bool)` to control nullability)
538    pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Self
539    where
540        P: Into<VariantPath<'a>>,
541        F: IntoShreddingField,
542    {
543        let path: VariantPath<'a> = path.into();
544        self.root.insert_path(&path, field.into_shredding_field());
545        self
546    }
547
548    /// Build the final [`DataType`].
549    pub fn build(self) -> DataType {
550        let shredding_type = self.root.to_shredding_type();
551        match shredding_type {
552            Some(shredding_type) => shredding_type,
553            None => DataType::Null,
554        }
555    }
556}
557
558/// Internal tree node structure for building variant schemas.
559#[derive(Clone)]
560enum VariantSchemaNode {
561    /// A leaf node with a primitive/scalar type (and nullability)
562    Leaf(ShreddingField),
563    /// An inner struct node with nested fields
564    Struct(BTreeMap<String, VariantSchemaNode>),
565}
566
567impl Default for VariantSchemaNode {
568    fn default() -> Self {
569        Self::Leaf(ShreddingField::null())
570    }
571}
572
573impl VariantSchemaNode {
574    /// Insert a path into this node with the given data type.
575    fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
576        self.insert_path_elements(path, field);
577    }
578
579    fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], field: ShreddingField) {
580        let Some((head, tail)) = segments.split_first() else {
581            *self = Self::Leaf(field);
582            return;
583        };
584
585        match head {
586            VariantPathElement::Field { name } => {
587                // Ensure this node is a Struct node
588                let children = match self {
589                    Self::Struct(children) => children,
590                    _ => {
591                        *self = Self::Struct(BTreeMap::new());
592                        match self {
593                            Self::Struct(children) => children,
594                            _ => unreachable!(),
595                        }
596                    }
597                };
598
599                children
600                    .entry(name.to_string())
601                    .or_default()
602                    .insert_path_elements(tail, field);
603            }
604            VariantPathElement::Index { .. } => {
605                // List support to be added later; reject for now
606                unreachable!("List paths are not supported yet");
607            }
608        }
609    }
610
611    /// Convert this node to a shredding type.
612    ///
613    /// Returns the [`DataType`] for passing to [`shred_variant`].
614    fn to_shredding_type(&self) -> Option<DataType> {
615        match self {
616            Self::Leaf(field) => Some(field.data_type.clone()),
617            Self::Struct(children) => {
618                let child_fields: Vec<_> = children
619                    .iter()
620                    .filter_map(|(name, child)| child.to_shredding_field(name))
621                    .collect();
622                if child_fields.is_empty() {
623                    None
624                } else {
625                    Some(DataType::Struct(Fields::from(child_fields)))
626                }
627            }
628        }
629    }
630
631    fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
632        match self {
633            Self::Leaf(field) => Some(Arc::new(Field::new(
634                name,
635                field.data_type.clone(),
636                field.nullable,
637            ))),
638            Self::Struct(_) => self
639                .to_shredding_type()
640                .map(|data_type| Arc::new(Field::new(name, data_type, true))),
641        }
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use crate::VariantArrayBuilder;
649    use crate::arrow_to_variant::ListLikeArray;
650    use arrow::array::{
651        Array, BinaryViewArray, FixedSizeBinaryArray, Float64Array, GenericListArray,
652        GenericListViewArray, Int64Array, ListArray, OffsetSizeTrait, PrimitiveArray, StringArray,
653    };
654    use arrow::datatypes::{
655        ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode,
656    };
657    use parquet_variant::{
658        BuilderSpecificState, EMPTY_VARIANT_METADATA_BYTES, ObjectBuilder, ReadOnlyMetadataBuilder,
659        Variant, VariantBuilder, VariantPath, VariantPathElement,
660    };
661    use std::sync::Arc;
662    use uuid::Uuid;
663
664    #[derive(Clone)]
665    enum VariantValue<'a> {
666        Value(Variant<'a, 'a>),
667        List(Vec<VariantValue<'a>>),
668        Object(Vec<(&'a str, VariantValue<'a>)>),
669        Null,
670    }
671
672    impl<'a, T> From<T> for VariantValue<'a>
673    where
674        T: Into<Variant<'a, 'a>>,
675    {
676        fn from(value: T) -> Self {
677            Self::Value(value.into())
678        }
679    }
680
681    #[derive(Clone)]
682    enum VariantRow<'a> {
683        Value(VariantValue<'a>),
684        List(Vec<VariantValue<'a>>),
685        Object(Vec<(&'a str, VariantValue<'a>)>),
686        Null,
687    }
688
689    fn build_variant_array(rows: Vec<VariantRow<'static>>) -> VariantArray {
690        let mut builder = VariantArrayBuilder::new(rows.len());
691
692        fn append_variant_value<B: VariantBuilderExt>(builder: &mut B, value: VariantValue) {
693            match value {
694                VariantValue::Value(v) => builder.append_value(v),
695                VariantValue::List(values) => {
696                    let mut list = builder.new_list();
697                    for v in values {
698                        append_variant_value(&mut list, v);
699                    }
700                    list.finish();
701                }
702                VariantValue::Object(fields) => {
703                    let mut object = builder.new_object();
704                    for (name, value) in fields {
705                        append_variant_field(&mut object, name, value);
706                    }
707                    object.finish();
708                }
709                VariantValue::Null => builder.append_null(),
710            }
711        }
712
713        fn append_variant_field<'a, S: BuilderSpecificState>(
714            object: &mut ObjectBuilder<'_, S>,
715            name: &'a str,
716            value: VariantValue<'a>,
717        ) {
718            match value {
719                VariantValue::Value(v) => {
720                    object.insert(name, v);
721                }
722                VariantValue::List(values) => {
723                    let mut list = object.new_list(name);
724                    for v in values {
725                        append_variant_value(&mut list, v);
726                    }
727                    list.finish();
728                }
729                VariantValue::Object(fields) => {
730                    let mut nested = object.new_object(name);
731                    for (field_name, v) in fields {
732                        append_variant_field(&mut nested, field_name, v);
733                    }
734                    nested.finish();
735                }
736                VariantValue::Null => {
737                    object.insert(name, Variant::Null);
738                }
739            }
740        }
741
742        rows.into_iter().for_each(|row| match row {
743            VariantRow::Value(value) => append_variant_value(&mut builder, value),
744            VariantRow::List(values) => {
745                let mut list = builder.new_list();
746                for value in values {
747                    append_variant_value(&mut list, value);
748                }
749                list.finish();
750            }
751            VariantRow::Object(fields) => {
752                let mut object = builder.new_object();
753                for (name, value) in fields {
754                    append_variant_field(&mut object, name, value);
755                }
756                object.finish();
757            }
758            VariantRow::Null => builder.append_null(),
759        });
760        builder.build()
761    }
762
763    trait TestListLikeArray: ListLikeArray {
764        type OffsetSize: OffsetSizeTrait;
765        fn value_offsets(&self) -> Option<&[Self::OffsetSize]>;
766        fn value_size(&self, index: usize) -> Self::OffsetSize;
767    }
768
769    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListArray<O> {
770        type OffsetSize = O;
771
772        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
773            Some(GenericListArray::value_offsets(self))
774        }
775
776        fn value_size(&self, index: usize) -> Self::OffsetSize {
777            GenericListArray::value_length(self, index)
778        }
779    }
780
781    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListViewArray<O> {
782        type OffsetSize = O;
783
784        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
785            Some(GenericListViewArray::value_offsets(self))
786        }
787
788        fn value_size(&self, index: usize) -> Self::OffsetSize {
789            GenericListViewArray::value_size(self, index)
790        }
791    }
792
793    fn downcast_list_like_array<O: OffsetSizeTrait>(
794        array: &VariantArray,
795    ) -> &dyn TestListLikeArray<OffsetSize = O> {
796        let typed_value = array.typed_value_field().unwrap();
797        if let Some(list) = typed_value.as_any().downcast_ref::<GenericListArray<O>>() {
798            list
799        } else if let Some(list_view) = typed_value
800            .as_any()
801            .downcast_ref::<GenericListViewArray<O>>()
802        {
803            list_view
804        } else {
805            panic!(
806                "Expected list-like typed_value with matching offset type, got {}",
807                typed_value.data_type()
808            );
809        }
810    }
811
812    fn assert_list_structure<O: OffsetSizeTrait>(
813        array: &VariantArray,
814        expected_len: usize,
815        expected_offsets: &[O],
816        expected_sizes: &[Option<O>],
817        expected_fallbacks: &[Option<Variant<'static, 'static>>],
818    ) {
819        assert_eq!(array.len(), expected_len);
820
821        let fallbacks = (array.value_field().unwrap(), Some(array.metadata_field()));
822        let array = downcast_list_like_array::<O>(array);
823
824        assert_eq!(
825            array.value_offsets().unwrap(),
826            expected_offsets,
827            "list offsets mismatch"
828        );
829        assert_eq!(
830            array.len(),
831            expected_sizes.len(),
832            "expected_sizes should match array length"
833        );
834        assert_eq!(
835            array.len(),
836            expected_fallbacks.len(),
837            "expected_fallbacks should match array length"
838        );
839        assert_eq!(
840            array.len(),
841            fallbacks.0.len(),
842            "fallbacks value field should match array length"
843        );
844
845        // Validate per-row shredding outcomes for the list array
846        for (idx, (expected_size, expected_fallback)) in expected_sizes
847            .iter()
848            .zip(expected_fallbacks.iter())
849            .enumerate()
850        {
851            match expected_size {
852                Some(len) => {
853                    // Successfully shredded: typed list value present, no fallback value
854                    assert!(array.is_valid(idx));
855                    assert_eq!(array.value_size(idx), *len);
856                    assert!(fallbacks.0.is_null(idx));
857                }
858                None => {
859                    // Unable to shred: typed list value absent, fallback should carry the variant
860                    assert!(array.is_null(idx));
861                    assert_eq!(array.value_size(idx), O::zero());
862                    match expected_fallback {
863                        Some(expected_variant) => {
864                            assert!(fallbacks.0.is_valid(idx));
865                            let metadata_bytes = fallbacks
866                                .1
867                                .filter(|m| m.is_valid(idx))
868                                .map(|m| m.value(idx))
869                                .filter(|bytes| !bytes.is_empty())
870                                .unwrap_or(EMPTY_VARIANT_METADATA_BYTES);
871                            assert_eq!(
872                                Variant::new(metadata_bytes, fallbacks.0.value(idx)),
873                                expected_variant.clone()
874                            );
875                        }
876                        None => unreachable!(),
877                    }
878                }
879            }
880        }
881    }
882
883    fn assert_list_structure_and_elements<T: ArrowPrimitiveType, O: OffsetSizeTrait>(
884        array: &VariantArray,
885        expected_len: usize,
886        expected_offsets: &[O],
887        expected_sizes: &[Option<O>],
888        expected_fallbacks: &[Option<Variant<'static, 'static>>],
889        expected_shredded_elements: (&[Option<T::Native>], &[Option<Variant<'static, 'static>>]),
890    ) {
891        assert_list_structure(
892            array,
893            expected_len,
894            expected_offsets,
895            expected_sizes,
896            expected_fallbacks,
897        );
898        let array = downcast_list_like_array::<O>(array);
899
900        // Validate the shredded state of list elements (typed values and fallbacks)
901        let (expected_values, expected_fallbacks) = expected_shredded_elements;
902        assert_eq!(
903            expected_values.len(),
904            expected_fallbacks.len(),
905            "expected_values and expected_fallbacks should be aligned"
906        );
907
908        // Validate the shredded primitive values for list elements
909        let element_array = ShreddedVariantFieldArray::try_new(array.values().as_ref()).unwrap();
910        let element_values = element_array
911            .typed_value_field()
912            .unwrap()
913            .as_any()
914            .downcast_ref::<PrimitiveArray<T>>()
915            .unwrap();
916        assert_eq!(element_values.len(), expected_values.len());
917        for (idx, expected_value) in expected_values.iter().enumerate() {
918            match expected_value {
919                Some(value) => {
920                    assert!(element_values.is_valid(idx));
921                    assert_eq!(element_values.value(idx), *value);
922                }
923                None => assert!(element_values.is_null(idx)),
924            }
925        }
926
927        // Validate fallback variants for list elements that could not be shredded
928        let element_fallbacks = element_array.value_field().unwrap();
929        assert_eq!(element_fallbacks.len(), expected_fallbacks.len());
930        for (idx, expected_fallback) in expected_fallbacks.iter().enumerate() {
931            match expected_fallback {
932                Some(expected_variant) => {
933                    assert!(element_fallbacks.is_valid(idx));
934                    assert_eq!(
935                        Variant::new(EMPTY_VARIANT_METADATA_BYTES, element_fallbacks.value(idx)),
936                        expected_variant.clone()
937                    );
938                }
939                None => assert!(element_fallbacks.is_null(idx)),
940            }
941        }
942    }
943
944    #[test]
945    fn test_already_shredded_input_error() {
946        // Create a VariantArray that already has typed_value_field
947        // First create a valid VariantArray, then extract its parts to construct a shredded one
948        let temp_array = VariantArray::from_iter(vec![Some(Variant::from("test"))]);
949        let metadata = temp_array.metadata_field().clone();
950        let value = temp_array.value_field().unwrap().clone();
951        let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef;
952
953        let shredded_array =
954            VariantArray::from_parts(metadata, Some(value), Some(typed_value), None);
955
956        let result = shred_variant(&shredded_array, &DataType::Int64);
957        assert!(matches!(
958            result.unwrap_err(),
959            ArrowError::InvalidArgumentError(_)
960        ));
961    }
962
963    #[test]
964    fn test_all_null_input() {
965        // Create VariantArray with no value field (all null case)
966        let metadata = BinaryViewArray::from_iter_values([&[1u8, 0u8]]); // minimal valid metadata
967        let all_null_array = VariantArray::from_parts(metadata, None, None, None);
968        let result = shred_variant(&all_null_array, &DataType::Int64).unwrap();
969
970        // Should return array with no value/typed_value fields
971        assert!(result.value_field().is_none());
972        assert!(result.typed_value_field().is_none());
973    }
974
975    #[test]
976    fn test_invalid_fixed_size_binary_shredding() {
977        let mock_uuid_1 = Uuid::new_v4();
978
979        let input = VariantArray::from_iter([Some(Variant::from(mock_uuid_1)), None]);
980
981        // shred_variant only supports FixedSizeBinary(16). Any other length will err.
982        let err = shred_variant(&input, &DataType::FixedSizeBinary(17)).unwrap_err();
983
984        assert_eq!(
985            err.to_string(),
986            "Invalid argument error: FixedSizeBinary(17) is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported."
987        );
988    }
989
990    #[test]
991    fn test_uuid_shredding() {
992        let mock_uuid_1 = Uuid::new_v4();
993        let mock_uuid_2 = Uuid::new_v4();
994
995        let input = VariantArray::from_iter([
996            Some(Variant::from(mock_uuid_1)),
997            None,
998            Some(Variant::from(false)),
999            Some(Variant::from(mock_uuid_2)),
1000        ]);
1001
1002        let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();
1003
1004        // // inspect the typed_value Field and make sure it contains the canonical Uuid extension type
1005        // let typed_value_field = variant_array
1006        //     .inner()
1007        //     .fields()
1008        //     .into_iter()
1009        //     .find(|f| f.name() == "typed_value")
1010        //     .unwrap();
1011
1012        // assert!(
1013        //     typed_value_field
1014        //         .try_extension_type::<extension::Uuid>()
1015        //         .is_ok()
1016        // );
1017
1018        // probe the downcasted typed_value array to make sure uuids are shredded correctly
1019        let uuids = variant_array
1020            .typed_value_field()
1021            .unwrap()
1022            .as_any()
1023            .downcast_ref::<FixedSizeBinaryArray>()
1024            .unwrap();
1025
1026        assert_eq!(uuids.len(), 4);
1027
1028        assert!(!uuids.is_null(0));
1029
1030        let got_uuid_1: &[u8] = uuids.value(0);
1031        assert_eq!(got_uuid_1, mock_uuid_1.as_bytes());
1032
1033        assert!(uuids.is_null(1));
1034        assert!(uuids.is_null(2));
1035
1036        assert!(!uuids.is_null(3));
1037
1038        let got_uuid_2: &[u8] = uuids.value(3);
1039        assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
1040    }
1041
1042    #[test]
1043    fn test_primitive_shredding_comprehensive() {
1044        // Test mixed scenarios in a single array
1045        let input = VariantArray::from_iter(vec![
1046            Some(Variant::from(42i64)),   // successful shred
1047            Some(Variant::from("hello")), // failed shred (string)
1048            Some(Variant::from(100i64)),  // successful shred
1049            None,                         // array-level null
1050            Some(Variant::Null),          // variant null
1051            Some(Variant::from(3i8)),     // successful shred (int8->int64 conversion)
1052        ]);
1053
1054        let result = shred_variant(&input, &DataType::Int64).unwrap();
1055
1056        // Verify structure
1057        let metadata_field = result.metadata_field();
1058        let value_field = result.value_field().unwrap();
1059        let typed_value_field = result
1060            .typed_value_field()
1061            .unwrap()
1062            .as_any()
1063            .downcast_ref::<Int64Array>()
1064            .unwrap();
1065
1066        // Check specific outcomes for each row
1067        assert_eq!(result.len(), 6);
1068
1069        // Row 0: 42 -> should shred successfully
1070        assert!(!result.is_null(0));
1071        assert!(value_field.is_null(0)); // value should be null when shredded
1072        assert!(!typed_value_field.is_null(0));
1073        assert_eq!(typed_value_field.value(0), 42);
1074
1075        // Row 1: "hello" -> should fail to shred
1076        assert!(!result.is_null(1));
1077        assert!(!value_field.is_null(1)); // value should contain original
1078        assert!(typed_value_field.is_null(1)); // typed_value should be null
1079        assert_eq!(
1080            Variant::new(metadata_field.value(1), value_field.value(1)),
1081            Variant::from("hello")
1082        );
1083
1084        // Row 2: 100 -> should shred successfully
1085        assert!(!result.is_null(2));
1086        assert!(value_field.is_null(2));
1087        assert_eq!(typed_value_field.value(2), 100);
1088
1089        // Row 3: array null -> should be null in result
1090        assert!(result.is_null(3));
1091
1092        // Row 4: Variant::Null -> should not shred (it's a null variant, not an integer)
1093        assert!(!result.is_null(4));
1094        assert!(!value_field.is_null(4)); // should contain Variant::Null
1095        assert_eq!(
1096            Variant::new(metadata_field.value(4), value_field.value(4)),
1097            Variant::Null
1098        );
1099        assert!(typed_value_field.is_null(4));
1100
1101        // Row 5: 3i8 -> should shred successfully (int8->int64 conversion)
1102        assert!(!result.is_null(5));
1103        assert!(value_field.is_null(5)); // value should be null when shredded
1104        assert!(!typed_value_field.is_null(5));
1105        assert_eq!(typed_value_field.value(5), 3);
1106    }
1107
1108    #[test]
1109    fn test_primitive_different_target_types() {
1110        let input = VariantArray::from_iter(vec![
1111            Variant::from(42i32),
1112            Variant::from(3.15f64),
1113            Variant::from("not_a_number"),
1114        ]);
1115
1116        // Test Int32 target
1117        let result_int32 = shred_variant(&input, &DataType::Int32).unwrap();
1118        let typed_value_int32 = result_int32
1119            .typed_value_field()
1120            .unwrap()
1121            .as_any()
1122            .downcast_ref::<arrow::array::Int32Array>()
1123            .unwrap();
1124        assert_eq!(typed_value_int32.value(0), 42);
1125        assert!(typed_value_int32.is_null(1)); // float doesn't convert to int32
1126        assert!(typed_value_int32.is_null(2)); // string doesn't convert to int32
1127
1128        // Test Float64 target
1129        let result_float64 = shred_variant(&input, &DataType::Float64).unwrap();
1130        let typed_value_float64 = result_float64
1131            .typed_value_field()
1132            .unwrap()
1133            .as_any()
1134            .downcast_ref::<Float64Array>()
1135            .unwrap();
1136        assert_eq!(typed_value_float64.value(0), 42.0); // int converts to float
1137        assert_eq!(typed_value_float64.value(1), 3.15);
1138        assert!(typed_value_float64.is_null(2)); // string doesn't convert
1139    }
1140
1141    #[test]
1142    fn test_invalid_shredded_types_rejected() {
1143        let input = VariantArray::from_iter([Variant::from(42)]);
1144
1145        let invalid_types = vec![
1146            DataType::UInt8,
1147            DataType::Float16,
1148            DataType::Decimal256(38, 10),
1149            DataType::Date64,
1150            DataType::Time32(TimeUnit::Second),
1151            DataType::Time64(TimeUnit::Nanosecond),
1152            DataType::Timestamp(TimeUnit::Millisecond, None),
1153            DataType::LargeBinary,
1154            DataType::LargeUtf8,
1155            DataType::FixedSizeBinary(17),
1156            DataType::Union(
1157                UnionFields::from_fields(vec![
1158                    Field::new("int_field", DataType::Int32, false),
1159                    Field::new("str_field", DataType::Utf8, true),
1160                ]),
1161                UnionMode::Dense,
1162            ),
1163            DataType::Map(
1164                Arc::new(Field::new(
1165                    "entries",
1166                    DataType::Struct(Fields::from(vec![
1167                        Field::new("key", DataType::Utf8, false),
1168                        Field::new("value", DataType::Int32, true),
1169                    ])),
1170                    false,
1171                )),
1172                false,
1173            ),
1174            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1175            DataType::RunEndEncoded(
1176                Arc::new(Field::new("run_ends", DataType::Int32, false)),
1177                Arc::new(Field::new("values", DataType::Utf8, true)),
1178            ),
1179        ];
1180
1181        for data_type in invalid_types {
1182            let err = shred_variant(&input, &data_type).unwrap_err();
1183            assert!(
1184                matches!(err, ArrowError::InvalidArgumentError(_)),
1185                "expected InvalidArgumentError for {:?}, got {:?}",
1186                data_type,
1187                err
1188            );
1189        }
1190    }
1191
1192    #[test]
1193    fn test_array_shredding_as_list() {
1194        let input = build_variant_array(vec![
1195            // Row 0: List of ints should shred entirely into typed_value
1196            VariantRow::List(vec![
1197                VariantValue::from(1i64),
1198                VariantValue::from(2i64),
1199                VariantValue::from(3i64),
1200            ]),
1201            // Row 1: Contains incompatible types so values fall back
1202            VariantRow::List(vec![
1203                VariantValue::from(1i64),
1204                VariantValue::from("two"),
1205                VariantValue::from(Variant::Null),
1206            ]),
1207            // Row 2: Not a list -> entire row falls back
1208            VariantRow::Value(VariantValue::from("not a list")),
1209            // Row 3: Array-level null propagates
1210            VariantRow::Null,
1211            // Row 4: Empty list exercises zero-length offsets
1212            VariantRow::List(vec![]),
1213        ]);
1214        let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1215        let result = shred_variant(&input, &list_schema).unwrap();
1216        assert_eq!(result.len(), 5);
1217
1218        assert_list_structure_and_elements::<Int64Type, i32>(
1219            &result,
1220            5,
1221            &[0, 3, 6, 6, 6, 6],
1222            &[Some(3), Some(3), None, None, Some(0)],
1223            &[
1224                None,
1225                None,
1226                Some(Variant::from("not a list")),
1227                Some(Variant::Null),
1228                None,
1229            ],
1230            (
1231                &[Some(1), Some(2), Some(3), Some(1), None, None],
1232                &[
1233                    None,
1234                    None,
1235                    None,
1236                    None,
1237                    Some(Variant::from("two")),
1238                    Some(Variant::Null),
1239                ],
1240            ),
1241        );
1242    }
1243
1244    #[test]
1245    fn test_array_shredding_as_large_list() {
1246        let input = build_variant_array(vec![
1247            // Row 0: List of ints shreds to typed_value
1248            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1249            // Row 1: Not a list -> entire row falls back
1250            VariantRow::Value(VariantValue::from("not a list")),
1251            // Row 2: Empty list
1252            VariantRow::List(vec![]),
1253        ]);
1254        let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true)));
1255        let result = shred_variant(&input, &list_schema).unwrap();
1256        assert_eq!(result.len(), 3);
1257
1258        assert_list_structure_and_elements::<Int64Type, i64>(
1259            &result,
1260            3,
1261            &[0, 2, 2, 2],
1262            &[Some(2), None, Some(0)],
1263            &[None, Some(Variant::from("not a list")), None],
1264            (&[Some(1), Some(2)], &[None, None]),
1265        );
1266    }
1267
1268    #[test]
1269    fn test_array_shredding_as_list_view() {
1270        let input = build_variant_array(vec![
1271            // Row 0: Standard list
1272            VariantRow::List(vec![
1273                VariantValue::from(1i64),
1274                VariantValue::from(2i64),
1275                VariantValue::from(3i64),
1276            ]),
1277            // Row 1: List with incompatible types -> element fallback
1278            VariantRow::List(vec![
1279                VariantValue::from(1i64),
1280                VariantValue::from("two"),
1281                VariantValue::from(Variant::Null),
1282            ]),
1283            // Row 2: Not a list -> top-level fallback
1284            VariantRow::Value(VariantValue::from("not a list")),
1285            // Row 3: Top-level Null
1286            VariantRow::Null,
1287            // Row 4: Empty list
1288            VariantRow::List(vec![]),
1289        ]);
1290        let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Int64, true)));
1291        let result = shred_variant(&input, &list_schema).unwrap();
1292        assert_eq!(result.len(), 5);
1293
1294        assert_list_structure_and_elements::<Int64Type, i32>(
1295            &result,
1296            5,
1297            &[0, 3, 6, 6, 6],
1298            &[Some(3), Some(3), None, None, Some(0)],
1299            &[
1300                None,
1301                None,
1302                Some(Variant::from("not a list")),
1303                Some(Variant::Null),
1304                None,
1305            ],
1306            (
1307                &[Some(1), Some(2), Some(3), Some(1), None, None],
1308                &[
1309                    None,
1310                    None,
1311                    None,
1312                    None,
1313                    Some(Variant::from("two")),
1314                    Some(Variant::Null),
1315                ],
1316            ),
1317        );
1318    }
1319
1320    #[test]
1321    fn test_array_shredding_as_large_list_view() {
1322        let input = build_variant_array(vec![
1323            // Row 0: List of ints shreds to typed_value
1324            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1325            // Row 1: Not a list -> entire row falls back
1326            VariantRow::Value(VariantValue::from("fallback")),
1327            // Row 2: Empty list
1328            VariantRow::List(vec![]),
1329        ]);
1330        let list_schema =
1331            DataType::LargeListView(Arc::new(Field::new("item", DataType::Int64, true)));
1332        let result = shred_variant(&input, &list_schema).unwrap();
1333        assert_eq!(result.len(), 3);
1334
1335        assert_list_structure_and_elements::<Int64Type, i64>(
1336            &result,
1337            3,
1338            &[0, 2, 2],
1339            &[Some(2), None, Some(0)],
1340            &[None, Some(Variant::from("fallback")), None],
1341            (&[Some(1), Some(2)], &[None, None]),
1342        );
1343    }
1344
1345    #[test]
1346    fn test_array_shredding_as_fixed_size_list() {
1347        let input = build_variant_array(vec![VariantRow::List(vec![
1348            VariantValue::from(1i64),
1349            VariantValue::from(2i64),
1350            VariantValue::from(3i64),
1351        ])]);
1352        let list_schema =
1353            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
1354        let err = shred_variant(&input, &list_schema).unwrap_err();
1355        assert_eq!(
1356            err.to_string(),
1357            "Not yet implemented: Converting unshredded variant arrays to arrow fixed-size lists"
1358        );
1359    }
1360
1361    #[test]
1362    fn test_array_shredding_with_array_elements() {
1363        let input = build_variant_array(vec![
1364            // Row 0: [[1, 2], [3, 4], []] - clean nested lists
1365            VariantRow::List(vec![
1366                VariantValue::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1367                VariantValue::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]),
1368                VariantValue::List(vec![]),
1369            ]),
1370            // Row 1: [[5, "bad", null], "not a list inner", null] - inner fallbacks
1371            VariantRow::List(vec![
1372                VariantValue::List(vec![
1373                    VariantValue::from(5i64),
1374                    VariantValue::from("bad"),
1375                    VariantValue::from(Variant::Null),
1376                ]),
1377                VariantValue::from("not a list inner"),
1378                VariantValue::Null,
1379            ]),
1380            // Row 2: "not a list" - top-level fallback
1381            VariantRow::Value(VariantValue::from("not a list")),
1382            // Row 3: null row
1383            VariantRow::Null,
1384        ]);
1385        let inner_field = Arc::new(Field::new("item", DataType::Int64, true));
1386        let inner_list_schema = DataType::List(inner_field);
1387        let list_schema = DataType::List(Arc::new(Field::new(
1388            "item",
1389            inner_list_schema.clone(),
1390            true,
1391        )));
1392        let result = shred_variant(&input, &list_schema).unwrap();
1393        assert_eq!(result.len(), 4);
1394
1395        let typed_value = result
1396            .typed_value_field()
1397            .unwrap()
1398            .as_any()
1399            .downcast_ref::<ListArray>()
1400            .unwrap();
1401
1402        assert_list_structure::<i32>(
1403            &result,
1404            4,
1405            &[0, 3, 6, 6, 6],
1406            &[Some(3), Some(3), None, None],
1407            &[
1408                None,
1409                None,
1410                Some(Variant::from("not a list")),
1411                Some(Variant::Null),
1412            ],
1413        );
1414
1415        let outer_elements =
1416            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1417        assert_eq!(outer_elements.len(), 6);
1418        let outer_values = outer_elements
1419            .typed_value_field()
1420            .unwrap()
1421            .as_any()
1422            .downcast_ref::<ListArray>()
1423            .unwrap();
1424        let outer_fallbacks = outer_elements.value_field().unwrap();
1425
1426        let outer_metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n(
1427            EMPTY_VARIANT_METADATA_BYTES,
1428            outer_elements.len(),
1429        ));
1430        let outer_variant = VariantArray::from_parts(
1431            outer_metadata,
1432            Some(outer_fallbacks.clone()),
1433            Some(Arc::new(outer_values.clone())),
1434            None,
1435        );
1436
1437        assert_list_structure_and_elements::<Int64Type, i32>(
1438            &outer_variant,
1439            outer_elements.len(),
1440            &[0, 2, 4, 4, 7, 7, 7],
1441            &[Some(2), Some(2), Some(0), Some(3), None, None],
1442            &[
1443                None,
1444                None,
1445                None,
1446                None,
1447                Some(Variant::from("not a list inner")),
1448                Some(Variant::Null),
1449            ],
1450            (
1451                &[Some(1), Some(2), Some(3), Some(4), Some(5), None, None],
1452                &[
1453                    None,
1454                    None,
1455                    None,
1456                    None,
1457                    None,
1458                    Some(Variant::from("bad")),
1459                    Some(Variant::Null),
1460                ],
1461            ),
1462        );
1463    }
1464
1465    #[test]
1466    fn test_array_shredding_with_object_elements() {
1467        let input = build_variant_array(vec![
1468            // Row 0: [{"id": 1, "name": "Alice"}, {"id": null}] fully shards
1469            VariantRow::List(vec![
1470                VariantValue::Object(vec![
1471                    ("id", VariantValue::from(1i64)),
1472                    ("name", VariantValue::from("Alice")),
1473                ]),
1474                VariantValue::Object(vec![("id", VariantValue::from(Variant::Null))]),
1475            ]),
1476            // Row 1: "not a list" -> fallback
1477            VariantRow::Value(VariantValue::from("not a list")),
1478            // Row 2: Null row
1479            VariantRow::Null,
1480        ]);
1481
1482        // Target schema is List<Struct<id:int64,name:utf8>>
1483        let object_fields = Fields::from(vec![
1484            Field::new("id", DataType::Int64, true),
1485            Field::new("name", DataType::Utf8, true),
1486        ]);
1487        let list_schema = DataType::List(Arc::new(Field::new(
1488            "item",
1489            DataType::Struct(object_fields),
1490            true,
1491        )));
1492        let result = shred_variant(&input, &list_schema).unwrap();
1493        assert_eq!(result.len(), 3);
1494
1495        assert_list_structure::<i32>(
1496            &result,
1497            3,
1498            &[0, 2, 2, 2],
1499            &[Some(2), None, None],
1500            &[None, Some(Variant::from("not a list")), Some(Variant::Null)],
1501        );
1502
1503        // Validate nested struct fields for each element
1504        let typed_value = result
1505            .typed_value_field()
1506            .unwrap()
1507            .as_any()
1508            .downcast_ref::<ListArray>()
1509            .unwrap();
1510        let element_array =
1511            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1512        assert_eq!(element_array.len(), 2);
1513        let element_objects = element_array
1514            .typed_value_field()
1515            .unwrap()
1516            .as_any()
1517            .downcast_ref::<arrow::array::StructArray>()
1518            .unwrap();
1519
1520        // Id field [1, Variant::Null]
1521        let id_field =
1522            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("id").unwrap())
1523                .unwrap();
1524        let id_values = id_field.value_field().unwrap();
1525        let id_typed_values = id_field
1526            .typed_value_field()
1527            .unwrap()
1528            .as_any()
1529            .downcast_ref::<Int64Array>()
1530            .unwrap();
1531        assert!(id_values.is_null(0));
1532        assert_eq!(id_typed_values.value(0), 1);
1533        // null is stored as Variant::Null in values
1534        assert!(id_values.is_valid(1));
1535        assert_eq!(
1536            Variant::new(EMPTY_VARIANT_METADATA_BYTES, id_values.value(1)),
1537            Variant::Null
1538        );
1539        assert!(id_typed_values.is_null(1));
1540
1541        // Name field ["Alice", null]
1542        let name_field =
1543            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("name").unwrap())
1544                .unwrap();
1545        let name_values = name_field.value_field().unwrap();
1546        let name_typed_values = name_field
1547            .typed_value_field()
1548            .unwrap()
1549            .as_any()
1550            .downcast_ref::<StringArray>()
1551            .unwrap();
1552        assert!(name_values.is_null(0));
1553        assert_eq!(name_typed_values.value(0), "Alice");
1554        // No value provided, both value and typed_value are null
1555        assert!(name_values.is_null(1));
1556        assert!(name_typed_values.is_null(1));
1557    }
1558
1559    #[test]
1560    fn test_object_shredding_comprehensive() {
1561        let input = build_variant_array(vec![
1562            // Row 0: Fully shredded object
1563            VariantRow::Object(vec![
1564                ("score", VariantValue::from(95.5f64)),
1565                ("age", VariantValue::from(30i64)),
1566            ]),
1567            // Row 1: Partially shredded object (extra email field)
1568            VariantRow::Object(vec![
1569                ("score", VariantValue::from(87.2f64)),
1570                ("age", VariantValue::from(25i64)),
1571                ("email", VariantValue::from("bob@example.com")),
1572            ]),
1573            // Row 2: Missing field (no score)
1574            VariantRow::Object(vec![("age", VariantValue::from(35i64))]),
1575            // Row 3: Type mismatch (score is string, age is string)
1576            VariantRow::Object(vec![
1577                ("score", VariantValue::from("ninety-five")),
1578                ("age", VariantValue::from("thirty")),
1579            ]),
1580            // Row 4: Non-object
1581            VariantRow::Value(VariantValue::from("not an object")),
1582            // Row 5: Empty object
1583            VariantRow::Object(vec![]),
1584            // Row 6: Null
1585            VariantRow::Null,
1586            // Row 7: Object with only "wrong" fields
1587            VariantRow::Object(vec![("foo", VariantValue::from(10))]),
1588            // Row 8: Object with one "right" and one "wrong" field
1589            VariantRow::Object(vec![
1590                ("score", VariantValue::from(66.67f64)),
1591                ("foo", VariantValue::from(10)),
1592            ]),
1593        ]);
1594
1595        // Create target schema: struct<score: float64, age: int64>
1596        // Both types are supported for shredding
1597        let target_schema = ShreddedSchemaBuilder::default()
1598            .with_path("score", &DataType::Float64)
1599            .with_path("age", &DataType::Int64)
1600            .build();
1601
1602        let result = shred_variant(&input, &target_schema).unwrap();
1603
1604        // Verify structure
1605        assert!(result.value_field().is_some());
1606        assert!(result.typed_value_field().is_some());
1607        assert_eq!(result.len(), 9);
1608
1609        let metadata = result.metadata_field();
1610
1611        let value = result.value_field().unwrap();
1612        let typed_value = result
1613            .typed_value_field()
1614            .unwrap()
1615            .as_any()
1616            .downcast_ref::<arrow::array::StructArray>()
1617            .unwrap();
1618
1619        // Extract score and age fields from typed_value struct
1620        let score_field =
1621            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("score").unwrap())
1622                .unwrap();
1623        let age_field =
1624            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("age").unwrap()).unwrap();
1625
1626        let score_value = score_field
1627            .value_field()
1628            .unwrap()
1629            .as_any()
1630            .downcast_ref::<BinaryViewArray>()
1631            .unwrap();
1632        let score_typed_value = score_field
1633            .typed_value_field()
1634            .unwrap()
1635            .as_any()
1636            .downcast_ref::<Float64Array>()
1637            .unwrap();
1638        let age_value = age_field
1639            .value_field()
1640            .unwrap()
1641            .as_any()
1642            .downcast_ref::<BinaryViewArray>()
1643            .unwrap();
1644        let age_typed_value = age_field
1645            .typed_value_field()
1646            .unwrap()
1647            .as_any()
1648            .downcast_ref::<Int64Array>()
1649            .unwrap();
1650
1651        // Set up exhaustive checking of all shredded columns and their nulls/values
1652        struct ShreddedValue<'m, 'v, T> {
1653            value: Option<Variant<'m, 'v>>,
1654            typed_value: Option<T>,
1655        }
1656        struct ShreddedStruct<'m, 'v> {
1657            score: ShreddedValue<'m, 'v, f64>,
1658            age: ShreddedValue<'m, 'v, i64>,
1659        }
1660        fn get_value<'m, 'v>(
1661            i: usize,
1662            metadata: &'m BinaryViewArray,
1663            value: &'v BinaryViewArray,
1664        ) -> Variant<'m, 'v> {
1665            Variant::new(metadata.value(i), value.value(i))
1666        }
1667        let expect = |i, expected_result: Option<ShreddedValue<ShreddedStruct>>| {
1668            match expected_result {
1669                Some(ShreddedValue {
1670                    value: expected_value,
1671                    typed_value: expected_typed_value,
1672                }) => {
1673                    assert!(result.is_valid(i));
1674                    match expected_value {
1675                        Some(expected_value) => {
1676                            assert!(value.is_valid(i));
1677                            assert_eq!(expected_value, get_value(i, metadata, value));
1678                        }
1679                        None => {
1680                            assert!(value.is_null(i));
1681                        }
1682                    }
1683                    match expected_typed_value {
1684                        Some(ShreddedStruct {
1685                            score: expected_score,
1686                            age: expected_age,
1687                        }) => {
1688                            assert!(typed_value.is_valid(i));
1689                            assert!(score_field.is_valid(i)); // non-nullable
1690                            assert!(age_field.is_valid(i)); // non-nullable
1691                            match expected_score.value {
1692                                Some(expected_score_value) => {
1693                                    assert!(score_value.is_valid(i));
1694                                    assert_eq!(
1695                                        expected_score_value,
1696                                        get_value(i, metadata, score_value)
1697                                    );
1698                                }
1699                                None => {
1700                                    assert!(score_value.is_null(i));
1701                                }
1702                            }
1703                            match expected_score.typed_value {
1704                                Some(expected_score) => {
1705                                    assert!(score_typed_value.is_valid(i));
1706                                    assert_eq!(expected_score, score_typed_value.value(i));
1707                                }
1708                                None => {
1709                                    assert!(score_typed_value.is_null(i));
1710                                }
1711                            }
1712                            match expected_age.value {
1713                                Some(expected_age_value) => {
1714                                    assert!(age_value.is_valid(i));
1715                                    assert_eq!(
1716                                        expected_age_value,
1717                                        get_value(i, metadata, age_value)
1718                                    );
1719                                }
1720                                None => {
1721                                    assert!(age_value.is_null(i));
1722                                }
1723                            }
1724                            match expected_age.typed_value {
1725                                Some(expected_age) => {
1726                                    assert!(age_typed_value.is_valid(i));
1727                                    assert_eq!(expected_age, age_typed_value.value(i));
1728                                }
1729                                None => {
1730                                    assert!(age_typed_value.is_null(i));
1731                                }
1732                            }
1733                        }
1734                        None => {
1735                            assert!(typed_value.is_null(i));
1736                        }
1737                    }
1738                }
1739                None => {
1740                    assert!(result.is_null(i));
1741                }
1742            };
1743        };
1744
1745        // Row 0: Fully shredded - both fields shred successfully
1746        expect(
1747            0,
1748            Some(ShreddedValue {
1749                value: None,
1750                typed_value: Some(ShreddedStruct {
1751                    score: ShreddedValue {
1752                        value: None,
1753                        typed_value: Some(95.5),
1754                    },
1755                    age: ShreddedValue {
1756                        value: None,
1757                        typed_value: Some(30),
1758                    },
1759                }),
1760            }),
1761        );
1762
1763        // Row 1: Partially shredded - value contains extra email field
1764        let mut builder = VariantBuilder::new();
1765        builder
1766            .new_object()
1767            .with_field("email", "bob@example.com")
1768            .finish();
1769        let (m, v) = builder.finish();
1770        let expected_value = Variant::new(&m, &v);
1771
1772        expect(
1773            1,
1774            Some(ShreddedValue {
1775                value: Some(expected_value),
1776                typed_value: Some(ShreddedStruct {
1777                    score: ShreddedValue {
1778                        value: None,
1779                        typed_value: Some(87.2),
1780                    },
1781                    age: ShreddedValue {
1782                        value: None,
1783                        typed_value: Some(25),
1784                    },
1785                }),
1786            }),
1787        );
1788
1789        // Row 2: Fully shredded -- missing score field
1790        expect(
1791            2,
1792            Some(ShreddedValue {
1793                value: None,
1794                typed_value: Some(ShreddedStruct {
1795                    score: ShreddedValue {
1796                        value: None,
1797                        typed_value: None,
1798                    },
1799                    age: ShreddedValue {
1800                        value: None,
1801                        typed_value: Some(35),
1802                    },
1803                }),
1804            }),
1805        );
1806
1807        // Row 3: Type mismatches - both score and age are strings
1808        expect(
1809            3,
1810            Some(ShreddedValue {
1811                value: None,
1812                typed_value: Some(ShreddedStruct {
1813                    score: ShreddedValue {
1814                        value: Some(Variant::from("ninety-five")),
1815                        typed_value: None,
1816                    },
1817                    age: ShreddedValue {
1818                        value: Some(Variant::from("thirty")),
1819                        typed_value: None,
1820                    },
1821                }),
1822            }),
1823        );
1824
1825        // Row 4: Non-object - falls back to value field
1826        expect(
1827            4,
1828            Some(ShreddedValue {
1829                value: Some(Variant::from("not an object")),
1830                typed_value: None,
1831            }),
1832        );
1833
1834        // Row 5: Empty object
1835        expect(
1836            5,
1837            Some(ShreddedValue {
1838                value: None,
1839                typed_value: Some(ShreddedStruct {
1840                    score: ShreddedValue {
1841                        value: None,
1842                        typed_value: None,
1843                    },
1844                    age: ShreddedValue {
1845                        value: None,
1846                        typed_value: None,
1847                    },
1848                }),
1849            }),
1850        );
1851
1852        // Row 6: Null
1853        expect(6, None);
1854
1855        // Helper to correctly create a variant object using a row's existing metadata
1856        let object_with_foo_field = |i| {
1857            use parquet_variant::{ParentState, ValueBuilder, VariantMetadata};
1858            let metadata = VariantMetadata::new(metadata.value(i));
1859            let mut metadata_builder = ReadOnlyMetadataBuilder::new(&metadata);
1860            let mut value_builder = ValueBuilder::new();
1861            let state = ParentState::variant(&mut value_builder, &mut metadata_builder);
1862            ObjectBuilder::new(state, false)
1863                .with_field("foo", 10)
1864                .finish();
1865            (metadata, value_builder.into_inner())
1866        };
1867
1868        // Row 7: Object with only a "wrong" field
1869        let (m, v) = object_with_foo_field(7);
1870        expect(
1871            7,
1872            Some(ShreddedValue {
1873                value: Some(Variant::new_with_metadata(m, &v)),
1874                typed_value: Some(ShreddedStruct {
1875                    score: ShreddedValue {
1876                        value: None,
1877                        typed_value: None,
1878                    },
1879                    age: ShreddedValue {
1880                        value: None,
1881                        typed_value: None,
1882                    },
1883                }),
1884            }),
1885        );
1886
1887        // Row 8: Object with one "wrong" and one "right" field
1888        let (m, v) = object_with_foo_field(8);
1889        expect(
1890            8,
1891            Some(ShreddedValue {
1892                value: Some(Variant::new_with_metadata(m, &v)),
1893                typed_value: Some(ShreddedStruct {
1894                    score: ShreddedValue {
1895                        value: None,
1896                        typed_value: Some(66.67),
1897                    },
1898                    age: ShreddedValue {
1899                        value: None,
1900                        typed_value: None,
1901                    },
1902                }),
1903            }),
1904        );
1905    }
1906
1907    #[test]
1908    fn test_object_shredding_with_array_field() {
1909        let input = build_variant_array(vec![
1910            // Row 0: Object with well-typed scores list
1911            VariantRow::Object(vec![(
1912                "scores",
1913                VariantValue::List(vec![VariantValue::from(10i64), VariantValue::from(20i64)]),
1914            )]),
1915            // Row 1: Object whose scores list contains incompatible type
1916            VariantRow::Object(vec![(
1917                "scores",
1918                VariantValue::List(vec![
1919                    VariantValue::from("oops"),
1920                    VariantValue::from(Variant::Null),
1921                ]),
1922            )]),
1923            // Row 2: Object missing the scores field entirely
1924            VariantRow::Object(vec![]),
1925            // Row 3: Non-object fallback
1926            VariantRow::Value(VariantValue::from("not an object")),
1927            // Row 4: Top-level Null
1928            VariantRow::Null,
1929        ]);
1930        let list_field = Arc::new(Field::new("item", DataType::Int64, true));
1931        let inner_list_schema = DataType::List(list_field);
1932        let schema = DataType::Struct(Fields::from(vec![Field::new(
1933            "scores",
1934            inner_list_schema.clone(),
1935            true,
1936        )]));
1937
1938        let result = shred_variant(&input, &schema).unwrap();
1939        assert_eq!(result.len(), 5);
1940
1941        // Access base value/typed_value columns
1942        let value_field = result.value_field().unwrap();
1943        let typed_struct = result
1944            .typed_value_field()
1945            .unwrap()
1946            .as_any()
1947            .downcast_ref::<arrow::array::StructArray>()
1948            .unwrap();
1949
1950        // Validate base value fallbacks for non-object rows
1951        assert!(value_field.is_null(0));
1952        assert!(value_field.is_null(1));
1953        assert!(value_field.is_null(2));
1954        assert!(value_field.is_valid(3));
1955        assert_eq!(
1956            Variant::new(result.metadata_field().value(3), value_field.value(3)),
1957            Variant::from("not an object")
1958        );
1959        assert!(value_field.is_null(4));
1960
1961        // Typed struct should only be null for the fallback row
1962        assert!(typed_struct.is_valid(0));
1963        assert!(typed_struct.is_valid(1));
1964        assert!(typed_struct.is_valid(2));
1965        assert!(typed_struct.is_null(3));
1966        assert!(typed_struct.is_null(4));
1967
1968        // Drill into the scores field on the typed struct
1969        let scores_field =
1970            ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("scores").unwrap())
1971                .unwrap();
1972        assert_list_structure_and_elements::<Int64Type, i32>(
1973            &VariantArray::from_parts(
1974                BinaryViewArray::from_iter_values(std::iter::repeat_n(
1975                    EMPTY_VARIANT_METADATA_BYTES,
1976                    scores_field.len(),
1977                )),
1978                Some(scores_field.value_field().unwrap().clone()),
1979                Some(scores_field.typed_value_field().unwrap().clone()),
1980                None,
1981            ),
1982            scores_field.len(),
1983            &[0i32, 2, 4, 4, 4, 4],
1984            &[Some(2), Some(2), None, None, None],
1985            &[
1986                None,
1987                None,
1988                Some(Variant::Null),
1989                Some(Variant::Null),
1990                Some(Variant::Null),
1991            ],
1992            (
1993                &[Some(10), Some(20), None, None],
1994                &[None, None, Some(Variant::from("oops")), Some(Variant::Null)],
1995            ),
1996        );
1997    }
1998
1999    #[test]
2000    fn test_object_different_schemas() {
2001        // Create object with multiple fields
2002        let input = build_variant_array(vec![VariantRow::Object(vec![
2003            ("id", VariantValue::from(123i32)),
2004            ("age", VariantValue::from(25i64)),
2005            ("score", VariantValue::from(95.5f64)),
2006        ])]);
2007
2008        // Test with schema containing only id field
2009        let schema1 = ShreddedSchemaBuilder::default()
2010            .with_path("id", &DataType::Int32)
2011            .build();
2012        let result1 = shred_variant(&input, &schema1).unwrap();
2013        let value_field1 = result1.value_field().unwrap();
2014        assert!(!value_field1.is_null(0)); // should contain {"age": 25, "score": 95.5}
2015
2016        // Test with schema containing id and age fields
2017        let schema2 = ShreddedSchemaBuilder::default()
2018            .with_path("id", &DataType::Int32)
2019            .with_path("age", &DataType::Int64)
2020            .build();
2021        let result2 = shred_variant(&input, &schema2).unwrap();
2022        let value_field2 = result2.value_field().unwrap();
2023        assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
2024
2025        // Test with schema containing all fields
2026        let schema3 = ShreddedSchemaBuilder::default()
2027            .with_path("id", &DataType::Int32)
2028            .with_path("age", &DataType::Int64)
2029            .with_path("score", &DataType::Float64)
2030            .build();
2031        let result3 = shred_variant(&input, &schema3).unwrap();
2032        let value_field3 = result3.value_field().unwrap();
2033        assert!(value_field3.is_null(0)); // fully shredded, no remaining fields
2034    }
2035
2036    #[test]
2037    fn test_uuid_shredding_in_objects() {
2038        let mock_uuid_1 = Uuid::new_v4();
2039        let mock_uuid_2 = Uuid::new_v4();
2040        let mock_uuid_3 = Uuid::new_v4();
2041
2042        let input = build_variant_array(vec![
2043            // Row 0: Fully shredded object with both UUID fields
2044            VariantRow::Object(vec![
2045                ("id", VariantValue::from(mock_uuid_1)),
2046                ("session_id", VariantValue::from(mock_uuid_2)),
2047            ]),
2048            // Row 1: Partially shredded object - UUID fields plus extra field
2049            VariantRow::Object(vec![
2050                ("id", VariantValue::from(mock_uuid_2)),
2051                ("session_id", VariantValue::from(mock_uuid_3)),
2052                ("name", VariantValue::from("test_user")),
2053            ]),
2054            // Row 2: Missing UUID field (no session_id)
2055            VariantRow::Object(vec![("id", VariantValue::from(mock_uuid_1))]),
2056            // Row 3: Type mismatch - id is UUID but session_id is a string
2057            VariantRow::Object(vec![
2058                ("id", VariantValue::from(mock_uuid_3)),
2059                ("session_id", VariantValue::from("not-a-uuid")),
2060            ]),
2061            // Row 4: Object with non-UUID value in id field
2062            VariantRow::Object(vec![
2063                ("id", VariantValue::from(12345i64)),
2064                ("session_id", VariantValue::from(mock_uuid_1)),
2065            ]),
2066            // Row 5: Null
2067            VariantRow::Null,
2068        ]);
2069
2070        let target_schema = ShreddedSchemaBuilder::default()
2071            .with_path("id", DataType::FixedSizeBinary(16))
2072            .with_path("session_id", DataType::FixedSizeBinary(16))
2073            .build();
2074
2075        let result = shred_variant(&input, &target_schema).unwrap();
2076
2077        assert!(result.value_field().is_some());
2078        assert!(result.typed_value_field().is_some());
2079        assert_eq!(result.len(), 6);
2080
2081        let metadata = result.metadata_field();
2082        let value = result.value_field().unwrap();
2083        let typed_value = result
2084            .typed_value_field()
2085            .unwrap()
2086            .as_any()
2087            .downcast_ref::<arrow::array::StructArray>()
2088            .unwrap();
2089
2090        // Extract id and session_id fields from typed_value struct
2091        let id_field =
2092            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("id").unwrap()).unwrap();
2093        let session_id_field =
2094            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("session_id").unwrap())
2095                .unwrap();
2096
2097        let id_value = id_field
2098            .value_field()
2099            .unwrap()
2100            .as_any()
2101            .downcast_ref::<BinaryViewArray>()
2102            .unwrap();
2103        let id_typed_value = id_field
2104            .typed_value_field()
2105            .unwrap()
2106            .as_any()
2107            .downcast_ref::<FixedSizeBinaryArray>()
2108            .unwrap();
2109        let session_id_value = session_id_field
2110            .value_field()
2111            .unwrap()
2112            .as_any()
2113            .downcast_ref::<BinaryViewArray>()
2114            .unwrap();
2115        let session_id_typed_value = session_id_field
2116            .typed_value_field()
2117            .unwrap()
2118            .as_any()
2119            .downcast_ref::<FixedSizeBinaryArray>()
2120            .unwrap();
2121
2122        // Row 0: Fully shredded - both UUID fields shred successfully
2123        assert!(result.is_valid(0));
2124
2125        assert!(value.is_null(0)); // fully shredded, no remaining fields
2126        assert!(id_value.is_null(0));
2127        assert!(session_id_value.is_null(0));
2128
2129        assert!(typed_value.is_valid(0));
2130        assert!(id_typed_value.is_valid(0));
2131        assert!(session_id_typed_value.is_valid(0));
2132
2133        assert_eq!(id_typed_value.value(0), mock_uuid_1.as_bytes());
2134        assert_eq!(session_id_typed_value.value(0), mock_uuid_2.as_bytes());
2135
2136        // Row 1: Partially shredded - value contains extra name field
2137        assert!(result.is_valid(1));
2138
2139        assert!(value.is_valid(1)); // contains unshredded "name" field
2140        assert!(typed_value.is_valid(1));
2141
2142        assert!(id_value.is_null(1));
2143        assert!(id_typed_value.is_valid(1));
2144        assert_eq!(id_typed_value.value(1), mock_uuid_2.as_bytes());
2145
2146        assert!(session_id_value.is_null(1));
2147        assert!(session_id_typed_value.is_valid(1));
2148        assert_eq!(session_id_typed_value.value(1), mock_uuid_3.as_bytes());
2149
2150        // Verify the value field contains the name field
2151        let row_1_variant = Variant::new(metadata.value(1), value.value(1));
2152        let Variant::Object(obj) = row_1_variant else {
2153            panic!("Expected object");
2154        };
2155
2156        assert_eq!(obj.get("name"), Some(Variant::from("test_user")));
2157
2158        // Row 2: Missing session_id field
2159        assert!(result.is_valid(2));
2160
2161        assert!(value.is_null(2)); // fully shredded, no extra fields
2162        assert!(typed_value.is_valid(2));
2163
2164        assert!(id_value.is_null(2));
2165        assert!(id_typed_value.is_valid(2));
2166        assert_eq!(id_typed_value.value(2), mock_uuid_1.as_bytes());
2167
2168        assert!(session_id_value.is_null(2));
2169        assert!(session_id_typed_value.is_null(2)); // missing field
2170
2171        // Row 3: Type mismatch - session_id is a string, not UUID
2172        assert!(result.is_valid(3));
2173
2174        assert!(value.is_null(3)); // no extra fields
2175        assert!(typed_value.is_valid(3));
2176
2177        assert!(id_value.is_null(3));
2178        assert!(id_typed_value.is_valid(3));
2179        assert_eq!(id_typed_value.value(3), mock_uuid_3.as_bytes());
2180
2181        assert!(session_id_value.is_valid(3)); // type mismatch, stored in value
2182        assert!(session_id_typed_value.is_null(3));
2183        let session_id_variant = Variant::new(metadata.value(3), session_id_value.value(3));
2184        assert_eq!(session_id_variant, Variant::from("not-a-uuid"));
2185
2186        // Row 4: Type mismatch - id is int64, not UUID
2187        assert!(result.is_valid(4));
2188
2189        assert!(value.is_null(4)); // no extra fields
2190        assert!(typed_value.is_valid(4));
2191
2192        assert!(id_value.is_valid(4)); // type mismatch, stored in value
2193        assert!(id_typed_value.is_null(4));
2194        let id_variant = Variant::new(metadata.value(4), id_value.value(4));
2195        assert_eq!(id_variant, Variant::from(12345i64));
2196
2197        assert!(session_id_value.is_null(4));
2198        assert!(session_id_typed_value.is_valid(4));
2199        assert_eq!(session_id_typed_value.value(4), mock_uuid_1.as_bytes());
2200
2201        // Row 5: Null
2202        assert!(result.is_null(5));
2203    }
2204
2205    #[test]
2206    fn test_spec_compliance() {
2207        let input = VariantArray::from_iter(vec![Variant::from(42i64), Variant::from("hello")]);
2208
2209        let result = shred_variant(&input, &DataType::Int64).unwrap();
2210
2211        // Test field access by name (not position)
2212        let inner_struct = result.inner();
2213        assert!(inner_struct.column_by_name("metadata").is_some());
2214        assert!(inner_struct.column_by_name("value").is_some());
2215        assert!(inner_struct.column_by_name("typed_value").is_some());
2216
2217        // Test metadata preservation
2218        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2219        // The metadata should be the same reference (cheap clone)
2220        // Note: BinaryViewArray doesn't have a .values() method, so we compare the arrays directly
2221        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2222
2223        // Test output structure correctness
2224        assert_eq!(result.len(), input.len());
2225        assert!(result.value_field().is_some());
2226        assert!(result.typed_value_field().is_some());
2227
2228        // For primitive shredding, verify that value and typed_value are never both non-null
2229        // (This rule applies to primitives; for objects, both can be non-null for partial shredding)
2230        let value_field = result.value_field().unwrap();
2231        let typed_value_field = result
2232            .typed_value_field()
2233            .unwrap()
2234            .as_any()
2235            .downcast_ref::<Int64Array>()
2236            .unwrap();
2237
2238        for i in 0..result.len() {
2239            if !result.is_null(i) {
2240                let value_is_null = value_field.is_null(i);
2241                let typed_value_is_null = typed_value_field.is_null(i);
2242                // For primitive shredding, at least one should be null
2243                assert!(
2244                    value_is_null || typed_value_is_null,
2245                    "Row {}: both value and typed_value are non-null for primitive shredding",
2246                    i
2247                );
2248            }
2249        }
2250    }
2251
2252    #[test]
2253    fn test_variant_schema_builder_simple() {
2254        let shredding_type = ShreddedSchemaBuilder::default()
2255            .with_path("a", &DataType::Int64)
2256            .with_path("b", &DataType::Float64)
2257            .build();
2258
2259        assert_eq!(
2260            shredding_type,
2261            DataType::Struct(Fields::from(vec![
2262                Field::new("a", DataType::Int64, true),
2263                Field::new("b", DataType::Float64, true),
2264            ]))
2265        );
2266    }
2267
2268    #[test]
2269    fn test_variant_schema_builder_nested() {
2270        let shredding_type = ShreddedSchemaBuilder::default()
2271            .with_path("a", &DataType::Int64)
2272            .with_path("b.c", &DataType::Utf8)
2273            .with_path("b.d", &DataType::Float64)
2274            .build();
2275
2276        assert_eq!(
2277            shredding_type,
2278            DataType::Struct(Fields::from(vec![
2279                Field::new("a", DataType::Int64, true),
2280                Field::new(
2281                    "b",
2282                    DataType::Struct(Fields::from(vec![
2283                        Field::new("c", DataType::Utf8, true),
2284                        Field::new("d", DataType::Float64, true),
2285                    ])),
2286                    true
2287                ),
2288            ]))
2289        );
2290    }
2291
2292    #[test]
2293    fn test_variant_schema_builder_with_path_variant_path_arg() {
2294        let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
2295        let shredding_type = ShreddedSchemaBuilder::default()
2296            .with_path(path, &DataType::Int64)
2297            .build();
2298
2299        match shredding_type {
2300            DataType::Struct(fields) => {
2301                assert_eq!(fields.len(), 1);
2302                assert_eq!(fields[0].name(), "a.b");
2303                assert_eq!(fields[0].data_type(), &DataType::Int64);
2304            }
2305            _ => panic!("expected struct data type"),
2306        }
2307    }
2308
2309    #[test]
2310    fn test_variant_schema_builder_custom_nullability() {
2311        let shredding_type = ShreddedSchemaBuilder::default()
2312            .with_path(
2313                "foo",
2314                Arc::new(Field::new("should_be_renamed", DataType::Utf8, false)),
2315            )
2316            .with_path("bar", (&DataType::Int64, false))
2317            .build();
2318
2319        let DataType::Struct(fields) = shredding_type else {
2320            panic!("expected struct data type");
2321        };
2322
2323        let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
2324        assert_eq!(foo.data_type(), &DataType::Utf8);
2325        assert!(!foo.is_nullable());
2326
2327        let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
2328        assert_eq!(bar.data_type(), &DataType::Int64);
2329        assert!(!bar.is_nullable());
2330    }
2331
2332    #[test]
2333    fn test_variant_schema_builder_with_shred_variant() {
2334        let input = build_variant_array(vec![
2335            VariantRow::Object(vec![
2336                ("time", VariantValue::from(1234567890i64)),
2337                ("hostname", VariantValue::from("server1")),
2338                ("extra", VariantValue::from(42)),
2339            ]),
2340            VariantRow::Object(vec![
2341                ("time", VariantValue::from(9876543210i64)),
2342                ("hostname", VariantValue::from("server2")),
2343            ]),
2344            VariantRow::Null,
2345        ]);
2346
2347        let shredding_type = ShreddedSchemaBuilder::default()
2348            .with_path("time", &DataType::Int64)
2349            .with_path("hostname", &DataType::Utf8)
2350            .build();
2351
2352        let result = shred_variant(&input, &shredding_type).unwrap();
2353
2354        assert_eq!(
2355            result.data_type(),
2356            &DataType::Struct(Fields::from(vec![
2357                Field::new("metadata", DataType::BinaryView, false),
2358                Field::new("value", DataType::BinaryView, true),
2359                Field::new(
2360                    "typed_value",
2361                    DataType::Struct(Fields::from(vec![
2362                        Field::new(
2363                            "hostname",
2364                            DataType::Struct(Fields::from(vec![
2365                                Field::new("value", DataType::BinaryView, true),
2366                                Field::new("typed_value", DataType::Utf8, true),
2367                            ])),
2368                            false,
2369                        ),
2370                        Field::new(
2371                            "time",
2372                            DataType::Struct(Fields::from(vec![
2373                                Field::new("value", DataType::BinaryView, true),
2374                                Field::new("typed_value", DataType::Int64, true),
2375                            ])),
2376                            false,
2377                        ),
2378                    ])),
2379                    true,
2380                ),
2381            ]))
2382        );
2383
2384        assert_eq!(result.len(), 3);
2385        assert!(result.typed_value_field().is_some());
2386
2387        let typed_value = result
2388            .typed_value_field()
2389            .unwrap()
2390            .as_any()
2391            .downcast_ref::<arrow::array::StructArray>()
2392            .unwrap();
2393
2394        let time_field =
2395            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
2396                .unwrap();
2397        let hostname_field =
2398            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
2399                .unwrap();
2400
2401        let time_typed = time_field
2402            .typed_value_field()
2403            .unwrap()
2404            .as_any()
2405            .downcast_ref::<Int64Array>()
2406            .unwrap();
2407        let hostname_typed = hostname_field
2408            .typed_value_field()
2409            .unwrap()
2410            .as_any()
2411            .downcast_ref::<arrow::array::StringArray>()
2412            .unwrap();
2413
2414        // Row 0
2415        assert!(!result.is_null(0));
2416        assert_eq!(time_typed.value(0), 1234567890);
2417        assert_eq!(hostname_typed.value(0), "server1");
2418
2419        // Row 1
2420        assert!(!result.is_null(1));
2421        assert_eq!(time_typed.value(1), 9876543210);
2422        assert_eq!(hostname_typed.value(1), "server2");
2423
2424        // Row 2
2425        assert!(result.is_null(2));
2426    }
2427
2428    #[test]
2429    fn test_variant_schema_builder_conflicting_path() {
2430        let shredding_type = ShreddedSchemaBuilder::default()
2431            .with_path("a", &DataType::Int64)
2432            .with_path("a", &DataType::Float64)
2433            .build();
2434
2435        assert_eq!(
2436            shredding_type,
2437            DataType::Struct(Fields::from(
2438                vec![Field::new("a", DataType::Float64, true),]
2439            ))
2440        );
2441    }
2442
2443    #[test]
2444    fn test_variant_schema_builder_root_path() {
2445        let path = VariantPath::new(vec![]);
2446        let shredding_type = ShreddedSchemaBuilder::default()
2447            .with_path(path, &DataType::Int64)
2448            .build();
2449
2450        assert_eq!(shredding_type, DataType::Int64);
2451    }
2452
2453    #[test]
2454    fn test_variant_schema_builder_empty_path() {
2455        let shredding_type = ShreddedSchemaBuilder::default()
2456            .with_path("", &DataType::Int64)
2457            .build();
2458
2459        assert_eq!(shredding_type, DataType::Int64);
2460    }
2461
2462    #[test]
2463    fn test_variant_schema_builder_default() {
2464        let shredding_type = ShreddedSchemaBuilder::default().build();
2465        assert_eq!(shredding_type, DataType::Null);
2466    }
2467}