Skip to main content

parquet_variant_compute/
unshred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for unshredding VariantArray by folding typed_value columns back into the value column.
19
20use crate::variant_array::{binary_array_value, validate_binary_array};
21use crate::{VariantArray, VariantValueArrayBuilder};
22use arrow::array::{
23    Array, ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray,
24    FixedSizeBinaryArray, FixedSizeListArray, GenericListArray, GenericListViewArray,
25    LargeBinaryArray, LargeStringArray, ListLikeArray, PrimitiveArray, StringArray,
26    StringViewArray, StructArray,
27};
28use arrow::datatypes::{
29    ArrowPrimitiveType, DataType, Date32Type, Decimal32Type, Decimal64Type, Decimal128Type,
30    DecimalType, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
31    Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, TimestampNanosecondType,
32};
33use arrow::error::{ArrowError, Result};
34use arrow::temporal_conversions::time64us_to_time;
35use chrono::{DateTime, Utc};
36use indexmap::IndexMap;
37use parquet_variant::{
38    ObjectFieldBuilder, Variant, VariantBuilderExt, VariantDecimal4, VariantDecimal8,
39    VariantDecimal16, VariantDecimalType, VariantMetadata,
40};
41use std::marker::PhantomData;
42use std::sync::Arc;
43use uuid::Uuid;
44
45/// Removes all (nested) typed_value columns from a VariantArray by converting them back to binary
46/// variant and merging the resulting values back into the value column.
47///
48/// This function efficiently converts a shredded VariantArray back to an unshredded form where all
49/// data resides in the value column.
50///
51/// # Arguments
52/// * `array` - The VariantArray to unshred
53///
54/// # Returns
55/// A new VariantArray with all data in the value column and no typed_value column
56///
57/// # Errors
58/// - If the shredded data contains spec violations (e.g., field name conflicts)
59/// - If unsupported data types are encountered in typed_value columns
60pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> {
61    // Check if already unshredded (optimization for common case)
62    if array.typed_value_field().is_none() && array.value_field().is_some() {
63        return Ok(array.clone());
64    }
65
66    // NOTE: None/None at top-level is technically invalid, but the shredding spec requires us to
67    // emit `Variant::Null` when a required value is missing.
68    let nulls = array.nulls();
69    let mut row_builder = UnshredVariantRowBuilder::try_new_opt(array.inner())?
70        .unwrap_or_else(UnshredVariantRowBuilder::null);
71
72    let metadata = array.metadata_field();
73    let mut value_builder = VariantValueArrayBuilder::new(array.len());
74    for i in 0..array.len() {
75        if array.is_null(i) {
76            value_builder.append_null();
77        } else {
78            let metadata_bytes = binary_array_value(metadata.as_ref(), i).ok_or_else(|| {
79                ArrowError::InvalidArgumentError(
80                    "metadata field must be a binary-like array".to_string(),
81                )
82            })?;
83            let metadata = VariantMetadata::try_new(metadata_bytes)?;
84            let mut value_builder = value_builder.builder_ext(&metadata);
85            row_builder.append_row(&mut value_builder, &metadata, i)?;
86        }
87    }
88
89    let value = value_builder.build()?;
90    Ok(VariantArray::from_parts(
91        metadata.clone(),
92        Some(Arc::new(value)),
93        None,
94        nulls.cloned(),
95    ))
96}
97
98/// Row builder for converting shredded VariantArray rows back to unshredded form
99enum UnshredVariantRowBuilder<'a> {
100    PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>),
101    PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>),
102    PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>),
103    PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>),
104    PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float32Type>>),
105    PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float64Type>>),
106    Decimal32(DecimalUnshredRowBuilder<'a, Decimal32Type, VariantDecimal4>),
107    Decimal64(DecimalUnshredRowBuilder<'a, Decimal64Type, VariantDecimal8>),
108    Decimal128(DecimalUnshredRowBuilder<'a, Decimal128Type, VariantDecimal16>),
109    PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Date32Type>>),
110    PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Time64MicrosecondType>>),
111    TimestampMicrosecond(TimestampUnshredRowBuilder<'a, TimestampMicrosecondType>),
112    TimestampNanosecond(TimestampUnshredRowBuilder<'a, TimestampNanosecondType>),
113    PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>),
114    PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>),
115    PrimitiveStringView(UnshredPrimitiveRowBuilder<'a, StringViewArray>),
116    PrimitiveLargeString(UnshredPrimitiveRowBuilder<'a, LargeStringArray>),
117    PrimitiveBinary(UnshredPrimitiveRowBuilder<'a, BinaryArray>),
118    PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>),
119    PrimitiveLargeBinary(UnshredPrimitiveRowBuilder<'a, LargeBinaryArray>),
120    PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>),
121    List(ListUnshredVariantBuilder<'a, GenericListArray<i32>>),
122    LargeList(ListUnshredVariantBuilder<'a, GenericListArray<i64>>),
123    ListView(ListUnshredVariantBuilder<'a, GenericListViewArray<i32>>),
124    LargeListView(ListUnshredVariantBuilder<'a, GenericListViewArray<i64>>),
125    FixedSizeList(ListUnshredVariantBuilder<'a, FixedSizeListArray>),
126    Struct(StructUnshredVariantBuilder<'a>),
127    ValueOnly(ValueOnlyUnshredVariantBuilder<'a>),
128    Null(NullUnshredVariantBuilder),
129}
130
131impl<'a> UnshredVariantRowBuilder<'a> {
132    /// Creates an all-null row builder.
133    fn null() -> Self {
134        Self::Null(NullUnshredVariantBuilder)
135    }
136
137    /// Appends a single row at the given value index to the supplied builder.
138    fn append_row(
139        &mut self,
140        builder: &mut impl VariantBuilderExt,
141        metadata: &VariantMetadata,
142        index: usize,
143    ) -> Result<()> {
144        match self {
145            Self::PrimitiveInt8(b) => b.append_row(builder, metadata, index),
146            Self::PrimitiveInt16(b) => b.append_row(builder, metadata, index),
147            Self::PrimitiveInt32(b) => b.append_row(builder, metadata, index),
148            Self::PrimitiveInt64(b) => b.append_row(builder, metadata, index),
149            Self::PrimitiveFloat32(b) => b.append_row(builder, metadata, index),
150            Self::PrimitiveFloat64(b) => b.append_row(builder, metadata, index),
151            Self::Decimal32(b) => b.append_row(builder, metadata, index),
152            Self::Decimal64(b) => b.append_row(builder, metadata, index),
153            Self::Decimal128(b) => b.append_row(builder, metadata, index),
154            Self::PrimitiveDate32(b) => b.append_row(builder, metadata, index),
155            Self::PrimitiveTime64(b) => b.append_row(builder, metadata, index),
156            Self::TimestampMicrosecond(b) => b.append_row(builder, metadata, index),
157            Self::TimestampNanosecond(b) => b.append_row(builder, metadata, index),
158            Self::PrimitiveBoolean(b) => b.append_row(builder, metadata, index),
159            Self::PrimitiveString(b) => b.append_row(builder, metadata, index),
160            Self::PrimitiveStringView(b) => b.append_row(builder, metadata, index),
161            Self::PrimitiveLargeString(b) => b.append_row(builder, metadata, index),
162            Self::PrimitiveBinary(b) => b.append_row(builder, metadata, index),
163            Self::PrimitiveBinaryView(b) => b.append_row(builder, metadata, index),
164            Self::PrimitiveLargeBinary(b) => b.append_row(builder, metadata, index),
165            Self::PrimitiveUuid(b) => b.append_row(builder, metadata, index),
166            Self::List(b) => b.append_row(builder, metadata, index),
167            Self::LargeList(b) => b.append_row(builder, metadata, index),
168            Self::ListView(b) => b.append_row(builder, metadata, index),
169            Self::LargeListView(b) => b.append_row(builder, metadata, index),
170            Self::FixedSizeList(b) => b.append_row(builder, metadata, index),
171            Self::Struct(b) => b.append_row(builder, metadata, index),
172            Self::ValueOnly(b) => b.append_row(builder, metadata, index),
173            Self::Null(b) => b.append_row(builder, metadata, index),
174        }
175    }
176
177    /// Creates a new UnshredVariantRowBuilder from the `(value, typed_value)` pair of a shredded
178    /// variant struct. Returns None for the None/None case - caller decides how to handle based on
179    /// context.
180    fn try_new_opt(inner_struct: &'a StructArray) -> Result<Option<Self>> {
181        let value = if let Some(value_col) = inner_struct.column_by_name("value") {
182            validate_binary_array(value_col.as_ref(), "value")?;
183            Some(value_col)
184        } else {
185            None
186        };
187        let Some(typed_value) = inner_struct.column_by_name("typed_value") else {
188            // Copy the value across directly, if present. Else caller decides what to do.
189            return Ok(value.map(|v| Self::ValueOnly(ValueOnlyUnshredVariantBuilder::new(v))));
190        };
191
192        // Has typed_value -> determine type and create appropriate builder
193        macro_rules! primitive_builder {
194            ($enum_variant:ident, $cast_fn:ident) => {
195                Self::$enum_variant(UnshredPrimitiveRowBuilder::new(
196                    value,
197                    typed_value.$cast_fn(),
198                ))
199            };
200        }
201
202        let builder = match typed_value.data_type() {
203            DataType::Int8 => primitive_builder!(PrimitiveInt8, as_primitive),
204            DataType::Int16 => primitive_builder!(PrimitiveInt16, as_primitive),
205            DataType::Int32 => primitive_builder!(PrimitiveInt32, as_primitive),
206            DataType::Int64 => primitive_builder!(PrimitiveInt64, as_primitive),
207            DataType::Float32 => primitive_builder!(PrimitiveFloat32, as_primitive),
208            DataType::Float64 => primitive_builder!(PrimitiveFloat64, as_primitive),
209            DataType::Decimal32(p, s) if VariantDecimal4::is_valid_precision_and_scale(p, s) => {
210                Self::Decimal32(DecimalUnshredRowBuilder::new(value, typed_value, *s as _))
211            }
212            DataType::Decimal64(p, s) if VariantDecimal8::is_valid_precision_and_scale(p, s) => {
213                Self::Decimal64(DecimalUnshredRowBuilder::new(value, typed_value, *s as _))
214            }
215            DataType::Decimal128(p, s) if VariantDecimal16::is_valid_precision_and_scale(p, s) => {
216                Self::Decimal128(DecimalUnshredRowBuilder::new(value, typed_value, *s as _))
217            }
218            DataType::Decimal32(_, _)
219            | DataType::Decimal64(_, _)
220            | DataType::Decimal128(_, _)
221            | DataType::Decimal256(_, _) => {
222                return Err(ArrowError::InvalidArgumentError(format!(
223                    "{} is not a valid variant shredding type",
224                    typed_value.data_type()
225                )));
226            }
227            DataType::Date32 => primitive_builder!(PrimitiveDate32, as_primitive),
228            DataType::Time64(TimeUnit::Microsecond) => {
229                primitive_builder!(PrimitiveTime64, as_primitive)
230            }
231            DataType::Time64(time_unit) => {
232                return Err(ArrowError::InvalidArgumentError(format!(
233                    "Time64({time_unit}) is not a valid variant shredding type",
234                )));
235            }
236            DataType::Timestamp(TimeUnit::Microsecond, timezone) => Self::TimestampMicrosecond(
237                TimestampUnshredRowBuilder::new(value, typed_value, timezone.is_some()),
238            ),
239            DataType::Timestamp(TimeUnit::Nanosecond, timezone) => Self::TimestampNanosecond(
240                TimestampUnshredRowBuilder::new(value, typed_value, timezone.is_some()),
241            ),
242            DataType::Timestamp(time_unit, _) => {
243                return Err(ArrowError::InvalidArgumentError(format!(
244                    "Timestamp({time_unit}) is not a valid variant shredding type",
245                )));
246            }
247            DataType::Boolean => primitive_builder!(PrimitiveBoolean, as_boolean),
248            DataType::Utf8 => primitive_builder!(PrimitiveString, as_string),
249            DataType::Utf8View => primitive_builder!(PrimitiveStringView, as_string_view),
250            DataType::LargeUtf8 => primitive_builder!(PrimitiveLargeString, as_string),
251            DataType::Binary => primitive_builder!(PrimitiveBinary, as_binary),
252            DataType::BinaryView => primitive_builder!(PrimitiveBinaryView, as_binary_view),
253            DataType::LargeBinary => primitive_builder!(PrimitiveLargeBinary, as_binary),
254            DataType::FixedSizeBinary(16) => {
255                primitive_builder!(PrimitiveUuid, as_fixed_size_binary)
256            }
257            DataType::FixedSizeBinary(size) => {
258                return Err(ArrowError::InvalidArgumentError(format!(
259                    "FixedSizeBinary({size}) is not a valid variant shredding type",
260                )));
261            }
262            DataType::Struct(_) => Self::Struct(StructUnshredVariantBuilder::try_new(
263                value,
264                typed_value.as_struct(),
265            )?),
266            DataType::List(_) => Self::List(ListUnshredVariantBuilder::try_new(
267                value,
268                typed_value.as_list(),
269            )?),
270            DataType::LargeList(_) => Self::LargeList(ListUnshredVariantBuilder::try_new(
271                value,
272                typed_value.as_list(),
273            )?),
274            DataType::ListView(_) => Self::ListView(ListUnshredVariantBuilder::try_new(
275                value,
276                typed_value.as_list_view(),
277            )?),
278            DataType::LargeListView(_) => Self::LargeListView(ListUnshredVariantBuilder::try_new(
279                value,
280                typed_value.as_list_view(),
281            )?),
282            DataType::FixedSizeList(_, _) => Self::FixedSizeList(
283                ListUnshredVariantBuilder::try_new(value, typed_value.as_fixed_size_list())?,
284            ),
285            _ => {
286                return Err(ArrowError::NotYetImplemented(format!(
287                    "Unshredding not yet supported for type: {}",
288                    typed_value.data_type()
289                )));
290            }
291        };
292        Ok(Some(builder))
293    }
294}
295
296/// Builder for arrays with neither typed_value nor value (all Variant::Null)
297struct NullUnshredVariantBuilder;
298
299impl NullUnshredVariantBuilder {
300    fn append_row(
301        &mut self,
302        builder: &mut impl VariantBuilderExt,
303        _metadata: &VariantMetadata,
304        _index: usize,
305    ) -> Result<()> {
306        builder.append_value(Variant::Null);
307        Ok(())
308    }
309}
310
311/// Builder for arrays that only have value column (already unshredded)
312struct ValueOnlyUnshredVariantBuilder<'a> {
313    value: &'a ArrayRef,
314}
315
316impl<'a> ValueOnlyUnshredVariantBuilder<'a> {
317    fn new(value: &'a ArrayRef) -> Self {
318        Self { value }
319    }
320
321    fn append_row(
322        &mut self,
323        builder: &mut impl VariantBuilderExt,
324        metadata: &VariantMetadata,
325        index: usize,
326    ) -> Result<()> {
327        if self.value.is_null(index) {
328            builder.append_null();
329        } else {
330            let value_bytes = binary_array_value(self.value.as_ref(), index).ok_or_else(|| {
331                ArrowError::InvalidArgumentError(
332                    "value field must be a binary-like array".to_string(),
333                )
334            })?;
335            let variant = Variant::try_new_with_metadata(metadata.clone(), value_bytes)?;
336            builder.append_value(variant);
337        }
338        Ok(())
339    }
340}
341
342/// Extension trait that directly adds row builder support for arrays that correspond to primitive
343/// variant types.
344trait AppendToVariantBuilder: Array {
345    fn append_to_variant_builder(
346        &self,
347        builder: &mut impl VariantBuilderExt,
348        index: usize,
349    ) -> Result<()>;
350}
351
352/// Macro that handles the unshredded case (typed_value is missing or NULL) and returns early if
353/// handled.  If not handled (shredded case), validates and returns the extracted value.
354macro_rules! handle_unshredded_case {
355    ($self:expr, $builder:expr, $metadata:expr, $index:expr, $partial_shredding:expr) => {{
356        let value = $self.value.as_ref().filter(|v| v.is_valid($index));
357        let value = value
358            .map(|v| {
359                let bytes = binary_array_value(v.as_ref(), $index).ok_or_else(|| {
360                    ArrowError::InvalidArgumentError(format!(
361                        "value field must be a binary-like array, instead got {}",
362                        v.data_type(),
363                    ))
364                })?;
365                Variant::try_new_with_metadata($metadata.clone(), bytes)
366            })
367            .transpose()?;
368
369        // If typed_value is null, handle unshredded case and return early
370        if $self.typed_value.is_null($index) {
371            match value {
372                Some(value) => $builder.append_value(value),
373                None => $builder.append_null(),
374            }
375            return Ok(());
376        }
377
378        // Only partial shredding allows value and typed_value to both be non-NULL
379        if !$partial_shredding && value.is_some() {
380            return Err(ArrowError::InvalidArgumentError(
381                "Invalid shredded variant: both value and typed_value are non-null".to_string(),
382            ));
383        }
384
385        // Return the extracted value for the partial shredded case
386        value
387    }};
388}
389
390/// Generic unshred builder that works with any Array implementing AppendToVariantBuilder
391struct UnshredPrimitiveRowBuilder<'a, T> {
392    value: Option<&'a ArrayRef>,
393    typed_value: &'a T,
394}
395
396impl<'a, T: AppendToVariantBuilder> UnshredPrimitiveRowBuilder<'a, T> {
397    fn new(value: Option<&'a ArrayRef>, typed_value: &'a T) -> Self {
398        Self { value, typed_value }
399    }
400
401    fn append_row(
402        &mut self,
403        builder: &mut impl VariantBuilderExt,
404        metadata: &VariantMetadata,
405        index: usize,
406    ) -> Result<()> {
407        handle_unshredded_case!(self, builder, metadata, index, false);
408
409        // If we get here, typed_value is valid and value is NULL
410        self.typed_value.append_to_variant_builder(builder, index)
411    }
412}
413
414// Macro to generate AppendToVariantBuilder implementations with optional value transformation
415macro_rules! impl_append_to_variant_builder {
416    ($array_type:ty $(, |$v:ident| $transform:expr)? ) => {
417        impl AppendToVariantBuilder for $array_type {
418            fn append_to_variant_builder(
419                &self,
420                builder: &mut impl VariantBuilderExt,
421                index: usize,
422            ) -> Result<()> {
423                let value = self.value(index);
424                $(
425                    let $v = value;
426                    let value = $transform;
427                )?
428                builder.append_value(value);
429                Ok(())
430            }
431        }
432    };
433}
434
435impl_append_to_variant_builder!(BooleanArray);
436impl_append_to_variant_builder!(StringArray);
437impl_append_to_variant_builder!(StringViewArray);
438impl_append_to_variant_builder!(LargeStringArray);
439impl_append_to_variant_builder!(BinaryArray);
440impl_append_to_variant_builder!(BinaryViewArray);
441impl_append_to_variant_builder!(LargeBinaryArray);
442impl_append_to_variant_builder!(PrimitiveArray<Int8Type>);
443impl_append_to_variant_builder!(PrimitiveArray<Int16Type>);
444impl_append_to_variant_builder!(PrimitiveArray<Int32Type>);
445impl_append_to_variant_builder!(PrimitiveArray<Int64Type>);
446impl_append_to_variant_builder!(PrimitiveArray<Float32Type>);
447impl_append_to_variant_builder!(PrimitiveArray<Float64Type>);
448
449impl_append_to_variant_builder!(PrimitiveArray<Date32Type>, |days_since_epoch| {
450    Date32Type::to_naive_date_opt(days_since_epoch).ok_or_else(|| {
451        ArrowError::InvalidArgumentError(format!("Invalid Date32 value: {days_since_epoch}"))
452    })?
453});
454
455impl_append_to_variant_builder!(
456    PrimitiveArray<Time64MicrosecondType>,
457    |micros_since_midnight| {
458        time64us_to_time(micros_since_midnight).ok_or_else(|| {
459            ArrowError::InvalidArgumentError(format!(
460                "Invalid Time64 microsecond value: {micros_since_midnight}"
461            ))
462        })?
463    }
464);
465
466// UUID from FixedSizeBinary(16)
467// NOTE: FixedSizeBinaryArray guarantees the byte length, so we can safely unwrap
468impl_append_to_variant_builder!(FixedSizeBinaryArray, |bytes| {
469    Uuid::from_slice(bytes).unwrap()
470});
471
472/// Trait for timestamp types to handle conversion to `DateTime<Utc>`
473trait TimestampType: ArrowPrimitiveType<Native = i64> {
474    fn to_datetime_utc(value: i64) -> Result<DateTime<Utc>>;
475}
476
477impl TimestampType for TimestampMicrosecondType {
478    fn to_datetime_utc(micros: i64) -> Result<DateTime<Utc>> {
479        DateTime::from_timestamp_micros(micros).ok_or_else(|| {
480            ArrowError::InvalidArgumentError(format!(
481                "Invalid timestamp microsecond value: {micros}"
482            ))
483        })
484    }
485}
486
487impl TimestampType for TimestampNanosecondType {
488    fn to_datetime_utc(nanos: i64) -> Result<DateTime<Utc>> {
489        Ok(DateTime::from_timestamp_nanos(nanos))
490    }
491}
492
493/// Generic builder for timestamp types that handles timezone-aware conversion
494struct TimestampUnshredRowBuilder<'a, T: TimestampType> {
495    value: Option<&'a ArrayRef>,
496    typed_value: &'a PrimitiveArray<T>,
497    has_timezone: bool,
498}
499
500impl<'a, T: TimestampType> TimestampUnshredRowBuilder<'a, T> {
501    fn new(value: Option<&'a ArrayRef>, typed_value: &'a dyn Array, has_timezone: bool) -> Self {
502        Self {
503            value,
504            typed_value: typed_value.as_primitive(),
505            has_timezone,
506        }
507    }
508
509    fn append_row(
510        &mut self,
511        builder: &mut impl VariantBuilderExt,
512        metadata: &VariantMetadata,
513        index: usize,
514    ) -> Result<()> {
515        handle_unshredded_case!(self, builder, metadata, index, false);
516
517        // If we get here, typed_value is valid and value is NULL
518        let timestamp_value = self.typed_value.value(index);
519        let dt = T::to_datetime_utc(timestamp_value)?;
520        if self.has_timezone {
521            builder.append_value(dt);
522        } else {
523            builder.append_value(dt.naive_utc());
524        }
525        Ok(())
526    }
527}
528
529/// Generic builder for decimal unshredding
530struct DecimalUnshredRowBuilder<'a, A: DecimalType, V>
531where
532    V: VariantDecimalType<Native = A::Native>,
533{
534    value: Option<&'a ArrayRef>,
535    typed_value: &'a PrimitiveArray<A>,
536    scale: i8,
537    _phantom: PhantomData<V>,
538}
539
540impl<'a, A: DecimalType, V> DecimalUnshredRowBuilder<'a, A, V>
541where
542    V: VariantDecimalType<Native = A::Native>,
543{
544    fn new(value: Option<&'a ArrayRef>, typed_value: &'a dyn Array, scale: i8) -> Self {
545        Self {
546            value,
547            typed_value: typed_value.as_primitive(),
548            scale,
549            _phantom: PhantomData,
550        }
551    }
552
553    fn append_row(
554        &mut self,
555        builder: &mut impl VariantBuilderExt,
556        metadata: &VariantMetadata,
557        index: usize,
558    ) -> Result<()> {
559        handle_unshredded_case!(self, builder, metadata, index, false);
560
561        let raw = self.typed_value.value(index);
562        let variant = V::try_new_with_signed_scale(raw, self.scale)?;
563        builder.append_value(variant);
564        Ok(())
565    }
566}
567
568/// Builder for unshredding struct/object types with nested fields
569struct StructUnshredVariantBuilder<'a> {
570    value: Option<&'a ArrayRef>,
571    typed_value: &'a arrow::array::StructArray,
572    field_unshredders: IndexMap<&'a str, Option<UnshredVariantRowBuilder<'a>>>,
573}
574
575impl<'a> StructUnshredVariantBuilder<'a> {
576    fn try_new(value: Option<&'a ArrayRef>, typed_value: &'a StructArray) -> Result<Self> {
577        // Create unshredders for each field in constructor
578        let mut field_unshredders = IndexMap::new();
579        for (field, field_array) in typed_value.fields().iter().zip(typed_value.columns()) {
580            // Factory returns None for None/None case -- these are missing fields we should skip
581            let Some(field_array) = field_array.as_struct_opt() else {
582                return Err(ArrowError::InvalidArgumentError(format!(
583                    "Invalid shredded variant object field: expected Struct, got {}",
584                    field_array.data_type()
585                )));
586            };
587            let field_unshredder = UnshredVariantRowBuilder::try_new_opt(field_array)?;
588            field_unshredders.insert(field.name().as_ref(), field_unshredder);
589        }
590
591        Ok(Self {
592            value,
593            typed_value,
594            field_unshredders,
595        })
596    }
597
598    fn append_row(
599        &mut self,
600        builder: &mut impl VariantBuilderExt,
601        metadata: &VariantMetadata,
602        index: usize,
603    ) -> Result<()> {
604        let value = handle_unshredded_case!(self, builder, metadata, index, true);
605
606        // If we get here, typed_value is valid and value may or may not be valid
607        let mut object_builder = builder.try_new_object()?;
608
609        // Process typed fields (skip empty builders that indicate missing fields)
610        for (field_name, field_unshredder_opt) in &mut self.field_unshredders {
611            if let Some(field_unshredder) = field_unshredder_opt {
612                let mut field_builder = ObjectFieldBuilder::new(field_name, &mut object_builder);
613                field_unshredder.append_row(&mut field_builder, metadata, index)?;
614            }
615        }
616
617        // Process any unshredded fields (partial shredding)
618        if let Some(value) = value {
619            let Variant::Object(object) = value else {
620                return Err(ArrowError::InvalidArgumentError(
621                    "Expected object in value field for partially shredded struct".to_string(),
622                ));
623            };
624
625            for entry in object.iter_try() {
626                let (field_name, field_value) = entry?;
627                if self.field_unshredders.contains_key(field_name) {
628                    return Err(ArrowError::InvalidArgumentError(format!(
629                        "Field '{field_name}' appears in both typed_value and value",
630                    )));
631                }
632                object_builder.insert_bytes(field_name, field_value);
633            }
634        }
635
636        object_builder.finish();
637        Ok(())
638    }
639}
640
641/// Builder for unshredding list/array types with recursive element processing
642struct ListUnshredVariantBuilder<'a, L: ListLikeArray> {
643    value: Option<&'a ArrayRef>,
644    typed_value: &'a L,
645    element_unshredder: Box<UnshredVariantRowBuilder<'a>>,
646}
647
648impl<'a, L: ListLikeArray> ListUnshredVariantBuilder<'a, L> {
649    fn try_new(value: Option<&'a ArrayRef>, typed_value: &'a L) -> Result<Self> {
650        // Create a recursive unshredder for the list elements
651        // The element type comes from the values array of the list
652        let element_values = typed_value.values();
653
654        // For shredded lists, each element would be a ShreddedVariantFieldArray (struct)
655        // Extract value/typed_value from the element struct
656        let Some(element_values) = element_values.as_struct_opt() else {
657            return Err(ArrowError::InvalidArgumentError(format!(
658                "Invalid shredded variant array element: expected Struct, got {}",
659                element_values.data_type()
660            )));
661        };
662
663        // Create recursive unshredder for elements
664        //
665        // NOTE: A None/None array element is technically invalid, but the shredding spec
666        // requires us to emit `Variant::Null` when a required value is missing.
667        let element_unshredder = UnshredVariantRowBuilder::try_new_opt(element_values)?
668            .unwrap_or_else(UnshredVariantRowBuilder::null);
669
670        Ok(Self {
671            value,
672            typed_value,
673            element_unshredder: Box::new(element_unshredder),
674        })
675    }
676
677    fn append_row(
678        &mut self,
679        builder: &mut impl VariantBuilderExt,
680        metadata: &VariantMetadata,
681        index: usize,
682    ) -> Result<()> {
683        handle_unshredded_case!(self, builder, metadata, index, false);
684
685        // If we get here, typed_value is valid and value is NULL -- process the list elements
686        let mut list_builder = builder.try_new_list()?;
687        for element_index in self.typed_value.element_range(index) {
688            self.element_unshredder
689                .append_row(&mut list_builder, metadata, element_index)?;
690        }
691
692        list_builder.finish();
693        Ok(())
694    }
695}
696
697#[cfg(test)]
698mod tests {
699    use crate::VariantArray;
700    use arrow::array::{
701        ArrayRef, BinaryArray, BinaryViewArray, LargeBinaryArray, LargeStringArray, StringViewArray,
702    };
703    use parquet_variant::Variant;
704    use std::sync::Arc;
705
706    #[test]
707    fn test_unshred_utf8view_typed_value() {
708        let metadata_bytes: &[u8] = &[0x01, 0x00, 0x00];
709        let metadata: ArrayRef =
710            Arc::new(BinaryViewArray::from_iter_values(vec![metadata_bytes; 3]));
711
712        let typed_value: ArrayRef = Arc::new(StringViewArray::from(vec![
713            Some("hello"),
714            Some("middle"),
715            Some("world"),
716        ]));
717
718        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
719
720        let result = crate::unshred_variant(&variant_array).unwrap();
721
722        assert_eq!(result.len(), 3);
723        assert_eq!(result.value(0), Variant::from("hello"));
724        assert_eq!(result.value(1), Variant::from("middle"));
725        assert_eq!(result.value(2), Variant::from("world"));
726    }
727
728    #[test]
729    fn test_unshred_largeutf8_typed_value() {
730        let metadata_bytes: &[u8] = &[0x01, 0x00, 0x00];
731        let metadata: ArrayRef =
732            Arc::new(BinaryViewArray::from_iter_values(vec![metadata_bytes; 3]));
733
734        let typed_value: ArrayRef = Arc::new(LargeStringArray::from(vec![
735            Some("hello"),
736            Some("middle"),
737            Some("world"),
738        ]));
739
740        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
741
742        let result = crate::unshred_variant(&variant_array).unwrap();
743
744        assert_eq!(result.len(), 3);
745        assert_eq!(result.value(0), Variant::from("hello"));
746        assert_eq!(result.value(1), Variant::from("middle"));
747        assert_eq!(result.value(2), Variant::from("world"));
748    }
749
750    #[test]
751    fn test_unshred_binary_typed_value() {
752        let metadata_bytes: &[u8] = &[0x01, 0x00, 0x00];
753        let metadata: ArrayRef =
754            Arc::new(BinaryViewArray::from_iter_values(vec![metadata_bytes; 3]));
755
756        let typed_value: ArrayRef = Arc::new(BinaryArray::from_iter_values(vec![
757            &b"\x00\x01\x02"[..],
758            &b"\xff\xaa"[..],
759            &b"\xde\xad\xbe\xef"[..],
760        ]));
761
762        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
763
764        let result = crate::unshred_variant(&variant_array).unwrap();
765
766        assert_eq!(result.len(), 3);
767        assert_eq!(result.value(0), Variant::from(&b"\x00\x01\x02"[..]));
768        assert_eq!(result.value(1), Variant::from(&b"\xff\xaa"[..]));
769        assert_eq!(result.value(2), Variant::from(&b"\xde\xad\xbe\xef"[..]));
770    }
771
772    #[test]
773    fn test_unshred_largebinary_typed_value() {
774        let metadata_bytes: &[u8] = &[0x01, 0x00, 0x00];
775        let metadata: ArrayRef =
776            Arc::new(BinaryViewArray::from_iter_values(vec![metadata_bytes; 3]));
777
778        let typed_value: ArrayRef = Arc::new(LargeBinaryArray::from_iter_values(vec![
779            &b"\x00\x01\x02"[..],
780            &b"\xff\xaa"[..],
781            &b"\xde\xad\xbe\xef"[..],
782        ]));
783
784        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
785
786        let result = crate::unshred_variant(&variant_array).unwrap();
787
788        assert_eq!(result.len(), 3);
789        assert_eq!(result.value(0), Variant::from(&b"\x00\x01\x02"[..]));
790        assert_eq!(result.value(1), Variant::from(&b"\xff\xaa"[..]));
791        assert_eq!(result.value(2), Variant::from(&b"\xde\xad\xbe\xef"[..]));
792    }
793
794    #[test]
795    fn test_unshred_returns_err_on_malformed_metadata() {
796        // empty metadata bytes fail VariantMetadata's header parse. before this fix the
797        // call inside unshred_variant used the panicking `VariantMetadata::new`, which
798        // crashed the thread instead of surfacing the spec violation through the
799        // documented `Result` return type.
800        let metadata: ArrayRef = Arc::new(BinaryViewArray::from_iter_values(vec![&b""[..]]));
801
802        let typed_value: ArrayRef = Arc::new(StringViewArray::from(vec![Some("hello")]));
803
804        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
805
806        let result = crate::unshred_variant(&variant_array);
807
808        assert!(
809            result.is_err(),
810            "unshred_variant must return Err on malformed metadata, not panic",
811        );
812    }
813}