Skip to main content

parquet_variant_compute/
shred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for shredding VariantArray with a given schema.
19
20use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
21use crate::variant_to_arrow::{
22    ArrayVariantToArrowRowBuilder, PrimitiveVariantToArrowRowBuilder,
23    make_primitive_variant_to_arrow_row_builder,
24};
25use crate::{VariantArray, VariantValueArrayBuilder};
26use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
27use arrow::buffer::NullBuffer;
28use arrow::compute::CastOptions;
29use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
30use arrow::error::{ArrowError, Result};
31use indexmap::IndexMap;
32use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement};
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36/// Shreds the input binary variant using a target shredding schema derived from the requested data type.
37///
38/// For example, requesting `DataType::Int64` would produce an output variant array with the schema:
39///
40/// ```text
41/// {
42///    metadata: BINARY,
43///    value: BINARY,
44///    typed_value: LONG,
45/// }
46/// ```
47///
48/// Similarly, requesting `DataType::Struct` with two integer fields `a` and `b` would produce an
49/// output variant array with the schema:
50///
51/// ```text
52/// {
53///   metadata: BINARY,
54///   value: BINARY,
55///   typed_value: {
56///     a: {
57///       value: BINARY,
58///       typed_value: INT,
59///     },
60///     b: {
61///       value: BINARY,
62///       typed_value: INT,
63///     },
64///   }
65/// }
66/// ```
67///
68/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
69/// value passed to this function.
70pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<VariantArray> {
71    if array.typed_value_field().is_some() {
72        return Err(ArrowError::InvalidArgumentError(
73            "Input is already shredded".to_string(),
74        ));
75    }
76
77    if array.value_field().is_none() {
78        // all-null case -- nothing to do.
79        return Ok(array.clone());
80    };
81
82    let cast_options = CastOptions::default();
83    let mut builder = make_variant_to_shredded_variant_arrow_row_builder(
84        as_type,
85        &cast_options,
86        array.len(),
87        NullValue::TopLevelVariant,
88    )?;
89    for i in 0..array.len() {
90        if array.is_null(i) {
91            builder.append_null()?;
92        } else {
93            builder.append_value(array.value(i))?;
94        }
95    }
96    let (value, typed_value, nulls) = builder.finish()?;
97    Ok(VariantArray::from_parts(
98        array.metadata_field().clone(),
99        Some(Arc::new(value)),
100        Some(typed_value),
101        nulls,
102    ))
103}
104
105/// Controls how `append_null` is encoded for a shredded `(value, typed_value)` pair.
106///
107/// | Mode | Struct validity bit | `value` | `typed_value` | Meaning |
108/// | --- | --- | --- | --- | --- |
109/// | `TopLevelVariant` | null | NULL | NULL | SQL NULL at the top-level variant row |
110/// | `ObjectField` | non-null | NULL | NULL | Missing object field |
111/// | `ArrayElement` | non-null | `Variant::Null` | NULL | Explicit null array element |
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub(crate) enum NullValue {
114    TopLevelVariant,
115    ObjectField,
116    ArrayElement,
117}
118
119impl NullValue {
120    fn append_to(
121        self,
122        nulls: &mut NullBufferBuilder,
123        value_builder: &mut VariantValueArrayBuilder,
124    ) {
125        match self {
126            Self::TopLevelVariant => nulls.append_null(),
127            Self::ObjectField | Self::ArrayElement => nulls.append_non_null(),
128        }
129        match self {
130            Self::TopLevelVariant | Self::ObjectField => value_builder.append_null(),
131            Self::ArrayElement => value_builder.append_value(Variant::Null),
132        }
133    }
134}
135
136pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>(
137    data_type: &'a DataType,
138    cast_options: &'a CastOptions,
139    capacity: usize,
140    null_value: NullValue,
141) -> Result<VariantToShreddedVariantRowBuilder<'a>> {
142    let builder = match data_type {
143        DataType::Struct(fields) => {
144            let typed_value_builder = VariantToShreddedObjectVariantRowBuilder::try_new(
145                fields,
146                cast_options,
147                capacity,
148                null_value,
149            )?;
150            VariantToShreddedVariantRowBuilder::Object(typed_value_builder)
151        }
152        DataType::List(_)
153        | DataType::LargeList(_)
154        | DataType::ListView(_)
155        | DataType::LargeListView(_)
156        | DataType::FixedSizeList(..) => {
157            let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new(
158                data_type,
159                cast_options,
160                capacity,
161                null_value,
162            )?;
163            VariantToShreddedVariantRowBuilder::Array(typed_value_builder)
164        }
165        // Supported shredded primitive types, see Variant shredding spec:
166        // https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types
167        DataType::Boolean
168        | DataType::Int8
169        | DataType::Int16
170        | DataType::Int32
171        | DataType::Int64
172        | DataType::Float32
173        | DataType::Float64
174        | DataType::Decimal32(..)
175        | DataType::Decimal64(..)
176        | DataType::Decimal128(..)
177        | DataType::Date32
178        | DataType::Time64(TimeUnit::Microsecond)
179        | DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _)
180        | DataType::Binary
181        | DataType::BinaryView
182        | DataType::LargeBinary
183        | DataType::Utf8
184        | DataType::Utf8View
185        | DataType::LargeUtf8
186        | DataType::FixedSizeBinary(16) // UUID
187        => {
188            let builder =
189                make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?;
190            let typed_value_builder =
191                VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity, null_value);
192            VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder)
193        }
194        DataType::FixedSizeBinary(_) => {
195            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported.")))
196        }
197        _ => {
198            return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type")))
199        }
200    };
201    Ok(builder)
202}
203
204pub(crate) enum VariantToShreddedVariantRowBuilder<'a> {
205    Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>),
206    Array(VariantToShreddedArrayVariantRowBuilder<'a>),
207    Object(VariantToShreddedObjectVariantRowBuilder<'a>),
208}
209
210impl<'a> VariantToShreddedVariantRowBuilder<'a> {
211    pub fn append_null(&mut self) -> Result<()> {
212        use VariantToShreddedVariantRowBuilder::*;
213        match self {
214            Primitive(b) => b.append_null(),
215            Array(b) => b.append_null(),
216            Object(b) => b.append_null(),
217        }
218    }
219
220    pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
221        use VariantToShreddedVariantRowBuilder::*;
222        match self {
223            Primitive(b) => b.append_value(value),
224            Array(b) => b.append_value(value),
225            Object(b) => b.append_value(value),
226        }
227    }
228
229    pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
230        use VariantToShreddedVariantRowBuilder::*;
231        match self {
232            Primitive(b) => b.finish(),
233            Array(b) => b.finish(),
234            Object(b) => b.finish(),
235        }
236    }
237}
238
239/// A shredded primitive field builder.
240pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> {
241    value_builder: VariantValueArrayBuilder,
242    typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
243    nulls: NullBufferBuilder,
244    null_value: NullValue,
245}
246
247impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> {
248    pub(crate) fn new(
249        typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
250        capacity: usize,
251        null_value: NullValue,
252    ) -> Self {
253        Self {
254            value_builder: VariantValueArrayBuilder::new(capacity),
255            typed_value_builder,
256            nulls: NullBufferBuilder::new(capacity),
257            null_value,
258        }
259    }
260
261    fn append_null(&mut self) -> Result<()> {
262        self.null_value
263            .append_to(&mut self.nulls, &mut self.value_builder);
264        self.typed_value_builder.append_null()
265    }
266
267    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
268        self.nulls.append_non_null();
269        if self.typed_value_builder.append_value(&value)? {
270            self.value_builder.append_null();
271        } else {
272            self.value_builder.append_value(value);
273        }
274        Ok(true)
275    }
276
277    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
278        Ok((
279            self.value_builder.build()?,
280            self.typed_value_builder.finish()?,
281            self.nulls.finish(),
282        ))
283    }
284}
285
286pub(crate) struct VariantToShreddedArrayVariantRowBuilder<'a> {
287    value_builder: VariantValueArrayBuilder,
288    typed_value_builder: ArrayVariantToArrowRowBuilder<'a>,
289    nulls: NullBufferBuilder,
290    null_value: NullValue,
291}
292
293impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> {
294    fn try_new(
295        data_type: &'a DataType,
296        cast_options: &'a CastOptions,
297        capacity: usize,
298        null_value: NullValue,
299    ) -> Result<Self> {
300        Ok(Self {
301            value_builder: VariantValueArrayBuilder::new(capacity),
302            typed_value_builder: ArrayVariantToArrowRowBuilder::try_new(
303                data_type,
304                cast_options,
305                capacity,
306                true,
307            )?,
308            nulls: NullBufferBuilder::new(capacity),
309            null_value,
310        })
311    }
312
313    fn append_null(&mut self) -> Result<()> {
314        self.null_value
315            .append_to(&mut self.nulls, &mut self.value_builder);
316        self.typed_value_builder.append_null()?;
317        Ok(())
318    }
319
320    fn append_value(&mut self, variant: Variant<'_, '_>) -> Result<bool> {
321        // If the variant is not an array, typed_value must be null.
322        // If the variant is an array, value must be null.
323        match variant {
324            Variant::List(list) => {
325                self.nulls.append_non_null();
326                self.value_builder.append_null();
327                self.typed_value_builder
328                    .append_value(&Variant::List(list))?;
329                Ok(true)
330            }
331            other => {
332                self.nulls.append_non_null();
333                self.value_builder.append_value(other);
334                self.typed_value_builder.append_null()?;
335                Ok(false)
336            }
337        }
338    }
339
340    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
341        Ok((
342            self.value_builder.build()?,
343            self.typed_value_builder.finish()?,
344            self.nulls.finish(),
345        ))
346    }
347}
348
349pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> {
350    value_builder: VariantValueArrayBuilder,
351    typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>,
352    typed_value_nulls: NullBufferBuilder,
353    nulls: NullBufferBuilder,
354    null_value: NullValue,
355}
356
357impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
358    fn try_new(
359        fields: &'a Fields,
360        cast_options: &'a CastOptions,
361        capacity: usize,
362        null_value: NullValue,
363    ) -> Result<Self> {
364        let typed_value_builders = fields.iter().map(|field| {
365            let builder = make_variant_to_shredded_variant_arrow_row_builder(
366                field.data_type(),
367                cast_options,
368                capacity,
369                NullValue::ObjectField,
370            )?;
371            Ok((field.name().as_str(), builder))
372        });
373        Ok(Self {
374            value_builder: VariantValueArrayBuilder::new(capacity),
375            typed_value_builders: typed_value_builders.collect::<Result<_>>()?,
376            typed_value_nulls: NullBufferBuilder::new(capacity),
377            nulls: NullBufferBuilder::new(capacity),
378            null_value,
379        })
380    }
381
382    fn append_null(&mut self) -> Result<()> {
383        self.null_value
384            .append_to(&mut self.nulls, &mut self.value_builder);
385        self.typed_value_nulls.append_null();
386        for (_, typed_value_builder) in &mut self.typed_value_builders {
387            typed_value_builder.append_null()?;
388        }
389        Ok(())
390    }
391
392    fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
393        let Variant::Object(ref obj) = value else {
394            // Not an object => fall back
395            self.nulls.append_non_null();
396            self.value_builder.append_value(value);
397            self.typed_value_nulls.append_null();
398            for (_, typed_value_builder) in &mut self.typed_value_builders {
399                typed_value_builder.append_null()?;
400            }
401            return Ok(false);
402        };
403
404        // Route the object's fields by name as either shredded or unshredded
405        let mut builder = self.value_builder.builder_ext(value.metadata());
406        let mut object_builder = builder.try_new_object()?;
407        let mut seen = std::collections::HashSet::new();
408        let mut partially_shredded = false;
409        for (field_name, value) in obj.iter() {
410            match self.typed_value_builders.get_mut(field_name) {
411                Some(typed_value_builder) => {
412                    typed_value_builder.append_value(value)?;
413                    seen.insert(field_name);
414                }
415                None => {
416                    object_builder.insert_bytes(field_name, value);
417                    partially_shredded = true;
418                }
419            }
420        }
421
422        // Handle missing fields
423        for (field_name, typed_value_builder) in &mut self.typed_value_builders {
424            if !seen.contains(field_name) {
425                typed_value_builder.append_null()?;
426            }
427        }
428
429        // Only emit the value if it captured any unshredded object fields
430        if partially_shredded {
431            object_builder.finish();
432        } else {
433            drop(object_builder);
434            self.value_builder.append_null();
435        }
436
437        self.typed_value_nulls.append_non_null();
438        self.nulls.append_non_null();
439        Ok(true)
440    }
441
442    fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
443        let mut builder = StructArrayBuilder::new();
444        for (field_name, typed_value_builder) in self.typed_value_builders {
445            let (value, typed_value, nulls) = typed_value_builder.finish()?;
446            let array = ShreddedVariantFieldArray::from_parts(
447                Some(Arc::new(value)),
448                Some(typed_value),
449                nulls,
450            );
451            builder = builder.with_field(field_name, ArrayRef::from(array), false);
452        }
453        if let Some(nulls) = self.typed_value_nulls.finish() {
454            builder = builder.with_nulls(nulls);
455        }
456        Ok((
457            self.value_builder.build()?,
458            Arc::new(builder.build()),
459            self.nulls.finish(),
460        ))
461    }
462}
463
464/// Field configuration captured by the builder (data type + nullability).
465#[derive(Clone)]
466pub struct ShreddingField {
467    data_type: DataType,
468    nullable: bool,
469}
470
471impl ShreddingField {
472    fn new(data_type: DataType, nullable: bool) -> Self {
473        Self {
474            data_type,
475            nullable,
476        }
477    }
478
479    fn null() -> Self {
480        Self::new(DataType::Null, true)
481    }
482}
483
484/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or `(DataType, bool)`.
485pub trait IntoShreddingField {
486    fn into_shredding_field(self) -> ShreddingField;
487}
488
489impl IntoShreddingField for FieldRef {
490    fn into_shredding_field(self) -> ShreddingField {
491        ShreddingField::new(self.data_type().clone(), self.is_nullable())
492    }
493}
494
495impl IntoShreddingField for &DataType {
496    fn into_shredding_field(self) -> ShreddingField {
497        ShreddingField::new(self.clone(), true)
498    }
499}
500
501impl IntoShreddingField for DataType {
502    fn into_shredding_field(self) -> ShreddingField {
503        ShreddingField::new(self, true)
504    }
505}
506
507impl IntoShreddingField for (&DataType, bool) {
508    fn into_shredding_field(self) -> ShreddingField {
509        ShreddingField::new(self.0.clone(), self.1)
510    }
511}
512
513impl IntoShreddingField for (DataType, bool) {
514    fn into_shredding_field(self) -> ShreddingField {
515        ShreddingField::new(self.0, self.1)
516    }
517}
518
519/// Builder for constructing a variant shredding schema.
520///
521/// The builder pattern makes it easy to incrementally define which fields
522/// should be shredded and with what types. Fields are nullable by default; pass
523/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
524///
525/// Note: this builder currently only supports struct fields. List support
526/// will be added in the future.
527///
528/// # Example
529///
530/// ```
531/// use std::sync::Arc;
532/// use arrow::datatypes::{DataType, Field, TimeUnit};
533/// use parquet_variant::{VariantPath, VariantPathElement};
534/// use parquet_variant_compute::ShreddedSchemaBuilder;
535///
536/// fn main() -> Result<(), arrow::error::ArrowError> {
537///     // Define the shredding schema using the builder
538///     let shredding_type = ShreddedSchemaBuilder::default()
539///     // store the "time" field as a separate UTC timestamp
540///     .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), true))?
541///     // store hostname as non-nullable Utf8
542///     .with_path("hostname", (&DataType::Utf8, false))?
543///     // pass a FieldRef directly
544///     .with_path(
545///         "metadata.trace_id",
546///         Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16), false)),
547///     )?
548///     // field name with a dot: use VariantPath to avoid splitting
549///     .with_path(
550///         VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
551///         &DataType::Float64,
552///     )?
553///     .build();
554///    Ok(())
555/// }
556/// // The shredding_type can now be passed to shred_variant:
557/// // let shredded = shred_variant(&input, &shredding_type)?;
558/// ```
559#[derive(Default, Clone)]
560pub struct ShreddedSchemaBuilder {
561    root: VariantSchemaNode,
562}
563
564impl ShreddedSchemaBuilder {
565    /// Create a new empty schema builder.
566    pub fn new() -> Self {
567        Self::default()
568    }
569
570    /// Insert a typed path into the schema using dot notation (or any
571    /// [`VariantPath`] convertible).
572    ///
573    /// The path uses dot notation to specify nested fields.
574    /// For example, "a.b.c" will create a nested structure.
575    ///
576    /// # Arguments
577    ///
578    /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
579    /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g. `FieldRef`,
580    ///   `&DataType`, or `(&DataType, bool)` to control nullability)
581    pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Result<Self>
582    where
583        P: TryInto<VariantPath<'a>>,
584        P::Error: std::fmt::Debug,
585        F: IntoShreddingField,
586    {
587        let path: VariantPath<'a> = path
588            .try_into()
589            .map_err(|e| ArrowError::InvalidArgumentError(format!("{:?}", e)))?;
590        self.root.insert_path(&path, field.into_shredding_field());
591        Ok(self)
592    }
593
594    /// Build the final [`DataType`].
595    pub fn build(self) -> DataType {
596        let shredding_type = self.root.to_shredding_type();
597        match shredding_type {
598            Some(shredding_type) => shredding_type,
599            None => DataType::Null,
600        }
601    }
602}
603
604/// Internal tree node structure for building variant schemas.
605#[derive(Clone)]
606enum VariantSchemaNode {
607    /// A leaf node with a primitive/scalar type (and nullability)
608    Leaf(ShreddingField),
609    /// An inner struct node with nested fields
610    Struct(BTreeMap<String, VariantSchemaNode>),
611}
612
613impl Default for VariantSchemaNode {
614    fn default() -> Self {
615        Self::Leaf(ShreddingField::null())
616    }
617}
618
619impl VariantSchemaNode {
620    /// Insert a path into this node with the given data type.
621    fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
622        self.insert_path_elements(path, field);
623    }
624
625    fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], field: ShreddingField) {
626        let Some((head, tail)) = segments.split_first() else {
627            *self = Self::Leaf(field);
628            return;
629        };
630
631        match head {
632            VariantPathElement::Field { name } => {
633                // Ensure this node is a Struct node
634                let children = match self {
635                    Self::Struct(children) => children,
636                    _ => {
637                        *self = Self::Struct(BTreeMap::new());
638                        match self {
639                            Self::Struct(children) => children,
640                            _ => unreachable!(),
641                        }
642                    }
643                };
644
645                children
646                    .entry(name.to_string())
647                    .or_default()
648                    .insert_path_elements(tail, field);
649            }
650            VariantPathElement::Index { .. } => {
651                // List support to be added later; reject for now
652                unreachable!("List paths are not supported yet");
653            }
654        }
655    }
656
657    /// Convert this node to a shredding type.
658    ///
659    /// Returns the [`DataType`] for passing to [`shred_variant`].
660    fn to_shredding_type(&self) -> Option<DataType> {
661        match self {
662            Self::Leaf(field) => Some(field.data_type.clone()),
663            Self::Struct(children) => {
664                let child_fields: Vec<_> = children
665                    .iter()
666                    .filter_map(|(name, child)| child.to_shredding_field(name))
667                    .collect();
668                if child_fields.is_empty() {
669                    None
670                } else {
671                    Some(DataType::Struct(Fields::from(child_fields)))
672                }
673            }
674        }
675    }
676
677    fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
678        match self {
679            Self::Leaf(field) => Some(Arc::new(Field::new(
680                name,
681                field.data_type.clone(),
682                field.nullable,
683            ))),
684            Self::Struct(_) => self
685                .to_shredding_type()
686                .map(|data_type| Arc::new(Field::new(name, data_type, true))),
687        }
688    }
689}
690
691#[cfg(test)]
692mod tests {
693    use super::*;
694    use crate::VariantArrayBuilder;
695    use crate::variant_array::{binary_array_value, variant_from_arrays_at};
696    use arrow::array::{
697        Array, BinaryViewArray, FixedSizeBinaryArray, Float64Array, GenericListArray,
698        GenericListViewArray, Int64Array, LargeBinaryArray, LargeStringArray, ListArray,
699        ListLikeArray, OffsetSizeTrait, PrimitiveArray, StringArray,
700    };
701    use arrow::datatypes::{
702        ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode,
703    };
704    use parquet_variant::{
705        BuilderSpecificState, EMPTY_VARIANT_METADATA_BYTES, ObjectBuilder, ReadOnlyMetadataBuilder,
706        Variant, VariantBuilder, VariantPath, VariantPathElement,
707    };
708    use std::sync::Arc;
709    use uuid::Uuid;
710
711    const NULL_VALUES: [NullValue; 3] = [
712        NullValue::TopLevelVariant,
713        NullValue::ObjectField,
714        NullValue::ArrayElement,
715    ];
716
717    #[derive(Clone)]
718    enum VariantValue<'a> {
719        Value(Variant<'a, 'a>),
720        List(Vec<VariantValue<'a>>),
721        Object(Vec<(&'a str, VariantValue<'a>)>),
722        Null,
723    }
724
725    impl<'a, T> From<T> for VariantValue<'a>
726    where
727        T: Into<Variant<'a, 'a>>,
728    {
729        fn from(value: T) -> Self {
730            Self::Value(value.into())
731        }
732    }
733
734    #[derive(Clone)]
735    enum VariantRow<'a> {
736        Value(VariantValue<'a>),
737        List(Vec<VariantValue<'a>>),
738        Object(Vec<(&'a str, VariantValue<'a>)>),
739        Null,
740    }
741
742    fn build_variant_array(rows: Vec<VariantRow<'static>>) -> VariantArray {
743        let mut builder = VariantArrayBuilder::new(rows.len());
744
745        fn append_variant_value<B: VariantBuilderExt>(builder: &mut B, value: VariantValue) {
746            match value {
747                VariantValue::Value(v) => builder.append_value(v),
748                VariantValue::List(values) => {
749                    let mut list = builder.new_list();
750                    for v in values {
751                        append_variant_value(&mut list, v);
752                    }
753                    list.finish();
754                }
755                VariantValue::Object(fields) => {
756                    let mut object = builder.new_object();
757                    for (name, value) in fields {
758                        append_variant_field(&mut object, name, value);
759                    }
760                    object.finish();
761                }
762                VariantValue::Null => builder.append_null(),
763            }
764        }
765
766        fn append_variant_field<'a, S: BuilderSpecificState>(
767            object: &mut ObjectBuilder<'_, S>,
768            name: &'a str,
769            value: VariantValue<'a>,
770        ) {
771            match value {
772                VariantValue::Value(v) => {
773                    object.insert(name, v);
774                }
775                VariantValue::List(values) => {
776                    let mut list = object.new_list(name);
777                    for v in values {
778                        append_variant_value(&mut list, v);
779                    }
780                    list.finish();
781                }
782                VariantValue::Object(fields) => {
783                    let mut nested = object.new_object(name);
784                    for (field_name, v) in fields {
785                        append_variant_field(&mut nested, field_name, v);
786                    }
787                    nested.finish();
788                }
789                VariantValue::Null => {
790                    object.insert(name, Variant::Null);
791                }
792            }
793        }
794
795        rows.into_iter().for_each(|row| match row {
796            VariantRow::Value(value) => append_variant_value(&mut builder, value),
797            VariantRow::List(values) => {
798                let mut list = builder.new_list();
799                for value in values {
800                    append_variant_value(&mut list, value);
801                }
802                list.finish();
803            }
804            VariantRow::Object(fields) => {
805                let mut object = builder.new_object();
806                for (name, value) in fields {
807                    append_variant_field(&mut object, name, value);
808                }
809                object.finish();
810            }
811            VariantRow::Null => builder.append_null(),
812        });
813        builder.build()
814    }
815
816    trait TestListLikeArray: ListLikeArray {
817        type OffsetSize: OffsetSizeTrait;
818        fn value_offsets(&self) -> Option<&[Self::OffsetSize]>;
819        fn value_size(&self, index: usize) -> Self::OffsetSize;
820    }
821
822    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListArray<O> {
823        type OffsetSize = O;
824
825        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
826            Some(GenericListArray::value_offsets(self))
827        }
828
829        fn value_size(&self, index: usize) -> Self::OffsetSize {
830            GenericListArray::value_length(self, index)
831        }
832    }
833
834    impl<O: OffsetSizeTrait> TestListLikeArray for GenericListViewArray<O> {
835        type OffsetSize = O;
836
837        fn value_offsets(&self) -> Option<&[Self::OffsetSize]> {
838            Some(GenericListViewArray::value_offsets(self))
839        }
840
841        fn value_size(&self, index: usize) -> Self::OffsetSize {
842            GenericListViewArray::value_size(self, index)
843        }
844    }
845
846    fn downcast_list_like_array<O: OffsetSizeTrait>(
847        array: &VariantArray,
848    ) -> &dyn TestListLikeArray<OffsetSize = O> {
849        let typed_value = array.typed_value_field().unwrap();
850        if let Some(list) = typed_value.as_any().downcast_ref::<GenericListArray<O>>() {
851            list
852        } else if let Some(list_view) = typed_value
853            .as_any()
854            .downcast_ref::<GenericListViewArray<O>>()
855        {
856            list_view
857        } else {
858            panic!(
859                "Expected list-like typed_value with matching offset type, got {}",
860                typed_value.data_type()
861            );
862        }
863    }
864
865    fn assert_list_structure<O: OffsetSizeTrait>(
866        array: &VariantArray,
867        expected_len: usize,
868        expected_offsets: &[O],
869        expected_sizes: &[Option<O>],
870        expected_fallbacks: &[Option<Variant<'static, 'static>>],
871    ) {
872        assert_eq!(array.len(), expected_len);
873
874        let fallback_value = array.value_field().unwrap();
875        let fallback_metadata = array.metadata_field();
876        let array = downcast_list_like_array::<O>(array);
877
878        assert_eq!(
879            array.value_offsets().unwrap(),
880            expected_offsets,
881            "list offsets mismatch"
882        );
883        assert_eq!(
884            array.len(),
885            expected_sizes.len(),
886            "expected_sizes should match array length"
887        );
888        assert_eq!(
889            array.len(),
890            expected_fallbacks.len(),
891            "expected_fallbacks should match array length"
892        );
893        assert_eq!(
894            array.len(),
895            fallback_value.len(),
896            "fallbacks value field should match array length"
897        );
898
899        // Validate per-row shredding outcomes for the list array
900        for (idx, (expected_size, expected_fallback)) in expected_sizes
901            .iter()
902            .zip(expected_fallbacks.iter())
903            .enumerate()
904        {
905            match expected_size {
906                Some(len) => {
907                    // Successfully shredded: typed list value present, no fallback value
908                    assert!(array.is_valid(idx));
909                    assert_eq!(array.value_size(idx), *len);
910                    assert!(fallback_value.is_null(idx));
911                }
912                None => {
913                    // Unable to shred: typed list value absent, fallback should carry the variant
914                    assert!(array.is_null(idx));
915                    assert_eq!(array.value_size(idx), O::zero());
916                    match expected_fallback {
917                        Some(expected_variant) => {
918                            assert!(fallback_value.is_valid(idx));
919                            let metadata_bytes =
920                                binary_array_value(fallback_metadata.as_ref(), idx).unwrap();
921                            let metadata_bytes =
922                                if fallback_metadata.is_valid(idx) && !metadata_bytes.is_empty() {
923                                    metadata_bytes
924                                } else {
925                                    EMPTY_VARIANT_METADATA_BYTES
926                                };
927                            assert_eq!(
928                                Variant::new(
929                                    metadata_bytes,
930                                    binary_array_value(fallback_value.as_ref(), idx).unwrap()
931                                ),
932                                expected_variant.clone()
933                            );
934                        }
935                        None => {
936                            assert!(fallback_value.is_null(idx));
937                        }
938                    }
939                }
940            }
941        }
942    }
943
944    fn assert_list_structure_and_elements<T: ArrowPrimitiveType, O: OffsetSizeTrait>(
945        array: &VariantArray,
946        expected_len: usize,
947        expected_offsets: &[O],
948        expected_sizes: &[Option<O>],
949        expected_fallbacks: &[Option<Variant<'static, 'static>>],
950        expected_shredded_elements: (&[Option<T::Native>], &[Option<Variant<'static, 'static>>]),
951    ) {
952        assert_list_structure(
953            array,
954            expected_len,
955            expected_offsets,
956            expected_sizes,
957            expected_fallbacks,
958        );
959        let array = downcast_list_like_array::<O>(array);
960
961        // Validate the shredded state of list elements (typed values and fallbacks)
962        let (expected_values, expected_fallbacks) = expected_shredded_elements;
963        assert_eq!(
964            expected_values.len(),
965            expected_fallbacks.len(),
966            "expected_values and expected_fallbacks should be aligned"
967        );
968
969        // Validate the shredded primitive values for list elements
970        let element_array = ShreddedVariantFieldArray::try_new(array.values().as_ref()).unwrap();
971        let element_values = element_array
972            .typed_value_field()
973            .unwrap()
974            .as_any()
975            .downcast_ref::<PrimitiveArray<T>>()
976            .unwrap();
977        assert_eq!(element_values.len(), expected_values.len());
978        for (idx, expected_value) in expected_values.iter().enumerate() {
979            match expected_value {
980                Some(value) => {
981                    assert!(element_values.is_valid(idx));
982                    assert_eq!(element_values.value(idx), *value);
983                }
984                None => assert!(element_values.is_null(idx)),
985            }
986        }
987
988        // Validate fallback variants for list elements that could not be shredded
989        let element_fallbacks = element_array.value_field().unwrap();
990        assert_eq!(element_fallbacks.len(), expected_fallbacks.len());
991        for (idx, expected_fallback) in expected_fallbacks.iter().enumerate() {
992            match expected_fallback {
993                Some(expected_variant) => {
994                    assert!(element_fallbacks.is_valid(idx));
995                    assert_eq!(
996                        Variant::new(
997                            EMPTY_VARIANT_METADATA_BYTES,
998                            binary_array_value(element_fallbacks.as_ref(), idx).unwrap()
999                        ),
1000                        expected_variant.clone()
1001                    );
1002                }
1003                None => assert!(element_fallbacks.is_null(idx)),
1004            }
1005        }
1006    }
1007
1008    fn assert_append_null_mode_value_and_struct_nulls(
1009        mode: NullValue,
1010        value: &BinaryViewArray,
1011        nulls: Option<&arrow::buffer::NullBuffer>,
1012    ) {
1013        if mode == NullValue::TopLevelVariant {
1014            assert!(nulls.is_some_and(|n| n.is_null(0)));
1015        } else {
1016            assert!(nulls.is_none());
1017        }
1018
1019        if mode == NullValue::ArrayElement {
1020            assert!(value.is_valid(0));
1021            assert_eq!(
1022                Variant::new(EMPTY_VARIANT_METADATA_BYTES, value.value(0)),
1023                Variant::Null
1024            );
1025        } else {
1026            assert!(value.is_null(0));
1027        }
1028    }
1029
1030    #[test]
1031    fn test_append_null_mode_semantics_primitive_builder() {
1032        let cast_options = arrow::compute::CastOptions::default();
1033
1034        for mode in NULL_VALUES {
1035            let mut primitive_builder = make_variant_to_shredded_variant_arrow_row_builder(
1036                &DataType::Int64,
1037                &cast_options,
1038                1,
1039                mode,
1040            )
1041            .unwrap();
1042            primitive_builder.append_null().unwrap();
1043            let (primitive_value, primitive_typed_value, primitive_nulls) =
1044                primitive_builder.finish().unwrap();
1045            let primitive_typed_value = primitive_typed_value
1046                .as_any()
1047                .downcast_ref::<Int64Array>()
1048                .unwrap();
1049
1050            assert!(primitive_typed_value.is_null(0));
1051            assert_append_null_mode_value_and_struct_nulls(
1052                mode,
1053                &primitive_value,
1054                primitive_nulls.as_ref(),
1055            );
1056        }
1057    }
1058
1059    #[test]
1060    fn test_append_null_mode_semantics_array_builder() {
1061        let cast_options = arrow::compute::CastOptions::default();
1062        let list_type = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1063
1064        for mode in NULL_VALUES {
1065            let mut array_builder = make_variant_to_shredded_variant_arrow_row_builder(
1066                &list_type,
1067                &cast_options,
1068                1,
1069                mode,
1070            )
1071            .unwrap();
1072            array_builder.append_null().unwrap();
1073            let (value, typed_value, nulls) = array_builder.finish().unwrap();
1074
1075            assert_append_null_mode_value_and_struct_nulls(mode, &value, nulls.as_ref());
1076
1077            let typed_value = typed_value.as_any().downcast_ref::<ListArray>().unwrap();
1078            assert_eq!(typed_value.len(), 1);
1079            assert!(typed_value.is_null(0));
1080            assert_eq!(typed_value.values().len(), 0);
1081        }
1082    }
1083
1084    #[test]
1085    fn test_append_null_mode_semantics_object_builder() {
1086        let cast_options = arrow::compute::CastOptions::default();
1087        let object_type = DataType::Struct(Fields::from(vec![
1088            Field::new("id", DataType::Int64, true),
1089            Field::new("name", DataType::Utf8, true),
1090        ]));
1091
1092        for mode in NULL_VALUES {
1093            let mut object_builder = make_variant_to_shredded_variant_arrow_row_builder(
1094                &object_type,
1095                &cast_options,
1096                1,
1097                mode,
1098            )
1099            .unwrap();
1100            object_builder.append_null().unwrap();
1101            let (value, typed_value, nulls) = object_builder.finish().unwrap();
1102
1103            assert_append_null_mode_value_and_struct_nulls(mode, &value, nulls.as_ref());
1104
1105            let typed_struct = typed_value
1106                .as_any()
1107                .downcast_ref::<arrow::array::StructArray>()
1108                .unwrap();
1109            assert_eq!(typed_struct.len(), 1);
1110            assert!(typed_struct.is_null(0));
1111
1112            for field_name in ["id", "name"] {
1113                let field = ShreddedVariantFieldArray::try_new(
1114                    typed_struct.column_by_name(field_name).unwrap(),
1115                )
1116                .unwrap();
1117                assert!(field.value_field().unwrap().is_null(0));
1118                assert!(field.typed_value_field().unwrap().is_null(0));
1119            }
1120        }
1121    }
1122
1123    #[test]
1124    fn test_already_shredded_input_error() {
1125        // Create a VariantArray that already has typed_value_field
1126        // First create a valid VariantArray, then extract its parts to construct a shredded one
1127        let temp_array = VariantArray::from_iter(vec![Some(Variant::from("test"))]);
1128        let metadata = temp_array.metadata_field().clone();
1129        let value = temp_array.value_field().unwrap().clone();
1130        let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef;
1131
1132        let shredded_array =
1133            VariantArray::from_parts(metadata, Some(value), Some(typed_value), None);
1134
1135        let result = shred_variant(&shredded_array, &DataType::Int64);
1136        assert!(matches!(
1137            result.unwrap_err(),
1138            ArrowError::InvalidArgumentError(_)
1139        ));
1140    }
1141
1142    #[test]
1143    fn test_all_null_input() {
1144        // Create VariantArray with no value field (all null case)
1145        let metadata = Arc::new(BinaryViewArray::from_iter_values([&[1u8, 0u8]])); // minimal valid metadata
1146        let all_null_array = VariantArray::from_parts(metadata, None, None, None);
1147        let result = shred_variant(&all_null_array, &DataType::Int64).unwrap();
1148
1149        // Should return array with no value/typed_value fields
1150        assert!(result.value_field().is_none());
1151        assert!(result.typed_value_field().is_none());
1152    }
1153
1154    #[test]
1155    fn test_invalid_fixed_size_binary_shredding() {
1156        let mock_uuid_1 = Uuid::new_v4();
1157
1158        let input = VariantArray::from_iter([Some(Variant::from(mock_uuid_1)), None]);
1159
1160        // shred_variant only supports FixedSizeBinary(16). Any other length will err.
1161        let err = shred_variant(&input, &DataType::FixedSizeBinary(17)).unwrap_err();
1162
1163        assert_eq!(
1164            err.to_string(),
1165            "Invalid argument error: FixedSizeBinary(17) is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported."
1166        );
1167    }
1168
1169    #[test]
1170    fn test_uuid_shredding() {
1171        let mock_uuid_1 = Uuid::new_v4();
1172        let mock_uuid_2 = Uuid::new_v4();
1173
1174        let input = VariantArray::from_iter([
1175            Some(Variant::from(mock_uuid_1)),
1176            None,
1177            Some(Variant::from(false)),
1178            Some(Variant::from(mock_uuid_2)),
1179        ]);
1180
1181        let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();
1182
1183        // // inspect the typed_value Field and make sure it contains the canonical Uuid extension type
1184        // let typed_value_field = variant_array
1185        //     .inner()
1186        //     .fields()
1187        //     .into_iter()
1188        //     .find(|f| f.name() == "typed_value")
1189        //     .unwrap();
1190
1191        // assert!(
1192        //     typed_value_field
1193        //         .try_extension_type::<extension::Uuid>()
1194        //         .is_ok()
1195        // );
1196
1197        // probe the downcasted typed_value array to make sure uuids are shredded correctly
1198        let uuids = variant_array
1199            .typed_value_field()
1200            .unwrap()
1201            .as_any()
1202            .downcast_ref::<FixedSizeBinaryArray>()
1203            .unwrap();
1204
1205        assert_eq!(uuids.len(), 4);
1206
1207        assert!(!uuids.is_null(0));
1208
1209        let got_uuid_1: &[u8] = uuids.value(0);
1210        assert_eq!(got_uuid_1, mock_uuid_1.as_bytes());
1211
1212        assert!(uuids.is_null(1));
1213        assert!(uuids.is_null(2));
1214
1215        assert!(!uuids.is_null(3));
1216
1217        let got_uuid_2: &[u8] = uuids.value(3);
1218        assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
1219    }
1220
1221    #[test]
1222    fn test_primitive_shredding_comprehensive() {
1223        // Test mixed scenarios in a single array
1224        let input = VariantArray::from_iter(vec![
1225            Some(Variant::from(42i64)),   // successful shred
1226            Some(Variant::from("hello")), // failed shred (string)
1227            Some(Variant::from(100i64)),  // successful shred
1228            None,                         // array-level null
1229            Some(Variant::Null),          // variant null
1230            Some(Variant::from(3i8)),     // successful shred (int8->int64 conversion)
1231        ]);
1232
1233        let result = shred_variant(&input, &DataType::Int64).unwrap();
1234
1235        // Verify structure
1236        let metadata_field = result.metadata_field();
1237        let value_field = result.value_field().unwrap();
1238        let typed_value_field = result
1239            .typed_value_field()
1240            .unwrap()
1241            .as_any()
1242            .downcast_ref::<Int64Array>()
1243            .unwrap();
1244
1245        // Check specific outcomes for each row
1246        assert_eq!(result.len(), 6);
1247
1248        // Row 0: 42 -> should shred successfully
1249        assert!(!result.is_null(0));
1250        assert!(value_field.is_null(0)); // value should be null when shredded
1251        assert!(!typed_value_field.is_null(0));
1252        assert_eq!(typed_value_field.value(0), 42);
1253
1254        // Row 1: "hello" -> should fail to shred
1255        assert!(!result.is_null(1));
1256        assert!(!value_field.is_null(1)); // value should contain original
1257        assert!(typed_value_field.is_null(1)); // typed_value should be null
1258        assert_eq!(
1259            variant_from_arrays_at(metadata_field, value_field, 1).unwrap(),
1260            Variant::from("hello")
1261        );
1262
1263        // Row 2: 100 -> should shred successfully
1264        assert!(!result.is_null(2));
1265        assert!(value_field.is_null(2));
1266        assert_eq!(typed_value_field.value(2), 100);
1267
1268        // Row 3: array null -> should be null in result
1269        assert!(result.is_null(3));
1270
1271        // Row 4: Variant::Null -> should not shred (it's a null variant, not an integer)
1272        assert!(!result.is_null(4));
1273        assert!(!value_field.is_null(4)); // should contain Variant::Null
1274        assert_eq!(
1275            variant_from_arrays_at(metadata_field, value_field, 4).unwrap(),
1276            Variant::Null
1277        );
1278        assert!(typed_value_field.is_null(4));
1279
1280        // Row 5: 3i8 -> should shred successfully (int8->int64 conversion)
1281        assert!(!result.is_null(5));
1282        assert!(value_field.is_null(5)); // value should be null when shredded
1283        assert!(!typed_value_field.is_null(5));
1284        assert_eq!(typed_value_field.value(5), 3);
1285    }
1286
1287    #[test]
1288    fn test_primitive_different_target_types() {
1289        let input = VariantArray::from_iter(vec![
1290            Variant::from(42i32),
1291            Variant::from(3.15f64),
1292            Variant::from("not_a_number"),
1293        ]);
1294
1295        // Test Int32 target
1296        let result_int32 = shred_variant(&input, &DataType::Int32).unwrap();
1297        let typed_value_int32 = result_int32
1298            .typed_value_field()
1299            .unwrap()
1300            .as_any()
1301            .downcast_ref::<arrow::array::Int32Array>()
1302            .unwrap();
1303        assert_eq!(typed_value_int32.value(0), 42);
1304        assert_eq!(typed_value_int32.value(1), 3);
1305        assert!(typed_value_int32.is_null(2)); // string doesn't convert to int32
1306
1307        // Test Float64 target
1308        let result_float64 = shred_variant(&input, &DataType::Float64).unwrap();
1309        let typed_value_float64 = result_float64
1310            .typed_value_field()
1311            .unwrap()
1312            .as_any()
1313            .downcast_ref::<Float64Array>()
1314            .unwrap();
1315        assert_eq!(typed_value_float64.value(0), 42.0); // int converts to float
1316        assert_eq!(typed_value_float64.value(1), 3.15);
1317        assert!(typed_value_float64.is_null(2)); // string doesn't convert
1318    }
1319
1320    #[test]
1321    fn test_largeutf8_shredding() {
1322        let input = VariantArray::from_iter(vec![
1323            Some(Variant::from("hello")),
1324            Some(Variant::from(42i64)),
1325            None,
1326            Some(Variant::Null),
1327            Some(Variant::from("world")),
1328        ]);
1329
1330        let result = shred_variant(&input, &DataType::LargeUtf8).unwrap();
1331        let metadata = result.metadata_field();
1332        let value = result.value_field().unwrap();
1333        let typed_value = result
1334            .typed_value_field()
1335            .unwrap()
1336            .as_any()
1337            .downcast_ref::<LargeStringArray>()
1338            .unwrap();
1339
1340        assert_eq!(result.len(), 5);
1341
1342        // Row 0: string shreds to typed_value
1343        assert!(result.is_valid(0));
1344        assert!(value.is_null(0));
1345        assert_eq!(typed_value.value(0), "hello");
1346
1347        // Row 1: integer falls back to value
1348        assert!(result.is_valid(1));
1349        assert!(value.is_valid(1));
1350        assert!(typed_value.is_null(1));
1351        assert_eq!(
1352            variant_from_arrays_at(metadata, value, 1).unwrap(),
1353            Variant::from(42i64)
1354        );
1355
1356        // Row 2: top-level null
1357        assert!(result.is_null(2));
1358        assert!(value.is_null(2));
1359        assert!(typed_value.is_null(2));
1360
1361        // Row 3: variant null falls back to value
1362        assert!(result.is_valid(3));
1363        assert!(value.is_valid(3));
1364        assert!(typed_value.is_null(3));
1365        assert_eq!(
1366            variant_from_arrays_at(metadata, value, 3).unwrap(),
1367            Variant::Null
1368        );
1369
1370        // Row 4: string shreds to typed_value
1371        assert!(result.is_valid(4));
1372        assert!(value.is_null(4));
1373        assert_eq!(typed_value.value(4), "world");
1374    }
1375
1376    #[test]
1377    fn test_largebinary_shredding() {
1378        let input = VariantArray::from_iter(vec![
1379            Some(Variant::from(&b"\x00\x01\x02"[..])),
1380            Some(Variant::from("not_binary")),
1381            None,
1382            Some(Variant::Null),
1383            Some(Variant::from(&b"\xff\xaa"[..])),
1384        ]);
1385
1386        let result = shred_variant(&input, &DataType::LargeBinary).unwrap();
1387        let metadata = result.metadata_field();
1388        let value = result.value_field().unwrap();
1389        let typed_value = result
1390            .typed_value_field()
1391            .unwrap()
1392            .as_any()
1393            .downcast_ref::<LargeBinaryArray>()
1394            .unwrap();
1395
1396        assert_eq!(result.len(), 5);
1397
1398        // Row 0: binary shreds to typed_value
1399        assert!(result.is_valid(0));
1400        assert!(value.is_null(0));
1401        assert_eq!(typed_value.value(0), &[0x00, 0x01, 0x02]);
1402
1403        // Row 1: string falls back to value
1404        assert!(result.is_valid(1));
1405        assert!(value.is_valid(1));
1406        assert!(typed_value.is_null(1));
1407        assert_eq!(
1408            variant_from_arrays_at(metadata, value, 1).unwrap(),
1409            Variant::from("not_binary")
1410        );
1411
1412        // Row 2: top-level null
1413        assert!(result.is_null(2));
1414        assert!(value.is_null(2));
1415        assert!(typed_value.is_null(2));
1416
1417        // Row 3: variant null falls back to value
1418        assert!(result.is_valid(3));
1419        assert!(value.is_valid(3));
1420        assert!(typed_value.is_null(3));
1421        assert_eq!(
1422            variant_from_arrays_at(metadata, value, 3).unwrap(),
1423            Variant::Null
1424        );
1425
1426        // Row 4: binary shreds to typed_value
1427        assert!(result.is_valid(4));
1428        assert!(value.is_null(4));
1429        assert_eq!(typed_value.value(4), &[0xff, 0xaa]);
1430    }
1431
1432    #[test]
1433    fn test_invalid_shredded_types_rejected() {
1434        let input = VariantArray::from_iter([Variant::from(42)]);
1435
1436        let invalid_types = vec![
1437            DataType::UInt8,
1438            DataType::Float16,
1439            DataType::Decimal256(38, 10),
1440            DataType::Date64,
1441            DataType::Time32(TimeUnit::Second),
1442            DataType::Time64(TimeUnit::Nanosecond),
1443            DataType::Timestamp(TimeUnit::Millisecond, None),
1444            DataType::FixedSizeBinary(17),
1445            DataType::Union(
1446                UnionFields::from_fields(vec![
1447                    Field::new("int_field", DataType::Int32, false),
1448                    Field::new("str_field", DataType::Utf8, true),
1449                ]),
1450                UnionMode::Dense,
1451            ),
1452            DataType::Map(
1453                Arc::new(Field::new(
1454                    "entries",
1455                    DataType::Struct(Fields::from(vec![
1456                        Field::new("key", DataType::Utf8, false),
1457                        Field::new("value", DataType::Int32, true),
1458                    ])),
1459                    false,
1460                )),
1461                false,
1462            ),
1463            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1464            DataType::RunEndEncoded(
1465                Arc::new(Field::new("run_ends", DataType::Int32, false)),
1466                Arc::new(Field::new("values", DataType::Utf8, true)),
1467            ),
1468        ];
1469
1470        for data_type in invalid_types {
1471            let err = shred_variant(&input, &data_type).unwrap_err();
1472            assert!(
1473                matches!(err, ArrowError::InvalidArgumentError(_)),
1474                "expected InvalidArgumentError for {:?}, got {:?}",
1475                data_type,
1476                err
1477            );
1478        }
1479    }
1480
1481    #[test]
1482    fn test_array_shredding_as_list() {
1483        let input = build_variant_array(vec![
1484            // Row 0: List of ints should shred entirely into typed_value
1485            VariantRow::List(vec![
1486                VariantValue::from(1i64),
1487                VariantValue::from(2i64),
1488                VariantValue::from(3i64),
1489            ]),
1490            // Row 1: Contains incompatible types so values fall back
1491            VariantRow::List(vec![
1492                VariantValue::from(1i64),
1493                VariantValue::from("two"),
1494                VariantValue::from(Variant::Null),
1495            ]),
1496            // Row 2: Not a list -> entire row falls back
1497            VariantRow::Value(VariantValue::from("not a list")),
1498            // Row 3: Array-level null propagates
1499            VariantRow::Null,
1500            // Row 4: Empty list exercises zero-length offsets
1501            VariantRow::List(vec![]),
1502        ]);
1503        let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
1504        let result = shred_variant(&input, &list_schema).unwrap();
1505        assert_eq!(result.len(), 5);
1506
1507        assert_list_structure_and_elements::<Int64Type, i32>(
1508            &result,
1509            5,
1510            &[0, 3, 6, 6, 6, 6],
1511            &[Some(3), Some(3), None, None, Some(0)],
1512            &[None, None, Some(Variant::from("not a list")), None, None],
1513            (
1514                &[Some(1), Some(2), Some(3), Some(1), None, None],
1515                &[
1516                    None,
1517                    None,
1518                    None,
1519                    None,
1520                    Some(Variant::from("two")),
1521                    Some(Variant::Null),
1522                ],
1523            ),
1524        );
1525    }
1526
1527    #[test]
1528    fn test_array_shredding_as_large_list() {
1529        let input = build_variant_array(vec![
1530            // Row 0: List of ints shreds to typed_value
1531            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1532            // Row 1: Not a list -> entire row falls back
1533            VariantRow::Value(VariantValue::from("not a list")),
1534            // Row 2: Empty list
1535            VariantRow::List(vec![]),
1536        ]);
1537        let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true)));
1538        let result = shred_variant(&input, &list_schema).unwrap();
1539        assert_eq!(result.len(), 3);
1540
1541        assert_list_structure_and_elements::<Int64Type, i64>(
1542            &result,
1543            3,
1544            &[0, 2, 2, 2],
1545            &[Some(2), None, Some(0)],
1546            &[None, Some(Variant::from("not a list")), None],
1547            (&[Some(1), Some(2)], &[None, None]),
1548        );
1549    }
1550
1551    #[test]
1552    fn test_array_shredding_as_list_view() {
1553        let input = build_variant_array(vec![
1554            // Row 0: Standard list
1555            VariantRow::List(vec![
1556                VariantValue::from(1i64),
1557                VariantValue::from(2i64),
1558                VariantValue::from(3i64),
1559            ]),
1560            // Row 1: List with incompatible types -> element fallback
1561            VariantRow::List(vec![
1562                VariantValue::from(1i64),
1563                VariantValue::from("two"),
1564                VariantValue::from(Variant::Null),
1565            ]),
1566            // Row 2: Not a list -> top-level fallback
1567            VariantRow::Value(VariantValue::from("not a list")),
1568            // Row 3: Top-level Null
1569            VariantRow::Null,
1570            // Row 4: Empty list
1571            VariantRow::List(vec![]),
1572        ]);
1573        let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Int64, true)));
1574        let result = shred_variant(&input, &list_schema).unwrap();
1575        assert_eq!(result.len(), 5);
1576
1577        assert_list_structure_and_elements::<Int64Type, i32>(
1578            &result,
1579            5,
1580            &[0, 3, 6, 6, 6],
1581            &[Some(3), Some(3), None, None, Some(0)],
1582            &[None, None, Some(Variant::from("not a list")), None, None],
1583            (
1584                &[Some(1), Some(2), Some(3), Some(1), None, None],
1585                &[
1586                    None,
1587                    None,
1588                    None,
1589                    None,
1590                    Some(Variant::from("two")),
1591                    Some(Variant::Null),
1592                ],
1593            ),
1594        );
1595    }
1596
1597    #[test]
1598    fn test_array_shredding_as_large_list_view() {
1599        let input = build_variant_array(vec![
1600            // Row 0: List of ints shreds to typed_value
1601            VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1602            // Row 1: Not a list -> entire row falls back
1603            VariantRow::Value(VariantValue::from("fallback")),
1604            // Row 2: Empty list
1605            VariantRow::List(vec![]),
1606        ]);
1607        let list_schema =
1608            DataType::LargeListView(Arc::new(Field::new("item", DataType::Int64, true)));
1609        let result = shred_variant(&input, &list_schema).unwrap();
1610        assert_eq!(result.len(), 3);
1611
1612        assert_list_structure_and_elements::<Int64Type, i64>(
1613            &result,
1614            3,
1615            &[0, 2, 2],
1616            &[Some(2), None, Some(0)],
1617            &[None, Some(Variant::from("fallback")), None],
1618            (&[Some(1), Some(2)], &[None, None]),
1619        );
1620    }
1621
1622    #[test]
1623    fn test_array_shredding_as_fixed_size_list() {
1624        let input = build_variant_array(vec![VariantRow::List(vec![
1625            VariantValue::from(1i64),
1626            VariantValue::from(2i64),
1627            VariantValue::from(3i64),
1628        ])]);
1629        let list_schema =
1630            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2);
1631        let err = shred_variant(&input, &list_schema).unwrap_err();
1632        assert_eq!(
1633            err.to_string(),
1634            "Not yet implemented: Converting unshredded variant arrays to arrow fixed-size lists"
1635        );
1636    }
1637
1638    #[test]
1639    fn test_array_shredding_with_array_elements() {
1640        let input = build_variant_array(vec![
1641            // Row 0: [[1, 2], [3, 4], []] - clean nested lists
1642            VariantRow::List(vec![
1643                VariantValue::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]),
1644                VariantValue::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]),
1645                VariantValue::List(vec![]),
1646            ]),
1647            // Row 1: [[5, "bad", null], "not a list inner", null] - inner fallbacks
1648            VariantRow::List(vec![
1649                VariantValue::List(vec![
1650                    VariantValue::from(5i64),
1651                    VariantValue::from("bad"),
1652                    VariantValue::from(Variant::Null),
1653                ]),
1654                VariantValue::from("not a list inner"),
1655                VariantValue::Null,
1656            ]),
1657            // Row 2: "not a list" - top-level fallback
1658            VariantRow::Value(VariantValue::from("not a list")),
1659            // Row 3: null row
1660            VariantRow::Null,
1661        ]);
1662        let inner_field = Arc::new(Field::new("item", DataType::Int64, true));
1663        let inner_list_schema = DataType::List(inner_field);
1664        let list_schema = DataType::List(Arc::new(Field::new(
1665            "item",
1666            inner_list_schema.clone(),
1667            true,
1668        )));
1669        let result = shred_variant(&input, &list_schema).unwrap();
1670        assert_eq!(result.len(), 4);
1671
1672        let typed_value = result
1673            .typed_value_field()
1674            .unwrap()
1675            .as_any()
1676            .downcast_ref::<ListArray>()
1677            .unwrap();
1678
1679        assert_list_structure::<i32>(
1680            &result,
1681            4,
1682            &[0, 3, 6, 6, 6],
1683            &[Some(3), Some(3), None, None],
1684            &[None, None, Some(Variant::from("not a list")), None],
1685        );
1686
1687        let outer_elements =
1688            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1689        assert_eq!(outer_elements.len(), 6);
1690        let outer_values = outer_elements
1691            .typed_value_field()
1692            .unwrap()
1693            .as_any()
1694            .downcast_ref::<ListArray>()
1695            .unwrap();
1696        let outer_fallbacks = outer_elements.value_field().unwrap();
1697
1698        let outer_metadata = Arc::new(BinaryViewArray::from_iter_values(std::iter::repeat_n(
1699            EMPTY_VARIANT_METADATA_BYTES,
1700            outer_elements.len(),
1701        )));
1702        let outer_variant = VariantArray::from_parts(
1703            outer_metadata,
1704            Some(outer_fallbacks.clone()),
1705            Some(Arc::new(outer_values.clone())),
1706            None,
1707        );
1708
1709        assert_list_structure_and_elements::<Int64Type, i32>(
1710            &outer_variant,
1711            outer_elements.len(),
1712            &[0, 2, 4, 4, 7, 7, 7],
1713            &[Some(2), Some(2), Some(0), Some(3), None, None],
1714            &[
1715                None,
1716                None,
1717                None,
1718                None,
1719                Some(Variant::from("not a list inner")),
1720                Some(Variant::Null),
1721            ],
1722            (
1723                &[Some(1), Some(2), Some(3), Some(4), Some(5), None, None],
1724                &[
1725                    None,
1726                    None,
1727                    None,
1728                    None,
1729                    None,
1730                    Some(Variant::from("bad")),
1731                    Some(Variant::Null),
1732                ],
1733            ),
1734        );
1735    }
1736
1737    #[test]
1738    fn test_array_shredding_with_object_elements() {
1739        let input = build_variant_array(vec![
1740            // Row 0: [{"id": 1, "name": "Alice"}, {"id": null}] fully shards
1741            VariantRow::List(vec![
1742                VariantValue::Object(vec![
1743                    ("id", VariantValue::from(1i64)),
1744                    ("name", VariantValue::from("Alice")),
1745                ]),
1746                VariantValue::Object(vec![("id", VariantValue::from(Variant::Null))]),
1747            ]),
1748            // Row 1: "not a list" -> fallback
1749            VariantRow::Value(VariantValue::from("not a list")),
1750            // Row 2: Null row
1751            VariantRow::Null,
1752        ]);
1753
1754        // Target schema is List<Struct<id:int64,name:utf8>>
1755        let object_fields = Fields::from(vec![
1756            Field::new("id", DataType::Int64, true),
1757            Field::new("name", DataType::Utf8, true),
1758        ]);
1759        let list_schema = DataType::List(Arc::new(Field::new(
1760            "item",
1761            DataType::Struct(object_fields),
1762            true,
1763        )));
1764        let result = shred_variant(&input, &list_schema).unwrap();
1765        assert_eq!(result.len(), 3);
1766
1767        assert_list_structure::<i32>(
1768            &result,
1769            3,
1770            &[0, 2, 2, 2],
1771            &[Some(2), None, None],
1772            &[None, Some(Variant::from("not a list")), None],
1773        );
1774
1775        // Validate nested struct fields for each element
1776        let typed_value = result
1777            .typed_value_field()
1778            .unwrap()
1779            .as_any()
1780            .downcast_ref::<ListArray>()
1781            .unwrap();
1782        let element_array =
1783            ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap();
1784        assert_eq!(element_array.len(), 2);
1785        let element_objects = element_array
1786            .typed_value_field()
1787            .unwrap()
1788            .as_any()
1789            .downcast_ref::<arrow::array::StructArray>()
1790            .unwrap();
1791
1792        // Id field [1, Variant::Null]
1793        let id_field =
1794            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("id").unwrap())
1795                .unwrap();
1796        let id_values = id_field.value_field().unwrap();
1797        let id_typed_values = id_field
1798            .typed_value_field()
1799            .unwrap()
1800            .as_any()
1801            .downcast_ref::<Int64Array>()
1802            .unwrap();
1803        assert!(id_values.is_null(0));
1804        assert_eq!(id_typed_values.value(0), 1);
1805        // null is stored as Variant::Null in values
1806        assert!(id_values.is_valid(1));
1807        assert_eq!(
1808            Variant::new(
1809                EMPTY_VARIANT_METADATA_BYTES,
1810                binary_array_value(id_values.as_ref(), 1).unwrap()
1811            ),
1812            Variant::Null
1813        );
1814        assert!(id_typed_values.is_null(1));
1815
1816        // Name field ["Alice", null]
1817        let name_field =
1818            ShreddedVariantFieldArray::try_new(element_objects.column_by_name("name").unwrap())
1819                .unwrap();
1820        let name_values = name_field.value_field().unwrap();
1821        let name_typed_values = name_field
1822            .typed_value_field()
1823            .unwrap()
1824            .as_any()
1825            .downcast_ref::<StringArray>()
1826            .unwrap();
1827        assert!(name_values.is_null(0));
1828        assert_eq!(name_typed_values.value(0), "Alice");
1829        // No value provided, both value and typed_value are null
1830        assert!(name_values.is_null(1));
1831        assert!(name_typed_values.is_null(1));
1832    }
1833
1834    #[test]
1835    fn test_object_shredding_comprehensive() -> Result<()> {
1836        let input = build_variant_array(vec![
1837            // Row 0: Fully shredded object
1838            VariantRow::Object(vec![
1839                ("score", VariantValue::from(95.5f64)),
1840                ("age", VariantValue::from(30i64)),
1841            ]),
1842            // Row 1: Partially shredded object (extra email field)
1843            VariantRow::Object(vec![
1844                ("score", VariantValue::from(87.2f64)),
1845                ("age", VariantValue::from(25i64)),
1846                ("email", VariantValue::from("bob@example.com")),
1847            ]),
1848            // Row 2: Missing field (no score)
1849            VariantRow::Object(vec![("age", VariantValue::from(35i64))]),
1850            // Row 3: Type mismatch (score is string, age is string)
1851            VariantRow::Object(vec![
1852                ("score", VariantValue::from("ninety-five")),
1853                ("age", VariantValue::from("thirty")),
1854            ]),
1855            // Row 4: Non-object
1856            VariantRow::Value(VariantValue::from("not an object")),
1857            // Row 5: Empty object
1858            VariantRow::Object(vec![]),
1859            // Row 6: Null
1860            VariantRow::Null,
1861            // Row 7: Object with only "wrong" fields
1862            VariantRow::Object(vec![("foo", VariantValue::from(10))]),
1863            // Row 8: Object with one "right" and one "wrong" field
1864            VariantRow::Object(vec![
1865                ("score", VariantValue::from(66.67f64)),
1866                ("foo", VariantValue::from(10)),
1867            ]),
1868        ]);
1869
1870        // Create target schema: struct<score: float64, age: int64>
1871        // Both types are supported for shredding
1872        let target_schema = ShreddedSchemaBuilder::default()
1873            .with_path("score", &DataType::Float64)?
1874            .with_path("age", &DataType::Int64)?
1875            .build();
1876
1877        let result = shred_variant(&input, &target_schema).unwrap();
1878
1879        // Verify structure
1880        assert!(result.value_field().is_some());
1881        assert!(result.typed_value_field().is_some());
1882        assert_eq!(result.len(), 9);
1883
1884        let metadata = result.metadata_field();
1885        let value = result.value_field().unwrap();
1886        let typed_value = result
1887            .typed_value_field()
1888            .unwrap()
1889            .as_any()
1890            .downcast_ref::<arrow::array::StructArray>()
1891            .unwrap();
1892
1893        // Extract score and age fields from typed_value struct
1894        let score_field =
1895            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("score").unwrap())
1896                .unwrap();
1897        let age_field =
1898            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("age").unwrap()).unwrap();
1899
1900        let score_value = score_field.value_field().unwrap();
1901        let score_typed_value = score_field
1902            .typed_value_field()
1903            .unwrap()
1904            .as_any()
1905            .downcast_ref::<Float64Array>()
1906            .unwrap();
1907        let age_value = age_field.value_field().unwrap();
1908        let age_typed_value = age_field
1909            .typed_value_field()
1910            .unwrap()
1911            .as_any()
1912            .downcast_ref::<Int64Array>()
1913            .unwrap();
1914
1915        // Set up exhaustive checking of all shredded columns and their nulls/values
1916        struct ShreddedValue<'m, 'v, T> {
1917            value: Option<Variant<'m, 'v>>,
1918            typed_value: Option<T>,
1919        }
1920        struct ShreddedStruct<'m, 'v> {
1921            score: ShreddedValue<'m, 'v, f64>,
1922            age: ShreddedValue<'m, 'v, i64>,
1923        }
1924        fn get_value<'m, 'v>(
1925            i: usize,
1926            metadata: &'m dyn Array,
1927            value: &'v dyn Array,
1928        ) -> Variant<'m, 'v> {
1929            variant_from_arrays_at(metadata, value, i).unwrap()
1930        }
1931        let expect = |i, expected_result: Option<ShreddedValue<ShreddedStruct>>| {
1932            match expected_result {
1933                Some(ShreddedValue {
1934                    value: expected_value,
1935                    typed_value: expected_typed_value,
1936                }) => {
1937                    assert!(result.is_valid(i));
1938                    match expected_value {
1939                        Some(expected_value) => {
1940                            assert!(value.is_valid(i));
1941                            assert_eq!(
1942                                expected_value,
1943                                get_value(i, metadata.as_ref(), value.as_ref())
1944                            );
1945                        }
1946                        None => {
1947                            assert!(value.is_null(i));
1948                        }
1949                    }
1950                    match expected_typed_value {
1951                        Some(ShreddedStruct {
1952                            score: expected_score,
1953                            age: expected_age,
1954                        }) => {
1955                            assert!(typed_value.is_valid(i));
1956                            assert!(score_field.is_valid(i)); // non-nullable
1957                            assert!(age_field.is_valid(i)); // non-nullable
1958                            match expected_score.value {
1959                                Some(expected_score_value) => {
1960                                    assert!(score_value.is_valid(i));
1961                                    assert_eq!(
1962                                        expected_score_value,
1963                                        get_value(i, metadata.as_ref(), score_value.as_ref())
1964                                    );
1965                                }
1966                                None => {
1967                                    assert!(score_value.is_null(i));
1968                                }
1969                            }
1970                            match expected_score.typed_value {
1971                                Some(expected_score) => {
1972                                    assert!(score_typed_value.is_valid(i));
1973                                    assert_eq!(expected_score, score_typed_value.value(i));
1974                                }
1975                                None => {
1976                                    assert!(score_typed_value.is_null(i));
1977                                }
1978                            }
1979                            match expected_age.value {
1980                                Some(expected_age_value) => {
1981                                    assert!(age_value.is_valid(i));
1982                                    assert_eq!(
1983                                        expected_age_value,
1984                                        get_value(i, metadata.as_ref(), age_value.as_ref())
1985                                    );
1986                                }
1987                                None => {
1988                                    assert!(age_value.is_null(i));
1989                                }
1990                            }
1991                            match expected_age.typed_value {
1992                                Some(expected_age) => {
1993                                    assert!(age_typed_value.is_valid(i));
1994                                    assert_eq!(expected_age, age_typed_value.value(i));
1995                                }
1996                                None => {
1997                                    assert!(age_typed_value.is_null(i));
1998                                }
1999                            }
2000                        }
2001                        None => {
2002                            assert!(typed_value.is_null(i));
2003                        }
2004                    }
2005                }
2006                None => {
2007                    assert!(result.is_null(i));
2008                }
2009            };
2010        };
2011
2012        // Row 0: Fully shredded - both fields shred successfully
2013        expect(
2014            0,
2015            Some(ShreddedValue {
2016                value: None,
2017                typed_value: Some(ShreddedStruct {
2018                    score: ShreddedValue {
2019                        value: None,
2020                        typed_value: Some(95.5),
2021                    },
2022                    age: ShreddedValue {
2023                        value: None,
2024                        typed_value: Some(30),
2025                    },
2026                }),
2027            }),
2028        );
2029
2030        // Row 1: Partially shredded - value contains extra email field
2031        let mut builder = VariantBuilder::new();
2032        builder
2033            .new_object()
2034            .with_field("email", "bob@example.com")
2035            .finish();
2036        let (m, v) = builder.finish();
2037        let expected_value = Variant::new(&m, &v);
2038
2039        expect(
2040            1,
2041            Some(ShreddedValue {
2042                value: Some(expected_value),
2043                typed_value: Some(ShreddedStruct {
2044                    score: ShreddedValue {
2045                        value: None,
2046                        typed_value: Some(87.2),
2047                    },
2048                    age: ShreddedValue {
2049                        value: None,
2050                        typed_value: Some(25),
2051                    },
2052                }),
2053            }),
2054        );
2055
2056        // Row 2: Fully shredded -- missing score field
2057        expect(
2058            2,
2059            Some(ShreddedValue {
2060                value: None,
2061                typed_value: Some(ShreddedStruct {
2062                    score: ShreddedValue {
2063                        value: None,
2064                        typed_value: None,
2065                    },
2066                    age: ShreddedValue {
2067                        value: None,
2068                        typed_value: Some(35),
2069                    },
2070                }),
2071            }),
2072        );
2073
2074        // Row 3: Type mismatches - both score and age are strings
2075        expect(
2076            3,
2077            Some(ShreddedValue {
2078                value: None,
2079                typed_value: Some(ShreddedStruct {
2080                    score: ShreddedValue {
2081                        value: Some(Variant::from("ninety-five")),
2082                        typed_value: None,
2083                    },
2084                    age: ShreddedValue {
2085                        value: Some(Variant::from("thirty")),
2086                        typed_value: None,
2087                    },
2088                }),
2089            }),
2090        );
2091
2092        // Row 4: Non-object - falls back to value field
2093        expect(
2094            4,
2095            Some(ShreddedValue {
2096                value: Some(Variant::from("not an object")),
2097                typed_value: None,
2098            }),
2099        );
2100
2101        // Row 5: Empty object
2102        expect(
2103            5,
2104            Some(ShreddedValue {
2105                value: None,
2106                typed_value: Some(ShreddedStruct {
2107                    score: ShreddedValue {
2108                        value: None,
2109                        typed_value: None,
2110                    },
2111                    age: ShreddedValue {
2112                        value: None,
2113                        typed_value: None,
2114                    },
2115                }),
2116            }),
2117        );
2118
2119        // Row 6: Null
2120        expect(6, None);
2121
2122        // Helper to correctly create a variant object using a row's existing metadata
2123        let object_with_foo_field = |i| {
2124            use parquet_variant::{ParentState, ValueBuilder, VariantMetadata};
2125            let metadata = VariantMetadata::new(binary_array_value(metadata.as_ref(), i).unwrap());
2126            let mut metadata_builder = ReadOnlyMetadataBuilder::new(&metadata);
2127            let mut value_builder = ValueBuilder::new();
2128            let state = ParentState::variant(&mut value_builder, &mut metadata_builder);
2129            ObjectBuilder::new(state, false)
2130                .with_field("foo", 10)
2131                .finish();
2132            (metadata, value_builder.into_inner())
2133        };
2134
2135        // Row 7: Object with only a "wrong" field
2136        let (m, v) = object_with_foo_field(7);
2137        expect(
2138            7,
2139            Some(ShreddedValue {
2140                value: Some(Variant::new_with_metadata(m, &v)),
2141                typed_value: Some(ShreddedStruct {
2142                    score: ShreddedValue {
2143                        value: None,
2144                        typed_value: None,
2145                    },
2146                    age: ShreddedValue {
2147                        value: None,
2148                        typed_value: None,
2149                    },
2150                }),
2151            }),
2152        );
2153
2154        // Row 8: Object with one "wrong" and one "right" field
2155        let (m, v) = object_with_foo_field(8);
2156        expect(
2157            8,
2158            Some(ShreddedValue {
2159                value: Some(Variant::new_with_metadata(m, &v)),
2160                typed_value: Some(ShreddedStruct {
2161                    score: ShreddedValue {
2162                        value: None,
2163                        typed_value: Some(66.67),
2164                    },
2165                    age: ShreddedValue {
2166                        value: None,
2167                        typed_value: None,
2168                    },
2169                }),
2170            }),
2171        );
2172        Ok(())
2173    }
2174
2175    #[test]
2176    fn test_object_shredding_with_array_field() {
2177        let input = build_variant_array(vec![
2178            // Row 0: Object with well-typed scores list
2179            VariantRow::Object(vec![(
2180                "scores",
2181                VariantValue::List(vec![VariantValue::from(10i64), VariantValue::from(20i64)]),
2182            )]),
2183            // Row 1: Object whose scores list contains incompatible type
2184            VariantRow::Object(vec![(
2185                "scores",
2186                VariantValue::List(vec![
2187                    VariantValue::from("oops"),
2188                    VariantValue::from(Variant::Null),
2189                ]),
2190            )]),
2191            // Row 2: Object missing the scores field entirely
2192            VariantRow::Object(vec![]),
2193            // Row 3: Non-object fallback
2194            VariantRow::Value(VariantValue::from("not an object")),
2195            // Row 4: Top-level Null
2196            VariantRow::Null,
2197        ]);
2198        let list_field = Arc::new(Field::new("item", DataType::Int64, true));
2199        let inner_list_schema = DataType::List(list_field);
2200        let schema = DataType::Struct(Fields::from(vec![Field::new(
2201            "scores",
2202            inner_list_schema.clone(),
2203            true,
2204        )]));
2205
2206        let result = shred_variant(&input, &schema).unwrap();
2207        assert_eq!(result.len(), 5);
2208
2209        // Access base value/typed_value columns
2210        let value_field = result.value_field().unwrap();
2211        let typed_struct = result
2212            .typed_value_field()
2213            .unwrap()
2214            .as_any()
2215            .downcast_ref::<arrow::array::StructArray>()
2216            .unwrap();
2217
2218        // Validate base value fallbacks for non-object rows
2219        assert!(value_field.is_null(0));
2220        assert!(value_field.is_null(1));
2221        assert!(value_field.is_null(2));
2222        assert!(value_field.is_valid(3));
2223        assert_eq!(
2224            variant_from_arrays_at(result.metadata_field(), value_field, 3).unwrap(),
2225            Variant::from("not an object")
2226        );
2227        assert!(value_field.is_null(4));
2228
2229        // Typed struct should only be null for the fallback row
2230        assert!(typed_struct.is_valid(0));
2231        assert!(typed_struct.is_valid(1));
2232        assert!(typed_struct.is_valid(2));
2233        assert!(typed_struct.is_null(3));
2234        assert!(typed_struct.is_null(4));
2235
2236        // Drill into the scores field on the typed struct
2237        let scores_field =
2238            ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("scores").unwrap())
2239                .unwrap();
2240        assert_list_structure_and_elements::<Int64Type, i32>(
2241            &VariantArray::from_parts(
2242                Arc::new(BinaryViewArray::from_iter_values(std::iter::repeat_n(
2243                    EMPTY_VARIANT_METADATA_BYTES,
2244                    scores_field.len(),
2245                ))),
2246                Some(scores_field.value_field().unwrap().clone()),
2247                Some(scores_field.typed_value_field().unwrap().clone()),
2248                None,
2249            ),
2250            scores_field.len(),
2251            &[0i32, 2, 4, 4, 4, 4],
2252            &[Some(2), Some(2), None, None, None],
2253            &[None, None, None, None, None],
2254            (
2255                &[Some(10), Some(20), None, None],
2256                &[None, None, Some(Variant::from("oops")), Some(Variant::Null)],
2257            ),
2258        );
2259    }
2260
2261    #[test]
2262    fn test_object_different_schemas() -> Result<()> {
2263        // Create object with multiple fields
2264        let input = build_variant_array(vec![VariantRow::Object(vec![
2265            ("id", VariantValue::from(123i32)),
2266            ("age", VariantValue::from(25i64)),
2267            ("score", VariantValue::from(95.5f64)),
2268        ])]);
2269
2270        // Test with schema containing only id field
2271        let schema1 = ShreddedSchemaBuilder::default()
2272            .with_path("id", &DataType::Int32)?
2273            .build();
2274        let result1 = shred_variant(&input, &schema1).unwrap();
2275        let value_field1 = result1.value_field().unwrap();
2276        assert!(!value_field1.is_null(0)); // should contain {"age": 25, "score": 95.5}
2277
2278        // Test with schema containing id and age fields
2279        let schema2 = ShreddedSchemaBuilder::default()
2280            .with_path("id", &DataType::Int32)?
2281            .with_path("age", &DataType::Int64)?
2282            .build();
2283        let result2 = shred_variant(&input, &schema2).unwrap();
2284        let value_field2 = result2.value_field().unwrap();
2285        assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
2286
2287        // Test with schema containing all fields
2288        let schema3 = ShreddedSchemaBuilder::default()
2289            .with_path("id", &DataType::Int32)?
2290            .with_path("age", &DataType::Int64)?
2291            .with_path("score", &DataType::Float64)?
2292            .build();
2293        let result3 = shred_variant(&input, &schema3).unwrap();
2294        let value_field3 = result3.value_field().unwrap();
2295        assert!(value_field3.is_null(0)); // fully shredded, no remaining fields
2296
2297        Ok(())
2298    }
2299
2300    #[test]
2301    fn test_uuid_shredding_in_objects() -> Result<()> {
2302        let mock_uuid_1 = Uuid::new_v4();
2303        let mock_uuid_2 = Uuid::new_v4();
2304        let mock_uuid_3 = Uuid::new_v4();
2305
2306        let input = build_variant_array(vec![
2307            // Row 0: Fully shredded object with both UUID fields
2308            VariantRow::Object(vec![
2309                ("id", VariantValue::from(mock_uuid_1)),
2310                ("session_id", VariantValue::from(mock_uuid_2)),
2311            ]),
2312            // Row 1: Partially shredded object - UUID fields plus extra field
2313            VariantRow::Object(vec![
2314                ("id", VariantValue::from(mock_uuid_2)),
2315                ("session_id", VariantValue::from(mock_uuid_3)),
2316                ("name", VariantValue::from("test_user")),
2317            ]),
2318            // Row 2: Missing UUID field (no session_id)
2319            VariantRow::Object(vec![("id", VariantValue::from(mock_uuid_1))]),
2320            // Row 3: Type mismatch - id is UUID but session_id is a string
2321            VariantRow::Object(vec![
2322                ("id", VariantValue::from(mock_uuid_3)),
2323                ("session_id", VariantValue::from("not-a-uuid")),
2324            ]),
2325            // Row 4: Object with non-UUID value in id field
2326            VariantRow::Object(vec![
2327                ("id", VariantValue::from(12345i64)),
2328                ("session_id", VariantValue::from(mock_uuid_1)),
2329            ]),
2330            // Row 5: Null
2331            VariantRow::Null,
2332        ]);
2333
2334        let target_schema = ShreddedSchemaBuilder::default()
2335            .with_path("id", DataType::FixedSizeBinary(16))?
2336            .with_path("session_id", DataType::FixedSizeBinary(16))?
2337            .build();
2338
2339        let result = shred_variant(&input, &target_schema).unwrap();
2340
2341        assert!(result.value_field().is_some());
2342        assert!(result.typed_value_field().is_some());
2343        assert_eq!(result.len(), 6);
2344
2345        let metadata = result.metadata_field();
2346        let value = result.value_field().unwrap();
2347        let typed_value = result
2348            .typed_value_field()
2349            .unwrap()
2350            .as_any()
2351            .downcast_ref::<arrow::array::StructArray>()
2352            .unwrap();
2353
2354        // Extract id and session_id fields from typed_value struct
2355        let id_field =
2356            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("id").unwrap()).unwrap();
2357        let session_id_field =
2358            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("session_id").unwrap())
2359                .unwrap();
2360
2361        let id_value = id_field.value_field().unwrap();
2362        let id_typed_value = id_field
2363            .typed_value_field()
2364            .unwrap()
2365            .as_any()
2366            .downcast_ref::<FixedSizeBinaryArray>()
2367            .unwrap();
2368        let session_id_value = session_id_field.value_field().unwrap();
2369        let session_id_typed_value = session_id_field
2370            .typed_value_field()
2371            .unwrap()
2372            .as_any()
2373            .downcast_ref::<FixedSizeBinaryArray>()
2374            .unwrap();
2375
2376        // Row 0: Fully shredded - both UUID fields shred successfully
2377        assert!(result.is_valid(0));
2378
2379        assert!(value.is_null(0)); // fully shredded, no remaining fields
2380        assert!(id_value.is_null(0));
2381        assert!(session_id_value.is_null(0));
2382
2383        assert!(typed_value.is_valid(0));
2384        assert!(id_typed_value.is_valid(0));
2385        assert!(session_id_typed_value.is_valid(0));
2386
2387        assert_eq!(id_typed_value.value(0), mock_uuid_1.as_bytes());
2388        assert_eq!(session_id_typed_value.value(0), mock_uuid_2.as_bytes());
2389
2390        // Row 1: Partially shredded - value contains extra name field
2391        assert!(result.is_valid(1));
2392
2393        assert!(value.is_valid(1)); // contains unshredded "name" field
2394        assert!(typed_value.is_valid(1));
2395
2396        assert!(id_value.is_null(1));
2397        assert!(id_typed_value.is_valid(1));
2398        assert_eq!(id_typed_value.value(1), mock_uuid_2.as_bytes());
2399
2400        assert!(session_id_value.is_null(1));
2401        assert!(session_id_typed_value.is_valid(1));
2402        assert_eq!(session_id_typed_value.value(1), mock_uuid_3.as_bytes());
2403
2404        // Verify the value field contains the name field
2405        let row_1_variant = variant_from_arrays_at(metadata, value, 1).unwrap();
2406        let Variant::Object(obj) = row_1_variant else {
2407            panic!("Expected object");
2408        };
2409
2410        assert_eq!(obj.get("name"), Some(Variant::from("test_user")));
2411
2412        // Row 2: Missing session_id field
2413        assert!(result.is_valid(2));
2414
2415        assert!(value.is_null(2)); // fully shredded, no extra fields
2416        assert!(typed_value.is_valid(2));
2417
2418        assert!(id_value.is_null(2));
2419        assert!(id_typed_value.is_valid(2));
2420        assert_eq!(id_typed_value.value(2), mock_uuid_1.as_bytes());
2421
2422        assert!(session_id_value.is_null(2));
2423        assert!(session_id_typed_value.is_null(2)); // missing field
2424
2425        // Row 3: Type mismatch - session_id is a string, not UUID
2426        assert!(result.is_valid(3));
2427
2428        assert!(value.is_null(3)); // no extra fields
2429        assert!(typed_value.is_valid(3));
2430
2431        assert!(id_value.is_null(3));
2432        assert!(id_typed_value.is_valid(3));
2433        assert_eq!(id_typed_value.value(3), mock_uuid_3.as_bytes());
2434
2435        assert!(session_id_value.is_valid(3)); // type mismatch, stored in value
2436        assert!(session_id_typed_value.is_null(3));
2437        let session_id_variant = variant_from_arrays_at(metadata, session_id_value, 3).unwrap();
2438        assert_eq!(session_id_variant, Variant::from("not-a-uuid"));
2439
2440        // Row 4: Type mismatch - id is int64, not UUID
2441        assert!(result.is_valid(4));
2442
2443        assert!(value.is_null(4)); // no extra fields
2444        assert!(typed_value.is_valid(4));
2445
2446        assert!(id_value.is_valid(4)); // type mismatch, stored in value
2447        assert!(id_typed_value.is_null(4));
2448        let id_variant = variant_from_arrays_at(metadata, id_value, 4).unwrap();
2449        assert_eq!(id_variant, Variant::from(12345i64));
2450
2451        assert!(session_id_value.is_null(4));
2452        assert!(session_id_typed_value.is_valid(4));
2453        assert_eq!(session_id_typed_value.value(4), mock_uuid_1.as_bytes());
2454
2455        // Row 5: Null
2456        assert!(result.is_null(5));
2457
2458        Ok(())
2459    }
2460
2461    #[test]
2462    fn test_spec_compliance() {
2463        let input = VariantArray::from_iter(vec![Variant::from(42i64), Variant::from("hello")]);
2464
2465        let result = shred_variant(&input, &DataType::Int64).unwrap();
2466
2467        // Test field access by name (not position)
2468        let inner_struct = result.inner();
2469        assert!(inner_struct.column_by_name("metadata").is_some());
2470        assert!(inner_struct.column_by_name("value").is_some());
2471        assert!(inner_struct.column_by_name("typed_value").is_some());
2472
2473        // Test metadata preservation
2474        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2475        // The metadata should be the same reference (cheap clone)
2476        // Note: BinaryViewArray doesn't have a .values() method, so we compare the arrays directly
2477        assert_eq!(result.metadata_field().len(), input.metadata_field().len());
2478
2479        // Test output structure correctness
2480        assert_eq!(result.len(), input.len());
2481        assert!(result.value_field().is_some());
2482        assert!(result.typed_value_field().is_some());
2483
2484        // For primitive shredding, verify that value and typed_value are never both non-null
2485        // (This rule applies to primitives; for objects, both can be non-null for partial shredding)
2486        let value_field = result.value_field().unwrap();
2487        let typed_value_field = result
2488            .typed_value_field()
2489            .unwrap()
2490            .as_any()
2491            .downcast_ref::<Int64Array>()
2492            .unwrap();
2493
2494        for i in 0..result.len() {
2495            if !result.is_null(i) {
2496                let value_is_null = value_field.is_null(i);
2497                let typed_value_is_null = typed_value_field.is_null(i);
2498                // For primitive shredding, at least one should be null
2499                assert!(
2500                    value_is_null || typed_value_is_null,
2501                    "Row {}: both value and typed_value are non-null for primitive shredding",
2502                    i
2503                );
2504            }
2505        }
2506    }
2507
2508    #[test]
2509    fn test_variant_schema_builder_simple() -> Result<()> {
2510        let shredding_type = ShreddedSchemaBuilder::default()
2511            .with_path("a", &DataType::Int64)?
2512            .with_path("b", &DataType::Float64)?
2513            .build();
2514
2515        assert_eq!(
2516            shredding_type,
2517            DataType::Struct(Fields::from(vec![
2518                Field::new("a", DataType::Int64, true),
2519                Field::new("b", DataType::Float64, true),
2520            ]))
2521        );
2522
2523        Ok(())
2524    }
2525
2526    #[test]
2527    fn test_variant_schema_builder_nested() -> Result<()> {
2528        let shredding_type = ShreddedSchemaBuilder::default()
2529            .with_path("a", &DataType::Int64)?
2530            .with_path("b.c", &DataType::Utf8)?
2531            .with_path("b.d", &DataType::Float64)?
2532            .build();
2533
2534        assert_eq!(
2535            shredding_type,
2536            DataType::Struct(Fields::from(vec![
2537                Field::new("a", DataType::Int64, true),
2538                Field::new(
2539                    "b",
2540                    DataType::Struct(Fields::from(vec![
2541                        Field::new("c", DataType::Utf8, true),
2542                        Field::new("d", DataType::Float64, true),
2543                    ])),
2544                    true
2545                ),
2546            ]))
2547        );
2548
2549        Ok(())
2550    }
2551
2552    #[test]
2553    fn test_variant_schema_builder_with_path_variant_path_arg() -> Result<()> {
2554        let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
2555        let shredding_type = ShreddedSchemaBuilder::default()
2556            .with_path(path, &DataType::Int64)?
2557            .build();
2558
2559        match shredding_type {
2560            DataType::Struct(fields) => {
2561                assert_eq!(fields.len(), 1);
2562                assert_eq!(fields[0].name(), "a.b");
2563                assert_eq!(fields[0].data_type(), &DataType::Int64);
2564            }
2565            _ => panic!("expected struct data type"),
2566        }
2567
2568        Ok(())
2569    }
2570
2571    #[test]
2572    fn test_variant_schema_builder_custom_nullability() -> Result<()> {
2573        let shredding_type = ShreddedSchemaBuilder::default()
2574            .with_path(
2575                "foo",
2576                Arc::new(Field::new("should_be_renamed", DataType::Utf8, false)),
2577            )?
2578            .with_path("bar", (&DataType::Int64, false))?
2579            .build();
2580
2581        let DataType::Struct(fields) = shredding_type else {
2582            panic!("expected struct data type");
2583        };
2584
2585        let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
2586        assert_eq!(foo.data_type(), &DataType::Utf8);
2587        assert!(!foo.is_nullable());
2588
2589        let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
2590        assert_eq!(bar.data_type(), &DataType::Int64);
2591        assert!(!bar.is_nullable());
2592
2593        Ok(())
2594    }
2595
2596    #[test]
2597    fn test_variant_schema_builder_with_shred_variant() -> Result<()> {
2598        let input = build_variant_array(vec![
2599            VariantRow::Object(vec![
2600                ("time", VariantValue::from(1234567890i64)),
2601                ("hostname", VariantValue::from("server1")),
2602                ("extra", VariantValue::from(42)),
2603            ]),
2604            VariantRow::Object(vec![
2605                ("time", VariantValue::from(9876543210i64)),
2606                ("hostname", VariantValue::from("server2")),
2607            ]),
2608            VariantRow::Null,
2609        ]);
2610
2611        let shredding_type = ShreddedSchemaBuilder::default()
2612            .with_path("time", &DataType::Int64)?
2613            .with_path("hostname", &DataType::Utf8)?
2614            .build();
2615
2616        let result = shred_variant(&input, &shredding_type).unwrap();
2617
2618        assert_eq!(
2619            result.data_type(),
2620            &DataType::Struct(Fields::from(vec![
2621                Field::new("metadata", DataType::BinaryView, false),
2622                Field::new("value", DataType::BinaryView, true),
2623                Field::new(
2624                    "typed_value",
2625                    DataType::Struct(Fields::from(vec![
2626                        Field::new(
2627                            "hostname",
2628                            DataType::Struct(Fields::from(vec![
2629                                Field::new("value", DataType::BinaryView, true),
2630                                Field::new("typed_value", DataType::Utf8, true),
2631                            ])),
2632                            false,
2633                        ),
2634                        Field::new(
2635                            "time",
2636                            DataType::Struct(Fields::from(vec![
2637                                Field::new("value", DataType::BinaryView, true),
2638                                Field::new("typed_value", DataType::Int64, true),
2639                            ])),
2640                            false,
2641                        ),
2642                    ])),
2643                    true,
2644                ),
2645            ]))
2646        );
2647
2648        assert_eq!(result.len(), 3);
2649        assert!(result.typed_value_field().is_some());
2650
2651        let typed_value = result
2652            .typed_value_field()
2653            .unwrap()
2654            .as_any()
2655            .downcast_ref::<arrow::array::StructArray>()
2656            .unwrap();
2657
2658        let time_field =
2659            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
2660                .unwrap();
2661        let hostname_field =
2662            ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
2663                .unwrap();
2664
2665        let time_typed = time_field
2666            .typed_value_field()
2667            .unwrap()
2668            .as_any()
2669            .downcast_ref::<Int64Array>()
2670            .unwrap();
2671        let hostname_typed = hostname_field
2672            .typed_value_field()
2673            .unwrap()
2674            .as_any()
2675            .downcast_ref::<arrow::array::StringArray>()
2676            .unwrap();
2677
2678        // Row 0
2679        assert!(!result.is_null(0));
2680        assert_eq!(time_typed.value(0), 1234567890);
2681        assert_eq!(hostname_typed.value(0), "server1");
2682
2683        // Row 1
2684        assert!(!result.is_null(1));
2685        assert_eq!(time_typed.value(1), 9876543210);
2686        assert_eq!(hostname_typed.value(1), "server2");
2687
2688        // Row 2
2689        assert!(result.is_null(2));
2690
2691        Ok(())
2692    }
2693
2694    #[test]
2695    fn test_variant_schema_builder_conflicting_path() -> Result<()> {
2696        let shredding_type = ShreddedSchemaBuilder::default()
2697            .with_path("a", &DataType::Int64)?
2698            .with_path("a", &DataType::Float64)?
2699            .build();
2700
2701        assert_eq!(
2702            shredding_type,
2703            DataType::Struct(Fields::from(
2704                vec![Field::new("a", DataType::Float64, true),]
2705            ))
2706        );
2707
2708        Ok(())
2709    }
2710
2711    #[test]
2712    fn test_variant_schema_builder_root_path() -> Result<()> {
2713        let path = VariantPath::new(vec![]);
2714        let shredding_type = ShreddedSchemaBuilder::default()
2715            .with_path(path, &DataType::Int64)?
2716            .build();
2717
2718        assert_eq!(shredding_type, DataType::Int64);
2719
2720        Ok(())
2721    }
2722
2723    #[test]
2724    fn test_variant_schema_builder_empty_path() -> Result<()> {
2725        let shredding_type = ShreddedSchemaBuilder::default()
2726            .with_path("", &DataType::Int64)?
2727            .build();
2728
2729        assert_eq!(shredding_type, DataType::Int64);
2730        Ok(())
2731    }
2732
2733    #[test]
2734    fn test_variant_schema_builder_default() {
2735        let shredding_type = ShreddedSchemaBuilder::default().build();
2736        assert_eq!(shredding_type, DataType::Null);
2737    }
2738}