Skip to main content

parquet_variant_compute/
shred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for shredding VariantArray with a given schema.
19
20use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
21use crate::variant_to_arrow::{
22    ArrayVariantToArrowRowBuilder, PrimitiveVariantToArrowRowBuilder,
23    make_primitive_variant_to_arrow_row_builder,
24};
25use crate::{VariantArray, VariantValueArrayBuilder};
26use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
27use arrow::buffer::NullBuffer;
28use arrow::compute::CastOptions;
29use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
30use arrow::error::{ArrowError, Result};
31use indexmap::IndexMap;
32use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement};
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36/// Shreds the input binary variant using a target shredding schema derived from the requested data type.
37///
38/// For example, requesting `DataType::Int64` would produce an output variant array with the schema:
39///
40/// ```text
41/// {
42///    metadata: BINARY,
43///    value: BINARY,
44///    typed_value: LONG,
45/// }
46/// ```
47///
48/// Similarly, requesting `DataType::Struct` with two integer fields `a` and `b` would produce an
49/// output variant array with the schema:
50///
51/// ```text
52/// {
53///   metadata: BINARY,
54///   value: BINARY,
55///   typed_value: {
56///     a: {
57///       value: BINARY,
58///       typed_value: INT,
59///     },
60///     b: {
61///       value: BINARY,
62///       typed_value: INT,
63///     },
64///   }
65/// }
66/// ```
67///
68/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
69/// value passed to this function.
70pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<VariantArray> {
71    if array.typed_value_field().is_some() {
72        return Err(ArrowError::InvalidArgumentError(
73            "Input is already shredded".to_string(),
74        ));
75    }
76
77    if array.value_field().is_none() {
78        // all-null case -- nothing to do.
79        return Ok(array.clone());
80    };
81
82    let cast_options = CastOptions::default();
83    let mut builder = make_variant_to_shredded_variant_arrow_row_builder(
84        as_type,
85        &cast_options,
86        array.len(),
87        NullValue::TopLevelVariant,
88    )?;
89    for i in 0..array.len() {
90        if array.is_null(i) {
91            builder.append_null()?;
92        } else {
93            builder.append_value(array.value(i))?;
94        }
95    }
96    let (value, typed_value, nulls) = builder.finish()?;
97    Ok(VariantArray::from_parts(
98        array.metadata_field().clone(),
99        Some(value),
100        Some(typed_value),
101        nulls,
102    ))
103}
104
105/// Controls how `append_null` is encoded for a shredded `(value, typed_value)` pair.
106///
107/// | Mode | Struct validity bit | `value` | `typed_value` | Meaning |
108/// | --- | --- | --- | --- | --- |
109/// | `TopLevelVariant` | null | NULL | NULL | SQL NULL at the top-level variant row |
110/// | `ObjectField` | non-null | NULL | NULL | Missing object field |
111/// | `ArrayElement` | non-null | `Variant::Null` | NULL | Explicit null array element |
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub(crate) enum NullValue {
114    TopLevelVariant,
115    ObjectField,
116    ArrayElement,
117}
118
119impl NullValue {
120    fn append_to(
121        self,
122        nulls: &mut NullBufferBuilder,
123        value_builder: &mut VariantValueArrayBuilder,
124    ) {
125        match self {
126            Self::TopLevelVariant => nulls.append_null(),
127            Self::ObjectField | Self::ArrayElement => nulls.append_non_null(),
128        }
129        match self {
130            Self::TopLevelVariant | Self::ObjectField => value_builder.append_null(),
131            Self::ArrayElement => value_builder.append_value(Variant::Null),
132        }
133    }
134}
135
136pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>(
137    data_type: &'a DataType,
138    cast_options: &'a CastOptions,
139    capacity: usize,
140    null_value: NullValue,
141) -> Result<VariantToShreddedVariantRowBuilder<'a>> {
142    let builder = match data_type {
143        DataType::Struct(fields) => {
144            let typed_value_builder = VariantToShreddedObjectVariantRowBuilder::try_new(
145                fields,
146                cast_options,
147                capacity,
148                null_value,
149            )?;
150            VariantToShreddedVariantRowBuilder::Object(typed_value_builder)
151        }
152        DataType::List(_)
153        | DataType::LargeList(_)
154        | DataType::ListView(_)
155        | DataType::LargeListView(_)
156        | DataType::FixedSizeList(..) => {
157            let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new(
158                data_type,
159                cast_options,
160                capacity,
161                null_value,
162            )?;
163            VariantToShreddedVariantRowBuilder::Array(typed_value_builder)
164        }
165        // Supported shredded primitive types, see Variant shredding spec:
166        // https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types
167        DataType::Boolean
168        | DataType::Int8
169        | DataType::Int16
170        | DataType::Int32
171        | DataType::Int64
172        | DataType::Float32
173        | DataType::Float64
174        | DataType::Decimal32(..)
175        | DataType::Decimal64(..)
176        | DataType::Decimal128(..)
177        | DataType::Date32
178        | DataType::Time64(TimeUnit::Microsecond)
179        | DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _)
180        | DataType::Binary
181        | DataType::BinaryView
182        | DataType::LargeBinary
183        | DataType::Utf8
184        | DataType::Utf8View
185        | DataType::LargeUtf8
186        | DataType::FixedSizeBinary(16) // UUID
187        => {
188            let builder =
189                make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?;
190            let typed_value_builder =
191                VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity, null_value);
192            VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder)
193        }
194        DataType::FixedSizeBinary(_) => {
195            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported.")))
196        }
197        _ => {
198            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type")))
199        }
200    };
201    Ok(builder)
202}
203
204pub(crate) enum VariantToShreddedVariantRowBuilder<'a> {
205    Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>),
206    Array(VariantToShreddedArrayVariantRowBuilder<'a>),
207    Object(VariantToShreddedObjectVariantRowBuilder<'a>),
208}
209
210impl<'a> VariantToShreddedVariantRowBuilder<'a> {
211    pub fn append_null(&mut self) -> Result<()> {
212        use VariantToShreddedVariantRowBuilder::*;
213        match self {
214            Primitive(b) => b.append_null(),
215            Array(b) => b.append_null(),
216            Object(b) => b.append_null(),
217        }
218    }
219
220    pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
221        use VariantToShreddedVariantRowBuilder::*;
222        match self {
223            Primitive(b) => b.append_value(value),
224            Array(b) => b.append_value(value),
225            Object(b) => b.append_value(value),
226        }
227    }
228
229    pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
230        use VariantToShreddedVariantRowBuilder::*;
231        match self {
232            Primitive(b) => b.finish(),
233            Array(b) => b.finish(),
234            Object(b) => b.finish(),
235        }
236    }
237}
238
239/// A shredded primitive field builder.
240pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> {
241    value_builder: VariantValueArrayBuilder,
242    typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
243    nulls: NullBufferBuilder,
244    null_value: NullValue,
245}
246
247impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> {
248    pub(crate) fn new(
249        typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
250        capacity: usize,
251        null_value: NullValue,
252    ) -> Self {
253        Self {
254            value_builder: VariantValueArrayBuilder::new(capacity),
255            typed_value_builder,
256            nulls: NullBufferBuilder::new(capacity),
257            null_value,
258        }
259    }
260
261    fn append_null(&mut self) -> Result<()> {
262        self.null_value
263            .append_to(&mut self.nulls, &mut self.value_builder);
264        self.typed_value_builder.append_null()
265    }
266
267    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
268        self.nulls.append_non_null();
269        if self.typed_value_builder.append_value(&value)? {
270            self.value_builder.append_null();
271        } else {
272            self.value_builder.append_value(value);
273        }
274        Ok(true)
275    }
276
277    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
278        Ok((
279            self.value_builder.build()?,
280            self.typed_value_builder.finish()?,
281            self.nulls.finish(),
282        ))
283    }
284}
285
286pub(crate) struct VariantToShreddedArrayVariantRowBuilder<'a> {
287    value_builder: VariantValueArrayBuilder,
288    typed_value_builder: ArrayVariantToArrowRowBuilder<'a>,
289    nulls: NullBufferBuilder,
290    null_value: NullValue,
291}
292
293impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> {
294    fn try_new(
295        data_type: &'a DataType,
296        cast_options: &'a CastOptions,
297        capacity: usize,
298        null_value: NullValue,
299    ) -> Result<Self> {
300        Ok(Self {
301            value_builder: VariantValueArrayBuilder::new(capacity),
302            typed_value_builder: ArrayVariantToArrowRowBuilder::try_new(
303                data_type,
304                cast_options,
305                capacity,
306                true,
307            )?,
308            nulls: NullBufferBuilder::new(capacity),
309            null_value,
310        })
311    }
312
313    fn append_null(&mut self) -> Result<()> {
314        self.null_value
315            .append_to(&mut self.nulls, &mut self.value_builder);
316        self.typed_value_builder.append_null()?;
317        Ok(())
318    }
319
320    fn append_value(&mut self, variant: Variant<'_, '_>) -> Result<bool> {
321        // If the variant is not an array, typed_value must be null.
322        // If the variant is an array, value must be null.
323        match variant {
324            Variant::List(list) => {
325                self.nulls.append_non_null();
326                self.value_builder.append_null();
327                self.typed_value_builder
328                    .append_value(&Variant::List(list))?;
329                Ok(true)
330            }
331            other => {
332                self.nulls.append_non_null();
333                self.value_builder.append_value(other);
334                self.typed_value_builder.append_null()?;
335                Ok(false)
336            }
337        }
338    }
339
340    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
341        Ok((
342            self.value_builder.build()?,
343            self.typed_value_builder.finish()?,
344            self.nulls.finish(),
345        ))
346    }
347}
348
349pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> {
350    value_builder: VariantValueArrayBuilder,
351    typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>,
352    typed_value_nulls: NullBufferBuilder,
353    nulls: NullBufferBuilder,
354    null_value: NullValue,
355}
356
357impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
358    fn try_new(
359        fields: &'a Fields,
360        cast_options: &'a CastOptions,
361        capacity: usize,
362        null_value: NullValue,
363    ) -> Result<Self> {
364        let typed_value_builders = fields.iter().map(|field| {
365            let builder = make_variant_to_shredded_variant_arrow_row_builder(
366                field.data_type(),
367                cast_options,
368                capacity,
369                NullValue::ObjectField,
370            )?;
371            Ok((field.name().as_str(), builder))
372        });
373        Ok(Self {
374            value_builder: VariantValueArrayBuilder::new(capacity),
375            typed_value_builders: typed_value_builders.collect::<Result<_>>()?,
376            typed_value_nulls: NullBufferBuilder::new(capacity),
377            nulls: NullBufferBuilder::new(capacity),
378            null_value,
379        })
380    }
381
382    fn append_null(&mut self) -> Result<()> {
383        self.null_value
384            .append_to(&mut self.nulls, &mut self.value_builder);
385        self.typed_value_nulls.append_null();
386        for (_, typed_value_builder) in &mut self.typed_value_builders {
387            typed_value_builder.append_null()?;
388        }
389        Ok(())
390    }
391
392    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
393        let Variant::Object(ref obj) = value else {
394            // Not an object => fall back
395            self.nulls.append_non_null();
396            self.value_builder.append_value(value);
397            self.typed_value_nulls.append_null();
398            for (_, typed_value_builder) in &mut self.typed_value_builders {
399                typed_value_builder.append_null()?;
400            }
401            return Ok(false);
402        };
403
404        // Route the object's fields by name as either shredded or unshredded
405        let mut builder = self.value_builder.builder_ext(value.metadata());
406        let mut object_builder = builder.try_new_object()?;
407        let mut seen = std::collections::HashSet::new();
408        let mut partially_shredded = false;
409        for (field_name, value) in obj.iter() {
410            match self.typed_value_builders.get_mut(field_name) {
411                Some(typed_value_builder) => {
412                    typed_value_builder.append_value(value)?;
413                    seen.insert(field_name);
414                }
415                None => {
416                    object_builder.insert_bytes(field_name, value);
417                    partially_shredded = true;
418                }
419            }
420        }
421
422        // Handle missing fields
423        for (field_name, typed_value_builder) in &mut self.typed_value_builders {
424            if !seen.contains(field_name) {
425                typed_value_builder.append_null()?;
426            }
427        }
428
429        // Only emit the value if it captured any unshredded object fields
430        if partially_shredded {
431            object_builder.finish();
432        } else {
433            drop(object_builder);
434            self.value_builder.append_null();
435        }
436
437        self.typed_value_nulls.append_non_null();
438        self.nulls.append_non_null();
439        Ok(true)
440    }
441
442    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
443        let mut builder = StructArrayBuilder::new();
444        for (field_name, typed_value_builder) in self.typed_value_builders {
445            let (value, typed_value, nulls) = typed_value_builder.finish()?;
446            let array =
447                ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls);
448            builder = builder.with_field(field_name, ArrayRef::from(array), false);
449        }
450        if let Some(nulls) = self.typed_value_nulls.finish() {
451            builder = builder.with_nulls(nulls);
452        }
453        Ok((
454            self.value_builder.build()?,
455            Arc::new(builder.build()),
456            self.nulls.finish(),
457        ))
458    }
459}
460
461/// Field configuration captured by the builder (data type + nullability).
462#[derive(Clone)]
463pub struct ShreddingField {
464    data_type: DataType,
465    nullable: bool,
466}
467
468impl ShreddingField {
469    fn new(data_type: DataType, nullable: bool) -> Self {
470        Self {
471            data_type,
472            nullable,
473        }
474    }
475
476    fn null() -> Self {
477        Self::new(DataType::Null, true)
478    }
479}
480
481/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or `(DataType, bool)`.
482pub trait IntoShreddingField {
483    fn into_shredding_field(self) -> ShreddingField;
484}
485
486impl IntoShreddingField for FieldRef {
487    fn into_shredding_field(self) -> ShreddingField {
488        ShreddingField::new(self.data_type().clone(), self.is_nullable())
489    }
490}
491
492impl IntoShreddingField for &DataType {
493    fn into_shredding_field(self) -> ShreddingField {
494        ShreddingField::new(self.clone(), true)
495    }
496}
497
498impl IntoShreddingField for DataType {
499    fn into_shredding_field(self) -> ShreddingField {
500        ShreddingField::new(self, true)
501    }
502}
503
504impl IntoShreddingField for (&DataType, bool) {
505    fn into_shredding_field(self) -> ShreddingField {
506        ShreddingField::new(self.0.clone(), self.1)
507    }
508}
509
510impl IntoShreddingField for (DataType, bool) {
511    fn into_shredding_field(self) -> ShreddingField {
512        ShreddingField::new(self.0, self.1)
513    }
514}
515
516/// Builder for constructing a variant shredding schema.
517///
518/// The builder pattern makes it easy to incrementally define which fields
519/// should be shredded and with what types. Fields are nullable by default; pass
520/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
521///
522/// Note: this builder currently only supports struct fields. List support
523/// will be added in the future.
524///
525/// # Example
526///
527/// ```
528/// use std::sync::Arc;
529/// use arrow::datatypes::{DataType, Field, TimeUnit};
530/// use parquet_variant::{VariantPath, VariantPathElement};
531/// use parquet_variant_compute::ShreddedSchemaBuilder;
532///
533/// fn main() -> Result<(), arrow::error::ArrowError> {
534///     // Define the shredding schema using the builder
535///     let shredding_type = ShreddedSchemaBuilder::default()
536///     // store the "time" field as a separate UTC timestamp
537///     .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), true))?
538///     // store hostname as non-nullable Utf8
539///     .with_path("hostname", (&DataType::Utf8, false))?
540///     // pass a FieldRef directly
541///     .with_path(
542///         "metadata.trace_id",
543///         Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16), false)),
544///     )?
545///     // field name with a dot: use VariantPath to avoid splitting
546///     .with_path(
547///         VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
548///         &DataType::Float64,
549///     )?
550///     .build();
551///    Ok(())
552/// }
553/// // The shredding_type can now be passed to shred_variant:
554/// // let shredded = shred_variant(&input, &shredding_type)?;
555/// ```
556#[derive(Default, Clone)]
557pub struct ShreddedSchemaBuilder {
558    root: VariantSchemaNode,
559}
560
561impl ShreddedSchemaBuilder {
562    /// Create a new empty schema builder.
563    pub fn new() -> Self {
564        Self::default()
565    }
566
567    /// Insert a typed path into the schema using dot notation (or any
568    /// [`VariantPath`] convertible).
569    ///
570    /// The path uses dot notation to specify nested fields.
571    /// For example, "a.b.c" will create a nested structure.
572    ///
573    /// # Arguments
574    ///
575    /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
576    /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g. `FieldRef`,
577    ///   `&DataType`, or `(&DataType, bool)` to control nullability)
578    pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Result<Self>
579    where
580        P: TryInto<VariantPath<'a>>,
581        P::Error: std::fmt::Debug,
582        F: IntoShreddingField,
583    {
584        let path: VariantPath<'a> = path
585            .try_into()
586            .map_err(|e| ArrowError::InvalidArgumentError(format!("{:?}", e)))?;
587        self.root.insert_path(&path, field.into_shredding_field());
588        Ok(self)
589    }
590
591    /// Build the final [`DataType`].
592    pub fn build(self) -> DataType {
593        let shredding_type = self.root.to_shredding_type();
594        match shredding_type {
595            Some(shredding_type) => shredding_type,
596            None => DataType::Null,
597        }
598    }
599}
600
601/// Internal tree node structure for building variant schemas.
602#[derive(Clone)]
603enum VariantSchemaNode {
604    /// A leaf node with a primitive/scalar type (and nullability)
605    Leaf(ShreddingField),
606    /// An inner struct node with nested fields
607    Struct(BTreeMap<String, VariantSchemaNode>),
608}
609
610impl Default for VariantSchemaNode {
611    fn default() -> Self {
612        Self::Leaf(ShreddingField::null())
613    }
614}
615
616impl VariantSchemaNode {
617    /// Insert a path into this node with the given data type.
618    fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
619        self.insert_path_elements(path, field);
620    }
621
622    fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], field: ShreddingField) {
623        let Some((head, tail)) = segments.split_first() else {
624            *self = Self::Leaf(field);
625            return;
626        };
627
628        match head {
629            VariantPathElement::Field { name } => {
630                // Ensure this node is a Struct node
631                let children = match self {
632                    Self::Struct(children) => children,
633                    _ => {
634                        *self = Self::Struct(BTreeMap::new());
635                        match self {
636                            Self::Struct(children) => children,
637                            _ => unreachable!(),
638                        }
639                    }
640                };
641
642                children
643                    .entry(name.to_string())
644                    .or_default()
645                    .insert_path_elements(tail, field);
646            }
647            VariantPathElement::Index { .. } => {
648                // List support to be added later; reject for now
649                unreachable!("List paths are not supported yet");
650            }
651        }
652    }
653
654    /// Convert this node to a shredding type.
655    ///
656    /// Returns the [`DataType`] for passing to [`shred_variant`].
657    fn to_shredding_type(&self) -> Option<DataType> {
658        match self {
659            Self::Leaf(field) => Some(field.data_type.clone()),
660            Self::Struct(children) => {
661                let child_fields: Vec<_> = children
662                    .iter()
663                    .filter_map(|(name, child)| child.to_shredding_field(name))
664                    .collect();
665                if child_fields.is_empty() {
666                    None
667                } else {
668                    Some(DataType::Struct(Fields::from(child_fields)))
669                }
670            }
671        }
672    }
673
674    fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
675        match self {
676            Self::Leaf(field) => Some(Arc::new(Field::new(
677                name,
678                field.data_type.clone(),
679                field.nullable,
680            ))),
681            Self::Struct(_) => self
682                .to_shredding_type()
683                .map(|data_type| Arc::new(Field::new(name, data_type, true))),
684        }
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use super::*;
691    use crate::VariantArrayBuilder;
692    use arrow::array::{
693        Array, BinaryViewArray, FixedSizeBinaryArray, Float64Array, GenericListArray,
694        GenericListViewArray, Int64Array, LargeBinaryArray, LargeStringArray, ListArray,
695        ListLikeArray, OffsetSizeTrait, PrimitiveArray, StringArray,
696    };
697    use arrow::datatypes::{
698        ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode,
699    };
700    use parquet_variant::{
701        BuilderSpecificState, EMPTY_VARIANT_METADATA_BYTES, ObjectBuilder, ReadOnlyMetadataBuilder,
702        Variant, VariantBuilder, VariantPath, VariantPathElement,
703    };
704    use std::sync::Arc;
705    use uuid::Uuid;
706
707    const NULL_VALUES: [NullValue; 3] = [
708        NullValue::TopLevelVariant,
709        NullValue::ObjectField,
710        NullValue::ArrayElement,
711    ];
712
713    #[derive(Clone)]
714    enum VariantValue<'a> {
715        Value(Variant<'a, 'a>),
716        List(Vec<VariantValue<'a>>),
717        Object(Vec<(&'a str, VariantValue<'a>)>),
718        Null,
719    }
720
721    impl<'a, T> From<T> for VariantValue<'a>
722    where
723        T: Into<Variant<'a, 'a>>,
724    {
725        fn from(value: T) -> Self {
726            Self::Value(value.into())
727        }
728    }
729
730    #[derive(Clone)]
731    enum VariantRow<'a> {
732        Value(VariantValue<'a>),
733        List(Vec<VariantValue<'a>>),
734        Object(Vec<(&'a str, VariantValue<'a>)>),
735        Null,
736    }
737
738    fn build_variant_array(rows: Vec<VariantRow<'static>>) -> VariantArray {
739        let mut builder = VariantArrayBuilder::new(rows.len());
740
741        fn append_variant_value<B: VariantBuilderExt>(builder: &mut B, value: VariantValue) {
742            match value {
743                VariantValue::Value(v) => builder.append_value(v),
744                VariantValue::List(values) => {
745                    let mut list = builder.new_list();
746                    for v in values {
747                        append_variant_value(&mut list, v);
748                    }
749                    list.finish();
750                }
751                VariantValue::Object(fields) => {
752                    let mut object = builder.new_object();
753                    for (name, value) in fields {
754                        append_variant_field(&mut object, name, value);
755                    }
756                    object.finish();
757                }
758                VariantValue::Null => builder.append_null(),
759            }
760        }
761
762        fn append_variant_field<'a, S: BuilderSpecificState>(
763            object: &mut ObjectBuilder<'_, S>,
764            name: &'a str,
765            value: VariantValue<'a>,
766        ) {
767            match value {
768                VariantValue::Value(v) => {
769                    object.insert(name, v);
770                }
771                VariantValue::List(values) => {
772                    let mut list = object.new_list(name);
773                    for v in values {
774                        append_variant_value(&mut list, v);
775                    }
776                    list.finish();
777                }
778                VariantValue::Object(fields) => {
779                    let mut nested = object.new_object(name);
780                    for (field_name, v) in fields {
781                        append_variant_field(&mut nested, field_name, v);
782                    }
783                    nested.finish();
784                }
785                VariantValue::Null => {
786                    object.insert(name, Variant::Null);
787                }
788            }
789        }
790
791        rows.into_iter().for_each(|row| match row {
792            VariantRow::Value(value) => append_variant_value(&mut builder, value),
793            VariantRow::List(values) => {
794                let mut list = builder.new_list();
795                for value in values {
796                    append_variant_value(&mut list, value);
797                }
798                list.finish();
799            }
800            VariantRow::Object(fields) => {
801                let mut object = builder.new_object();
802                for (name, value) in fields {
803                    append_variant_field(&mut object, name, value);
804                }
805                object.finish();
806            }
807            VariantRow::Null => builder.append_null(),
808        });
809        builder.build()
810    }
811
812    trait TestListLikeArray: ListLikeArray {
813        type OffsetSize: OffsetSizeTrait;
814        fn value_offsets(&self) -> Option<&[Self::OffsetSize]>;
815        fn value_size(&self, index: usize) -> Self::OffsetSize;
816    }
817
818    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListArray<O> {
819        type OffsetSize = O;
820
821        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
822            Some(GenericListArray::value_offsets(self))
823        }
824
825        fn value_size(&self, index: usize) -> Self::OffsetSize {
826            GenericListArray::value_length(self, index)
827        }
828    }
829
830    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListViewArray<O> {
831        type OffsetSize = O;
832
833        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
834            Some(GenericListViewArray::value_offsets(self))
835        }
836
837        fn value_size(&self, index: usize) -> Self::OffsetSize {
838            GenericListViewArray::value_size(self, index)
839        }
840    }
841
842    fn downcast_list_like_array<O: OffsetSizeTrait>(
843        array: &VariantArray,
844    ) -> &dyn TestListLikeArray<OffsetSize = O> {
845        let typed_value = array.typed_value_field().unwrap();
846        if let Some(list) = typed_value.as_any().downcast_ref::<GenericListArray<O>>() {
847            list
848        } else if let Some(list_view) = typed_value
849            .as_any()
850            .downcast_ref::<GenericListViewArray<O>>()
851        {
852            list_view
853        } else {
854            panic!(
855                "Expected list-like typed_value with matching offset type, got {}",
856                typed_value.data_type()
857            );
858        }
859    }
860
861    fn assert_list_structure<O: OffsetSizeTrait>(
862        array: &VariantArray,
863        expected_len: usize,
864        expected_offsets: &[O],
865        expected_sizes: &[Option<O>],
866        expected_fallbacks: &[Option<Variant<'static, 'static>>],
867    ) {
868        assert_eq!(array.len(), expected_len);
869
870        let fallbacks = (array.value_field().unwrap(), Some(array.metadata_field()));
871        let array = downcast_list_like_array::<O>(array);
872
873        assert_eq!(
874            array.value_offsets().unwrap(),
875            expected_offsets,
876            "list offsets mismatch"
877        );
878        assert_eq!(
879            array.len(),
880            expected_sizes.len(),
881            "expected_sizes should match array length"
882        );
883        assert_eq!(
884            array.len(),
885            expected_fallbacks.len(),
886            "expected_fallbacks should match array length"
887        );
888        assert_eq!(
889            array.len(),
890            fallbacks.0.len(),
891            "fallbacks value field should match array length"
892        );
893
894        // Validate per-row shredding outcomes for the list array
895        for (idx, (expected_size, expected_fallback)) in expected_sizes
896            .iter()
897            .zip(expected_fallbacks.iter())
898            .enumerate()
899        {
900            match expected_size {
901                Some(len) => {
902                    // Successfully shredded: typed list value present, no fallback value
903                    assert!(array.is_valid(idx));
904                    assert_eq!(array.value_size(idx), *len);
905                    assert!(fallbacks.0.is_null(idx));
906                }
907                None => {
908                    // Unable to shred: typed list value absent, fallback should carry the variant
909                    assert!(array.is_null(idx));
910                    assert_eq!(array.value_size(idx), O::zero());
911                    match expected_fallback {
912                        Some(expected_variant) => {
913                            assert!(fallbacks.0.is_valid(idx));
914                            let metadata_bytes = fallbacks
915                                .1
916                                .filter(|m| m.is_valid(idx))
917                                .map(|m| m.value(idx))
918                                .filter(|bytes| !bytes.is_empty())
919                                .unwrap_or(EMPTY_VARIANT_METADATA_BYTES);
920                            assert_eq!(
921                                Variant::new(metadata_bytes, fallbacks.0.value(idx)),
922                                expected_variant.clone()
923                            );
924                        }
925                        None => {
926                            assert!(fallbacks.0.is_null(idx));
927                        }
928                    }
929                }
930            }
931        }
932    }
933
934    fn assert_list_structure_and_elements<T: ArrowPrimitiveType, O: OffsetSizeTrait>(
935        array: &VariantArray,
936        expected_len: usize,
937        expected_offsets: &[O],
938        expected_sizes: &[Option<O>],
939        expected_fallbacks: &[Option<Variant<'static, 'static>>],
940        expected_shredded_elements: (&[Option<T::Native>], &[Option<Variant<'static, 'static>>]),
941    ) {
942        assert_list_structure(
943            array,
944            expected_len,
945            expected_offsets,
946            expected_sizes,
947            expected_fallbacks,
948        );
949        let array = downcast_list_like_array::<O>(array);
950
951        // Validate the shredded state of list elements (typed values and fallbacks)
952        let (expected_values, expected_fallbacks) = expected_shredded_elements;
953        assert_eq!(
954            expected_values.len(),
955            expected_fallbacks.len(),
956            "expected_values and expected_fallbacks should be aligned"
957        );
958
959        // Validate the shredded primitive values for list elements
960        let element_array = ShreddedVariantFieldArray::try_new(array.values().as_ref()).unwrap();
961        let element_values = element_array
962            .typed_value_field()
963            .unwrap()
964            .as_any()
965            .downcast_ref::<PrimitiveArray<T>>()
966            .unwrap();
967        assert_eq!(element_values.len(), expected_values.len());
968        for (idx, expected_value) in expected_values.iter().enumerate() {
969            match expected_value {
970                Some(value) => {
971                    assert!(element_values.is_valid(idx));
972                    assert_eq!(element_values.value(idx), *value);
973                }
974                None => assert!(element_values.is_null(idx)),
975            }
976        }
977
978        // Validate fallback variants for list elements that could not be shredded
979        let element_fallbacks = element_array.value_field().unwrap();
980        assert_eq!(element_fallbacks.len(), expected_fallbacks.len());
981        for (idx, expected_fallback) in expected_fallbacks.iter().enumerate() {
982            match expected_fallback {
983                Some(expected_variant) => {
984                    assert!(element_fallbacks.is_valid(idx));
985                    assert_eq!(
986                        Variant::new(EMPTY_VARIANT_METADATA_BYTES, element_fallbacks.value(idx)),
987                        expected_variant.clone()
988                    );
989                }
990                None => assert!(element_fallbacks.is_null(idx)),
991            }
992        }
993    }
994
995    fn assert_append_null_mode_value_and_struct_nulls(
996        mode: NullValue,
997        value: &BinaryViewArray,
998        nulls: Option<&arrow::buffer::NullBuffer>,
999    ) {
1000        if mode == NullValue::TopLevelVariant {
1001            assert!(nulls.is_some_and(|n| n.is_null(0)));
1002        } else {
1003            assert!(nulls.is_none());
1004        }
1005
1006        if mode == NullValue::ArrayElement {
1007            assert!(value.is_valid(0));
1008            assert_eq!(
1009                Variant::new(EMPTY_VARIANT_METADATA_BYTES, value.value(0)),
1010                Variant::Null
1011            );
1012        } else {
1013            assert!(value.is_null(0));
1014        }
1015    }
1016
1017    #[test]
1018    fn test_append_null_mode_semantics_primitive_builder() {
1019        let cast_options = arrow::compute::CastOptions::default();
1020
1021        for mode in NULL_VALUES {
1022            let mut primitive_builder = make_variant_to_shredded_variant_arrow_row_builder(
1023                &DataType::Int64,
1024                &cast_options,
1025                1,
1026                mode,
1027            )
1028            .unwrap();
1029            primitive_builder.append_null().unwrap();
1030            let (primitive_value, primitive_typed_value, primitive_nulls) =
1031                primitive_builder.finish().unwrap();
1032            let primitive_typed_value = primitive_typed_value
1033                .as_any()
1034                .downcast_ref::<Int64Array>()
1035                .unwrap();
1036
1037            assert!(primitive_typed_value.is_null(0));
1038            assert_append_null_mode_value_and_struct_nulls(
1039                mode,
1040                &primitive_value,
1041                primitive_nulls.as_ref(),
1042            );
1043        }
1044    }
1045
1046    #[test]
1047    fn test_append_null_mode_semantics_array_builder() {
1048        let cast_options = arrow::compute::CastOptions::default();
1049        let list_type = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1050
1051        for mode in NULL_VALUES {
1052            let mut array_builder = make_variant_to_shredded_variant_arrow_row_builder(
1053                &list_type,
1054                &cast_options,
1055                1,
1056                mode,
1057            )
1058            .unwrap();
1059            array_builder.append_null().unwrap();
1060            let (value, typed_value, nulls) = array_builder.finish().unwrap();
1061
1062            assert_append_null_mode_value_and_struct_nulls(mode, &value, nulls.as_ref());
1063
1064            let typed_value = typed_value.as_any().downcast_ref::<ListArray>().unwrap();
1065            assert_eq!(typed_value.len(), 1);
1066            assert!(typed_value.is_null(0));
1067            assert_eq!(typed_value.values().len(), 0);
1068        }
1069    }
1070
1071    #[test]
1072    fn test_append_null_mode_semantics_object_builder() {
1073        let cast_options = arrow::compute::CastOptions::default();
1074        let object_type = DataType::Struct(Fields::from(vec![
1075            Field::new("id", DataType::Int64, true),
1076            Field::new("name", DataType::Utf8, true),
1077        ]));
1078
1079        for mode in NULL_VALUES {
1080            let mut object_builder = make_variant_to_shredded_variant_arrow_row_builder(
1081                &object_type,
1082                &cast_options,
1083                1,
1084                mode,
1085            )
1086            .unwrap();
1087            object_builder.append_null().unwrap();
1088            let (value, typed_value, nulls) = object_builder.finish().unwrap();
1089
1090            assert_append_null_mode_value_and_struct_nulls(mode, &value, nulls.as_ref());
1091
1092            let typed_struct = typed_value
1093                .as_any()
1094                .downcast_ref::<arrow::array::StructArray>()
1095                .unwrap();
1096            assert_eq!(typed_struct.len(), 1);
1097            assert!(typed_struct.is_null(0));
1098
1099            for field_name in ["id", "name"] {
1100                let field = ShreddedVariantFieldArray::try_new(
1101                    typed_struct.column_by_name(field_name).unwrap(),
1102                )
1103                .unwrap();
1104                assert!(field.value_field().unwrap().is_null(0));
1105                assert!(field.typed_value_field().unwrap().is_null(0));
1106            }
1107        }
1108    }
1109
1110    #[test]
1111    fn test_already_shredded_input_error() {
1112        // Create a VariantArray that already has typed_value_field
1113        // First create a valid VariantArray, then extract its parts to construct a shredded one
1114        let temp_array = VariantArray::from_iter(vec![Some(Variant::from("test"))]);
1115        let metadata = temp_array.metadata_field().clone();
1116        let value = temp_array.value_field().unwrap().clone();
1117        let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef;
1118
1119        let shredded_array =
1120            VariantArray::from_parts(metadata, Some(value), Some(typed_value), None);
1121
1122        let result = shred_variant(&shredded_array, &DataType::Int64);
1123        assert!(matches!(
1124            result.unwrap_err(),
1125            ArrowError::InvalidArgumentError(_)
1126        ));
1127    }
1128
1129    #[test]
1130    fn test_all_null_input() {
1131        // Create VariantArray with no value field (all null case)
1132        let metadata = BinaryViewArray::from_iter_values([&[1u8, 0u8]]); // minimal valid metadata
1133        let all_null_array = VariantArray::from_parts(metadata, None, None, None);
1134        let result = shred_variant(&all_null_array, &DataType::Int64).unwrap();
1135
1136        // Should return array with no value/typed_value fields
1137        assert!(result.value_field().is_none());
1138        assert!(result.typed_value_field().is_none());
1139    }
1140
1141    #[test]
1142    fn test_invalid_fixed_size_binary_shredding() {
1143        let mock_uuid_1 = Uuid::new_v4();
1144
1145        let input = VariantArray::from_iter([Some(Variant::from(mock_uuid_1)), None]);
1146
1147        // shred_variant only supports FixedSizeBinary(16). Any other length will err.
1148        let err = shred_variant(&input, &DataType::FixedSizeBinary(17)).unwrap_err();
1149
1150        assert_eq!(
1151            err.to_string(),
1152            "Invalid argument error: FixedSizeBinary(17) is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported."
1153        );
1154    }
1155
1156    #[test]
1157    fn test_uuid_shredding() {
1158        let mock_uuid_1 = Uuid::new_v4();
1159        let mock_uuid_2 = Uuid::new_v4();
1160
1161        let input = VariantArray::from_iter([
1162            Some(Variant::from(mock_uuid_1)),
1163            None,
1164            Some(Variant::from(false)),
1165            Some(Variant::from(mock_uuid_2)),
1166        ]);
1167
1168        let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();
1169
1170        // // inspect the typed_value Field and make sure it contains the canonical Uuid extension type
1171        // let typed_value_field = variant_array
1172        //     .inner()
1173        //     .fields()
1174        //     .into_iter()
1175        //     .find(|f| f.name() == "typed_value")
1176        //     .unwrap();
1177
1178        // assert!(
1179        //     typed_value_field
1180        //         .try_extension_type::<extension::Uuid>()
1181        //         .is_ok()
1182        // );
1183
1184        // probe the downcasted typed_value array to make sure uuids are shredded correctly
1185        let uuids = variant_array
1186            .typed_value_field()
1187            .unwrap()
1188            .as_any()
1189            .downcast_ref::<FixedSizeBinaryArray>()
1190            .unwrap();
1191
1192        assert_eq!(uuids.len(), 4);
1193
1194        assert!(!uuids.is_null(0));
1195
1196        let got_uuid_1: &[u8] = uuids.value(0);
1197        assert_eq!(got_uuid_1, mock_uuid_1.as_bytes());
1198
1199        assert!(uuids.is_null(1));
1200        assert!(uuids.is_null(2));
1201
1202        assert!(!uuids.is_null(3));
1203
1204        let got_uuid_2: &[u8] = uuids.value(3);
1205        assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
1206    }
1207
1208    #[test]
1209    fn test_primitive_shredding_comprehensive() {
1210        // Test mixed scenarios in a single array
1211        let input = VariantArray::from_iter(vec![
1212            Some(Variant::from(42i64)),   // successful shred
1213            Some(Variant::from("hello")), // failed shred (string)
1214            Some(Variant::from(100i64)),  // successful shred
1215            None,                         // array-level null
1216            Some(Variant::Null),          // variant null
1217            Some(Variant::from(3i8)),     // successful shred (int8->int64 conversion)
1218        ]);
1219
1220        let result = shred_variant(&input, &DataType::Int64).unwrap();
1221
1222        // Verify structure
1223        let metadata_field = result.metadata_field();
1224        let value_field = result.value_field().unwrap();
1225        let typed_value_field = result
1226            .typed_value_field()
1227            .unwrap()
1228            .as_any()
1229            .downcast_ref::<Int64Array>()
1230            .unwrap();
1231
1232        // Check specific outcomes for each row
1233        assert_eq!(result.len(), 6);
1234
1235        // Row 0: 42 -> should shred successfully
1236        assert!(!result.is_null(0));
1237        assert!(value_field.is_null(0)); // value should be null when shredded
1238        assert!(!typed_value_field.is_null(0));
1239        assert_eq!(typed_value_field.value(0), 42);
1240
1241        // Row 1: "hello" -> should fail to shred
1242        assert!(!result.is_null(1));
1243        assert!(!value_field.is_null(1)); // value should contain original
1244        assert!(typed_value_field.is_null(1)); // typed_value should be null
1245        assert_eq!(
1246            Variant::new(metadata_field.value(1), value_field.value(1)),
1247            Variant::from("hello")
1248        );
1249
1250        // Row 2: 100 -> should shred successfully
1251        assert!(!result.is_null(2));
1252        assert!(value_field.is_null(2));
1253        assert_eq!(typed_value_field.value(2), 100);
1254
1255        // Row 3: array null -> should be null in result
1256        assert!(result.is_null(3));
1257
1258        // Row 4: Variant::Null -> should not shred (it's a null variant, not an integer)
1259        assert!(!result.is_null(4));
1260        assert!(!value_field.is_null(4)); // should contain Variant::Null
1261        assert_eq!(
1262            Variant::new(metadata_field.value(4), value_field.value(4)),
1263            Variant::Null
1264        );
1265        assert!(typed_value_field.is_null(4));
1266
1267        // Row 5: 3i8 -> should shred successfully (int8->int64 conversion)
1268        assert!(!result.is_null(5));
1269        assert!(value_field.is_null(5)); // value should be null when shredded
1270        assert!(!typed_value_field.is_null(5));
1271        assert_eq!(typed_value_field.value(5), 3);
1272    }
1273
1274    #[test]
1275    fn test_primitive_different_target_types() {
1276        let input = VariantArray::from_iter(vec![
1277            Variant::from(42i32),
1278            Variant::from(3.15f64),
1279            Variant::from("not_a_number"),
1280        ]);
1281
1282        // Test Int32 target
1283        let result_int32 = shred_variant(&input, &DataType::Int32).unwrap();
1284        let typed_value_int32 = result_int32
1285            .typed_value_field()
1286            .unwrap()
1287            .as_any()
1288            .downcast_ref::<arrow::array::Int32Array>()
1289            .unwrap();
1290        assert_eq!(typed_value_int32.value(0), 42);
1291        assert_eq!(typed_value_int32.value(1), 3);
1292        assert!(typed_value_int32.is_null(2)); // string doesn't convert to int32
1293
1294        // Test Float64 target
1295        let result_float64 = shred_variant(&input, &DataType::Float64).unwrap();
1296        let typed_value_float64 = result_float64
1297            .typed_value_field()
1298            .unwrap()
1299            .as_any()
1300            .downcast_ref::<Float64Array>()
1301            .unwrap();
1302        assert_eq!(typed_value_float64.value(0), 42.0); // int converts to float
1303        assert_eq!(typed_value_float64.value(1), 3.15);
1304        assert!(typed_value_float64.is_null(2)); // string doesn't convert
1305    }
1306
1307    #[test]
1308    fn test_largeutf8_shredding() {
1309        let input = VariantArray::from_iter(vec![
1310            Some(Variant::from("hello")),
1311            Some(Variant::from(42i64)),
1312            None,
1313            Some(Variant::Null),
1314            Some(Variant::from("world")),
1315        ]);
1316
1317        let result = shred_variant(&input, &DataType::LargeUtf8).unwrap();
1318        let metadata = result.metadata_field();
1319        let value = result.value_field().unwrap();
1320        let typed_value = result
1321            .typed_value_field()
1322            .unwrap()
1323            .as_any()
1324            .downcast_ref::<LargeStringArray>()
1325            .unwrap();
1326
1327        assert_eq!(result.len(), 5);
1328
1329        // Row 0: string shreds to typed_value
1330        assert!(result.is_valid(0));
1331        assert!(value.is_null(0));
1332        assert_eq!(typed_value.value(0), "hello");
1333
1334        // Row 1: integer falls back to value
1335        assert!(result.is_valid(1));
1336        assert!(value.is_valid(1));
1337        assert!(typed_value.is_null(1));
1338        assert_eq!(
1339            Variant::new(metadata.value(1), value.value(1)),
1340            Variant::from(42i64)
1341        );
1342
1343        // Row 2: top-level null
1344        assert!(result.is_null(2));
1345        assert!(value.is_null(2));
1346        assert!(typed_value.is_null(2));
1347
1348        // Row 3: variant null falls back to value
1349        assert!(result.is_valid(3));
1350        assert!(value.is_valid(3));
1351        assert!(typed_value.is_null(3));
1352        assert_eq!(
1353            Variant::new(metadata.value(3), value.value(3)),
1354            Variant::Null
1355        );
1356
1357        // Row 4: string shreds to typed_value
1358        assert!(result.is_valid(4));
1359        assert!(value.is_null(4));
1360        assert_eq!(typed_value.value(4), "world");
1361    }
1362
1363    #[test]
1364    fn test_largebinary_shredding() {
1365        let input = VariantArray::from_iter(vec![
1366            Some(Variant::from(&b"\x00\x01\x02"[..])),
1367            Some(Variant::from("not_binary")),
1368            None,
1369            Some(Variant::Null),
1370            Some(Variant::from(&b"\xff\xaa"[..])),
1371        ]);
1372
1373        let result = shred_variant(&input, &DataType::LargeBinary).unwrap();
1374        let metadata = result.metadata_field();
1375        let value = result.value_field().unwrap();
1376        let typed_value = result
1377            .typed_value_field()
1378            .unwrap()
1379            .as_any()
1380            .downcast_ref::<LargeBinaryArray>()
1381            .unwrap();
1382
1383        assert_eq!(result.len(), 5);
1384
1385        // Row 0: binary shreds to typed_value
1386        assert!(result.is_valid(0));
1387        assert!(value.is_null(0));
1388        assert_eq!(typed_value.value(0), &[0x00, 0x01, 0x02]);
1389
1390        // Row 1: string falls back to value
1391        assert!(result.is_valid(1));
1392        assert!(value.is_valid(1));
1393        assert!(typed_value.is_null(1));
1394        assert_eq!(
1395            Variant::new(metadata.value(1), value.value(1)),
1396            Variant::from("not_binary")
1397        );
1398
1399        // Row 2: top-level null
1400        assert!(result.is_null(2));
1401        assert!(value.is_null(2));
1402        assert!(typed_value.is_null(2));
1403
1404        // Row 3: variant null falls back to value
1405        assert!(result.is_valid(3));
1406        assert!(value.is_valid(3));
1407        assert!(typed_value.is_null(3));
1408        assert_eq!(
1409            Variant::new(metadata.value(3), value.value(3)),
1410            Variant::Null
1411        );
1412
1413        // Row 4: binary shreds to typed_value
1414        assert!(result.is_valid(4));
1415        assert!(value.is_null(4));
1416        assert_eq!(typed_value.value(4), &[0xff, 0xaa]);
1417    }
1418
1419    #[test]
1420    fn test_invalid_shredded_types_rejected() {
1421        let input = VariantArray::from_iter([Variant::from(42)]);
1422
1423        let invalid_types = vec![
1424            DataType::UInt8,
1425            DataType::Float16,
1426            DataType::Decimal256(38, 10),
1427            DataType::Date64,
1428            DataType::Time32(TimeUnit::Second),
1429            DataType::Time64(TimeUnit::Nanosecond),
1430            DataType::Timestamp(TimeUnit::Millisecond, None),
1431            DataType::FixedSizeBinary(17),
1432            DataType::Union(
1433                UnionFields::from_fields(vec![
1434                    Field::new("int_field", DataType::Int32, false),
1435                    Field::new("str_field", DataType::Utf8, true),
1436                ]),
1437                UnionMode::Dense,
1438            ),
1439            DataType::Map(
1440                Arc::new(Field::new(
1441                    "entries",
1442                    DataType::Struct(Fields::from(vec![
1443                        Field::new("key", DataType::Utf8, false),
1444                        Field::new("value", DataType::Int32, true),
1445                    ])),
1446                    false,
1447                )),
1448                false,
1449            ),
1450            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1451            DataType::RunEndEncoded(
1452                Arc::new(Field::new("run_ends", DataType::Int32, false)),
1453                Arc::new(Field::new("values", DataType::Utf8, true)),
1454            ),
1455        ];
1456
1457        for data_type in invalid_types {
1458            let err = shred_variant(&input, &data_type).unwrap_err();
1459            assert!(
1460                matches!(err, ArrowError::InvalidArgumentError(_)),
1461                "expected InvalidArgumentError for {:?}, got {:?}",
1462                data_type,
1463                err
1464            );
1465        }
1466    }
1467
1468    #[test]
1469    fn test_array_shredding_as_list() {
1470        let input = build_variant_array(vec![
1471            // Row 0: List of ints should shred entirely into typed_value
1472            VariantRow::List(vec![
1473                VariantValue::from(1i64),
1474                VariantValue::from(2i64),
1475                VariantValue::from(3i64),
1476            ]),
1477            // Row 1: Contains incompatible types so values fall back
1478            VariantRow::List(vec![
1479                VariantValue::from(1i64),
1480                VariantValue::from("two"),
1481                VariantValue::from(Variant::Null),
1482            ]),
1483            // Row 2: Not a list -> entire row falls back
1484            VariantRow::Value(VariantValue::from("not a list")),
1485            // Row 3: Array-level null propagates
1486            VariantRow::Null,
1487            // Row 4: Empty list exercises zero-length offsets
1488            VariantRow::List(vec![]),
1489        ]);
1490        let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1491        let result = shred_variant(&input, &list_schema).unwrap();
1492        assert_eq!(result.len(), 5);
1493
1494        assert_list_structure_and_elements::<Int64Type, i32>(
1495            &result,
1496            5,
1497            &[0, 3, 6, 6, 6, 6],
1498            &[Some(3), Some(3), None, None, Some(0)],
1499            &[None, None, Some(Variant::from("not a list")), None, None],
1500            (
1501                &[Some(1), Some(2), Some(3), Some(1), None, None],
1502                &[
1503                    None,
1504                    None,
1505                    None,
1506                    None,
1507                    Some(Variant::from("two")),
1508                    Some(Variant::Null),
1509                ],
1510            ),
1511        );
1512    }
1513
1514    #[test]
1515    fn test_array_shredding_as_large_list() {
1516        let input = build_variant_array(vec![
1517            // Row 0: List of ints shreds to typed_value
1518            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1519            // Row 1: Not a list -> entire row falls back
1520            VariantRow::Value(VariantValue::from("not a list")),
1521            // Row 2: Empty list
1522            VariantRow::List(vec![]),
1523        ]);
1524        let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true)));
1525        let result = shred_variant(&input, &list_schema).unwrap();
1526        assert_eq!(result.len(), 3);
1527
1528        assert_list_structure_and_elements::<Int64Type, i64>(
1529            &result,
1530            3,
1531            &[0, 2, 2, 2],
1532            &[Some(2), None, Some(0)],
1533            &[None, Some(Variant::from("not a list")), None],
1534            (&[Some(1), Some(2)], &[None, None]),
1535        );
1536    }
1537
1538    #[test]
1539    fn test_array_shredding_as_list_view() {
1540        let input = build_variant_array(vec![
1541            // Row 0: Standard list
1542            VariantRow::List(vec![
1543                VariantValue::from(1i64),
1544                VariantValue::from(2i64),
1545                VariantValue::from(3i64),
1546            ]),
1547            // Row 1: List with incompatible types -> element fallback
1548            VariantRow::List(vec![
1549                VariantValue::from(1i64),
1550                VariantValue::from("two"),
1551                VariantValue::from(Variant::Null),
1552            ]),
1553            // Row 2: Not a list -> top-level fallback
1554            VariantRow::Value(VariantValue::from("not a list")),
1555            // Row 3: Top-level Null
1556            VariantRow::Null,
1557            // Row 4: Empty list
1558            VariantRow::List(vec![]),
1559        ]);
1560        let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Int64, true)));
1561        let result = shred_variant(&input, &list_schema).unwrap();
1562        assert_eq!(result.len(), 5);
1563
1564        assert_list_structure_and_elements::<Int64Type, i32>(
1565            &result,
1566            5,
1567            &[0, 3, 6, 6, 6],
1568            &[Some(3), Some(3), None, None, Some(0)],
1569            &[None, None, Some(Variant::from("not a list")), None, None],
1570            (
1571                &[Some(1), Some(2), Some(3), Some(1), None, None],
1572                &[
1573                    None,
1574                    None,
1575                    None,
1576                    None,
1577                    Some(Variant::from("two")),
1578                    Some(Variant::Null),
1579                ],
1580            ),
1581        );
1582    }
1583
1584    #[test]
1585    fn test_array_shredding_as_large_list_view() {
1586        let input = build_variant_array(vec![
1587            // Row 0: List of ints shreds to typed_value
1588            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1589            // Row 1: Not a list -> entire row falls back
1590            VariantRow::Value(VariantValue::from("fallback")),
1591            // Row 2: Empty list
1592            VariantRow::List(vec![]),
1593        ]);
1594        let list_schema =
1595            DataType::LargeListView(Arc::new(Field::new("item", DataType::Int64, true)));
1596        let result = shred_variant(&input, &list_schema).unwrap();
1597        assert_eq!(result.len(), 3);
1598
1599        assert_list_structure_and_elements::<Int64Type, i64>(
1600            &result,
1601            3,
1602            &[0, 2, 2],
1603            &[Some(2), None, Some(0)],
1604            &[None, Some(Variant::from("fallback")), None],
1605            (&[Some(1), Some(2)], &[None, None]),
1606        );
1607    }
1608
1609    #[test]
1610    fn test_array_shredding_as_fixed_size_list() {
1611        let input = build_variant_array(vec![VariantRow::List(vec![
1612            VariantValue::from(1i64),
1613            VariantValue::from(2i64),
1614            VariantValue::from(3i64),
1615        ])]);
1616        let list_schema =
1617            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
1618        let err = shred_variant(&input, &list_schema).unwrap_err();
1619        assert_eq!(
1620            err.to_string(),
1621            "Not yet implemented: Converting unshredded variant arrays to arrow fixed-size lists"
1622        );
1623    }
1624
1625    #[test]
1626    fn test_array_shredding_with_array_elements() {
1627        let input = build_variant_array(vec![
1628            // Row 0: [[1, 2], [3, 4], []] - clean nested lists
1629            VariantRow::List(vec![
1630                VariantValue::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1631                VariantValue::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]),
1632                VariantValue::List(vec![]),
1633            ]),
1634            // Row 1: [[5, "bad", null], "not a list inner", null] - inner fallbacks
1635            VariantRow::List(vec![
1636                VariantValue::List(vec![
1637                    VariantValue::from(5i64),
1638                    VariantValue::from("bad"),
1639                    VariantValue::from(Variant::Null),
1640                ]),
1641                VariantValue::from("not a list inner"),
1642                VariantValue::Null,
1643            ]),
1644            // Row 2: "not a list" - top-level fallback
1645            VariantRow::Value(VariantValue::from("not a list")),
1646            // Row 3: null row
1647            VariantRow::Null,
1648        ]);
1649        let inner_field = Arc::new(Field::new("item", DataType::Int64, true));
1650        let inner_list_schema = DataType::List(inner_field);
1651        let list_schema = DataType::List(Arc::new(Field::new(
1652            "item",
1653            inner_list_schema.clone(),
1654            true,
1655        )));
1656        let result = shred_variant(&input, &list_schema).unwrap();
1657        assert_eq!(result.len(), 4);
1658
1659        let typed_value = result
1660            .typed_value_field()
1661            .unwrap()
1662            .as_any()
1663            .downcast_ref::<ListArray>()
1664            .unwrap();
1665
1666        assert_list_structure::<i32>(
1667            &result,
1668            4,
1669            &[0, 3, 6, 6, 6],
1670            &[Some(3), Some(3), None, None],
1671            &[None, None, Some(Variant::from("not a list")), None],
1672        );
1673
1674        let outer_elements =
1675            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1676        assert_eq!(outer_elements.len(), 6);
1677        let outer_values = outer_elements
1678            .typed_value_field()
1679            .unwrap()
1680            .as_any()
1681            .downcast_ref::<ListArray>()
1682            .unwrap();
1683        let outer_fallbacks = outer_elements.value_field().unwrap();
1684
1685        let outer_metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n(
1686            EMPTY_VARIANT_METADATA_BYTES,
1687            outer_elements.len(),
1688        ));
1689        let outer_variant = VariantArray::from_parts(
1690            outer_metadata,
1691            Some(outer_fallbacks.clone()),
1692            Some(Arc::new(outer_values.clone())),
1693            None,
1694        );
1695
1696        assert_list_structure_and_elements::<Int64Type, i32>(
1697            &outer_variant,
1698            outer_elements.len(),
1699            &[0, 2, 4, 4, 7, 7, 7],
1700            &[Some(2), Some(2), Some(0), Some(3), None, None],
1701            &[
1702                None,
1703                None,
1704                None,
1705                None,
1706                Some(Variant::from("not a list inner")),
1707                Some(Variant::Null),
1708            ],
1709            (
1710                &[Some(1), Some(2), Some(3), Some(4), Some(5), None, None],
1711                &[
1712                    None,
1713                    None,
1714                    None,
1715                    None,
1716                    None,
1717                    Some(Variant::from("bad")),
1718                    Some(Variant::Null),
1719                ],
1720            ),
1721        );
1722    }
1723
1724    #[test]
1725    fn test_array_shredding_with_object_elements() {
1726        let input = build_variant_array(vec![
1727            // Row 0: [{"id": 1, "name": "Alice"}, {"id": null}] fully shards
1728            VariantRow::List(vec![
1729                VariantValue::Object(vec![
1730                    ("id", VariantValue::from(1i64)),
1731                    ("name", VariantValue::from("Alice")),
1732                ]),
1733                VariantValue::Object(vec![("id", VariantValue::from(Variant::Null))]),
1734            ]),
1735            // Row 1: "not a list" -> fallback
1736            VariantRow::Value(VariantValue::from("not a list")),
1737            // Row 2: Null row
1738            VariantRow::Null,
1739        ]);
1740
1741        // Target schema is List<Struct<id:int64,name:utf8>>
1742        let object_fields = Fields::from(vec![
1743            Field::new("id", DataType::Int64, true),
1744            Field::new("name", DataType::Utf8, true),
1745        ]);
1746        let list_schema = DataType::List(Arc::new(Field::new(
1747            "item",
1748            DataType::Struct(object_fields),
1749            true,
1750        )));
1751        let result = shred_variant(&input, &list_schema).unwrap();
1752        assert_eq!(result.len(), 3);
1753
1754        assert_list_structure::<i32>(
1755            &result,
1756            3,
1757            &[0, 2, 2, 2],
1758            &[Some(2), None, None],
1759            &[None, Some(Variant::from("not a list")), None],
1760        );
1761
1762        // Validate nested struct fields for each element
1763        let typed_value = result
1764            .typed_value_field()
1765            .unwrap()
1766            .as_any()
1767            .downcast_ref::<ListArray>()
1768            .unwrap();
1769        let element_array =
1770            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1771        assert_eq!(element_array.len(), 2);
1772        let element_objects = element_array
1773            .typed_value_field()
1774            .unwrap()
1775            .as_any()
1776            .downcast_ref::<arrow::array::StructArray>()
1777            .unwrap();
1778
1779        // Id field [1, Variant::Null]
1780        let id_field =
1781            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("id").unwrap())
1782                .unwrap();
1783        let id_values = id_field.value_field().unwrap();
1784        let id_typed_values = id_field
1785            .typed_value_field()
1786            .unwrap()
1787            .as_any()
1788            .downcast_ref::<Int64Array>()
1789            .unwrap();
1790        assert!(id_values.is_null(0));
1791        assert_eq!(id_typed_values.value(0), 1);
1792        // null is stored as Variant::Null in values
1793        assert!(id_values.is_valid(1));
1794        assert_eq!(
1795            Variant::new(EMPTY_VARIANT_METADATA_BYTES, id_values.value(1)),
1796            Variant::Null
1797        );
1798        assert!(id_typed_values.is_null(1));
1799
1800        // Name field ["Alice", null]
1801        let name_field =
1802            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("name").unwrap())
1803                .unwrap();
1804        let name_values = name_field.value_field().unwrap();
1805        let name_typed_values = name_field
1806            .typed_value_field()
1807            .unwrap()
1808            .as_any()
1809            .downcast_ref::<StringArray>()
1810            .unwrap();
1811        assert!(name_values.is_null(0));
1812        assert_eq!(name_typed_values.value(0), "Alice");
1813        // No value provided, both value and typed_value are null
1814        assert!(name_values.is_null(1));
1815        assert!(name_typed_values.is_null(1));
1816    }
1817
1818    #[test]
1819    fn test_object_shredding_comprehensive() -> Result<()> {
1820        let input = build_variant_array(vec![
1821            // Row 0: Fully shredded object
1822            VariantRow::Object(vec![
1823                ("score", VariantValue::from(95.5f64)),
1824                ("age", VariantValue::from(30i64)),
1825            ]),
1826            // Row 1: Partially shredded object (extra email field)
1827            VariantRow::Object(vec![
1828                ("score", VariantValue::from(87.2f64)),
1829                ("age", VariantValue::from(25i64)),
1830                ("email", VariantValue::from("bob@example.com")),
1831            ]),
1832            // Row 2: Missing field (no score)
1833            VariantRow::Object(vec![("age", VariantValue::from(35i64))]),
1834            // Row 3: Type mismatch (score is string, age is string)
1835            VariantRow::Object(vec![
1836                ("score", VariantValue::from("ninety-five")),
1837                ("age", VariantValue::from("thirty")),
1838            ]),
1839            // Row 4: Non-object
1840            VariantRow::Value(VariantValue::from("not an object")),
1841            // Row 5: Empty object
1842            VariantRow::Object(vec![]),
1843            // Row 6: Null
1844            VariantRow::Null,
1845            // Row 7: Object with only "wrong" fields
1846            VariantRow::Object(vec![("foo", VariantValue::from(10))]),
1847            // Row 8: Object with one "right" and one "wrong" field
1848            VariantRow::Object(vec![
1849                ("score", VariantValue::from(66.67f64)),
1850                ("foo", VariantValue::from(10)),
1851            ]),
1852        ]);
1853
1854        // Create target schema: struct<score: float64, age: int64>
1855        // Both types are supported for shredding
1856        let target_schema = ShreddedSchemaBuilder::default()
1857            .with_path("score", &DataType::Float64)?
1858            .with_path("age", &DataType::Int64)?
1859            .build();
1860
1861        let result = shred_variant(&input, &target_schema).unwrap();
1862
1863        // Verify structure
1864        assert!(result.value_field().is_some());
1865        assert!(result.typed_value_field().is_some());
1866        assert_eq!(result.len(), 9);
1867
1868        let metadata = result.metadata_field();
1869
1870        let value = result.value_field().unwrap();
1871        let typed_value = result
1872            .typed_value_field()
1873            .unwrap()
1874            .as_any()
1875            .downcast_ref::<arrow::array::StructArray>()
1876            .unwrap();
1877
1878        // Extract score and age fields from typed_value struct
1879        let score_field =
1880            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("score").unwrap())
1881                .unwrap();
1882        let age_field =
1883            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("age").unwrap()).unwrap();
1884
1885        let score_value = score_field
1886            .value_field()
1887            .unwrap()
1888            .as_any()
1889            .downcast_ref::<BinaryViewArray>()
1890            .unwrap();
1891        let score_typed_value = score_field
1892            .typed_value_field()
1893            .unwrap()
1894            .as_any()
1895            .downcast_ref::<Float64Array>()
1896            .unwrap();
1897        let age_value = age_field
1898            .value_field()
1899            .unwrap()
1900            .as_any()
1901            .downcast_ref::<BinaryViewArray>()
1902            .unwrap();
1903        let age_typed_value = age_field
1904            .typed_value_field()
1905            .unwrap()
1906            .as_any()
1907            .downcast_ref::<Int64Array>()
1908            .unwrap();
1909
1910        // Set up exhaustive checking of all shredded columns and their nulls/values
1911        struct ShreddedValue<'m, 'v, T> {
1912            value: Option<Variant<'m, 'v>>,
1913            typed_value: Option<T>,
1914        }
1915        struct ShreddedStruct<'m, 'v> {
1916            score: ShreddedValue<'m, 'v, f64>,
1917            age: ShreddedValue<'m, 'v, i64>,
1918        }
1919        fn get_value<'m, 'v>(
1920            i: usize,
1921            metadata: &'m BinaryViewArray,
1922            value: &'v BinaryViewArray,
1923        ) -> Variant<'m, 'v> {
1924            Variant::new(metadata.value(i), value.value(i))
1925        }
1926        let expect = |i, expected_result: Option<ShreddedValue<ShreddedStruct>>| {
1927            match expected_result {
1928                Some(ShreddedValue {
1929                    value: expected_value,
1930                    typed_value: expected_typed_value,
1931                }) => {
1932                    assert!(result.is_valid(i));
1933                    match expected_value {
1934                        Some(expected_value) => {
1935                            assert!(value.is_valid(i));
1936                            assert_eq!(expected_value, get_value(i, metadata, value));
1937                        }
1938                        None => {
1939                            assert!(value.is_null(i));
1940                        }
1941                    }
1942                    match expected_typed_value {
1943                        Some(ShreddedStruct {
1944                            score: expected_score,
1945                            age: expected_age,
1946                        }) => {
1947                            assert!(typed_value.is_valid(i));
1948                            assert!(score_field.is_valid(i)); // non-nullable
1949                            assert!(age_field.is_valid(i)); // non-nullable
1950                            match expected_score.value {
1951                                Some(expected_score_value) => {
1952                                    assert!(score_value.is_valid(i));
1953                                    assert_eq!(
1954                                        expected_score_value,
1955                                        get_value(i, metadata, score_value)
1956                                    );
1957                                }
1958                                None => {
1959                                    assert!(score_value.is_null(i));
1960                                }
1961                            }
1962                            match expected_score.typed_value {
1963                                Some(expected_score) => {
1964                                    assert!(score_typed_value.is_valid(i));
1965                                    assert_eq!(expected_score, score_typed_value.value(i));
1966                                }
1967                                None => {
1968                                    assert!(score_typed_value.is_null(i));
1969                                }
1970                            }
1971                            match expected_age.value {
1972                                Some(expected_age_value) => {
1973                                    assert!(age_value.is_valid(i));
1974                                    assert_eq!(
1975                                        expected_age_value,
1976                                        get_value(i, metadata, age_value)
1977                                    );
1978                                }
1979                                None => {
1980                                    assert!(age_value.is_null(i));
1981                                }
1982                            }
1983                            match expected_age.typed_value {
1984                                Some(expected_age) => {
1985                                    assert!(age_typed_value.is_valid(i));
1986                                    assert_eq!(expected_age, age_typed_value.value(i));
1987                                }
1988                                None => {
1989                                    assert!(age_typed_value.is_null(i));
1990                                }
1991                            }
1992                        }
1993                        None => {
1994                            assert!(typed_value.is_null(i));
1995                        }
1996                    }
1997                }
1998                None => {
1999                    assert!(result.is_null(i));
2000                }
2001            };
2002        };
2003
2004        // Row 0: Fully shredded - both fields shred successfully
2005        expect(
2006            0,
2007            Some(ShreddedValue {
2008                value: None,
2009                typed_value: Some(ShreddedStruct {
2010                    score: ShreddedValue {
2011                        value: None,
2012                        typed_value: Some(95.5),
2013                    },
2014                    age: ShreddedValue {
2015                        value: None,
2016                        typed_value: Some(30),
2017                    },
2018                }),
2019            }),
2020        );
2021
2022        // Row 1: Partially shredded - value contains extra email field
2023        let mut builder = VariantBuilder::new();
2024        builder
2025            .new_object()
2026            .with_field("email", "bob@example.com")
2027            .finish();
2028        let (m, v) = builder.finish();
2029        let expected_value = Variant::new(&m, &v);
2030
2031        expect(
2032            1,
2033            Some(ShreddedValue {
2034                value: Some(expected_value),
2035                typed_value: Some(ShreddedStruct {
2036                    score: ShreddedValue {
2037                        value: None,
2038                        typed_value: Some(87.2),
2039                    },
2040                    age: ShreddedValue {
2041                        value: None,
2042                        typed_value: Some(25),
2043                    },
2044                }),
2045            }),
2046        );
2047
2048        // Row 2: Fully shredded -- missing score field
2049        expect(
2050            2,
2051            Some(ShreddedValue {
2052                value: None,
2053                typed_value: Some(ShreddedStruct {
2054                    score: ShreddedValue {
2055                        value: None,
2056                        typed_value: None,
2057                    },
2058                    age: ShreddedValue {
2059                        value: None,
2060                        typed_value: Some(35),
2061                    },
2062                }),
2063            }),
2064        );
2065
2066        // Row 3: Type mismatches - both score and age are strings
2067        expect(
2068            3,
2069            Some(ShreddedValue {
2070                value: None,
2071                typed_value: Some(ShreddedStruct {
2072                    score: ShreddedValue {
2073                        value: Some(Variant::from("ninety-five")),
2074                        typed_value: None,
2075                    },
2076                    age: ShreddedValue {
2077                        value: Some(Variant::from("thirty")),
2078                        typed_value: None,
2079                    },
2080                }),
2081            }),
2082        );
2083
2084        // Row 4: Non-object - falls back to value field
2085        expect(
2086            4,
2087            Some(ShreddedValue {
2088                value: Some(Variant::from("not an object")),
2089                typed_value: None,
2090            }),
2091        );
2092
2093        // Row 5: Empty object
2094        expect(
2095            5,
2096            Some(ShreddedValue {
2097                value: None,
2098                typed_value: Some(ShreddedStruct {
2099                    score: ShreddedValue {
2100                        value: None,
2101                        typed_value: None,
2102                    },
2103                    age: ShreddedValue {
2104                        value: None,
2105                        typed_value: None,
2106                    },
2107                }),
2108            }),
2109        );
2110
2111        // Row 6: Null
2112        expect(6, None);
2113
2114        // Helper to correctly create a variant object using a row's existing metadata
2115        let object_with_foo_field = |i| {
2116            use parquet_variant::{ParentState, ValueBuilder, VariantMetadata};
2117            let metadata = VariantMetadata::new(metadata.value(i));
2118            let mut metadata_builder = ReadOnlyMetadataBuilder::new(&metadata);
2119            let mut value_builder = ValueBuilder::new();
2120            let state = ParentState::variant(&mut value_builder, &mut metadata_builder);
2121            ObjectBuilder::new(state, false)
2122                .with_field("foo", 10)
2123                .finish();
2124            (metadata, value_builder.into_inner())
2125        };
2126
2127        // Row 7: Object with only a "wrong" field
2128        let (m, v) = object_with_foo_field(7);
2129        expect(
2130            7,
2131            Some(ShreddedValue {
2132                value: Some(Variant::new_with_metadata(m, &v)),
2133                typed_value: Some(ShreddedStruct {
2134                    score: ShreddedValue {
2135                        value: None,
2136                        typed_value: None,
2137                    },
2138                    age: ShreddedValue {
2139                        value: None,
2140                        typed_value: None,
2141                    },
2142                }),
2143            }),
2144        );
2145
2146        // Row 8: Object with one "wrong" and one "right" field
2147        let (m, v) = object_with_foo_field(8);
2148        expect(
2149            8,
2150            Some(ShreddedValue {
2151                value: Some(Variant::new_with_metadata(m, &v)),
2152                typed_value: Some(ShreddedStruct {
2153                    score: ShreddedValue {
2154                        value: None,
2155                        typed_value: Some(66.67),
2156                    },
2157                    age: ShreddedValue {
2158                        value: None,
2159                        typed_value: None,
2160                    },
2161                }),
2162            }),
2163        );
2164        Ok(())
2165    }
2166
2167    #[test]
2168    fn test_object_shredding_with_array_field() {
2169        let input = build_variant_array(vec![
2170            // Row 0: Object with well-typed scores list
2171            VariantRow::Object(vec![(
2172                "scores",
2173                VariantValue::List(vec![VariantValue::from(10i64), VariantValue::from(20i64)]),
2174            )]),
2175            // Row 1: Object whose scores list contains incompatible type
2176            VariantRow::Object(vec![(
2177                "scores",
2178                VariantValue::List(vec![
2179                    VariantValue::from("oops"),
2180                    VariantValue::from(Variant::Null),
2181                ]),
2182            )]),
2183            // Row 2: Object missing the scores field entirely
2184            VariantRow::Object(vec![]),
2185            // Row 3: Non-object fallback
2186            VariantRow::Value(VariantValue::from("not an object")),
2187            // Row 4: Top-level Null
2188            VariantRow::Null,
2189        ]);
2190        let list_field = Arc::new(Field::new("item", DataType::Int64, true));
2191        let inner_list_schema = DataType::List(list_field);
2192        let schema = DataType::Struct(Fields::from(vec![Field::new(
2193            "scores",
2194            inner_list_schema.clone(),
2195            true,
2196        )]));
2197
2198        let result = shred_variant(&input, &schema).unwrap();
2199        assert_eq!(result.len(), 5);
2200
2201        // Access base value/typed_value columns
2202        let value_field = result.value_field().unwrap();
2203        let typed_struct = result
2204            .typed_value_field()
2205            .unwrap()
2206            .as_any()
2207            .downcast_ref::<arrow::array::StructArray>()
2208            .unwrap();
2209
2210        // Validate base value fallbacks for non-object rows
2211        assert!(value_field.is_null(0));
2212        assert!(value_field.is_null(1));
2213        assert!(value_field.is_null(2));
2214        assert!(value_field.is_valid(3));
2215        assert_eq!(
2216            Variant::new(result.metadata_field().value(3), value_field.value(3)),
2217            Variant::from("not an object")
2218        );
2219        assert!(value_field.is_null(4));
2220
2221        // Typed struct should only be null for the fallback row
2222        assert!(typed_struct.is_valid(0));
2223        assert!(typed_struct.is_valid(1));
2224        assert!(typed_struct.is_valid(2));
2225        assert!(typed_struct.is_null(3));
2226        assert!(typed_struct.is_null(4));
2227
2228        // Drill into the scores field on the typed struct
2229        let scores_field =
2230            ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("scores").unwrap())
2231                .unwrap();
2232        assert_list_structure_and_elements::<Int64Type, i32>(
2233            &VariantArray::from_parts(
2234                BinaryViewArray::from_iter_values(std::iter::repeat_n(
2235                    EMPTY_VARIANT_METADATA_BYTES,
2236                    scores_field.len(),
2237                )),
2238                Some(scores_field.value_field().unwrap().clone()),
2239                Some(scores_field.typed_value_field().unwrap().clone()),
2240                None,
2241            ),
2242            scores_field.len(),
2243            &[0i32, 2, 4, 4, 4, 4],
2244            &[Some(2), Some(2), None, None, None],
2245            &[None, None, None, None, None],
2246            (
2247                &[Some(10), Some(20), None, None],
2248                &[None, None, Some(Variant::from("oops")), Some(Variant::Null)],
2249            ),
2250        );
2251    }
2252
2253    #[test]
2254    fn test_object_different_schemas() -> Result<()> {
2255        // Create object with multiple fields
2256        let input = build_variant_array(vec![VariantRow::Object(vec![
2257            ("id", VariantValue::from(123i32)),
2258            ("age", VariantValue::from(25i64)),
2259            ("score", VariantValue::from(95.5f64)),
2260        ])]);
2261
2262        // Test with schema containing only id field
2263        let schema1 = ShreddedSchemaBuilder::default()
2264            .with_path("id", &DataType::Int32)?
2265            .build();
2266        let result1 = shred_variant(&input, &schema1).unwrap();
2267        let value_field1 = result1.value_field().unwrap();
2268        assert!(!value_field1.is_null(0)); // should contain {"age": 25, "score": 95.5}
2269
2270        // Test with schema containing id and age fields
2271        let schema2 = ShreddedSchemaBuilder::default()
2272            .with_path("id", &DataType::Int32)?
2273            .with_path("age", &DataType::Int64)?
2274            .build();
2275        let result2 = shred_variant(&input, &schema2).unwrap();
2276        let value_field2 = result2.value_field().unwrap();
2277        assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
2278
2279        // Test with schema containing all fields
2280        let schema3 = ShreddedSchemaBuilder::default()
2281            .with_path("id", &DataType::Int32)?
2282            .with_path("age", &DataType::Int64)?
2283            .with_path("score", &DataType::Float64)?
2284            .build();
2285        let result3 = shred_variant(&input, &schema3).unwrap();
2286        let value_field3 = result3.value_field().unwrap();
2287        assert!(value_field3.is_null(0)); // fully shredded, no remaining fields
2288
2289        Ok(())
2290    }
2291
2292    #[test]
2293    fn test_uuid_shredding_in_objects() -> Result<()> {
2294        let mock_uuid_1 = Uuid::new_v4();
2295        let mock_uuid_2 = Uuid::new_v4();
2296        let mock_uuid_3 = Uuid::new_v4();
2297
2298        let input = build_variant_array(vec![
2299            // Row 0: Fully shredded object with both UUID fields
2300            VariantRow::Object(vec![
2301                ("id", VariantValue::from(mock_uuid_1)),
2302                ("session_id", VariantValue::from(mock_uuid_2)),
2303            ]),
2304            // Row 1: Partially shredded object - UUID fields plus extra field
2305            VariantRow::Object(vec![
2306                ("id", VariantValue::from(mock_uuid_2)),
2307                ("session_id", VariantValue::from(mock_uuid_3)),
2308                ("name", VariantValue::from("test_user")),
2309            ]),
2310            // Row 2: Missing UUID field (no session_id)
2311            VariantRow::Object(vec![("id", VariantValue::from(mock_uuid_1))]),
2312            // Row 3: Type mismatch - id is UUID but session_id is a string
2313            VariantRow::Object(vec![
2314                ("id", VariantValue::from(mock_uuid_3)),
2315                ("session_id", VariantValue::from("not-a-uuid")),
2316            ]),
2317            // Row 4: Object with non-UUID value in id field
2318            VariantRow::Object(vec![
2319                ("id", VariantValue::from(12345i64)),
2320                ("session_id", VariantValue::from(mock_uuid_1)),
2321            ]),
2322            // Row 5: Null
2323            VariantRow::Null,
2324        ]);
2325
2326        let target_schema = ShreddedSchemaBuilder::default()
2327            .with_path("id", DataType::FixedSizeBinary(16))?
2328            .with_path("session_id", DataType::FixedSizeBinary(16))?
2329            .build();
2330
2331        let result = shred_variant(&input, &target_schema).unwrap();
2332
2333        assert!(result.value_field().is_some());
2334        assert!(result.typed_value_field().is_some());
2335        assert_eq!(result.len(), 6);
2336
2337        let metadata = result.metadata_field();
2338        let value = result.value_field().unwrap();
2339        let typed_value = result
2340            .typed_value_field()
2341            .unwrap()
2342            .as_any()
2343            .downcast_ref::<arrow::array::StructArray>()
2344            .unwrap();
2345
2346        // Extract id and session_id fields from typed_value struct
2347        let id_field =
2348            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("id").unwrap()).unwrap();
2349        let session_id_field =
2350            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("session_id").unwrap())
2351                .unwrap();
2352
2353        let id_value = id_field
2354            .value_field()
2355            .unwrap()
2356            .as_any()
2357            .downcast_ref::<BinaryViewArray>()
2358            .unwrap();
2359        let id_typed_value = id_field
2360            .typed_value_field()
2361            .unwrap()
2362            .as_any()
2363            .downcast_ref::<FixedSizeBinaryArray>()
2364            .unwrap();
2365        let session_id_value = session_id_field
2366            .value_field()
2367            .unwrap()
2368            .as_any()
2369            .downcast_ref::<BinaryViewArray>()
2370            .unwrap();
2371        let session_id_typed_value = session_id_field
2372            .typed_value_field()
2373            .unwrap()
2374            .as_any()
2375            .downcast_ref::<FixedSizeBinaryArray>()
2376            .unwrap();
2377
2378        // Row 0: Fully shredded - both UUID fields shred successfully
2379        assert!(result.is_valid(0));
2380
2381        assert!(value.is_null(0)); // fully shredded, no remaining fields
2382        assert!(id_value.is_null(0));
2383        assert!(session_id_value.is_null(0));
2384
2385        assert!(typed_value.is_valid(0));
2386        assert!(id_typed_value.is_valid(0));
2387        assert!(session_id_typed_value.is_valid(0));
2388
2389        assert_eq!(id_typed_value.value(0), mock_uuid_1.as_bytes());
2390        assert_eq!(session_id_typed_value.value(0), mock_uuid_2.as_bytes());
2391
2392        // Row 1: Partially shredded - value contains extra name field
2393        assert!(result.is_valid(1));
2394
2395        assert!(value.is_valid(1)); // contains unshredded "name" field
2396        assert!(typed_value.is_valid(1));
2397
2398        assert!(id_value.is_null(1));
2399        assert!(id_typed_value.is_valid(1));
2400        assert_eq!(id_typed_value.value(1), mock_uuid_2.as_bytes());
2401
2402        assert!(session_id_value.is_null(1));
2403        assert!(session_id_typed_value.is_valid(1));
2404        assert_eq!(session_id_typed_value.value(1), mock_uuid_3.as_bytes());
2405
2406        // Verify the value field contains the name field
2407        let row_1_variant = Variant::new(metadata.value(1), value.value(1));
2408        let Variant::Object(obj) = row_1_variant else {
2409            panic!("Expected object");
2410        };
2411
2412        assert_eq!(obj.get("name"), Some(Variant::from("test_user")));
2413
2414        // Row 2: Missing session_id field
2415        assert!(result.is_valid(2));
2416
2417        assert!(value.is_null(2)); // fully shredded, no extra fields
2418        assert!(typed_value.is_valid(2));
2419
2420        assert!(id_value.is_null(2));
2421        assert!(id_typed_value.is_valid(2));
2422        assert_eq!(id_typed_value.value(2), mock_uuid_1.as_bytes());
2423
2424        assert!(session_id_value.is_null(2));
2425        assert!(session_id_typed_value.is_null(2)); // missing field
2426
2427        // Row 3: Type mismatch - session_id is a string, not UUID
2428        assert!(result.is_valid(3));
2429
2430        assert!(value.is_null(3)); // no extra fields
2431        assert!(typed_value.is_valid(3));
2432
2433        assert!(id_value.is_null(3));
2434        assert!(id_typed_value.is_valid(3));
2435        assert_eq!(id_typed_value.value(3), mock_uuid_3.as_bytes());
2436
2437        assert!(session_id_value.is_valid(3)); // type mismatch, stored in value
2438        assert!(session_id_typed_value.is_null(3));
2439        let session_id_variant = Variant::new(metadata.value(3), session_id_value.value(3));
2440        assert_eq!(session_id_variant, Variant::from("not-a-uuid"));
2441
2442        // Row 4: Type mismatch - id is int64, not UUID
2443        assert!(result.is_valid(4));
2444
2445        assert!(value.is_null(4)); // no extra fields
2446        assert!(typed_value.is_valid(4));
2447
2448        assert!(id_value.is_valid(4)); // type mismatch, stored in value
2449        assert!(id_typed_value.is_null(4));
2450        let id_variant = Variant::new(metadata.value(4), id_value.value(4));
2451        assert_eq!(id_variant, Variant::from(12345i64));
2452
2453        assert!(session_id_value.is_null(4));
2454        assert!(session_id_typed_value.is_valid(4));
2455        assert_eq!(session_id_typed_value.value(4), mock_uuid_1.as_bytes());
2456
2457        // Row 5: Null
2458        assert!(result.is_null(5));
2459
2460        Ok(())
2461    }
2462
2463    #[test]
2464    fn test_spec_compliance() {
2465        let input = VariantArray::from_iter(vec![Variant::from(42i64), Variant::from("hello")]);
2466
2467        let result = shred_variant(&input, &DataType::Int64).unwrap();
2468
2469        // Test field access by name (not position)
2470        let inner_struct = result.inner();
2471        assert!(inner_struct.column_by_name("metadata").is_some());
2472        assert!(inner_struct.column_by_name("value").is_some());
2473        assert!(inner_struct.column_by_name("typed_value").is_some());
2474
2475        // Test metadata preservation
2476        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2477        // The metadata should be the same reference (cheap clone)
2478        // Note: BinaryViewArray doesn't have a .values() method, so we compare the arrays directly
2479        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2480
2481        // Test output structure correctness
2482        assert_eq!(result.len(), input.len());
2483        assert!(result.value_field().is_some());
2484        assert!(result.typed_value_field().is_some());
2485
2486        // For primitive shredding, verify that value and typed_value are never both non-null
2487        // (This rule applies to primitives; for objects, both can be non-null for partial shredding)
2488        let value_field = result.value_field().unwrap();
2489        let typed_value_field = result
2490            .typed_value_field()
2491            .unwrap()
2492            .as_any()
2493            .downcast_ref::<Int64Array>()
2494            .unwrap();
2495
2496        for i in 0..result.len() {
2497            if !result.is_null(i) {
2498                let value_is_null = value_field.is_null(i);
2499                let typed_value_is_null = typed_value_field.is_null(i);
2500                // For primitive shredding, at least one should be null
2501                assert!(
2502                    value_is_null || typed_value_is_null,
2503                    "Row {}: both value and typed_value are non-null for primitive shredding",
2504                    i
2505                );
2506            }
2507        }
2508    }
2509
2510    #[test]
2511    fn test_variant_schema_builder_simple() -> Result<()> {
2512        let shredding_type = ShreddedSchemaBuilder::default()
2513            .with_path("a", &DataType::Int64)?
2514            .with_path("b", &DataType::Float64)?
2515            .build();
2516
2517        assert_eq!(
2518            shredding_type,
2519            DataType::Struct(Fields::from(vec![
2520                Field::new("a", DataType::Int64, true),
2521                Field::new("b", DataType::Float64, true),
2522            ]))
2523        );
2524
2525        Ok(())
2526    }
2527
2528    #[test]
2529    fn test_variant_schema_builder_nested() -> Result<()> {
2530        let shredding_type = ShreddedSchemaBuilder::default()
2531            .with_path("a", &DataType::Int64)?
2532            .with_path("b.c", &DataType::Utf8)?
2533            .with_path("b.d", &DataType::Float64)?
2534            .build();
2535
2536        assert_eq!(
2537            shredding_type,
2538            DataType::Struct(Fields::from(vec![
2539                Field::new("a", DataType::Int64, true),
2540                Field::new(
2541                    "b",
2542                    DataType::Struct(Fields::from(vec![
2543                        Field::new("c", DataType::Utf8, true),
2544                        Field::new("d", DataType::Float64, true),
2545                    ])),
2546                    true
2547                ),
2548            ]))
2549        );
2550
2551        Ok(())
2552    }
2553
2554    #[test]
2555    fn test_variant_schema_builder_with_path_variant_path_arg() -> Result<()> {
2556        let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
2557        let shredding_type = ShreddedSchemaBuilder::default()
2558            .with_path(path, &DataType::Int64)?
2559            .build();
2560
2561        match shredding_type {
2562            DataType::Struct(fields) => {
2563                assert_eq!(fields.len(), 1);
2564                assert_eq!(fields[0].name(), "a.b");
2565                assert_eq!(fields[0].data_type(), &DataType::Int64);
2566            }
2567            _ => panic!("expected struct data type"),
2568        }
2569
2570        Ok(())
2571    }
2572
2573    #[test]
2574    fn test_variant_schema_builder_custom_nullability() -> Result<()> {
2575        let shredding_type = ShreddedSchemaBuilder::default()
2576            .with_path(
2577                "foo",
2578                Arc::new(Field::new("should_be_renamed", DataType::Utf8, false)),
2579            )?
2580            .with_path("bar", (&DataType::Int64, false))?
2581            .build();
2582
2583        let DataType::Struct(fields) = shredding_type else {
2584            panic!("expected struct data type");
2585        };
2586
2587        let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
2588        assert_eq!(foo.data_type(), &DataType::Utf8);
2589        assert!(!foo.is_nullable());
2590
2591        let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
2592        assert_eq!(bar.data_type(), &DataType::Int64);
2593        assert!(!bar.is_nullable());
2594
2595        Ok(())
2596    }
2597
2598    #[test]
2599    fn test_variant_schema_builder_with_shred_variant() -> Result<()> {
2600        let input = build_variant_array(vec![
2601            VariantRow::Object(vec![
2602                ("time", VariantValue::from(1234567890i64)),
2603                ("hostname", VariantValue::from("server1")),
2604                ("extra", VariantValue::from(42)),
2605            ]),
2606            VariantRow::Object(vec![
2607                ("time", VariantValue::from(9876543210i64)),
2608                ("hostname", VariantValue::from("server2")),
2609            ]),
2610            VariantRow::Null,
2611        ]);
2612
2613        let shredding_type = ShreddedSchemaBuilder::default()
2614            .with_path("time", &DataType::Int64)?
2615            .with_path("hostname", &DataType::Utf8)?
2616            .build();
2617
2618        let result = shred_variant(&input, &shredding_type).unwrap();
2619
2620        assert_eq!(
2621            result.data_type(),
2622            &DataType::Struct(Fields::from(vec![
2623                Field::new("metadata", DataType::BinaryView, false),
2624                Field::new("value", DataType::BinaryView, true),
2625                Field::new(
2626                    "typed_value",
2627                    DataType::Struct(Fields::from(vec![
2628                        Field::new(
2629                            "hostname",
2630                            DataType::Struct(Fields::from(vec![
2631                                Field::new("value", DataType::BinaryView, true),
2632                                Field::new("typed_value", DataType::Utf8, true),
2633                            ])),
2634                            false,
2635                        ),
2636                        Field::new(
2637                            "time",
2638                            DataType::Struct(Fields::from(vec![
2639                                Field::new("value", DataType::BinaryView, true),
2640                                Field::new("typed_value", DataType::Int64, true),
2641                            ])),
2642                            false,
2643                        ),
2644                    ])),
2645                    true,
2646                ),
2647            ]))
2648        );
2649
2650        assert_eq!(result.len(), 3);
2651        assert!(result.typed_value_field().is_some());
2652
2653        let typed_value = result
2654            .typed_value_field()
2655            .unwrap()
2656            .as_any()
2657            .downcast_ref::<arrow::array::StructArray>()
2658            .unwrap();
2659
2660        let time_field =
2661            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
2662                .unwrap();
2663        let hostname_field =
2664            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
2665                .unwrap();
2666
2667        let time_typed = time_field
2668            .typed_value_field()
2669            .unwrap()
2670            .as_any()
2671            .downcast_ref::<Int64Array>()
2672            .unwrap();
2673        let hostname_typed = hostname_field
2674            .typed_value_field()
2675            .unwrap()
2676            .as_any()
2677            .downcast_ref::<arrow::array::StringArray>()
2678            .unwrap();
2679
2680        // Row 0
2681        assert!(!result.is_null(0));
2682        assert_eq!(time_typed.value(0), 1234567890);
2683        assert_eq!(hostname_typed.value(0), "server1");
2684
2685        // Row 1
2686        assert!(!result.is_null(1));
2687        assert_eq!(time_typed.value(1), 9876543210);
2688        assert_eq!(hostname_typed.value(1), "server2");
2689
2690        // Row 2
2691        assert!(result.is_null(2));
2692
2693        Ok(())
2694    }
2695
2696    #[test]
2697    fn test_variant_schema_builder_conflicting_path() -> Result<()> {
2698        let shredding_type = ShreddedSchemaBuilder::default()
2699            .with_path("a", &DataType::Int64)?
2700            .with_path("a", &DataType::Float64)?
2701            .build();
2702
2703        assert_eq!(
2704            shredding_type,
2705            DataType::Struct(Fields::from(
2706                vec![Field::new("a", DataType::Float64, true),]
2707            ))
2708        );
2709
2710        Ok(())
2711    }
2712
2713    #[test]
2714    fn test_variant_schema_builder_root_path() -> Result<()> {
2715        let path = VariantPath::new(vec![]);
2716        let shredding_type = ShreddedSchemaBuilder::default()
2717            .with_path(path, &DataType::Int64)?
2718            .build();
2719
2720        assert_eq!(shredding_type, DataType::Int64);
2721
2722        Ok(())
2723    }
2724
2725    #[test]
2726    fn test_variant_schema_builder_empty_path() -> Result<()> {
2727        let shredding_type = ShreddedSchemaBuilder::default()
2728            .with_path("", &DataType::Int64)?
2729            .build();
2730
2731        assert_eq!(shredding_type, DataType::Int64);
2732        Ok(())
2733    }
2734
2735    #[test]
2736    fn test_variant_schema_builder_default() {
2737        let shredding_type = ShreddedSchemaBuilder::default().build();
2738        assert_eq!(shredding_type, DataType::Null);
2739    }
2740}