parquet_variant_compute/
unshred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for unshredding VariantArray by folding typed_value columns back into the value column.
19
20use crate::arrow_to_variant::ListLikeArray;
21use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder};
22use arrow::array::{
23    Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray,
24    GenericListArray, GenericListViewArray, PrimitiveArray, StringArray, StructArray,
25};
26use arrow::buffer::NullBuffer;
27use arrow::datatypes::{
28    ArrowPrimitiveType, DataType, Date32Type, Decimal32Type, Decimal64Type, Decimal128Type,
29    DecimalType, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
30    Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, TimestampNanosecondType,
31};
32use arrow::error::{ArrowError, Result};
33use arrow::temporal_conversions::time64us_to_time;
34use chrono::{DateTime, Utc};
35use indexmap::IndexMap;
36use parquet_variant::{
37    ObjectFieldBuilder, Variant, VariantBuilderExt, VariantDecimal4, VariantDecimal8,
38    VariantDecimal16, VariantMetadata,
39};
40use uuid::Uuid;
41
42/// Removes all (nested) typed_value columns from a VariantArray by converting them back to binary
43/// variant and merging the resulting values back into the value column.
44///
45/// This function efficiently converts a shredded VariantArray back to an unshredded form where all
46/// data resides in the value column.
47///
48/// # Arguments
49/// * `array` - The VariantArray to unshred
50///
51/// # Returns
52/// A new VariantArray with all data in the value column and no typed_value column
53///
54/// # Errors
55/// - If the shredded data contains spec violations (e.g., field name conflicts)
56/// - If unsupported data types are encountered in typed_value columns
57pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> {
58    // Check if already unshredded (optimization for common case)
59    if array.typed_value_field().is_none() && array.value_field().is_some() {
60        return Ok(array.clone());
61    }
62
63    // NOTE: None/None at top-level is technically invalid, but the shredding spec requires us to
64    // emit `Variant::Null` when a required value is missing.
65    let nulls = array.nulls();
66    let mut row_builder = UnshredVariantRowBuilder::try_new_opt(array.shredding_state().borrow())?
67        .unwrap_or_else(|| UnshredVariantRowBuilder::null(nulls));
68
69    let metadata = array.metadata_field();
70    let mut value_builder = VariantValueArrayBuilder::new(array.len());
71    for i in 0..array.len() {
72        if array.is_null(i) {
73            value_builder.append_null();
74        } else {
75            let metadata = VariantMetadata::new(metadata.value(i));
76            let mut value_builder = value_builder.builder_ext(&metadata);
77            row_builder.append_row(&mut value_builder, &metadata, i)?;
78        }
79    }
80
81    let value = value_builder.build()?;
82    Ok(VariantArray::from_parts(
83        metadata.clone(),
84        Some(value),
85        None,
86        nulls.cloned(),
87    ))
88}
89
90/// Row builder for converting shredded VariantArray rows back to unshredded form
91enum UnshredVariantRowBuilder<'a> {
92    PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>),
93    PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>),
94    PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>),
95    PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>),
96    PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float32Type>>),
97    PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float64Type>>),
98    Decimal32(DecimalUnshredRowBuilder<'a, Decimal32Spec>),
99    Decimal64(DecimalUnshredRowBuilder<'a, Decimal64Spec>),
100    Decimal128(DecimalUnshredRowBuilder<'a, Decimal128Spec>),
101    PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Date32Type>>),
102    PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Time64MicrosecondType>>),
103    TimestampMicrosecond(TimestampUnshredRowBuilder<'a, TimestampMicrosecondType>),
104    TimestampNanosecond(TimestampUnshredRowBuilder<'a, TimestampNanosecondType>),
105    PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>),
106    PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>),
107    PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>),
108    PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>),
109    List(ListUnshredVariantBuilder<'a, GenericListArray<i32>>),
110    LargeList(ListUnshredVariantBuilder<'a, GenericListArray<i64>>),
111    ListView(ListUnshredVariantBuilder<'a, GenericListViewArray<i32>>),
112    LargeListView(ListUnshredVariantBuilder<'a, GenericListViewArray<i64>>),
113    FixedSizeList(ListUnshredVariantBuilder<'a, FixedSizeListArray>),
114    Struct(StructUnshredVariantBuilder<'a>),
115    ValueOnly(ValueOnlyUnshredVariantBuilder<'a>),
116    Null(NullUnshredVariantBuilder<'a>),
117}
118
119impl<'a> UnshredVariantRowBuilder<'a> {
120    /// Creates an all-null row builder.
121    fn null(nulls: Option<&'a NullBuffer>) -> Self {
122        Self::Null(NullUnshredVariantBuilder::new(nulls))
123    }
124
125    /// Appends a single row at the given value index to the supplied builder.
126    fn append_row(
127        &mut self,
128        builder: &mut impl VariantBuilderExt,
129        metadata: &VariantMetadata,
130        index: usize,
131    ) -> Result<()> {
132        match self {
133            Self::PrimitiveInt8(b) => b.append_row(builder, metadata, index),
134            Self::PrimitiveInt16(b) => b.append_row(builder, metadata, index),
135            Self::PrimitiveInt32(b) => b.append_row(builder, metadata, index),
136            Self::PrimitiveInt64(b) => b.append_row(builder, metadata, index),
137            Self::PrimitiveFloat32(b) => b.append_row(builder, metadata, index),
138            Self::PrimitiveFloat64(b) => b.append_row(builder, metadata, index),
139            Self::Decimal32(b) => b.append_row(builder, metadata, index),
140            Self::Decimal64(b) => b.append_row(builder, metadata, index),
141            Self::Decimal128(b) => b.append_row(builder, metadata, index),
142            Self::PrimitiveDate32(b) => b.append_row(builder, metadata, index),
143            Self::PrimitiveTime64(b) => b.append_row(builder, metadata, index),
144            Self::TimestampMicrosecond(b) => b.append_row(builder, metadata, index),
145            Self::TimestampNanosecond(b) => b.append_row(builder, metadata, index),
146            Self::PrimitiveBoolean(b) => b.append_row(builder, metadata, index),
147            Self::PrimitiveString(b) => b.append_row(builder, metadata, index),
148            Self::PrimitiveBinaryView(b) => b.append_row(builder, metadata, index),
149            Self::PrimitiveUuid(b) => b.append_row(builder, metadata, index),
150            Self::List(b) => b.append_row(builder, metadata, index),
151            Self::LargeList(b) => b.append_row(builder, metadata, index),
152            Self::ListView(b) => b.append_row(builder, metadata, index),
153            Self::LargeListView(b) => b.append_row(builder, metadata, index),
154            Self::FixedSizeList(b) => b.append_row(builder, metadata, index),
155            Self::Struct(b) => b.append_row(builder, metadata, index),
156            Self::ValueOnly(b) => b.append_row(builder, metadata, index),
157            Self::Null(b) => b.append_row(builder, metadata, index),
158        }
159    }
160
161    /// Creates a new UnshredVariantRowBuilder from shredding state
162    /// Returns None for None/None case - caller decides how to handle based on context
163    fn try_new_opt(shredding_state: BorrowedShreddingState<'a>) -> Result<Option<Self>> {
164        let value = shredding_state.value_field();
165        let typed_value = shredding_state.typed_value_field();
166        let Some(typed_value) = typed_value else {
167            // Copy the value across directly, if present. Else caller decides what to do.
168            return Ok(value.map(|v| Self::ValueOnly(ValueOnlyUnshredVariantBuilder::new(v))));
169        };
170
171        // Has typed_value -> determine type and create appropriate builder
172        macro_rules! primitive_builder {
173            ($enum_variant:ident, $cast_fn:ident) => {
174                Self::$enum_variant(UnshredPrimitiveRowBuilder::new(
175                    value,
176                    typed_value.$cast_fn(),
177                ))
178            };
179        }
180
181        let builder = match typed_value.data_type() {
182            DataType::Int8 => primitive_builder!(PrimitiveInt8, as_primitive),
183            DataType::Int16 => primitive_builder!(PrimitiveInt16, as_primitive),
184            DataType::Int32 => primitive_builder!(PrimitiveInt32, as_primitive),
185            DataType::Int64 => primitive_builder!(PrimitiveInt64, as_primitive),
186            DataType::Float32 => primitive_builder!(PrimitiveFloat32, as_primitive),
187            DataType::Float64 => primitive_builder!(PrimitiveFloat64, as_primitive),
188            DataType::Decimal32(_, scale) => Self::Decimal32(DecimalUnshredRowBuilder::new(
189                value,
190                typed_value.as_primitive(),
191                *scale,
192            )),
193            DataType::Decimal64(_, scale) => Self::Decimal64(DecimalUnshredRowBuilder::new(
194                value,
195                typed_value.as_primitive(),
196                *scale,
197            )),
198            DataType::Decimal128(_, scale) => Self::Decimal128(DecimalUnshredRowBuilder::new(
199                value,
200                typed_value.as_primitive(),
201                *scale,
202            )),
203            DataType::Decimal256(_, _) => {
204                return Err(ArrowError::InvalidArgumentError(
205                    "Decimal256 is not a valid variant shredding type".to_string(),
206                ));
207            }
208            DataType::Date32 => primitive_builder!(PrimitiveDate32, as_primitive),
209            DataType::Time64(TimeUnit::Microsecond) => {
210                primitive_builder!(PrimitiveTime64, as_primitive)
211            }
212            DataType::Time64(time_unit) => {
213                return Err(ArrowError::InvalidArgumentError(format!(
214                    "Time64({time_unit}) is not a valid variant shredding type",
215                )));
216            }
217            DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
218                Self::TimestampMicrosecond(TimestampUnshredRowBuilder::new(
219                    value,
220                    typed_value.as_primitive(),
221                    timezone.is_some(),
222                ))
223            }
224            DataType::Timestamp(TimeUnit::Nanosecond, timezone) => {
225                Self::TimestampNanosecond(TimestampUnshredRowBuilder::new(
226                    value,
227                    typed_value.as_primitive(),
228                    timezone.is_some(),
229                ))
230            }
231            DataType::Timestamp(time_unit, _) => {
232                return Err(ArrowError::InvalidArgumentError(format!(
233                    "Timestamp({time_unit}) is not a valid variant shredding type",
234                )));
235            }
236            DataType::Boolean => primitive_builder!(PrimitiveBoolean, as_boolean),
237            DataType::Utf8 => primitive_builder!(PrimitiveString, as_string),
238            DataType::BinaryView => primitive_builder!(PrimitiveBinaryView, as_binary_view),
239            DataType::FixedSizeBinary(16) => {
240                primitive_builder!(PrimitiveUuid, as_fixed_size_binary)
241            }
242            DataType::FixedSizeBinary(size) => {
243                return Err(ArrowError::InvalidArgumentError(format!(
244                    "FixedSizeBinary({size}) is not a valid variant shredding type",
245                )));
246            }
247            DataType::Struct(_) => Self::Struct(StructUnshredVariantBuilder::try_new(
248                value,
249                typed_value.as_struct(),
250            )?),
251            DataType::List(_) => Self::List(ListUnshredVariantBuilder::try_new(
252                value,
253                typed_value.as_list(),
254            )?),
255            DataType::LargeList(_) => Self::LargeList(ListUnshredVariantBuilder::try_new(
256                value,
257                typed_value.as_list(),
258            )?),
259            DataType::ListView(_) => Self::ListView(ListUnshredVariantBuilder::try_new(
260                value,
261                typed_value.as_list_view(),
262            )?),
263            DataType::LargeListView(_) => Self::LargeListView(ListUnshredVariantBuilder::try_new(
264                value,
265                typed_value.as_list_view(),
266            )?),
267            DataType::FixedSizeList(_, _) => Self::FixedSizeList(
268                ListUnshredVariantBuilder::try_new(value, typed_value.as_fixed_size_list())?,
269            ),
270            _ => {
271                return Err(ArrowError::NotYetImplemented(format!(
272                    "Unshredding not yet supported for type: {}",
273                    typed_value.data_type()
274                )));
275            }
276        };
277        Ok(Some(builder))
278    }
279}
280
281/// Builder for arrays with neither typed_value nor value (all NULL/Variant::Null)
282struct NullUnshredVariantBuilder<'a> {
283    nulls: Option<&'a NullBuffer>,
284}
285
286impl<'a> NullUnshredVariantBuilder<'a> {
287    fn new(nulls: Option<&'a NullBuffer>) -> Self {
288        Self { nulls }
289    }
290
291    fn append_row(
292        &mut self,
293        builder: &mut impl VariantBuilderExt,
294        _metadata: &VariantMetadata,
295        index: usize,
296    ) -> Result<()> {
297        if self.nulls.is_some_and(|nulls| nulls.is_null(index)) {
298            builder.append_null();
299        } else {
300            builder.append_value(Variant::Null);
301        }
302        Ok(())
303    }
304}
305
306/// Builder for arrays that only have value column (already unshredded)
307struct ValueOnlyUnshredVariantBuilder<'a> {
308    value: &'a arrow::array::BinaryViewArray,
309}
310
311impl<'a> ValueOnlyUnshredVariantBuilder<'a> {
312    fn new(value: &'a BinaryViewArray) -> Self {
313        Self { value }
314    }
315
316    fn append_row(
317        &mut self,
318        builder: &mut impl VariantBuilderExt,
319        metadata: &VariantMetadata,
320        index: usize,
321    ) -> Result<()> {
322        if self.value.is_null(index) {
323            builder.append_null();
324        } else {
325            let variant = Variant::new_with_metadata(metadata.clone(), self.value.value(index));
326            builder.append_value(variant);
327        }
328        Ok(())
329    }
330}
331
332/// Extension trait that directly adds row builder support for arrays that correspond to primitive
333/// variant types.
334trait AppendToVariantBuilder: Array {
335    fn append_to_variant_builder(
336        &self,
337        builder: &mut impl VariantBuilderExt,
338        index: usize,
339    ) -> Result<()>;
340}
341
342/// Macro that handles the unshredded case (typed_value is missing or NULL) and returns early if
343/// handled.  If not handled (shredded case), validates and returns the extracted value.
344macro_rules! handle_unshredded_case {
345    ($self:expr, $builder:expr, $metadata:expr, $index:expr, $partial_shredding:expr) => {{
346        let value = $self.value.as_ref().filter(|v| v.is_valid($index));
347        let value = value.map(|v| Variant::new_with_metadata($metadata.clone(), v.value($index)));
348
349        // If typed_value is null, handle unshredded case and return early
350        if $self.typed_value.is_null($index) {
351            match value {
352                Some(value) => $builder.append_value(value),
353                None => $builder.append_null(),
354            }
355            return Ok(());
356        }
357
358        // Only partial shredding allows value and typed_value to both be non-NULL
359        if !$partial_shredding && value.is_some() {
360            return Err(ArrowError::InvalidArgumentError(
361                "Invalid shredded variant: both value and typed_value are non-null".to_string(),
362            ));
363        }
364
365        // Return the extracted value for the partial shredded case
366        value
367    }};
368}
369
370/// Generic unshred builder that works with any Array implementing AppendToVariantBuilder
371struct UnshredPrimitiveRowBuilder<'a, T> {
372    value: Option<&'a BinaryViewArray>,
373    typed_value: &'a T,
374}
375
376impl<'a, T: AppendToVariantBuilder> UnshredPrimitiveRowBuilder<'a, T> {
377    fn new(value: Option<&'a BinaryViewArray>, typed_value: &'a T) -> Self {
378        Self { value, typed_value }
379    }
380
381    fn append_row(
382        &mut self,
383        builder: &mut impl VariantBuilderExt,
384        metadata: &VariantMetadata,
385        index: usize,
386    ) -> Result<()> {
387        handle_unshredded_case!(self, builder, metadata, index, false);
388
389        // If we get here, typed_value is valid and value is NULL
390        self.typed_value.append_to_variant_builder(builder, index)
391    }
392}
393
394// Macro to generate AppendToVariantBuilder implementations with optional value transformation
395macro_rules! impl_append_to_variant_builder {
396    ($array_type:ty $(, |$v:ident| $transform:expr)? ) => {
397        impl AppendToVariantBuilder for $array_type {
398            fn append_to_variant_builder(
399                &self,
400                builder: &mut impl VariantBuilderExt,
401                index: usize,
402            ) -> Result<()> {
403                let value = self.value(index);
404                $(
405                    let $v = value;
406                    let value = $transform;
407                )?
408                builder.append_value(value);
409                Ok(())
410            }
411        }
412    };
413}
414
415impl_append_to_variant_builder!(BooleanArray);
416impl_append_to_variant_builder!(StringArray);
417impl_append_to_variant_builder!(BinaryViewArray);
418impl_append_to_variant_builder!(PrimitiveArray<Int8Type>);
419impl_append_to_variant_builder!(PrimitiveArray<Int16Type>);
420impl_append_to_variant_builder!(PrimitiveArray<Int32Type>);
421impl_append_to_variant_builder!(PrimitiveArray<Int64Type>);
422impl_append_to_variant_builder!(PrimitiveArray<Float32Type>);
423impl_append_to_variant_builder!(PrimitiveArray<Float64Type>);
424
425impl_append_to_variant_builder!(PrimitiveArray<Date32Type>, |days_since_epoch| {
426    Date32Type::to_naive_date(days_since_epoch)
427});
428
429impl_append_to_variant_builder!(
430    PrimitiveArray<Time64MicrosecondType>,
431    |micros_since_midnight| {
432        time64us_to_time(micros_since_midnight).ok_or_else(|| {
433            ArrowError::InvalidArgumentError(format!(
434                "Invalid Time64 microsecond value: {micros_since_midnight}"
435            ))
436        })?
437    }
438);
439
440// UUID from FixedSizeBinary(16)
441// NOTE: FixedSizeBinaryArray guarantees the byte length, so we can safely unwrap
442impl_append_to_variant_builder!(FixedSizeBinaryArray, |bytes| {
443    Uuid::from_slice(bytes).unwrap()
444});
445
446/// Trait for timestamp types to handle conversion to `DateTime<Utc>`
447trait TimestampType: ArrowPrimitiveType<Native = i64> {
448    fn to_datetime_utc(value: i64) -> Result<DateTime<Utc>>;
449}
450
451impl TimestampType for TimestampMicrosecondType {
452    fn to_datetime_utc(micros: i64) -> Result<DateTime<Utc>> {
453        DateTime::from_timestamp_micros(micros).ok_or_else(|| {
454            ArrowError::InvalidArgumentError(format!(
455                "Invalid timestamp microsecond value: {micros}"
456            ))
457        })
458    }
459}
460
461impl TimestampType for TimestampNanosecondType {
462    fn to_datetime_utc(nanos: i64) -> Result<DateTime<Utc>> {
463        Ok(DateTime::from_timestamp_nanos(nanos))
464    }
465}
466
467/// Generic builder for timestamp types that handles timezone-aware conversion
468struct TimestampUnshredRowBuilder<'a, T: TimestampType> {
469    value: Option<&'a BinaryViewArray>,
470    typed_value: &'a PrimitiveArray<T>,
471    has_timezone: bool,
472}
473
474impl<'a, T: TimestampType> TimestampUnshredRowBuilder<'a, T> {
475    fn new(
476        value: Option<&'a BinaryViewArray>,
477        typed_value: &'a PrimitiveArray<T>,
478        has_timezone: bool,
479    ) -> Self {
480        Self {
481            value,
482            typed_value,
483            has_timezone,
484        }
485    }
486
487    fn append_row(
488        &mut self,
489        builder: &mut impl VariantBuilderExt,
490        metadata: &VariantMetadata,
491        index: usize,
492    ) -> Result<()> {
493        handle_unshredded_case!(self, builder, metadata, index, false);
494
495        // If we get here, typed_value is valid and value is NULL
496        let timestamp_value = self.typed_value.value(index);
497        let dt = T::to_datetime_utc(timestamp_value)?;
498        if self.has_timezone {
499            builder.append_value(dt);
500        } else {
501            builder.append_value(dt.naive_utc());
502        }
503        Ok(())
504    }
505}
506
507/// Trait to unify decimal unshredding across Decimal32/64/128 types
508trait DecimalSpec {
509    type Arrow: ArrowPrimitiveType + DecimalType;
510
511    fn into_variant(
512        raw: <Self::Arrow as ArrowPrimitiveType>::Native,
513        scale: i8,
514    ) -> Result<Variant<'static, 'static>>;
515}
516
517/// Spec for Decimal32 -> VariantDecimal4
518struct Decimal32Spec;
519
520impl DecimalSpec for Decimal32Spec {
521    type Arrow = Decimal32Type;
522
523    fn into_variant(raw: i32, scale: i8) -> Result<Variant<'static, 'static>> {
524        let scale =
525            u8::try_from(scale).map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?;
526        let value = VariantDecimal4::try_new(raw, scale)
527            .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?;
528        Ok(value.into())
529    }
530}
531
532/// Spec for Decimal64 -> VariantDecimal8
533struct Decimal64Spec;
534
535impl DecimalSpec for Decimal64Spec {
536    type Arrow = Decimal64Type;
537
538    fn into_variant(raw: i64, scale: i8) -> Result<Variant<'static, 'static>> {
539        let scale =
540            u8::try_from(scale).map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?;
541        let value = VariantDecimal8::try_new(raw, scale)
542            .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?;
543        Ok(value.into())
544    }
545}
546
547/// Spec for Decimal128 -> VariantDecimal16
548struct Decimal128Spec;
549
550impl DecimalSpec for Decimal128Spec {
551    type Arrow = Decimal128Type;
552
553    fn into_variant(raw: i128, scale: i8) -> Result<Variant<'static, 'static>> {
554        let scale =
555            u8::try_from(scale).map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?;
556        let value = VariantDecimal16::try_new(raw, scale)
557            .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?;
558        Ok(value.into())
559    }
560}
561
562/// Generic builder for decimal unshredding that caches scale
563struct DecimalUnshredRowBuilder<'a, S: DecimalSpec> {
564    value: Option<&'a BinaryViewArray>,
565    typed_value: &'a PrimitiveArray<S::Arrow>,
566    scale: i8,
567}
568
569impl<'a, S: DecimalSpec> DecimalUnshredRowBuilder<'a, S> {
570    fn new(
571        value: Option<&'a BinaryViewArray>,
572        typed_value: &'a PrimitiveArray<S::Arrow>,
573        scale: i8,
574    ) -> Self {
575        Self {
576            value,
577            typed_value,
578            scale,
579        }
580    }
581
582    fn append_row(
583        &mut self,
584        builder: &mut impl VariantBuilderExt,
585        metadata: &VariantMetadata,
586        index: usize,
587    ) -> Result<()> {
588        handle_unshredded_case!(self, builder, metadata, index, false);
589
590        let raw = self.typed_value.value(index);
591        let variant = S::into_variant(raw, self.scale)?;
592        builder.append_value(variant);
593        Ok(())
594    }
595}
596
597/// Builder for unshredding struct/object types with nested fields
598struct StructUnshredVariantBuilder<'a> {
599    value: Option<&'a arrow::array::BinaryViewArray>,
600    typed_value: &'a arrow::array::StructArray,
601    field_unshredders: IndexMap<&'a str, Option<UnshredVariantRowBuilder<'a>>>,
602}
603
604impl<'a> StructUnshredVariantBuilder<'a> {
605    fn try_new(value: Option<&'a BinaryViewArray>, typed_value: &'a StructArray) -> Result<Self> {
606        // Create unshredders for each field in constructor
607        let mut field_unshredders = IndexMap::new();
608        for (field, field_array) in typed_value.fields().iter().zip(typed_value.columns()) {
609            // Factory returns None for None/None case -- these are missing fields we should skip
610            let Some(field_array) = field_array.as_struct_opt() else {
611                return Err(ArrowError::InvalidArgumentError(format!(
612                    "Invalid shredded variant object field: expected Struct, got {}",
613                    field_array.data_type()
614                )));
615            };
616            let field_unshredder = UnshredVariantRowBuilder::try_new_opt(field_array.try_into()?)?;
617            field_unshredders.insert(field.name().as_ref(), field_unshredder);
618        }
619
620        Ok(Self {
621            value,
622            typed_value,
623            field_unshredders,
624        })
625    }
626
627    fn append_row(
628        &mut self,
629        builder: &mut impl VariantBuilderExt,
630        metadata: &VariantMetadata,
631        index: usize,
632    ) -> Result<()> {
633        let value = handle_unshredded_case!(self, builder, metadata, index, true);
634
635        // If we get here, typed_value is valid and value may or may not be valid
636        let mut object_builder = builder.try_new_object()?;
637
638        // Process typed fields (skip empty builders that indicate missing fields)
639        for (field_name, field_unshredder_opt) in &mut self.field_unshredders {
640            if let Some(field_unshredder) = field_unshredder_opt {
641                let mut field_builder = ObjectFieldBuilder::new(field_name, &mut object_builder);
642                field_unshredder.append_row(&mut field_builder, metadata, index)?;
643            }
644        }
645
646        // Process any unshredded fields (partial shredding)
647        if let Some(value) = value {
648            let Variant::Object(object) = value else {
649                return Err(ArrowError::InvalidArgumentError(
650                    "Expected object in value field for partially shredded struct".to_string(),
651                ));
652            };
653
654            for (field_name, field_value) in object.iter() {
655                if self.field_unshredders.contains_key(field_name) {
656                    return Err(ArrowError::InvalidArgumentError(format!(
657                        "Field '{field_name}' appears in both typed_value and value",
658                    )));
659                }
660                object_builder.insert_bytes(field_name, field_value);
661            }
662        }
663
664        object_builder.finish();
665        Ok(())
666    }
667}
668
669/// Builder for unshredding list/array types with recursive element processing
670struct ListUnshredVariantBuilder<'a, L: ListLikeArray> {
671    value: Option<&'a BinaryViewArray>,
672    typed_value: &'a L,
673    element_unshredder: Box<UnshredVariantRowBuilder<'a>>,
674}
675
676impl<'a, L: ListLikeArray> ListUnshredVariantBuilder<'a, L> {
677    fn try_new(value: Option<&'a BinaryViewArray>, typed_value: &'a L) -> Result<Self> {
678        // Create a recursive unshredder for the list elements
679        // The element type comes from the values array of the list
680        let element_values = typed_value.values();
681
682        // For shredded lists, each element would be a ShreddedVariantFieldArray (struct)
683        // Extract value/typed_value from the element struct
684        let Some(element_values) = element_values.as_struct_opt() else {
685            return Err(ArrowError::InvalidArgumentError(format!(
686                "Invalid shredded variant array element: expected Struct, got {}",
687                element_values.data_type()
688            )));
689        };
690
691        // Create recursive unshredder for elements
692        //
693        // NOTE: A None/None array element is technically invalid, but the shredding spec
694        // requires us to emit `Variant::Null` when a required value is missing.
695        let element_unshredder = UnshredVariantRowBuilder::try_new_opt(element_values.try_into()?)?
696            .unwrap_or_else(|| UnshredVariantRowBuilder::null(None));
697
698        Ok(Self {
699            value,
700            typed_value,
701            element_unshredder: Box::new(element_unshredder),
702        })
703    }
704
705    fn append_row(
706        &mut self,
707        builder: &mut impl VariantBuilderExt,
708        metadata: &VariantMetadata,
709        index: usize,
710    ) -> Result<()> {
711        handle_unshredded_case!(self, builder, metadata, index, false);
712
713        // If we get here, typed_value is valid and value is NULL -- process the list elements
714        let mut list_builder = builder.try_new_list()?;
715        for element_index in self.typed_value.element_range(index) {
716            self.element_unshredder
717                .append_row(&mut list_builder, metadata, element_index)?;
718        }
719
720        list_builder.finish();
721        Ok(())
722    }
723}
724
725// TODO: This code is covered by tests in `parquet/tests/variant_integration.rs`. Does that suffice?
726// Or do we also need targeted stand-alone unit tests for full coverage?