Skip to main content

parquet_variant_compute/
shred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for shredding VariantArray with a given schema.
19
20use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
21use crate::variant_to_arrow::{
22    ArrayVariantToArrowRowBuilder, PrimitiveVariantToArrowRowBuilder,
23    make_primitive_variant_to_arrow_row_builder,
24};
25use crate::{VariantArray, VariantValueArrayBuilder};
26use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
27use arrow::buffer::NullBuffer;
28use arrow::compute::CastOptions;
29use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
30use arrow::error::{ArrowError, Result};
31use indexmap::IndexMap;
32use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement};
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36/// Shreds the input binary variant using a target shredding schema derived from the requested data type.
37///
38/// For example, requesting `DataType::Int64` would produce an output variant array with the schema:
39///
40/// ```text
41/// {
42///    metadata: BINARY,
43///    value: BINARY,
44///    typed_value: LONG,
45/// }
46/// ```
47///
48/// Similarly, requesting `DataType::Struct` with two integer fields `a` and `b` would produce an
49/// output variant array with the schema:
50///
51/// ```text
52/// {
53///   metadata: BINARY,
54///   value: BINARY,
55///   typed_value: {
56///     a: {
57///       value: BINARY,
58///       typed_value: INT,
59///     },
60///     b: {
61///       value: BINARY,
62///       typed_value: INT,
63///     },
64///   }
65/// }
66/// ```
67///
68/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
69/// value passed to this function.
70pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<VariantArray> {
71    shred_variant_with_options(array, as_type, &CastOptions::default())
72}
73
74pub(crate) fn shred_variant_with_options(
75    array: &VariantArray,
76    as_type: &DataType,
77    cast_options: &CastOptions,
78) -> Result<VariantArray> {
79    if array.typed_value_field().is_some() {
80        return Err(ArrowError::InvalidArgumentError(
81            "Input is already shredded".to_string(),
82        ));
83    }
84
85    if array.value_field().is_none() {
86        // all-null case -- nothing to do.
87        return Ok(array.clone());
88    };
89
90    let mut builder = make_variant_to_shredded_variant_arrow_row_builder(
91        as_type,
92        cast_options,
93        array.len(),
94        NullValue::TopLevelVariant,
95    )?;
96    for i in 0..array.len() {
97        if array.is_null(i) {
98            builder.append_null()?;
99        } else {
100            builder.append_value(array.value(i))?;
101        }
102    }
103    let (value, typed_value, nulls) = builder.finish()?;
104    Ok(VariantArray::from_parts(
105        array.metadata_field().clone(),
106        Some(Arc::new(value)),
107        Some(typed_value),
108        nulls,
109    ))
110}
111
112/// Controls how `append_null` is encoded for a shredded `(value, typed_value)` pair.
113///
114/// | Mode | Struct validity bit | `value` | `typed_value` | Meaning |
115/// | --- | --- | --- | --- | --- |
116/// | `TopLevelVariant` | null | NULL | NULL | SQL NULL at the top-level variant row |
117/// | `ObjectField` | non-null | NULL | NULL | Missing object field |
118/// | `ArrayElement` | non-null | `Variant::Null` | NULL | Explicit null array element |
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub(crate) enum NullValue {
121    TopLevelVariant,
122    ObjectField,
123    ArrayElement,
124}
125
126impl NullValue {
127    fn append_to(
128        self,
129        nulls: &mut NullBufferBuilder,
130        value_builder: &mut VariantValueArrayBuilder,
131    ) {
132        match self {
133            Self::TopLevelVariant => nulls.append_null(),
134            Self::ObjectField | Self::ArrayElement => nulls.append_non_null(),
135        }
136        match self {
137            Self::TopLevelVariant | Self::ObjectField => value_builder.append_null(),
138            Self::ArrayElement => value_builder.append_value(Variant::Null),
139        }
140    }
141}
142
143pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>(
144    data_type: &'a DataType,
145    cast_options: &'a CastOptions,
146    capacity: usize,
147    null_value: NullValue,
148) -> Result<VariantToShreddedVariantRowBuilder<'a>> {
149    let builder = match data_type {
150        DataType::Struct(fields) => {
151            let typed_value_builder = VariantToShreddedObjectVariantRowBuilder::try_new(
152                fields,
153                cast_options,
154                capacity,
155                null_value,
156            )?;
157            VariantToShreddedVariantRowBuilder::Object(typed_value_builder)
158        }
159        DataType::List(_)
160        | DataType::LargeList(_)
161        | DataType::ListView(_)
162        | DataType::LargeListView(_)
163        | DataType::FixedSizeList(..) => {
164            let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new(
165                data_type,
166                cast_options,
167                capacity,
168                null_value,
169            )?;
170            VariantToShreddedVariantRowBuilder::Array(typed_value_builder)
171        }
172        // Supported shredded primitive types, see Variant shredding spec:
173        // https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types
174        DataType::Boolean
175        | DataType::Int8
176        | DataType::Int16
177        | DataType::Int32
178        | DataType::Int64
179        | DataType::Float32
180        | DataType::Float64
181        | DataType::Decimal32(..)
182        | DataType::Decimal64(..)
183        | DataType::Decimal128(..)
184        | DataType::Date32
185        | DataType::Time64(TimeUnit::Microsecond)
186        | DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _)
187        | DataType::Binary
188        | DataType::BinaryView
189        | DataType::LargeBinary
190        | DataType::Utf8
191        | DataType::Utf8View
192        | DataType::LargeUtf8
193        | DataType::FixedSizeBinary(16) // UUID
194        => {
195            let builder =
196                make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?;
197            let typed_value_builder =
198                VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity, null_value);
199            VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder)
200        }
201        DataType::FixedSizeBinary(_) => {
202            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported.")))
203        }
204        _ => {
205            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type")))
206        }
207    };
208    Ok(builder)
209}
210
211pub(crate) enum VariantToShreddedVariantRowBuilder<'a> {
212    Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>),
213    Array(VariantToShreddedArrayVariantRowBuilder<'a>),
214    Object(VariantToShreddedObjectVariantRowBuilder<'a>),
215}
216
217impl<'a> VariantToShreddedVariantRowBuilder<'a> {
218    pub fn append_null(&mut self) -> Result<()> {
219        use VariantToShreddedVariantRowBuilder::*;
220        match self {
221            Primitive(b) => b.append_null(),
222            Array(b) => b.append_null(),
223            Object(b) => b.append_null(),
224        }
225    }
226
227    pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
228        use VariantToShreddedVariantRowBuilder::*;
229        match self {
230            Primitive(b) => b.append_value(value),
231            Array(b) => b.append_value(value),
232            Object(b) => b.append_value(value),
233        }
234    }
235
236    pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
237        use VariantToShreddedVariantRowBuilder::*;
238        match self {
239            Primitive(b) => b.finish(),
240            Array(b) => b.finish(),
241            Object(b) => b.finish(),
242        }
243    }
244}
245
246/// A shredded primitive field builder.
247pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> {
248    value_builder: VariantValueArrayBuilder,
249    typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
250    nulls: NullBufferBuilder,
251    null_value: NullValue,
252}
253
254impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> {
255    pub(crate) fn new(
256        typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
257        capacity: usize,
258        null_value: NullValue,
259    ) -> Self {
260        Self {
261            value_builder: VariantValueArrayBuilder::new(capacity),
262            typed_value_builder,
263            nulls: NullBufferBuilder::new(capacity),
264            null_value,
265        }
266    }
267
268    fn append_null(&mut self) -> Result<()> {
269        self.null_value
270            .append_to(&mut self.nulls, &mut self.value_builder);
271        self.typed_value_builder.append_null()
272    }
273
274    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
275        self.nulls.append_non_null();
276        if self.typed_value_builder.append_value(&value)? {
277            self.value_builder.append_null();
278        } else {
279            self.value_builder.append_value(value);
280        }
281        Ok(true)
282    }
283
284    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
285        Ok((
286            self.value_builder.build()?,
287            self.typed_value_builder.finish()?,
288            self.nulls.finish(),
289        ))
290    }
291}
292
293pub(crate) struct VariantToShreddedArrayVariantRowBuilder<'a> {
294    value_builder: VariantValueArrayBuilder,
295    typed_value_builder: ArrayVariantToArrowRowBuilder<'a>,
296    nulls: NullBufferBuilder,
297    null_value: NullValue,
298}
299
300impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> {
301    fn try_new(
302        data_type: &'a DataType,
303        cast_options: &'a CastOptions,
304        capacity: usize,
305        null_value: NullValue,
306    ) -> Result<Self> {
307        Ok(Self {
308            value_builder: VariantValueArrayBuilder::new(capacity),
309            typed_value_builder: ArrayVariantToArrowRowBuilder::try_new(
310                data_type,
311                cast_options,
312                capacity,
313                true,
314            )?,
315            nulls: NullBufferBuilder::new(capacity),
316            null_value,
317        })
318    }
319
320    fn append_null(&mut self) -> Result<()> {
321        self.null_value
322            .append_to(&mut self.nulls, &mut self.value_builder);
323        self.typed_value_builder.append_null()?;
324        Ok(())
325    }
326
327    fn append_value(&mut self, variant: Variant<'_, '_>) -> Result<bool> {
328        // If the variant is not an array, typed_value must be null.
329        // If the variant is an array, value must be null.
330        match variant {
331            Variant::List(list) => {
332                self.nulls.append_non_null();
333                self.value_builder.append_null();
334
335                // NOTE: A `FixedSizeList` with incorrect size will hard fail during shredding.
336                self.typed_value_builder
337                    .append_value(&Variant::List(list))?;
338                Ok(true)
339            }
340            other => {
341                self.nulls.append_non_null();
342                self.value_builder.append_value(other);
343                self.typed_value_builder.append_null()?;
344                Ok(false)
345            }
346        }
347    }
348
349    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
350        Ok((
351            self.value_builder.build()?,
352            self.typed_value_builder.finish()?,
353            self.nulls.finish(),
354        ))
355    }
356}
357
358pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> {
359    value_builder: VariantValueArrayBuilder,
360    typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>,
361    typed_value_nulls: NullBufferBuilder,
362    nulls: NullBufferBuilder,
363    null_value: NullValue,
364}
365
366impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
367    fn try_new(
368        fields: &'a Fields,
369        cast_options: &'a CastOptions,
370        capacity: usize,
371        null_value: NullValue,
372    ) -> Result<Self> {
373        let typed_value_builders = fields.iter().map(|field| {
374            let builder = make_variant_to_shredded_variant_arrow_row_builder(
375                field.data_type(),
376                cast_options,
377                capacity,
378                NullValue::ObjectField,
379            )?;
380            Ok((field.name().as_str(), builder))
381        });
382        Ok(Self {
383            value_builder: VariantValueArrayBuilder::new(capacity),
384            typed_value_builders: typed_value_builders.collect::<Result<_>>()?,
385            typed_value_nulls: NullBufferBuilder::new(capacity),
386            nulls: NullBufferBuilder::new(capacity),
387            null_value,
388        })
389    }
390
391    fn append_null(&mut self) -> Result<()> {
392        self.null_value
393            .append_to(&mut self.nulls, &mut self.value_builder);
394        self.typed_value_nulls.append_null();
395        for (_, typed_value_builder) in &mut self.typed_value_builders {
396            typed_value_builder.append_null()?;
397        }
398        Ok(())
399    }
400
401    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
402        let Variant::Object(ref obj) = value else {
403            // Not an object => fall back
404            self.nulls.append_non_null();
405            self.value_builder.append_value(value);
406            self.typed_value_nulls.append_null();
407            for (_, typed_value_builder) in &mut self.typed_value_builders {
408                typed_value_builder.append_null()?;
409            }
410            return Ok(false);
411        };
412
413        // Route the object's fields by name as either shredded or unshredded
414        let mut builder = self.value_builder.builder_ext(value.metadata());
415        let mut object_builder = builder.try_new_object()?;
416        let mut seen = std::collections::HashSet::new();
417        let mut partially_shredded = false;
418        for (field_name, value) in obj.iter() {
419            match self.typed_value_builders.get_mut(field_name) {
420                Some(typed_value_builder) => {
421                    typed_value_builder.append_value(value)?;
422                    seen.insert(field_name);
423                }
424                None => {
425                    object_builder.insert_bytes(field_name, value);
426                    partially_shredded = true;
427                }
428            }
429        }
430
431        // Handle missing fields
432        for (field_name, typed_value_builder) in &mut self.typed_value_builders {
433            if !seen.contains(field_name) {
434                typed_value_builder.append_null()?;
435            }
436        }
437
438        // Only emit the value if it captured any unshredded object fields
439        if partially_shredded {
440            object_builder.finish();
441        } else {
442            drop(object_builder);
443            self.value_builder.append_null();
444        }
445
446        self.typed_value_nulls.append_non_null();
447        self.nulls.append_non_null();
448        Ok(true)
449    }
450
451    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
452        let mut builder = StructArrayBuilder::new();
453        for (field_name, typed_value_builder) in self.typed_value_builders {
454            let (value, typed_value, nulls) = typed_value_builder.finish()?;
455            let array = ShreddedVariantFieldArray::from_parts(
456                Some(Arc::new(value)),
457                Some(typed_value),
458                nulls,
459            );
460            builder = builder.with_field(field_name, ArrayRef::from(array), false);
461        }
462        if let Some(nulls) = self.typed_value_nulls.finish() {
463            builder = builder.with_nulls(nulls);
464        }
465        Ok((
466            self.value_builder.build()?,
467            Arc::new(builder.build()),
468            self.nulls.finish(),
469        ))
470    }
471}
472
473/// Field configuration captured by the builder (data type + nullability).
474#[derive(Clone)]
475pub struct ShreddingField {
476    data_type: DataType,
477    nullable: bool,
478}
479
480impl ShreddingField {
481    fn new(data_type: DataType, nullable: bool) -> Self {
482        Self {
483            data_type,
484            nullable,
485        }
486    }
487
488    fn null() -> Self {
489        Self::new(DataType::Null, true)
490    }
491}
492
493/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or `(DataType, bool)`.
494pub trait IntoShreddingField {
495    fn into_shredding_field(self) -> ShreddingField;
496}
497
498impl IntoShreddingField for FieldRef {
499    fn into_shredding_field(self) -> ShreddingField {
500        ShreddingField::new(self.data_type().clone(), self.is_nullable())
501    }
502}
503
504impl IntoShreddingField for &DataType {
505    fn into_shredding_field(self) -> ShreddingField {
506        ShreddingField::new(self.clone(), true)
507    }
508}
509
510impl IntoShreddingField for DataType {
511    fn into_shredding_field(self) -> ShreddingField {
512        ShreddingField::new(self, true)
513    }
514}
515
516impl IntoShreddingField for (&DataType, bool) {
517    fn into_shredding_field(self) -> ShreddingField {
518        ShreddingField::new(self.0.clone(), self.1)
519    }
520}
521
522impl IntoShreddingField for (DataType, bool) {
523    fn into_shredding_field(self) -> ShreddingField {
524        ShreddingField::new(self.0, self.1)
525    }
526}
527
528/// Builder for constructing a variant shredding schema.
529///
530/// The builder pattern makes it easy to incrementally define which fields
531/// should be shredded and with what types. Fields are nullable by default; pass
532/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
533///
534/// Note: this builder currently only supports struct fields. List support
535/// will be added in the future.
536///
537/// # Example
538///
539/// ```
540/// use std::sync::Arc;
541/// use arrow::datatypes::{DataType, Field, TimeUnit};
542/// use parquet_variant::{VariantPath, VariantPathElement};
543/// use parquet_variant_compute::ShreddedSchemaBuilder;
544///
545/// fn main() -> Result<(), arrow::error::ArrowError> {
546///     // Define the shredding schema using the builder
547///     let shredding_type = ShreddedSchemaBuilder::default()
548///     // store the "time" field as a separate UTC timestamp
549///     .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), true))?
550///     // store hostname as non-nullable Utf8
551///     .with_path("hostname", (&DataType::Utf8, false))?
552///     // pass a FieldRef directly
553///     .with_path(
554///         "metadata.trace_id",
555///         Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16), false)),
556///     )?
557///     // field name with a dot: use VariantPath to avoid splitting
558///     .with_path(
559///         VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
560///         &DataType::Float64,
561///     )?
562///     .build();
563///    Ok(())
564/// }
565/// // The shredding_type can now be passed to shred_variant:
566/// // let shredded = shred_variant(&input, &shredding_type)?;
567/// ```
568#[derive(Default, Clone)]
569pub struct ShreddedSchemaBuilder {
570    root: VariantSchemaNode,
571}
572
573impl ShreddedSchemaBuilder {
574    /// Create a new empty schema builder.
575    pub fn new() -> Self {
576        Self::default()
577    }
578
579    /// Insert a typed path into the schema using dot notation (or any
580    /// [`VariantPath`] convertible).
581    ///
582    /// The path uses dot notation to specify nested fields.
583    /// For example, "a.b.c" will create a nested structure.
584    ///
585    /// # Arguments
586    ///
587    /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
588    /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g. `FieldRef`,
589    ///   `&DataType`, or `(&DataType, bool)` to control nullability)
590    pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Result<Self>
591    where
592        P: TryInto<VariantPath<'a>>,
593        P::Error: std::fmt::Debug,
594        F: IntoShreddingField,
595    {
596        let path: VariantPath<'a> = path
597            .try_into()
598            .map_err(|e| ArrowError::InvalidArgumentError(format!("{:?}", e)))?;
599        self.root.insert_path(&path, field.into_shredding_field());
600        Ok(self)
601    }
602
603    /// Build the final [`DataType`].
604    pub fn build(self) -> DataType {
605        let shredding_type = self.root.to_shredding_type();
606        match shredding_type {
607            Some(shredding_type) => shredding_type,
608            None => DataType::Null,
609        }
610    }
611}
612
613/// Internal tree node structure for building variant schemas.
614#[derive(Clone)]
615enum VariantSchemaNode {
616    /// A leaf node with a primitive/scalar type (and nullability)
617    Leaf(ShreddingField),
618    /// An inner struct node with nested fields
619    Struct(BTreeMap<String, VariantSchemaNode>),
620}
621
622impl Default for VariantSchemaNode {
623    fn default() -> Self {
624        Self::Leaf(ShreddingField::null())
625    }
626}
627
628impl VariantSchemaNode {
629    /// Insert a path into this node with the given data type.
630    fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
631        self.insert_path_elements(path, field);
632    }
633
634    fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], field: ShreddingField) {
635        let Some((head, tail)) = segments.split_first() else {
636            *self = Self::Leaf(field);
637            return;
638        };
639
640        match head {
641            VariantPathElement::Field { name } => {
642                // Ensure this node is a Struct node
643                let children = match self {
644                    Self::Struct(children) => children,
645                    _ => {
646                        *self = Self::Struct(BTreeMap::new());
647                        match self {
648                            Self::Struct(children) => children,
649                            _ => unreachable!(),
650                        }
651                    }
652                };
653
654                children
655                    .entry(name.to_string())
656                    .or_default()
657                    .insert_path_elements(tail, field);
658            }
659            VariantPathElement::Index { .. } => {
660                // List support to be added later; reject for now
661                unreachable!("List paths are not supported yet");
662            }
663        }
664    }
665
666    /// Convert this node to a shredding type.
667    ///
668    /// Returns the [`DataType`] for passing to [`shred_variant`].
669    fn to_shredding_type(&self) -> Option<DataType> {
670        match self {
671            Self::Leaf(field) => Some(field.data_type.clone()),
672            Self::Struct(children) => {
673                let child_fields: Vec<_> = children
674                    .iter()
675                    .filter_map(|(name, child)| child.to_shredding_field(name))
676                    .collect();
677                if child_fields.is_empty() {
678                    None
679                } else {
680                    Some(DataType::Struct(Fields::from(child_fields)))
681                }
682            }
683        }
684    }
685
686    fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
687        match self {
688            Self::Leaf(field) => Some(Arc::new(Field::new(
689                name,
690                field.data_type.clone(),
691                field.nullable,
692            ))),
693            Self::Struct(_) => self
694                .to_shredding_type()
695                .map(|data_type| Arc::new(Field::new(name, data_type, true))),
696        }
697    }
698}
699
700#[cfg(test)]
701mod tests {
702    use super::*;
703    use crate::VariantArrayBuilder;
704    use crate::variant_array::{binary_array_value, variant_from_arrays_at};
705    use arrow::array::{
706        Array, BinaryViewArray, FixedSizeBinaryArray, FixedSizeListArray, Float64Array,
707        GenericListArray, GenericListViewArray, Int64Array, LargeBinaryArray, LargeStringArray,
708        ListArray, ListLikeArray, OffsetSizeTrait, PrimitiveArray, StringArray, StructArray,
709    };
710    use arrow::datatypes::{
711        ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode,
712    };
713    use parquet_variant::{
714        BuilderSpecificState, EMPTY_VARIANT_METADATA_BYTES, ObjectBuilder, ReadOnlyMetadataBuilder,
715        Variant, VariantBuilder, VariantPath, VariantPathElement,
716    };
717    use std::sync::Arc;
718    use uuid::Uuid;
719
720    const NULL_VALUES: [NullValue; 3] = [
721        NullValue::TopLevelVariant,
722        NullValue::ObjectField,
723        NullValue::ArrayElement,
724    ];
725
726    #[derive(Clone)]
727    enum VariantValue<'a> {
728        Value(Variant<'a, 'a>),
729        List(Vec<VariantValue<'a>>),
730        Object(Vec<(&'a str, VariantValue<'a>)>),
731        Null,
732    }
733
734    impl<'a, T> From<T> for VariantValue<'a>
735    where
736        T: Into<Variant<'a, 'a>>,
737    {
738        fn from(value: T) -> Self {
739            Self::Value(value.into())
740        }
741    }
742
743    #[derive(Clone)]
744    enum VariantRow<'a> {
745        Value(VariantValue<'a>),
746        List(Vec<VariantValue<'a>>),
747        Object(Vec<(&'a str, VariantValue<'a>)>),
748        Null,
749    }
750
751    fn build_variant_array(rows: Vec<VariantRow<'static>>) -> VariantArray {
752        let mut builder = VariantArrayBuilder::new(rows.len());
753
754        fn append_variant_value<B: VariantBuilderExt>(builder: &mut B, value: VariantValue) {
755            match value {
756                VariantValue::Value(v) => builder.append_value(v),
757                VariantValue::List(values) => {
758                    let mut list = builder.new_list();
759                    for v in values {
760                        append_variant_value(&mut list, v);
761                    }
762                    list.finish();
763                }
764                VariantValue::Object(fields) => {
765                    let mut object = builder.new_object();
766                    for (name, value) in fields {
767                        append_variant_field(&mut object, name, value);
768                    }
769                    object.finish();
770                }
771                VariantValue::Null => builder.append_null(),
772            }
773        }
774
775        fn append_variant_field<'a, S: BuilderSpecificState>(
776            object: &mut ObjectBuilder<'_, S>,
777            name: &'a str,
778            value: VariantValue<'a>,
779        ) {
780            match value {
781                VariantValue::Value(v) => {
782                    object.insert(name, v);
783                }
784                VariantValue::List(values) => {
785                    let mut list = object.new_list(name);
786                    for v in values {
787                        append_variant_value(&mut list, v);
788                    }
789                    list.finish();
790                }
791                VariantValue::Object(fields) => {
792                    let mut nested = object.new_object(name);
793                    for (field_name, v) in fields {
794                        append_variant_field(&mut nested, field_name, v);
795                    }
796                    nested.finish();
797                }
798                VariantValue::Null => {
799                    object.insert(name, Variant::Null);
800                }
801            }
802        }
803
804        rows.into_iter().for_each(|row| match row {
805            VariantRow::Value(value) => append_variant_value(&mut builder, value),
806            VariantRow::List(values) => {
807                let mut list = builder.new_list();
808                for value in values {
809                    append_variant_value(&mut list, value);
810                }
811                list.finish();
812            }
813            VariantRow::Object(fields) => {
814                let mut object = builder.new_object();
815                for (name, value) in fields {
816                    append_variant_field(&mut object, name, value);
817                }
818                object.finish();
819            }
820            VariantRow::Null => builder.append_null(),
821        });
822        builder.build()
823    }
824
825    trait TestListLikeArray: ListLikeArray {
826        type OffsetSize: OffsetSizeTrait;
827        fn value_offsets(&self) -> Option<&[Self::OffsetSize]>;
828        fn value_size(&self, index: usize) -> Self::OffsetSize;
829    }
830
831    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListArray<O> {
832        type OffsetSize = O;
833
834        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
835            Some(GenericListArray::value_offsets(self))
836        }
837
838        fn value_size(&self, index: usize) -> Self::OffsetSize {
839            GenericListArray::value_length(self, index)
840        }
841    }
842
843    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListViewArray<O> {
844        type OffsetSize = O;
845
846        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
847            Some(GenericListViewArray::value_offsets(self))
848        }
849
850        fn value_size(&self, index: usize) -> Self::OffsetSize {
851            GenericListViewArray::value_size(self, index)
852        }
853    }
854
855    fn downcast_list_like_array<O: OffsetSizeTrait>(
856        array: &VariantArray,
857    ) -> &dyn TestListLikeArray<OffsetSize = O> {
858        let typed_value = array.typed_value_field().unwrap();
859        if let Some(list) = typed_value.as_any().downcast_ref::<GenericListArray<O>>() {
860            list
861        } else if let Some(list_view) = typed_value
862            .as_any()
863            .downcast_ref::<GenericListViewArray<O>>()
864        {
865            list_view
866        } else {
867            panic!(
868                "Expected list-like typed_value with matching offset type, got {}",
869                typed_value.data_type()
870            );
871        }
872    }
873
874    fn assert_list_structure<O: OffsetSizeTrait>(
875        array: &VariantArray,
876        expected_len: usize,
877        expected_offsets: &[O],
878        expected_sizes: &[Option<O>],
879        expected_fallbacks: &[Option<Variant<'static, 'static>>],
880    ) {
881        assert_eq!(array.len(), expected_len);
882
883        let fallback_value = array.value_field().unwrap();
884        let fallback_metadata = array.metadata_field();
885        let array = downcast_list_like_array::<O>(array);
886
887        assert_eq!(
888            array.value_offsets().unwrap(),
889            expected_offsets,
890            "list offsets mismatch"
891        );
892        assert_eq!(
893            array.len(),
894            expected_sizes.len(),
895            "expected_sizes should match array length"
896        );
897        assert_eq!(
898            array.len(),
899            expected_fallbacks.len(),
900            "expected_fallbacks should match array length"
901        );
902        assert_eq!(
903            array.len(),
904            fallback_value.len(),
905            "fallbacks value field should match array length"
906        );
907
908        // Validate per-row shredding outcomes for the list array
909        for (idx, (expected_size, expected_fallback)) in expected_sizes
910            .iter()
911            .zip(expected_fallbacks.iter())
912            .enumerate()
913        {
914            match expected_size {
915                Some(len) => {
916                    // Successfully shredded: typed list value present, no fallback value
917                    assert!(array.is_valid(idx));
918                    assert_eq!(array.value_size(idx), *len);
919                    assert!(fallback_value.is_null(idx));
920                }
921                None => {
922                    // Unable to shred: typed list value absent, fallback should carry the variant
923                    assert!(array.is_null(idx));
924                    assert_eq!(array.value_size(idx), O::zero());
925                    match expected_fallback {
926                        Some(expected_variant) => {
927                            assert!(fallback_value.is_valid(idx));
928                            let metadata_bytes =
929                                binary_array_value(fallback_metadata.as_ref(), idx).unwrap();
930                            let metadata_bytes =
931                                if fallback_metadata.is_valid(idx) && !metadata_bytes.is_empty() {
932                                    metadata_bytes
933                                } else {
934                                    EMPTY_VARIANT_METADATA_BYTES
935                                };
936                            assert_eq!(
937                                Variant::new(
938                                    metadata_bytes,
939                                    binary_array_value(fallback_value.as_ref(), idx).unwrap()
940                                ),
941                                expected_variant.clone()
942                            );
943                        }
944                        None => {
945                            assert!(fallback_value.is_null(idx));
946                        }
947                    }
948                }
949            }
950        }
951    }
952
953    fn assert_list_structure_and_elements<T: ArrowPrimitiveType, O: OffsetSizeTrait>(
954        array: &VariantArray,
955        expected_len: usize,
956        expected_offsets: &[O],
957        expected_sizes: &[Option<O>],
958        expected_fallbacks: &[Option<Variant<'static, 'static>>],
959        expected_shredded_elements: (&[Option<T::Native>], &[Option<Variant<'static, 'static>>]),
960    ) {
961        assert_list_structure(
962            array,
963            expected_len,
964            expected_offsets,
965            expected_sizes,
966            expected_fallbacks,
967        );
968        let array = downcast_list_like_array::<O>(array);
969
970        // Validate the shredded state of list elements (typed values and fallbacks)
971        let (expected_values, expected_fallbacks) = expected_shredded_elements;
972        assert_eq!(
973            expected_values.len(),
974            expected_fallbacks.len(),
975            "expected_values and expected_fallbacks should be aligned"
976        );
977
978        // Validate the shredded primitive values for list elements
979        let element_array = ShreddedVariantFieldArray::try_new(array.values().as_ref()).unwrap();
980        let element_values = element_array
981            .typed_value_field()
982            .unwrap()
983            .as_any()
984            .downcast_ref::<PrimitiveArray<T>>()
985            .unwrap();
986        assert_eq!(element_values.len(), expected_values.len());
987        for (idx, expected_value) in expected_values.iter().enumerate() {
988            match expected_value {
989                Some(value) => {
990                    assert!(element_values.is_valid(idx));
991                    assert_eq!(element_values.value(idx), *value);
992                }
993                None => assert!(element_values.is_null(idx)),
994            }
995        }
996
997        // Validate fallback variants for list elements that could not be shredded
998        let element_fallbacks = element_array.value_field().unwrap();
999        assert_eq!(element_fallbacks.len(), expected_fallbacks.len());
1000        for (idx, expected_fallback) in expected_fallbacks.iter().enumerate() {
1001            match expected_fallback {
1002                Some(expected_variant) => {
1003                    assert!(element_fallbacks.is_valid(idx));
1004                    assert_eq!(
1005                        Variant::new(
1006                            EMPTY_VARIANT_METADATA_BYTES,
1007                            binary_array_value(element_fallbacks.as_ref(), idx).unwrap()
1008                        ),
1009                        expected_variant.clone()
1010                    );
1011                }
1012                None => assert!(element_fallbacks.is_null(idx)),
1013            }
1014        }
1015    }
1016
1017    fn assert_append_null_mode_value_and_struct_nulls(
1018        mode: NullValue,
1019        value: &BinaryViewArray,
1020        nulls: Option<&arrow::buffer::NullBuffer>,
1021    ) {
1022        if mode == NullValue::TopLevelVariant {
1023            assert!(nulls.is_some_and(|n| n.is_null(0)));
1024        } else {
1025            assert!(nulls.is_none());
1026        }
1027
1028        if mode == NullValue::ArrayElement {
1029            assert!(value.is_valid(0));
1030            assert_eq!(
1031                Variant::new(EMPTY_VARIANT_METADATA_BYTES, value.value(0)),
1032                Variant::Null
1033            );
1034        } else {
1035            assert!(value.is_null(0));
1036        }
1037    }
1038
1039    #[test]
1040    fn test_append_null_mode_semantics_primitive_builder() {
1041        let cast_options = arrow::compute::CastOptions::default();
1042
1043        for mode in NULL_VALUES {
1044            let mut primitive_builder = make_variant_to_shredded_variant_arrow_row_builder(
1045                &DataType::Int64,
1046                &cast_options,
1047                1,
1048                mode,
1049            )
1050            .unwrap();
1051            primitive_builder.append_null().unwrap();
1052            let (primitive_value, primitive_typed_value, primitive_nulls) =
1053                primitive_builder.finish().unwrap();
1054            let primitive_typed_value = primitive_typed_value
1055                .as_any()
1056                .downcast_ref::<Int64Array>()
1057                .unwrap();
1058
1059            assert!(primitive_typed_value.is_null(0));
1060            assert_append_null_mode_value_and_struct_nulls(
1061                mode,
1062                &primitive_value,
1063                primitive_nulls.as_ref(),
1064            );
1065        }
1066    }
1067
1068    #[test]
1069    fn test_append_null_mode_semantics_array_builder() {
1070        let cast_options = arrow::compute::CastOptions::default();
1071        let list_type = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1072
1073        for mode in NULL_VALUES {
1074            let mut array_builder = make_variant_to_shredded_variant_arrow_row_builder(
1075                &list_type,
1076                &cast_options,
1077                1,
1078                mode,
1079            )
1080            .unwrap();
1081            array_builder.append_null().unwrap();
1082            let (value, typed_value, nulls) = array_builder.finish().unwrap();
1083
1084            assert_append_null_mode_value_and_struct_nulls(mode, &value, nulls.as_ref());
1085
1086            let typed_value = typed_value.as_any().downcast_ref::<ListArray>().unwrap();
1087            assert_eq!(typed_value.len(), 1);
1088            assert!(typed_value.is_null(0));
1089            assert_eq!(typed_value.values().len(), 0);
1090        }
1091    }
1092
1093    #[test]
1094    fn test_append_null_mode_semantics_object_builder() {
1095        let cast_options = arrow::compute::CastOptions::default();
1096        let object_type = DataType::Struct(Fields::from(vec![
1097            Field::new("id", DataType::Int64, true),
1098            Field::new("name", DataType::Utf8, true),
1099        ]));
1100
1101        for mode in NULL_VALUES {
1102            let mut object_builder = make_variant_to_shredded_variant_arrow_row_builder(
1103                &object_type,
1104                &cast_options,
1105                1,
1106                mode,
1107            )
1108            .unwrap();
1109            object_builder.append_null().unwrap();
1110            let (value, typed_value, nulls) = object_builder.finish().unwrap();
1111
1112            assert_append_null_mode_value_and_struct_nulls(mode, &value, nulls.as_ref());
1113
1114            let typed_struct = typed_value
1115                .as_any()
1116                .downcast_ref::<arrow::array::StructArray>()
1117                .unwrap();
1118            assert_eq!(typed_struct.len(), 1);
1119            assert!(typed_struct.is_null(0));
1120
1121            for field_name in ["id", "name"] {
1122                let field = ShreddedVariantFieldArray::try_new(
1123                    typed_struct.column_by_name(field_name).unwrap(),
1124                )
1125                .unwrap();
1126                assert!(field.value_field().unwrap().is_null(0));
1127                assert!(field.typed_value_field().unwrap().is_null(0));
1128            }
1129        }
1130    }
1131
1132    #[test]
1133    fn test_already_shredded_input_error() {
1134        // Create a VariantArray that already has typed_value_field
1135        // First create a valid VariantArray, then extract its parts to construct a shredded one
1136        let temp_array = VariantArray::from_iter(vec![Some(Variant::from("test"))]);
1137        let metadata = temp_array.metadata_field().clone();
1138        let value = temp_array.value_field().unwrap().clone();
1139        let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef;
1140
1141        let shredded_array =
1142            VariantArray::from_parts(metadata, Some(value), Some(typed_value), None);
1143
1144        let result = shred_variant(&shredded_array, &DataType::Int64);
1145        assert!(matches!(
1146            result.unwrap_err(),
1147            ArrowError::InvalidArgumentError(_)
1148        ));
1149    }
1150
1151    #[test]
1152    fn test_all_null_input() {
1153        // Create VariantArray with no value field (all null case)
1154        let metadata = Arc::new(BinaryViewArray::from_iter_values([&[1u8, 0u8]])); // minimal valid metadata
1155        let all_null_array = VariantArray::from_parts(metadata, None, None, None);
1156        let result = shred_variant(&all_null_array, &DataType::Int64).unwrap();
1157
1158        // Should return array with no value/typed_value fields
1159        assert!(result.value_field().is_none());
1160        assert!(result.typed_value_field().is_none());
1161    }
1162
1163    #[test]
1164    fn test_invalid_fixed_size_binary_shredding() {
1165        let mock_uuid_1 = Uuid::new_v4();
1166
1167        let input = VariantArray::from_iter([Some(Variant::from(mock_uuid_1)), None]);
1168
1169        // shred_variant only supports FixedSizeBinary(16). Any other length will err.
1170        let err = shred_variant(&input, &DataType::FixedSizeBinary(17)).unwrap_err();
1171
1172        assert_eq!(
1173            err.to_string(),
1174            "Invalid argument error: FixedSizeBinary(17) is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported."
1175        );
1176    }
1177
1178    #[test]
1179    fn test_uuid_shredding() {
1180        let mock_uuid_1 = Uuid::new_v4();
1181        let mock_uuid_2 = Uuid::new_v4();
1182
1183        let input = VariantArray::from_iter([
1184            Some(Variant::from(mock_uuid_1)),
1185            None,
1186            Some(Variant::from(false)),
1187            Some(Variant::from(mock_uuid_2)),
1188        ]);
1189
1190        let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();
1191
1192        let typed_value_field = variant_array.inner().field_by_name("typed_value").unwrap();
1193
1194        assert!(typed_value_field.has_valid_extension_type::<arrow_schema::extension::Uuid>());
1195
1196        // probe the downcasted typed_value array to make sure uuids are shredded correctly
1197        let uuids = variant_array
1198            .typed_value_field()
1199            .unwrap()
1200            .as_any()
1201            .downcast_ref::<FixedSizeBinaryArray>()
1202            .unwrap();
1203
1204        assert_eq!(uuids.len(), 4);
1205
1206        assert!(!uuids.is_null(0));
1207
1208        let got_uuid_1: &[u8] = uuids.value(0);
1209        assert_eq!(got_uuid_1, mock_uuid_1.as_bytes());
1210
1211        assert!(uuids.is_null(1));
1212        assert!(uuids.is_null(2));
1213
1214        assert!(!uuids.is_null(3));
1215
1216        let got_uuid_2: &[u8] = uuids.value(3);
1217        assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
1218    }
1219
1220    #[test]
1221    fn test_uuid_nested_shredding() {
1222        let mock_uuid = Uuid::new_v4();
1223        let input = build_variant_array(vec![VariantRow::Object(vec![(
1224            "id",
1225            VariantValue::from(mock_uuid),
1226        )])]);
1227        let target = ShreddedSchemaBuilder::default()
1228            .with_path("id", DataType::FixedSizeBinary(16))
1229            .unwrap()
1230            .build();
1231
1232        let result = shred_variant(&input, &target).unwrap();
1233
1234        let typed_value = result.typed_value_field().unwrap();
1235        let typed_struct = typed_value.as_any().downcast_ref::<StructArray>().unwrap();
1236        let id =
1237            ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("id").unwrap()).unwrap();
1238
1239        // The extension type lives on the field, not the array, so assert it on the inner struct.
1240        let leaf = id.inner().field_by_name("typed_value").unwrap();
1241
1242        assert_eq!(leaf.data_type(), &DataType::FixedSizeBinary(16));
1243        assert!(leaf.has_valid_extension_type::<arrow_schema::extension::Uuid>());
1244    }
1245
1246    #[test]
1247    fn test_primitive_shredding_comprehensive() {
1248        // Test mixed scenarios in a single array
1249        let input = VariantArray::from_iter(vec![
1250            Some(Variant::from(42i64)),   // successful shred
1251            Some(Variant::from("hello")), // failed shred (string)
1252            Some(Variant::from(100i64)),  // successful shred
1253            None,                         // array-level null
1254            Some(Variant::Null),          // variant null
1255            Some(Variant::from(3i8)),     // successful shred (int8->int64 conversion)
1256        ]);
1257
1258        let result = shred_variant(&input, &DataType::Int64).unwrap();
1259
1260        // Verify structure
1261        let metadata_field = result.metadata_field();
1262        let value_field = result.value_field().unwrap();
1263        let typed_value_field = result
1264            .typed_value_field()
1265            .unwrap()
1266            .as_any()
1267            .downcast_ref::<Int64Array>()
1268            .unwrap();
1269
1270        // Check specific outcomes for each row
1271        assert_eq!(result.len(), 6);
1272
1273        // Row 0: 42 -> should shred successfully
1274        assert!(!result.is_null(0));
1275        assert!(value_field.is_null(0)); // value should be null when shredded
1276        assert!(!typed_value_field.is_null(0));
1277        assert_eq!(typed_value_field.value(0), 42);
1278
1279        // Row 1: "hello" -> should fail to shred
1280        assert!(!result.is_null(1));
1281        assert!(!value_field.is_null(1)); // value should contain original
1282        assert!(typed_value_field.is_null(1)); // typed_value should be null
1283        assert_eq!(
1284            variant_from_arrays_at(metadata_field, value_field, 1).unwrap(),
1285            Variant::from("hello")
1286        );
1287
1288        // Row 2: 100 -> should shred successfully
1289        assert!(!result.is_null(2));
1290        assert!(value_field.is_null(2));
1291        assert_eq!(typed_value_field.value(2), 100);
1292
1293        // Row 3: array null -> should be null in result
1294        assert!(result.is_null(3));
1295
1296        // Row 4: Variant::Null -> should not shred (it's a null variant, not an integer)
1297        assert!(!result.is_null(4));
1298        assert!(!value_field.is_null(4)); // should contain Variant::Null
1299        assert_eq!(
1300            variant_from_arrays_at(metadata_field, value_field, 4).unwrap(),
1301            Variant::Null
1302        );
1303        assert!(typed_value_field.is_null(4));
1304
1305        // Row 5: 3i8 -> should shred successfully (int8->int64 conversion)
1306        assert!(!result.is_null(5));
1307        assert!(value_field.is_null(5)); // value should be null when shredded
1308        assert!(!typed_value_field.is_null(5));
1309        assert_eq!(typed_value_field.value(5), 3);
1310    }
1311
1312    #[test]
1313    fn test_primitive_different_target_types() {
1314        let input = VariantArray::from_iter(vec![
1315            Variant::from(42i32),
1316            Variant::from(3.15f64),
1317            Variant::from("not_a_number"),
1318        ]);
1319
1320        // Test Int32 target
1321        let result_int32 = shred_variant(&input, &DataType::Int32).unwrap();
1322        let typed_value_int32 = result_int32
1323            .typed_value_field()
1324            .unwrap()
1325            .as_any()
1326            .downcast_ref::<arrow::array::Int32Array>()
1327            .unwrap();
1328        assert_eq!(typed_value_int32.value(0), 42);
1329        assert_eq!(typed_value_int32.value(1), 3);
1330        assert!(typed_value_int32.is_null(2)); // string doesn't convert to int32
1331
1332        // Test Float64 target
1333        let result_float64 = shred_variant(&input, &DataType::Float64).unwrap();
1334        let typed_value_float64 = result_float64
1335            .typed_value_field()
1336            .unwrap()
1337            .as_any()
1338            .downcast_ref::<Float64Array>()
1339            .unwrap();
1340        assert_eq!(typed_value_float64.value(0), 42.0); // int converts to float
1341        assert_eq!(typed_value_float64.value(1), 3.15);
1342        assert!(typed_value_float64.is_null(2)); // string doesn't convert
1343    }
1344
1345    #[test]
1346    fn test_largeutf8_shredding() {
1347        let input = VariantArray::from_iter(vec![
1348            Some(Variant::from("hello")),
1349            Some(Variant::from(42i64)),
1350            None,
1351            Some(Variant::Null),
1352            Some(Variant::from("world")),
1353        ]);
1354
1355        let result = shred_variant(&input, &DataType::LargeUtf8).unwrap();
1356        let metadata = result.metadata_field();
1357        let value = result.value_field().unwrap();
1358        let typed_value = result
1359            .typed_value_field()
1360            .unwrap()
1361            .as_any()
1362            .downcast_ref::<LargeStringArray>()
1363            .unwrap();
1364
1365        assert_eq!(result.len(), 5);
1366
1367        // Row 0: string shreds to typed_value
1368        assert!(result.is_valid(0));
1369        assert!(value.is_null(0));
1370        assert_eq!(typed_value.value(0), "hello");
1371
1372        // Row 1: integer falls back to value
1373        assert!(result.is_valid(1));
1374        assert!(value.is_valid(1));
1375        assert!(typed_value.is_null(1));
1376        assert_eq!(
1377            variant_from_arrays_at(metadata, value, 1).unwrap(),
1378            Variant::from(42i64)
1379        );
1380
1381        // Row 2: top-level null
1382        assert!(result.is_null(2));
1383        assert!(value.is_null(2));
1384        assert!(typed_value.is_null(2));
1385
1386        // Row 3: variant null falls back to value
1387        assert!(result.is_valid(3));
1388        assert!(value.is_valid(3));
1389        assert!(typed_value.is_null(3));
1390        assert_eq!(
1391            variant_from_arrays_at(metadata, value, 3).unwrap(),
1392            Variant::Null
1393        );
1394
1395        // Row 4: string shreds to typed_value
1396        assert!(result.is_valid(4));
1397        assert!(value.is_null(4));
1398        assert_eq!(typed_value.value(4), "world");
1399    }
1400
1401    #[test]
1402    fn test_largebinary_shredding() {
1403        let input = VariantArray::from_iter(vec![
1404            Some(Variant::from(&b"\x00\x01\x02"[..])),
1405            Some(Variant::from("not_binary")),
1406            None,
1407            Some(Variant::Null),
1408            Some(Variant::from(&b"\xff\xaa"[..])),
1409        ]);
1410
1411        let result = shred_variant(&input, &DataType::LargeBinary).unwrap();
1412        let metadata = result.metadata_field();
1413        let value = result.value_field().unwrap();
1414        let typed_value = result
1415            .typed_value_field()
1416            .unwrap()
1417            .as_any()
1418            .downcast_ref::<LargeBinaryArray>()
1419            .unwrap();
1420
1421        assert_eq!(result.len(), 5);
1422
1423        // Row 0: binary shreds to typed_value
1424        assert!(result.is_valid(0));
1425        assert!(value.is_null(0));
1426        assert_eq!(typed_value.value(0), &[0x00, 0x01, 0x02]);
1427
1428        // Row 1: string falls back to value
1429        assert!(result.is_valid(1));
1430        assert!(value.is_valid(1));
1431        assert!(typed_value.is_null(1));
1432        assert_eq!(
1433            variant_from_arrays_at(metadata, value, 1).unwrap(),
1434            Variant::from("not_binary")
1435        );
1436
1437        // Row 2: top-level null
1438        assert!(result.is_null(2));
1439        assert!(value.is_null(2));
1440        assert!(typed_value.is_null(2));
1441
1442        // Row 3: variant null falls back to value
1443        assert!(result.is_valid(3));
1444        assert!(value.is_valid(3));
1445        assert!(typed_value.is_null(3));
1446        assert_eq!(
1447            variant_from_arrays_at(metadata, value, 3).unwrap(),
1448            Variant::Null
1449        );
1450
1451        // Row 4: binary shreds to typed_value
1452        assert!(result.is_valid(4));
1453        assert!(value.is_null(4));
1454        assert_eq!(typed_value.value(4), &[0xff, 0xaa]);
1455    }
1456
1457    #[test]
1458    fn test_invalid_shredded_types_rejected() {
1459        let input = VariantArray::from_iter([Variant::from(42)]);
1460
1461        let invalid_types = vec![
1462            DataType::UInt8,
1463            DataType::Float16,
1464            DataType::Decimal256(38, 10),
1465            DataType::Date64,
1466            DataType::Time32(TimeUnit::Second),
1467            DataType::Time64(TimeUnit::Nanosecond),
1468            DataType::Timestamp(TimeUnit::Millisecond, None),
1469            DataType::FixedSizeBinary(17),
1470            DataType::Union(
1471                UnionFields::from_fields(vec![
1472                    Field::new("int_field", DataType::Int32, false),
1473                    Field::new("str_field", DataType::Utf8, true),
1474                ]),
1475                UnionMode::Dense,
1476            ),
1477            DataType::Map(
1478                Arc::new(Field::new(
1479                    "entries",
1480                    DataType::Struct(Fields::from(vec![
1481                        Field::new("key", DataType::Utf8, false),
1482                        Field::new("value", DataType::Int32, true),
1483                    ])),
1484                    false,
1485                )),
1486                false,
1487            ),
1488            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1489            DataType::RunEndEncoded(
1490                Arc::new(Field::new("run_ends", DataType::Int32, false)),
1491                Arc::new(Field::new("values", DataType::Utf8, true)),
1492            ),
1493        ];
1494
1495        for data_type in invalid_types {
1496            let err = shred_variant(&input, &data_type).unwrap_err();
1497            assert!(
1498                matches!(err, ArrowError::InvalidArgumentError(_)),
1499                "expected InvalidArgumentError for {:?}, got {:?}",
1500                data_type,
1501                err
1502            );
1503        }
1504    }
1505
1506    #[test]
1507    fn test_array_shredding_as_list() {
1508        let input = build_variant_array(vec![
1509            // Row 0: List of ints should shred entirely into typed_value
1510            VariantRow::List(vec![
1511                VariantValue::from(1i64),
1512                VariantValue::from(2i64),
1513                VariantValue::from(3i64),
1514            ]),
1515            // Row 1: Contains incompatible types so values fall back
1516            VariantRow::List(vec![
1517                VariantValue::from(1i64),
1518                VariantValue::from("two"),
1519                VariantValue::from(Variant::Null),
1520            ]),
1521            // Row 2: Not a list -> entire row falls back
1522            VariantRow::Value(VariantValue::from("not a list")),
1523            // Row 3: Array-level null propagates
1524            VariantRow::Null,
1525            // Row 4: Empty list exercises zero-length offsets
1526            VariantRow::List(vec![]),
1527        ]);
1528        let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1529        let result = shred_variant(&input, &list_schema).unwrap();
1530        assert_eq!(result.len(), 5);
1531
1532        assert_list_structure_and_elements::<Int64Type, i32>(
1533            &result,
1534            5,
1535            &[0, 3, 6, 6, 6, 6],
1536            &[Some(3), Some(3), None, None, Some(0)],
1537            &[None, None, Some(Variant::from("not a list")), None, None],
1538            (
1539                &[Some(1), Some(2), Some(3), Some(1), None, None],
1540                &[
1541                    None,
1542                    None,
1543                    None,
1544                    None,
1545                    Some(Variant::from("two")),
1546                    Some(Variant::Null),
1547                ],
1548            ),
1549        );
1550    }
1551
1552    #[test]
1553    fn test_array_shredding_as_large_list() {
1554        let input = build_variant_array(vec![
1555            // Row 0: List of ints shreds to typed_value
1556            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1557            // Row 1: Not a list -> entire row falls back
1558            VariantRow::Value(VariantValue::from("not a list")),
1559            // Row 2: Empty list
1560            VariantRow::List(vec![]),
1561        ]);
1562        let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true)));
1563        let result = shred_variant(&input, &list_schema).unwrap();
1564        assert_eq!(result.len(), 3);
1565
1566        assert_list_structure_and_elements::<Int64Type, i64>(
1567            &result,
1568            3,
1569            &[0, 2, 2, 2],
1570            &[Some(2), None, Some(0)],
1571            &[None, Some(Variant::from("not a list")), None],
1572            (&[Some(1), Some(2)], &[None, None]),
1573        );
1574    }
1575
1576    #[test]
1577    fn test_array_shredding_as_list_view() {
1578        let input = build_variant_array(vec![
1579            // Row 0: Standard list
1580            VariantRow::List(vec![
1581                VariantValue::from(1i64),
1582                VariantValue::from(2i64),
1583                VariantValue::from(3i64),
1584            ]),
1585            // Row 1: List with incompatible types -> element fallback
1586            VariantRow::List(vec![
1587                VariantValue::from(1i64),
1588                VariantValue::from("two"),
1589                VariantValue::from(Variant::Null),
1590            ]),
1591            // Row 2: Not a list -> top-level fallback
1592            VariantRow::Value(VariantValue::from("not a list")),
1593            // Row 3: Top-level Null
1594            VariantRow::Null,
1595            // Row 4: Empty list
1596            VariantRow::List(vec![]),
1597        ]);
1598        let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Int64, true)));
1599        let result = shred_variant(&input, &list_schema).unwrap();
1600        assert_eq!(result.len(), 5);
1601
1602        assert_list_structure_and_elements::<Int64Type, i32>(
1603            &result,
1604            5,
1605            &[0, 3, 6, 6, 6],
1606            &[Some(3), Some(3), None, None, Some(0)],
1607            &[None, None, Some(Variant::from("not a list")), None, None],
1608            (
1609                &[Some(1), Some(2), Some(3), Some(1), None, None],
1610                &[
1611                    None,
1612                    None,
1613                    None,
1614                    None,
1615                    Some(Variant::from("two")),
1616                    Some(Variant::Null),
1617                ],
1618            ),
1619        );
1620    }
1621
1622    #[test]
1623    fn test_array_shredding_as_large_list_view() {
1624        let input = build_variant_array(vec![
1625            // Row 0: List of ints shreds to typed_value
1626            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1627            // Row 1: Not a list -> entire row falls back
1628            VariantRow::Value(VariantValue::from("fallback")),
1629            // Row 2: Empty list
1630            VariantRow::List(vec![]),
1631        ]);
1632        let list_schema =
1633            DataType::LargeListView(Arc::new(Field::new("item", DataType::Int64, true)));
1634        let result = shred_variant(&input, &list_schema).unwrap();
1635        assert_eq!(result.len(), 3);
1636
1637        assert_list_structure_and_elements::<Int64Type, i64>(
1638            &result,
1639            3,
1640            &[0, 2, 2],
1641            &[Some(2), None, Some(0)],
1642            &[None, Some(Variant::from("fallback")), None],
1643            (&[Some(1), Some(2)], &[None, None]),
1644        );
1645    }
1646
1647    #[test]
1648    fn test_array_shredding_as_fixed_size_list() {
1649        let input = build_variant_array(vec![
1650            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1651            VariantRow::Value(VariantValue::from("This should not be shredded")),
1652            VariantRow::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]),
1653        ]);
1654
1655        let list_schema =
1656            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
1657        let result = shred_variant(&input, &list_schema).unwrap();
1658        assert_eq!(result.len(), 3);
1659
1660        // The first row should be shredded, so the `value` field should be null and the
1661        // `typed_value` field should contain the list
1662        assert!(result.is_valid(0));
1663        assert!(result.value_field().unwrap().is_null(0));
1664        assert!(result.typed_value_field().unwrap().is_valid(0));
1665
1666        // The second row should not be shredded because the provided schema for shredding did not
1667        // match. Hence, the `value` field should contain the raw value and the `typed_value` field
1668        // should be null.
1669        assert!(result.is_valid(1));
1670        assert!(result.value_field().unwrap().is_valid(1));
1671        assert!(result.typed_value_field().unwrap().is_null(1));
1672
1673        // The third row should be shredded, so the `value` field should be null and the
1674        // `typed_value` field should contain the list
1675        assert!(result.is_valid(2));
1676        assert!(result.value_field().unwrap().is_null(2));
1677        assert!(result.typed_value_field().unwrap().is_valid(2));
1678
1679        let typed_value = result.typed_value_field().unwrap();
1680        let fixed_size_list = typed_value
1681            .as_any()
1682            .downcast_ref::<FixedSizeListArray>()
1683            .expect("Expected FixedSizeListArray");
1684
1685        // Verify that typed value is `FixedSizeList`.
1686        assert_eq!(fixed_size_list.len(), 3);
1687        assert_eq!(fixed_size_list.value_length(), 2);
1688
1689        // Verify that the first entry in the `FixedSizeList` contains the expected value.
1690        let val0 = fixed_size_list.value(0);
1691        let val0_struct = val0.as_any().downcast_ref::<StructArray>().unwrap();
1692        let val0_typed = val0_struct.column_by_name("typed_value").unwrap();
1693        let val0_ints = val0_typed.as_any().downcast_ref::<Int64Array>().unwrap();
1694        assert_eq!(val0_ints.values(), &[1i64, 2i64]);
1695
1696        // Verify that second entry in the `FixedSizeList` cannot be shredded hence the value is
1697        // invalid.
1698        assert!(fixed_size_list.is_null(1));
1699
1700        // Verify that the third entry in the `FixedSizeList` contains the expected value.
1701        let val2 = fixed_size_list.value(2);
1702        let val2_struct = val2.as_any().downcast_ref::<StructArray>().unwrap();
1703        let val2_typed = val2_struct.column_by_name("typed_value").unwrap();
1704        let val2_ints = val2_typed.as_any().downcast_ref::<Int64Array>().unwrap();
1705        assert_eq!(val2_ints.values(), &[3i64, 4i64]);
1706    }
1707
1708    #[test]
1709    fn test_array_shredding_as_fixed_size_list_wrong_size() {
1710        let input = build_variant_array(vec![VariantRow::List(vec![
1711            VariantValue::from(1i64),
1712            VariantValue::from(2i64),
1713            VariantValue::from(3i64),
1714        ])]);
1715        let list_schema =
1716            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
1717
1718        let err = shred_variant(&input, &list_schema).unwrap_err();
1719        assert!(
1720            err.to_string()
1721                .contains("Expected fixed size list of size 2, got size 3"),
1722            "got: {err}",
1723        );
1724    }
1725
1726    #[test]
1727    fn test_array_shredding_with_array_elements() {
1728        let input = build_variant_array(vec![
1729            // Row 0: [[1, 2], [3, 4], []] - clean nested lists
1730            VariantRow::List(vec![
1731                VariantValue::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1732                VariantValue::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]),
1733                VariantValue::List(vec![]),
1734            ]),
1735            // Row 1: [[5, "bad", null], "not a list inner", null] - inner fallbacks
1736            VariantRow::List(vec![
1737                VariantValue::List(vec![
1738                    VariantValue::from(5i64),
1739                    VariantValue::from("bad"),
1740                    VariantValue::from(Variant::Null),
1741                ]),
1742                VariantValue::from("not a list inner"),
1743                VariantValue::Null,
1744            ]),
1745            // Row 2: "not a list" - top-level fallback
1746            VariantRow::Value(VariantValue::from("not a list")),
1747            // Row 3: null row
1748            VariantRow::Null,
1749        ]);
1750        let inner_field = Arc::new(Field::new("item", DataType::Int64, true));
1751        let inner_list_schema = DataType::List(inner_field);
1752        let list_schema = DataType::List(Arc::new(Field::new(
1753            "item",
1754            inner_list_schema.clone(),
1755            true,
1756        )));
1757        let result = shred_variant(&input, &list_schema).unwrap();
1758        assert_eq!(result.len(), 4);
1759
1760        let typed_value = result
1761            .typed_value_field()
1762            .unwrap()
1763            .as_any()
1764            .downcast_ref::<ListArray>()
1765            .unwrap();
1766
1767        assert_list_structure::<i32>(
1768            &result,
1769            4,
1770            &[0, 3, 6, 6, 6],
1771            &[Some(3), Some(3), None, None],
1772            &[None, None, Some(Variant::from("not a list")), None],
1773        );
1774
1775        let outer_elements =
1776            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1777        assert_eq!(outer_elements.len(), 6);
1778        let outer_values = outer_elements
1779            .typed_value_field()
1780            .unwrap()
1781            .as_any()
1782            .downcast_ref::<ListArray>()
1783            .unwrap();
1784        let outer_fallbacks = outer_elements.value_field().unwrap();
1785
1786        let outer_metadata = Arc::new(BinaryViewArray::from_iter_values(std::iter::repeat_n(
1787            EMPTY_VARIANT_METADATA_BYTES,
1788            outer_elements.len(),
1789        )));
1790        let outer_variant = VariantArray::from_parts(
1791            outer_metadata,
1792            Some(outer_fallbacks.clone()),
1793            Some(Arc::new(outer_values.clone())),
1794            None,
1795        );
1796
1797        assert_list_structure_and_elements::<Int64Type, i32>(
1798            &outer_variant,
1799            outer_elements.len(),
1800            &[0, 2, 4, 4, 7, 7, 7],
1801            &[Some(2), Some(2), Some(0), Some(3), None, None],
1802            &[
1803                None,
1804                None,
1805                None,
1806                None,
1807                Some(Variant::from("not a list inner")),
1808                Some(Variant::Null),
1809            ],
1810            (
1811                &[Some(1), Some(2), Some(3), Some(4), Some(5), None, None],
1812                &[
1813                    None,
1814                    None,
1815                    None,
1816                    None,
1817                    None,
1818                    Some(Variant::from("bad")),
1819                    Some(Variant::Null),
1820                ],
1821            ),
1822        );
1823    }
1824
1825    #[test]
1826    fn test_array_shredding_with_object_elements() {
1827        let input = build_variant_array(vec![
1828            // Row 0: [{"id": 1, "name": "Alice"}, {"id": null}] fully shards
1829            VariantRow::List(vec![
1830                VariantValue::Object(vec![
1831                    ("id", VariantValue::from(1i64)),
1832                    ("name", VariantValue::from("Alice")),
1833                ]),
1834                VariantValue::Object(vec![("id", VariantValue::from(Variant::Null))]),
1835            ]),
1836            // Row 1: "not a list" -> fallback
1837            VariantRow::Value(VariantValue::from("not a list")),
1838            // Row 2: Null row
1839            VariantRow::Null,
1840        ]);
1841
1842        // Target schema is List<Struct<id:int64,name:utf8>>
1843        let object_fields = Fields::from(vec![
1844            Field::new("id", DataType::Int64, true),
1845            Field::new("name", DataType::Utf8, true),
1846        ]);
1847        let list_schema = DataType::List(Arc::new(Field::new(
1848            "item",
1849            DataType::Struct(object_fields),
1850            true,
1851        )));
1852        let result = shred_variant(&input, &list_schema).unwrap();
1853        assert_eq!(result.len(), 3);
1854
1855        assert_list_structure::<i32>(
1856            &result,
1857            3,
1858            &[0, 2, 2, 2],
1859            &[Some(2), None, None],
1860            &[None, Some(Variant::from("not a list")), None],
1861        );
1862
1863        // Validate nested struct fields for each element
1864        let typed_value = result
1865            .typed_value_field()
1866            .unwrap()
1867            .as_any()
1868            .downcast_ref::<ListArray>()
1869            .unwrap();
1870        let element_array =
1871            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1872        assert_eq!(element_array.len(), 2);
1873        let element_objects = element_array
1874            .typed_value_field()
1875            .unwrap()
1876            .as_any()
1877            .downcast_ref::<arrow::array::StructArray>()
1878            .unwrap();
1879
1880        // Id field [1, Variant::Null]
1881        let id_field =
1882            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("id").unwrap())
1883                .unwrap();
1884        let id_values = id_field.value_field().unwrap();
1885        let id_typed_values = id_field
1886            .typed_value_field()
1887            .unwrap()
1888            .as_any()
1889            .downcast_ref::<Int64Array>()
1890            .unwrap();
1891        assert!(id_values.is_null(0));
1892        assert_eq!(id_typed_values.value(0), 1);
1893        // null is stored as Variant::Null in values
1894        assert!(id_values.is_valid(1));
1895        assert_eq!(
1896            Variant::new(
1897                EMPTY_VARIANT_METADATA_BYTES,
1898                binary_array_value(id_values.as_ref(), 1).unwrap()
1899            ),
1900            Variant::Null
1901        );
1902        assert!(id_typed_values.is_null(1));
1903
1904        // Name field ["Alice", null]
1905        let name_field =
1906            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("name").unwrap())
1907                .unwrap();
1908        let name_values = name_field.value_field().unwrap();
1909        let name_typed_values = name_field
1910            .typed_value_field()
1911            .unwrap()
1912            .as_any()
1913            .downcast_ref::<StringArray>()
1914            .unwrap();
1915        assert!(name_values.is_null(0));
1916        assert_eq!(name_typed_values.value(0), "Alice");
1917        // No value provided, both value and typed_value are null
1918        assert!(name_values.is_null(1));
1919        assert!(name_typed_values.is_null(1));
1920    }
1921
1922    #[test]
1923    fn test_object_shredding_comprehensive() -> Result<()> {
1924        let input = build_variant_array(vec![
1925            // Row 0: Fully shredded object
1926            VariantRow::Object(vec![
1927                ("score", VariantValue::from(95.5f64)),
1928                ("age", VariantValue::from(30i64)),
1929            ]),
1930            // Row 1: Partially shredded object (extra email field)
1931            VariantRow::Object(vec![
1932                ("score", VariantValue::from(87.2f64)),
1933                ("age", VariantValue::from(25i64)),
1934                ("email", VariantValue::from("bob@example.com")),
1935            ]),
1936            // Row 2: Missing field (no score)
1937            VariantRow::Object(vec![("age", VariantValue::from(35i64))]),
1938            // Row 3: Type mismatch (score is string, age is string)
1939            VariantRow::Object(vec![
1940                ("score", VariantValue::from("ninety-five")),
1941                ("age", VariantValue::from("thirty")),
1942            ]),
1943            // Row 4: Non-object
1944            VariantRow::Value(VariantValue::from("not an object")),
1945            // Row 5: Empty object
1946            VariantRow::Object(vec![]),
1947            // Row 6: Null
1948            VariantRow::Null,
1949            // Row 7: Object with only "wrong" fields
1950            VariantRow::Object(vec![("foo", VariantValue::from(10))]),
1951            // Row 8: Object with one "right" and one "wrong" field
1952            VariantRow::Object(vec![
1953                ("score", VariantValue::from(66.67f64)),
1954                ("foo", VariantValue::from(10)),
1955            ]),
1956        ]);
1957
1958        // Create target schema: struct<score: float64, age: int64>
1959        // Both types are supported for shredding
1960        let target_schema = ShreddedSchemaBuilder::default()
1961            .with_path("score", &DataType::Float64)?
1962            .with_path("age", &DataType::Int64)?
1963            .build();
1964
1965        let result = shred_variant(&input, &target_schema).unwrap();
1966
1967        // Verify structure
1968        assert!(result.value_field().is_some());
1969        assert!(result.typed_value_field().is_some());
1970        assert_eq!(result.len(), 9);
1971
1972        let metadata = result.metadata_field();
1973        let value = result.value_field().unwrap();
1974        let typed_value = result
1975            .typed_value_field()
1976            .unwrap()
1977            .as_any()
1978            .downcast_ref::<arrow::array::StructArray>()
1979            .unwrap();
1980
1981        // Extract score and age fields from typed_value struct
1982        let score_field =
1983            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("score").unwrap())
1984                .unwrap();
1985        let age_field =
1986            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("age").unwrap()).unwrap();
1987
1988        let score_value = score_field.value_field().unwrap();
1989        let score_typed_value = score_field
1990            .typed_value_field()
1991            .unwrap()
1992            .as_any()
1993            .downcast_ref::<Float64Array>()
1994            .unwrap();
1995        let age_value = age_field.value_field().unwrap();
1996        let age_typed_value = age_field
1997            .typed_value_field()
1998            .unwrap()
1999            .as_any()
2000            .downcast_ref::<Int64Array>()
2001            .unwrap();
2002
2003        // Set up exhaustive checking of all shredded columns and their nulls/values
2004        struct ShreddedValue<'m, 'v, T> {
2005            value: Option<Variant<'m, 'v>>,
2006            typed_value: Option<T>,
2007        }
2008        struct ShreddedStruct<'m, 'v> {
2009            score: ShreddedValue<'m, 'v, f64>,
2010            age: ShreddedValue<'m, 'v, i64>,
2011        }
2012        fn get_value<'m, 'v>(
2013            i: usize,
2014            metadata: &'m dyn Array,
2015            value: &'v dyn Array,
2016        ) -> Variant<'m, 'v> {
2017            variant_from_arrays_at(metadata, value, i).unwrap()
2018        }
2019        let expect = |i, expected_result: Option<ShreddedValue<ShreddedStruct>>| {
2020            match expected_result {
2021                Some(ShreddedValue {
2022                    value: expected_value,
2023                    typed_value: expected_typed_value,
2024                }) => {
2025                    assert!(result.is_valid(i));
2026                    match expected_value {
2027                        Some(expected_value) => {
2028                            assert!(value.is_valid(i));
2029                            assert_eq!(
2030                                expected_value,
2031                                get_value(i, metadata.as_ref(), value.as_ref())
2032                            );
2033                        }
2034                        None => {
2035                            assert!(value.is_null(i));
2036                        }
2037                    }
2038                    match expected_typed_value {
2039                        Some(ShreddedStruct {
2040                            score: expected_score,
2041                            age: expected_age,
2042                        }) => {
2043                            assert!(typed_value.is_valid(i));
2044                            assert!(score_field.is_valid(i)); // non-nullable
2045                            assert!(age_field.is_valid(i)); // non-nullable
2046                            match expected_score.value {
2047                                Some(expected_score_value) => {
2048                                    assert!(score_value.is_valid(i));
2049                                    assert_eq!(
2050                                        expected_score_value,
2051                                        get_value(i, metadata.as_ref(), score_value.as_ref())
2052                                    );
2053                                }
2054                                None => {
2055                                    assert!(score_value.is_null(i));
2056                                }
2057                            }
2058                            match expected_score.typed_value {
2059                                Some(expected_score) => {
2060                                    assert!(score_typed_value.is_valid(i));
2061                                    assert_eq!(expected_score, score_typed_value.value(i));
2062                                }
2063                                None => {
2064                                    assert!(score_typed_value.is_null(i));
2065                                }
2066                            }
2067                            match expected_age.value {
2068                                Some(expected_age_value) => {
2069                                    assert!(age_value.is_valid(i));
2070                                    assert_eq!(
2071                                        expected_age_value,
2072                                        get_value(i, metadata.as_ref(), age_value.as_ref())
2073                                    );
2074                                }
2075                                None => {
2076                                    assert!(age_value.is_null(i));
2077                                }
2078                            }
2079                            match expected_age.typed_value {
2080                                Some(expected_age) => {
2081                                    assert!(age_typed_value.is_valid(i));
2082                                    assert_eq!(expected_age, age_typed_value.value(i));
2083                                }
2084                                None => {
2085                                    assert!(age_typed_value.is_null(i));
2086                                }
2087                            }
2088                        }
2089                        None => {
2090                            assert!(typed_value.is_null(i));
2091                        }
2092                    }
2093                }
2094                None => {
2095                    assert!(result.is_null(i));
2096                }
2097            };
2098        };
2099
2100        // Row 0: Fully shredded - both fields shred successfully
2101        expect(
2102            0,
2103            Some(ShreddedValue {
2104                value: None,
2105                typed_value: Some(ShreddedStruct {
2106                    score: ShreddedValue {
2107                        value: None,
2108                        typed_value: Some(95.5),
2109                    },
2110                    age: ShreddedValue {
2111                        value: None,
2112                        typed_value: Some(30),
2113                    },
2114                }),
2115            }),
2116        );
2117
2118        // Row 1: Partially shredded - value contains extra email field
2119        let mut builder = VariantBuilder::new();
2120        builder
2121            .new_object()
2122            .with_field("email", "bob@example.com")
2123            .finish();
2124        let (m, v) = builder.finish();
2125        let expected_value = Variant::new(&m, &v);
2126
2127        expect(
2128            1,
2129            Some(ShreddedValue {
2130                value: Some(expected_value),
2131                typed_value: Some(ShreddedStruct {
2132                    score: ShreddedValue {
2133                        value: None,
2134                        typed_value: Some(87.2),
2135                    },
2136                    age: ShreddedValue {
2137                        value: None,
2138                        typed_value: Some(25),
2139                    },
2140                }),
2141            }),
2142        );
2143
2144        // Row 2: Fully shredded -- missing score field
2145        expect(
2146            2,
2147            Some(ShreddedValue {
2148                value: None,
2149                typed_value: Some(ShreddedStruct {
2150                    score: ShreddedValue {
2151                        value: None,
2152                        typed_value: None,
2153                    },
2154                    age: ShreddedValue {
2155                        value: None,
2156                        typed_value: Some(35),
2157                    },
2158                }),
2159            }),
2160        );
2161
2162        // Row 3: Type mismatches - both score and age are strings
2163        expect(
2164            3,
2165            Some(ShreddedValue {
2166                value: None,
2167                typed_value: Some(ShreddedStruct {
2168                    score: ShreddedValue {
2169                        value: Some(Variant::from("ninety-five")),
2170                        typed_value: None,
2171                    },
2172                    age: ShreddedValue {
2173                        value: Some(Variant::from("thirty")),
2174                        typed_value: None,
2175                    },
2176                }),
2177            }),
2178        );
2179
2180        // Row 4: Non-object - falls back to value field
2181        expect(
2182            4,
2183            Some(ShreddedValue {
2184                value: Some(Variant::from("not an object")),
2185                typed_value: None,
2186            }),
2187        );
2188
2189        // Row 5: Empty object
2190        expect(
2191            5,
2192            Some(ShreddedValue {
2193                value: None,
2194                typed_value: Some(ShreddedStruct {
2195                    score: ShreddedValue {
2196                        value: None,
2197                        typed_value: None,
2198                    },
2199                    age: ShreddedValue {
2200                        value: None,
2201                        typed_value: None,
2202                    },
2203                }),
2204            }),
2205        );
2206
2207        // Row 6: Null
2208        expect(6, None);
2209
2210        // Helper to correctly create a variant object using a row's existing metadata
2211        let object_with_foo_field = |i| {
2212            use parquet_variant::{ParentState, ValueBuilder, VariantMetadata};
2213            let metadata = VariantMetadata::new(binary_array_value(metadata.as_ref(), i).unwrap());
2214            let mut metadata_builder = ReadOnlyMetadataBuilder::new(&metadata);
2215            let mut value_builder = ValueBuilder::new();
2216            let state = ParentState::variant(&mut value_builder, &mut metadata_builder);
2217            ObjectBuilder::new(state, false)
2218                .with_field("foo", 10)
2219                .finish();
2220            (metadata, value_builder.into_inner())
2221        };
2222
2223        // Row 7: Object with only a "wrong" field
2224        let (m, v) = object_with_foo_field(7);
2225        expect(
2226            7,
2227            Some(ShreddedValue {
2228                value: Some(Variant::new_with_metadata(m, &v)),
2229                typed_value: Some(ShreddedStruct {
2230                    score: ShreddedValue {
2231                        value: None,
2232                        typed_value: None,
2233                    },
2234                    age: ShreddedValue {
2235                        value: None,
2236                        typed_value: None,
2237                    },
2238                }),
2239            }),
2240        );
2241
2242        // Row 8: Object with one "wrong" and one "right" field
2243        let (m, v) = object_with_foo_field(8);
2244        expect(
2245            8,
2246            Some(ShreddedValue {
2247                value: Some(Variant::new_with_metadata(m, &v)),
2248                typed_value: Some(ShreddedStruct {
2249                    score: ShreddedValue {
2250                        value: None,
2251                        typed_value: Some(66.67),
2252                    },
2253                    age: ShreddedValue {
2254                        value: None,
2255                        typed_value: None,
2256                    },
2257                }),
2258            }),
2259        );
2260        Ok(())
2261    }
2262
2263    #[test]
2264    fn test_object_shredding_with_array_field() {
2265        let input = build_variant_array(vec![
2266            // Row 0: Object with well-typed scores list
2267            VariantRow::Object(vec![(
2268                "scores",
2269                VariantValue::List(vec![VariantValue::from(10i64), VariantValue::from(20i64)]),
2270            )]),
2271            // Row 1: Object whose scores list contains incompatible type
2272            VariantRow::Object(vec![(
2273                "scores",
2274                VariantValue::List(vec![
2275                    VariantValue::from("oops"),
2276                    VariantValue::from(Variant::Null),
2277                ]),
2278            )]),
2279            // Row 2: Object missing the scores field entirely
2280            VariantRow::Object(vec![]),
2281            // Row 3: Non-object fallback
2282            VariantRow::Value(VariantValue::from("not an object")),
2283            // Row 4: Top-level Null
2284            VariantRow::Null,
2285        ]);
2286        let list_field = Arc::new(Field::new("item", DataType::Int64, true));
2287        let inner_list_schema = DataType::List(list_field);
2288        let schema = DataType::Struct(Fields::from(vec![Field::new(
2289            "scores",
2290            inner_list_schema.clone(),
2291            true,
2292        )]));
2293
2294        let result = shred_variant(&input, &schema).unwrap();
2295        assert_eq!(result.len(), 5);
2296
2297        // Access base value/typed_value columns
2298        let value_field = result.value_field().unwrap();
2299        let typed_struct = result
2300            .typed_value_field()
2301            .unwrap()
2302            .as_any()
2303            .downcast_ref::<arrow::array::StructArray>()
2304            .unwrap();
2305
2306        // Validate base value fallbacks for non-object rows
2307        assert!(value_field.is_null(0));
2308        assert!(value_field.is_null(1));
2309        assert!(value_field.is_null(2));
2310        assert!(value_field.is_valid(3));
2311        assert_eq!(
2312            variant_from_arrays_at(result.metadata_field(), value_field, 3).unwrap(),
2313            Variant::from("not an object")
2314        );
2315        assert!(value_field.is_null(4));
2316
2317        // Typed struct should only be null for the fallback row
2318        assert!(typed_struct.is_valid(0));
2319        assert!(typed_struct.is_valid(1));
2320        assert!(typed_struct.is_valid(2));
2321        assert!(typed_struct.is_null(3));
2322        assert!(typed_struct.is_null(4));
2323
2324        // Drill into the scores field on the typed struct
2325        let scores_field =
2326            ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("scores").unwrap())
2327                .unwrap();
2328        assert_list_structure_and_elements::<Int64Type, i32>(
2329            &VariantArray::from_parts(
2330                Arc::new(BinaryViewArray::from_iter_values(std::iter::repeat_n(
2331                    EMPTY_VARIANT_METADATA_BYTES,
2332                    scores_field.len(),
2333                ))),
2334                Some(scores_field.value_field().unwrap().clone()),
2335                Some(scores_field.typed_value_field().unwrap().clone()),
2336                None,
2337            ),
2338            scores_field.len(),
2339            &[0i32, 2, 4, 4, 4, 4],
2340            &[Some(2), Some(2), None, None, None],
2341            &[None, None, None, None, None],
2342            (
2343                &[Some(10), Some(20), None, None],
2344                &[None, None, Some(Variant::from("oops")), Some(Variant::Null)],
2345            ),
2346        );
2347    }
2348
2349    #[test]
2350    fn test_object_different_schemas() -> Result<()> {
2351        // Create object with multiple fields
2352        let input = build_variant_array(vec![VariantRow::Object(vec![
2353            ("id", VariantValue::from(123i32)),
2354            ("age", VariantValue::from(25i64)),
2355            ("score", VariantValue::from(95.5f64)),
2356        ])]);
2357
2358        // Test with schema containing only id field
2359        let schema1 = ShreddedSchemaBuilder::default()
2360            .with_path("id", &DataType::Int32)?
2361            .build();
2362        let result1 = shred_variant(&input, &schema1).unwrap();
2363        let value_field1 = result1.value_field().unwrap();
2364        assert!(!value_field1.is_null(0)); // should contain {"age": 25, "score": 95.5}
2365
2366        // Test with schema containing id and age fields
2367        let schema2 = ShreddedSchemaBuilder::default()
2368            .with_path("id", &DataType::Int32)?
2369            .with_path("age", &DataType::Int64)?
2370            .build();
2371        let result2 = shred_variant(&input, &schema2).unwrap();
2372        let value_field2 = result2.value_field().unwrap();
2373        assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
2374
2375        // Test with schema containing all fields
2376        let schema3 = ShreddedSchemaBuilder::default()
2377            .with_path("id", &DataType::Int32)?
2378            .with_path("age", &DataType::Int64)?
2379            .with_path("score", &DataType::Float64)?
2380            .build();
2381        let result3 = shred_variant(&input, &schema3).unwrap();
2382        let value_field3 = result3.value_field().unwrap();
2383        assert!(value_field3.is_null(0)); // fully shredded, no remaining fields
2384
2385        Ok(())
2386    }
2387
2388    #[test]
2389    fn test_uuid_shredding_in_objects() -> Result<()> {
2390        let mock_uuid_1 = Uuid::new_v4();
2391        let mock_uuid_2 = Uuid::new_v4();
2392        let mock_uuid_3 = Uuid::new_v4();
2393
2394        let input = build_variant_array(vec![
2395            // Row 0: Fully shredded object with both UUID fields
2396            VariantRow::Object(vec![
2397                ("id", VariantValue::from(mock_uuid_1)),
2398                ("session_id", VariantValue::from(mock_uuid_2)),
2399            ]),
2400            // Row 1: Partially shredded object - UUID fields plus extra field
2401            VariantRow::Object(vec![
2402                ("id", VariantValue::from(mock_uuid_2)),
2403                ("session_id", VariantValue::from(mock_uuid_3)),
2404                ("name", VariantValue::from("test_user")),
2405            ]),
2406            // Row 2: Missing UUID field (no session_id)
2407            VariantRow::Object(vec![("id", VariantValue::from(mock_uuid_1))]),
2408            // Row 3: Type mismatch - id is UUID but session_id is a string
2409            VariantRow::Object(vec![
2410                ("id", VariantValue::from(mock_uuid_3)),
2411                ("session_id", VariantValue::from("not-a-uuid")),
2412            ]),
2413            // Row 4: Object with non-UUID value in id field
2414            VariantRow::Object(vec![
2415                ("id", VariantValue::from(12345i64)),
2416                ("session_id", VariantValue::from(mock_uuid_1)),
2417            ]),
2418            // Row 5: Null
2419            VariantRow::Null,
2420        ]);
2421
2422        let target_schema = ShreddedSchemaBuilder::default()
2423            .with_path("id", DataType::FixedSizeBinary(16))?
2424            .with_path("session_id", DataType::FixedSizeBinary(16))?
2425            .build();
2426
2427        let result = shred_variant(&input, &target_schema).unwrap();
2428
2429        assert!(result.value_field().is_some());
2430        assert!(result.typed_value_field().is_some());
2431        assert_eq!(result.len(), 6);
2432
2433        let metadata = result.metadata_field();
2434        let value = result.value_field().unwrap();
2435        let typed_value = result
2436            .typed_value_field()
2437            .unwrap()
2438            .as_any()
2439            .downcast_ref::<arrow::array::StructArray>()
2440            .unwrap();
2441
2442        // Extract id and session_id fields from typed_value struct
2443        let id_field =
2444            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("id").unwrap()).unwrap();
2445        let session_id_field =
2446            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("session_id").unwrap())
2447                .unwrap();
2448
2449        let id_value = id_field.value_field().unwrap();
2450        let id_typed_value = id_field
2451            .typed_value_field()
2452            .unwrap()
2453            .as_any()
2454            .downcast_ref::<FixedSizeBinaryArray>()
2455            .unwrap();
2456        let session_id_value = session_id_field.value_field().unwrap();
2457        let session_id_typed_value = session_id_field
2458            .typed_value_field()
2459            .unwrap()
2460            .as_any()
2461            .downcast_ref::<FixedSizeBinaryArray>()
2462            .unwrap();
2463
2464        // Row 0: Fully shredded - both UUID fields shred successfully
2465        assert!(result.is_valid(0));
2466
2467        assert!(value.is_null(0)); // fully shredded, no remaining fields
2468        assert!(id_value.is_null(0));
2469        assert!(session_id_value.is_null(0));
2470
2471        assert!(typed_value.is_valid(0));
2472        assert!(id_typed_value.is_valid(0));
2473        assert!(session_id_typed_value.is_valid(0));
2474
2475        assert_eq!(id_typed_value.value(0), mock_uuid_1.as_bytes());
2476        assert_eq!(session_id_typed_value.value(0), mock_uuid_2.as_bytes());
2477
2478        // Row 1: Partially shredded - value contains extra name field
2479        assert!(result.is_valid(1));
2480
2481        assert!(value.is_valid(1)); // contains unshredded "name" field
2482        assert!(typed_value.is_valid(1));
2483
2484        assert!(id_value.is_null(1));
2485        assert!(id_typed_value.is_valid(1));
2486        assert_eq!(id_typed_value.value(1), mock_uuid_2.as_bytes());
2487
2488        assert!(session_id_value.is_null(1));
2489        assert!(session_id_typed_value.is_valid(1));
2490        assert_eq!(session_id_typed_value.value(1), mock_uuid_3.as_bytes());
2491
2492        // Verify the value field contains the name field
2493        let row_1_variant = variant_from_arrays_at(metadata, value, 1).unwrap();
2494        let Variant::Object(obj) = row_1_variant else {
2495            panic!("Expected object");
2496        };
2497
2498        assert_eq!(obj.get("name"), Some(Variant::from("test_user")));
2499
2500        // Row 2: Missing session_id field
2501        assert!(result.is_valid(2));
2502
2503        assert!(value.is_null(2)); // fully shredded, no extra fields
2504        assert!(typed_value.is_valid(2));
2505
2506        assert!(id_value.is_null(2));
2507        assert!(id_typed_value.is_valid(2));
2508        assert_eq!(id_typed_value.value(2), mock_uuid_1.as_bytes());
2509
2510        assert!(session_id_value.is_null(2));
2511        assert!(session_id_typed_value.is_null(2)); // missing field
2512
2513        // Row 3: Type mismatch - session_id is a string, not UUID
2514        assert!(result.is_valid(3));
2515
2516        assert!(value.is_null(3)); // no extra fields
2517        assert!(typed_value.is_valid(3));
2518
2519        assert!(id_value.is_null(3));
2520        assert!(id_typed_value.is_valid(3));
2521        assert_eq!(id_typed_value.value(3), mock_uuid_3.as_bytes());
2522
2523        assert!(session_id_value.is_valid(3)); // type mismatch, stored in value
2524        assert!(session_id_typed_value.is_null(3));
2525        let session_id_variant = variant_from_arrays_at(metadata, session_id_value, 3).unwrap();
2526        assert_eq!(session_id_variant, Variant::from("not-a-uuid"));
2527
2528        // Row 4: Type mismatch - id is int64, not UUID
2529        assert!(result.is_valid(4));
2530
2531        assert!(value.is_null(4)); // no extra fields
2532        assert!(typed_value.is_valid(4));
2533
2534        assert!(id_value.is_valid(4)); // type mismatch, stored in value
2535        assert!(id_typed_value.is_null(4));
2536        let id_variant = variant_from_arrays_at(metadata, id_value, 4).unwrap();
2537        assert_eq!(id_variant, Variant::from(12345i64));
2538
2539        assert!(session_id_value.is_null(4));
2540        assert!(session_id_typed_value.is_valid(4));
2541        assert_eq!(session_id_typed_value.value(4), mock_uuid_1.as_bytes());
2542
2543        // Row 5: Null
2544        assert!(result.is_null(5));
2545
2546        Ok(())
2547    }
2548
2549    #[test]
2550    fn test_spec_compliance() {
2551        let input = VariantArray::from_iter(vec![Variant::from(42i64), Variant::from("hello")]);
2552
2553        let result = shred_variant(&input, &DataType::Int64).unwrap();
2554
2555        // Test field access by name (not position)
2556        let inner_struct = result.inner();
2557        assert!(inner_struct.column_by_name("metadata").is_some());
2558        assert!(inner_struct.column_by_name("value").is_some());
2559        assert!(inner_struct.column_by_name("typed_value").is_some());
2560
2561        // Test metadata preservation
2562        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2563        // The metadata should be the same reference (cheap clone)
2564        // Note: BinaryViewArray doesn't have a .values() method, so we compare the arrays directly
2565        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2566
2567        // Test output structure correctness
2568        assert_eq!(result.len(), input.len());
2569        assert!(result.value_field().is_some());
2570        assert!(result.typed_value_field().is_some());
2571
2572        // For primitive shredding, verify that value and typed_value are never both non-null
2573        // (This rule applies to primitives; for objects, both can be non-null for partial shredding)
2574        let value_field = result.value_field().unwrap();
2575        let typed_value_field = result
2576            .typed_value_field()
2577            .unwrap()
2578            .as_any()
2579            .downcast_ref::<Int64Array>()
2580            .unwrap();
2581
2582        for i in 0..result.len() {
2583            if !result.is_null(i) {
2584                let value_is_null = value_field.is_null(i);
2585                let typed_value_is_null = typed_value_field.is_null(i);
2586                // For primitive shredding, at least one should be null
2587                assert!(
2588                    value_is_null || typed_value_is_null,
2589                    "Row {}: both value and typed_value are non-null for primitive shredding",
2590                    i
2591                );
2592            }
2593        }
2594    }
2595
2596    #[test]
2597    fn test_variant_schema_builder_simple() -> Result<()> {
2598        let shredding_type = ShreddedSchemaBuilder::default()
2599            .with_path("a", &DataType::Int64)?
2600            .with_path("b", &DataType::Float64)?
2601            .build();
2602
2603        assert_eq!(
2604            shredding_type,
2605            DataType::Struct(Fields::from(vec![
2606                Field::new("a", DataType::Int64, true),
2607                Field::new("b", DataType::Float64, true),
2608            ]))
2609        );
2610
2611        Ok(())
2612    }
2613
2614    #[test]
2615    fn test_variant_schema_builder_nested() -> Result<()> {
2616        let shredding_type = ShreddedSchemaBuilder::default()
2617            .with_path("a", &DataType::Int64)?
2618            .with_path("b.c", &DataType::Utf8)?
2619            .with_path("b.d", &DataType::Float64)?
2620            .build();
2621
2622        assert_eq!(
2623            shredding_type,
2624            DataType::Struct(Fields::from(vec![
2625                Field::new("a", DataType::Int64, true),
2626                Field::new(
2627                    "b",
2628                    DataType::Struct(Fields::from(vec![
2629                        Field::new("c", DataType::Utf8, true),
2630                        Field::new("d", DataType::Float64, true),
2631                    ])),
2632                    true
2633                ),
2634            ]))
2635        );
2636
2637        Ok(())
2638    }
2639
2640    #[test]
2641    fn test_variant_schema_builder_with_path_variant_path_arg() -> Result<()> {
2642        let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
2643        let shredding_type = ShreddedSchemaBuilder::default()
2644            .with_path(path, &DataType::Int64)?
2645            .build();
2646
2647        match shredding_type {
2648            DataType::Struct(fields) => {
2649                assert_eq!(fields.len(), 1);
2650                assert_eq!(fields[0].name(), "a.b");
2651                assert_eq!(fields[0].data_type(), &DataType::Int64);
2652            }
2653            _ => panic!("expected struct data type"),
2654        }
2655
2656        Ok(())
2657    }
2658
2659    #[test]
2660    fn test_variant_schema_builder_custom_nullability() -> Result<()> {
2661        let shredding_type = ShreddedSchemaBuilder::default()
2662            .with_path(
2663                "foo",
2664                Arc::new(Field::new("should_be_renamed", DataType::Utf8, false)),
2665            )?
2666            .with_path("bar", (&DataType::Int64, false))?
2667            .build();
2668
2669        let DataType::Struct(fields) = shredding_type else {
2670            panic!("expected struct data type");
2671        };
2672
2673        let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
2674        assert_eq!(foo.data_type(), &DataType::Utf8);
2675        assert!(!foo.is_nullable());
2676
2677        let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
2678        assert_eq!(bar.data_type(), &DataType::Int64);
2679        assert!(!bar.is_nullable());
2680
2681        Ok(())
2682    }
2683
2684    #[test]
2685    fn test_variant_schema_builder_with_shred_variant() -> Result<()> {
2686        let input = build_variant_array(vec![
2687            VariantRow::Object(vec![
2688                ("time", VariantValue::from(1234567890i64)),
2689                ("hostname", VariantValue::from("server1")),
2690                ("extra", VariantValue::from(42)),
2691            ]),
2692            VariantRow::Object(vec![
2693                ("time", VariantValue::from(9876543210i64)),
2694                ("hostname", VariantValue::from("server2")),
2695            ]),
2696            VariantRow::Null,
2697        ]);
2698
2699        let shredding_type = ShreddedSchemaBuilder::default()
2700            .with_path("time", &DataType::Int64)?
2701            .with_path("hostname", &DataType::Utf8)?
2702            .build();
2703
2704        let result = shred_variant(&input, &shredding_type).unwrap();
2705
2706        assert_eq!(
2707            result.data_type(),
2708            &DataType::Struct(Fields::from(vec![
2709                Field::new("metadata", DataType::BinaryView, false),
2710                Field::new("value", DataType::BinaryView, true),
2711                Field::new(
2712                    "typed_value",
2713                    DataType::Struct(Fields::from(vec![
2714                        Field::new(
2715                            "hostname",
2716                            DataType::Struct(Fields::from(vec![
2717                                Field::new("value", DataType::BinaryView, true),
2718                                Field::new("typed_value", DataType::Utf8, true),
2719                            ])),
2720                            false,
2721                        ),
2722                        Field::new(
2723                            "time",
2724                            DataType::Struct(Fields::from(vec![
2725                                Field::new("value", DataType::BinaryView, true),
2726                                Field::new("typed_value", DataType::Int64, true),
2727                            ])),
2728                            false,
2729                        ),
2730                    ])),
2731                    true,
2732                ),
2733            ]))
2734        );
2735
2736        assert_eq!(result.len(), 3);
2737        assert!(result.typed_value_field().is_some());
2738
2739        let typed_value = result
2740            .typed_value_field()
2741            .unwrap()
2742            .as_any()
2743            .downcast_ref::<arrow::array::StructArray>()
2744            .unwrap();
2745
2746        let time_field =
2747            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
2748                .unwrap();
2749        let hostname_field =
2750            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
2751                .unwrap();
2752
2753        let time_typed = time_field
2754            .typed_value_field()
2755            .unwrap()
2756            .as_any()
2757            .downcast_ref::<Int64Array>()
2758            .unwrap();
2759        let hostname_typed = hostname_field
2760            .typed_value_field()
2761            .unwrap()
2762            .as_any()
2763            .downcast_ref::<arrow::array::StringArray>()
2764            .unwrap();
2765
2766        // Row 0
2767        assert!(!result.is_null(0));
2768        assert_eq!(time_typed.value(0), 1234567890);
2769        assert_eq!(hostname_typed.value(0), "server1");
2770
2771        // Row 1
2772        assert!(!result.is_null(1));
2773        assert_eq!(time_typed.value(1), 9876543210);
2774        assert_eq!(hostname_typed.value(1), "server2");
2775
2776        // Row 2
2777        assert!(result.is_null(2));
2778
2779        Ok(())
2780    }
2781
2782    #[test]
2783    fn test_variant_schema_builder_conflicting_path() -> Result<()> {
2784        let shredding_type = ShreddedSchemaBuilder::default()
2785            .with_path("a", &DataType::Int64)?
2786            .with_path("a", &DataType::Float64)?
2787            .build();
2788
2789        assert_eq!(
2790            shredding_type,
2791            DataType::Struct(Fields::from(
2792                vec![Field::new("a", DataType::Float64, true),]
2793            ))
2794        );
2795
2796        Ok(())
2797    }
2798
2799    #[test]
2800    fn test_variant_schema_builder_root_path() -> Result<()> {
2801        let path = VariantPath::new(vec![]);
2802        let shredding_type = ShreddedSchemaBuilder::default()
2803            .with_path(path, &DataType::Int64)?
2804            .build();
2805
2806        assert_eq!(shredding_type, DataType::Int64);
2807
2808        Ok(())
2809    }
2810
2811    #[test]
2812    fn test_variant_schema_builder_empty_path() -> Result<()> {
2813        let shredding_type = ShreddedSchemaBuilder::default()
2814            .with_path("", &DataType::Int64)?
2815            .build();
2816
2817        assert_eq!(shredding_type, DataType::Int64);
2818        Ok(())
2819    }
2820
2821    #[test]
2822    fn test_variant_schema_builder_default() {
2823        let shredding_type = ShreddedSchemaBuilder::default().build();
2824        assert_eq!(shredding_type, DataType::Null);
2825    }
2826}