Skip to main content

parquet_variant_compute/
unshred_variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for unshredding VariantArray by folding typed_value columns back into the value column.
19
20use crate::variant_array::binary_array_value;
21use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder};
22use arrow::array::{
23    Array, ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray,
24    FixedSizeBinaryArray, FixedSizeListArray, GenericListArray, GenericListViewArray,
25    LargeBinaryArray, LargeStringArray, ListLikeArray, PrimitiveArray, StringArray,
26    StringViewArray, StructArray,
27};
28use arrow::buffer::NullBuffer;
29use arrow::datatypes::{
30    ArrowPrimitiveType, DataType, Date32Type, Decimal32Type, Decimal64Type, Decimal128Type,
31    DecimalType, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
32    Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, TimestampNanosecondType,
33};
34use arrow::error::{ArrowError, Result};
35use arrow::temporal_conversions::time64us_to_time;
36use chrono::{DateTime, Utc};
37use indexmap::IndexMap;
38use parquet_variant::{
39    ObjectFieldBuilder, Variant, VariantBuilderExt, VariantDecimal4, VariantDecimal8,
40    VariantDecimal16, VariantDecimalType, VariantMetadata,
41};
42use std::marker::PhantomData;
43use std::sync::Arc;
44use uuid::Uuid;
45
46/// Removes all (nested) typed_value columns from a VariantArray by converting them back to binary
47/// variant and merging the resulting values back into the value column.
48///
49/// This function efficiently converts a shredded VariantArray back to an unshredded form where all
50/// data resides in the value column.
51///
52/// # Arguments
53/// * `array` - The VariantArray to unshred
54///
55/// # Returns
56/// A new VariantArray with all data in the value column and no typed_value column
57///
58/// # Errors
59/// - If the shredded data contains spec violations (e.g., field name conflicts)
60/// - If unsupported data types are encountered in typed_value columns
61pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> {
62    // Check if already unshredded (optimization for common case)
63    if array.typed_value_field().is_none() && array.value_field().is_some() {
64        return Ok(array.clone());
65    }
66
67    // NOTE: None/None at top-level is technically invalid, but the shredding spec requires us to
68    // emit `Variant::Null` when a required value is missing.
69    let nulls = array.nulls();
70    let mut row_builder = UnshredVariantRowBuilder::try_new_opt(array.shredding_state().borrow())?
71        .unwrap_or_else(|| UnshredVariantRowBuilder::null(nulls));
72
73    let metadata = array.metadata_field();
74    let mut value_builder = VariantValueArrayBuilder::new(array.len());
75    for i in 0..array.len() {
76        if array.is_null(i) {
77            value_builder.append_null();
78        } else {
79            let metadata_bytes = binary_array_value(metadata.as_ref(), i).ok_or_else(|| {
80                ArrowError::InvalidArgumentError(
81                    "metadata field must be a binary-like array".to_string(),
82                )
83            })?;
84            let metadata = VariantMetadata::try_new(metadata_bytes)?;
85            let mut value_builder = value_builder.builder_ext(&metadata);
86            row_builder.append_row(&mut value_builder, &metadata, i)?;
87        }
88    }
89
90    let value = value_builder.build()?;
91    Ok(VariantArray::from_parts(
92        metadata.clone(),
93        Some(Arc::new(value)),
94        None,
95        nulls.cloned(),
96    ))
97}
98
99/// Row builder for converting shredded VariantArray rows back to unshredded form
100enum UnshredVariantRowBuilder<'a> {
101    PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>),
102    PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>),
103    PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>),
104    PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>),
105    PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float32Type>>),
106    PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float64Type>>),
107    Decimal32(DecimalUnshredRowBuilder<'a, Decimal32Type, VariantDecimal4>),
108    Decimal64(DecimalUnshredRowBuilder<'a, Decimal64Type, VariantDecimal8>),
109    Decimal128(DecimalUnshredRowBuilder<'a, Decimal128Type, VariantDecimal16>),
110    PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Date32Type>>),
111    PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Time64MicrosecondType>>),
112    TimestampMicrosecond(TimestampUnshredRowBuilder<'a, TimestampMicrosecondType>),
113    TimestampNanosecond(TimestampUnshredRowBuilder<'a, TimestampNanosecondType>),
114    PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>),
115    PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>),
116    PrimitiveStringView(UnshredPrimitiveRowBuilder<'a, StringViewArray>),
117    PrimitiveLargeString(UnshredPrimitiveRowBuilder<'a, LargeStringArray>),
118    PrimitiveBinary(UnshredPrimitiveRowBuilder<'a, BinaryArray>),
119    PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>),
120    PrimitiveLargeBinary(UnshredPrimitiveRowBuilder<'a, LargeBinaryArray>),
121    PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>),
122    List(ListUnshredVariantBuilder<'a, GenericListArray<i32>>),
123    LargeList(ListUnshredVariantBuilder<'a, GenericListArray<i64>>),
124    ListView(ListUnshredVariantBuilder<'a, GenericListViewArray<i32>>),
125    LargeListView(ListUnshredVariantBuilder<'a, GenericListViewArray<i64>>),
126    FixedSizeList(ListUnshredVariantBuilder<'a, FixedSizeListArray>),
127    Struct(StructUnshredVariantBuilder<'a>),
128    ValueOnly(ValueOnlyUnshredVariantBuilder<'a>),
129    Null(NullUnshredVariantBuilder<'a>),
130}
131
132impl<'a> UnshredVariantRowBuilder<'a> {
133    /// Creates an all-null row builder.
134    fn null(nulls: Option<&'a NullBuffer>) -> Self {
135        Self::Null(NullUnshredVariantBuilder::new(nulls))
136    }
137
138    /// Appends a single row at the given value index to the supplied builder.
139    fn append_row(
140        &mut self,
141        builder: &mut impl VariantBuilderExt,
142        metadata: &VariantMetadata,
143        index: usize,
144    ) -> Result<()> {
145        match self {
146            Self::PrimitiveInt8(b) => b.append_row(builder, metadata, index),
147            Self::PrimitiveInt16(b) => b.append_row(builder, metadata, index),
148            Self::PrimitiveInt32(b) => b.append_row(builder, metadata, index),
149            Self::PrimitiveInt64(b) => b.append_row(builder, metadata, index),
150            Self::PrimitiveFloat32(b) => b.append_row(builder, metadata, index),
151            Self::PrimitiveFloat64(b) => b.append_row(builder, metadata, index),
152            Self::Decimal32(b) => b.append_row(builder, metadata, index),
153            Self::Decimal64(b) => b.append_row(builder, metadata, index),
154            Self::Decimal128(b) => b.append_row(builder, metadata, index),
155            Self::PrimitiveDate32(b) => b.append_row(builder, metadata, index),
156            Self::PrimitiveTime64(b) => b.append_row(builder, metadata, index),
157            Self::TimestampMicrosecond(b) => b.append_row(builder, metadata, index),
158            Self::TimestampNanosecond(b) => b.append_row(builder, metadata, index),
159            Self::PrimitiveBoolean(b) => b.append_row(builder, metadata, index),
160            Self::PrimitiveString(b) => b.append_row(builder, metadata, index),
161            Self::PrimitiveStringView(b) => b.append_row(builder, metadata, index),
162            Self::PrimitiveLargeString(b) => b.append_row(builder, metadata, index),
163            Self::PrimitiveBinary(b) => b.append_row(builder, metadata, index),
164            Self::PrimitiveBinaryView(b) => b.append_row(builder, metadata, index),
165            Self::PrimitiveLargeBinary(b) => b.append_row(builder, metadata, index),
166            Self::PrimitiveUuid(b) => b.append_row(builder, metadata, index),
167            Self::List(b) => b.append_row(builder, metadata, index),
168            Self::LargeList(b) => b.append_row(builder, metadata, index),
169            Self::ListView(b) => b.append_row(builder, metadata, index),
170            Self::LargeListView(b) => b.append_row(builder, metadata, index),
171            Self::FixedSizeList(b) => b.append_row(builder, metadata, index),
172            Self::Struct(b) => b.append_row(builder, metadata, index),
173            Self::ValueOnly(b) => b.append_row(builder, metadata, index),
174            Self::Null(b) => b.append_row(builder, metadata, index),
175        }
176    }
177
178    /// Creates a new UnshredVariantRowBuilder from shredding state
179    /// Returns None for None/None case - caller decides how to handle based on context
180    fn try_new_opt(shredding_state: BorrowedShreddingState<'a>) -> Result<Option<Self>> {
181        let value = shredding_state.value_field();
182        let typed_value = shredding_state.typed_value_field();
183        let Some(typed_value) = typed_value else {
184            // Copy the value across directly, if present. Else caller decides what to do.
185            return Ok(value.map(|v| Self::ValueOnly(ValueOnlyUnshredVariantBuilder::new(v))));
186        };
187
188        // Has typed_value -> determine type and create appropriate builder
189        macro_rules! primitive_builder {
190            ($enum_variant:ident, $cast_fn:ident) => {
191                Self::$enum_variant(UnshredPrimitiveRowBuilder::new(
192                    value,
193                    typed_value.$cast_fn(),
194                ))
195            };
196        }
197
198        let builder = match typed_value.data_type() {
199            DataType::Int8 => primitive_builder!(PrimitiveInt8, as_primitive),
200            DataType::Int16 => primitive_builder!(PrimitiveInt16, as_primitive),
201            DataType::Int32 => primitive_builder!(PrimitiveInt32, as_primitive),
202            DataType::Int64 => primitive_builder!(PrimitiveInt64, as_primitive),
203            DataType::Float32 => primitive_builder!(PrimitiveFloat32, as_primitive),
204            DataType::Float64 => primitive_builder!(PrimitiveFloat64, as_primitive),
205            DataType::Decimal32(p, s) if VariantDecimal4::is_valid_precision_and_scale(p, s) => {
206                Self::Decimal32(DecimalUnshredRowBuilder::new(value, typed_value, *s as _))
207            }
208            DataType::Decimal64(p, s) if VariantDecimal8::is_valid_precision_and_scale(p, s) => {
209                Self::Decimal64(DecimalUnshredRowBuilder::new(value, typed_value, *s as _))
210            }
211            DataType::Decimal128(p, s) if VariantDecimal16::is_valid_precision_and_scale(p, s) => {
212                Self::Decimal128(DecimalUnshredRowBuilder::new(value, typed_value, *s as _))
213            }
214            DataType::Decimal32(_, _)
215            | DataType::Decimal64(_, _)
216            | DataType::Decimal128(_, _)
217            | DataType::Decimal256(_, _) => {
218                return Err(ArrowError::InvalidArgumentError(format!(
219                    "{} is not a valid variant shredding type",
220                    typed_value.data_type()
221                )));
222            }
223            DataType::Date32 => primitive_builder!(PrimitiveDate32, as_primitive),
224            DataType::Time64(TimeUnit::Microsecond) => {
225                primitive_builder!(PrimitiveTime64, as_primitive)
226            }
227            DataType::Time64(time_unit) => {
228                return Err(ArrowError::InvalidArgumentError(format!(
229                    "Time64({time_unit}) is not a valid variant shredding type",
230                )));
231            }
232            DataType::Timestamp(TimeUnit::Microsecond, timezone) => Self::TimestampMicrosecond(
233                TimestampUnshredRowBuilder::new(value, typed_value, timezone.is_some()),
234            ),
235            DataType::Timestamp(TimeUnit::Nanosecond, timezone) => Self::TimestampNanosecond(
236                TimestampUnshredRowBuilder::new(value, typed_value, timezone.is_some()),
237            ),
238            DataType::Timestamp(time_unit, _) => {
239                return Err(ArrowError::InvalidArgumentError(format!(
240                    "Timestamp({time_unit}) is not a valid variant shredding type",
241                )));
242            }
243            DataType::Boolean => primitive_builder!(PrimitiveBoolean, as_boolean),
244            DataType::Utf8 => primitive_builder!(PrimitiveString, as_string),
245            DataType::Utf8View => primitive_builder!(PrimitiveStringView, as_string_view),
246            DataType::LargeUtf8 => primitive_builder!(PrimitiveLargeString, as_string),
247            DataType::Binary => primitive_builder!(PrimitiveBinary, as_binary),
248            DataType::BinaryView => primitive_builder!(PrimitiveBinaryView, as_binary_view),
249            DataType::LargeBinary => primitive_builder!(PrimitiveLargeBinary, as_binary),
250            DataType::FixedSizeBinary(16) => {
251                primitive_builder!(PrimitiveUuid, as_fixed_size_binary)
252            }
253            DataType::FixedSizeBinary(size) => {
254                return Err(ArrowError::InvalidArgumentError(format!(
255                    "FixedSizeBinary({size}) is not a valid variant shredding type",
256                )));
257            }
258            DataType::Struct(_) => Self::Struct(StructUnshredVariantBuilder::try_new(
259                value,
260                typed_value.as_struct(),
261            )?),
262            DataType::List(_) => Self::List(ListUnshredVariantBuilder::try_new(
263                value,
264                typed_value.as_list(),
265            )?),
266            DataType::LargeList(_) => Self::LargeList(ListUnshredVariantBuilder::try_new(
267                value,
268                typed_value.as_list(),
269            )?),
270            DataType::ListView(_) => Self::ListView(ListUnshredVariantBuilder::try_new(
271                value,
272                typed_value.as_list_view(),
273            )?),
274            DataType::LargeListView(_) => Self::LargeListView(ListUnshredVariantBuilder::try_new(
275                value,
276                typed_value.as_list_view(),
277            )?),
278            DataType::FixedSizeList(_, _) => Self::FixedSizeList(
279                ListUnshredVariantBuilder::try_new(value, typed_value.as_fixed_size_list())?,
280            ),
281            _ => {
282                return Err(ArrowError::NotYetImplemented(format!(
283                    "Unshredding not yet supported for type: {}",
284                    typed_value.data_type()
285                )));
286            }
287        };
288        Ok(Some(builder))
289    }
290}
291
292/// Builder for arrays with neither typed_value nor value (all NULL/Variant::Null)
293struct NullUnshredVariantBuilder<'a> {
294    nulls: Option<&'a NullBuffer>,
295}
296
297impl<'a> NullUnshredVariantBuilder<'a> {
298    fn new(nulls: Option<&'a NullBuffer>) -> Self {
299        Self { nulls }
300    }
301
302    fn append_row(
303        &mut self,
304        builder: &mut impl VariantBuilderExt,
305        _metadata: &VariantMetadata,
306        index: usize,
307    ) -> Result<()> {
308        if self.nulls.is_some_and(|nulls| nulls.is_null(index)) {
309            builder.append_null();
310        } else {
311            builder.append_value(Variant::Null);
312        }
313        Ok(())
314    }
315}
316
317/// Builder for arrays that only have value column (already unshredded)
318struct ValueOnlyUnshredVariantBuilder<'a> {
319    value: &'a ArrayRef,
320}
321
322impl<'a> ValueOnlyUnshredVariantBuilder<'a> {
323    fn new(value: &'a ArrayRef) -> Self {
324        Self { value }
325    }
326
327    fn append_row(
328        &mut self,
329        builder: &mut impl VariantBuilderExt,
330        metadata: &VariantMetadata,
331        index: usize,
332    ) -> Result<()> {
333        if self.value.is_null(index) {
334            builder.append_null();
335        } else {
336            let value_bytes = binary_array_value(self.value.as_ref(), index).ok_or_else(|| {
337                ArrowError::InvalidArgumentError(
338                    "value field must be a binary-like array".to_string(),
339                )
340            })?;
341            let variant = Variant::try_new_with_metadata(metadata.clone(), value_bytes)?;
342            builder.append_value(variant);
343        }
344        Ok(())
345    }
346}
347
348/// Extension trait that directly adds row builder support for arrays that correspond to primitive
349/// variant types.
350trait AppendToVariantBuilder: Array {
351    fn append_to_variant_builder(
352        &self,
353        builder: &mut impl VariantBuilderExt,
354        index: usize,
355    ) -> Result<()>;
356}
357
358/// Macro that handles the unshredded case (typed_value is missing or NULL) and returns early if
359/// handled.  If not handled (shredded case), validates and returns the extracted value.
360macro_rules! handle_unshredded_case {
361    ($self:expr, $builder:expr, $metadata:expr, $index:expr, $partial_shredding:expr) => {{
362        let value = $self.value.as_ref().filter(|v| v.is_valid($index));
363        let value = value
364            .map(|v| {
365                let bytes = binary_array_value(v.as_ref(), $index).ok_or_else(|| {
366                    ArrowError::InvalidArgumentError(format!(
367                        "value field must be a binary-like array, instead got {}",
368                        v.data_type(),
369                    ))
370                })?;
371                Variant::try_new_with_metadata($metadata.clone(), bytes)
372            })
373            .transpose()?;
374
375        // If typed_value is null, handle unshredded case and return early
376        if $self.typed_value.is_null($index) {
377            match value {
378                Some(value) => $builder.append_value(value),
379                None => $builder.append_null(),
380            }
381            return Ok(());
382        }
383
384        // Only partial shredding allows value and typed_value to both be non-NULL
385        if !$partial_shredding && value.is_some() {
386            return Err(ArrowError::InvalidArgumentError(
387                "Invalid shredded variant: both value and typed_value are non-null".to_string(),
388            ));
389        }
390
391        // Return the extracted value for the partial shredded case
392        value
393    }};
394}
395
396/// Generic unshred builder that works with any Array implementing AppendToVariantBuilder
397struct UnshredPrimitiveRowBuilder<'a, T> {
398    value: Option<&'a ArrayRef>,
399    typed_value: &'a T,
400}
401
402impl<'a, T: AppendToVariantBuilder> UnshredPrimitiveRowBuilder<'a, T> {
403    fn new(value: Option<&'a ArrayRef>, typed_value: &'a T) -> Self {
404        Self { value, typed_value }
405    }
406
407    fn append_row(
408        &mut self,
409        builder: &mut impl VariantBuilderExt,
410        metadata: &VariantMetadata,
411        index: usize,
412    ) -> Result<()> {
413        handle_unshredded_case!(self, builder, metadata, index, false);
414
415        // If we get here, typed_value is valid and value is NULL
416        self.typed_value.append_to_variant_builder(builder, index)
417    }
418}
419
420// Macro to generate AppendToVariantBuilder implementations with optional value transformation
421macro_rules! impl_append_to_variant_builder {
422    ($array_type:ty $(, |$v:ident| $transform:expr)? ) => {
423        impl AppendToVariantBuilder for $array_type {
424            fn append_to_variant_builder(
425                &self,
426                builder: &mut impl VariantBuilderExt,
427                index: usize,
428            ) -> Result<()> {
429                let value = self.value(index);
430                $(
431                    let $v = value;
432                    let value = $transform;
433                )?
434                builder.append_value(value);
435                Ok(())
436            }
437        }
438    };
439}
440
441impl_append_to_variant_builder!(BooleanArray);
442impl_append_to_variant_builder!(StringArray);
443impl_append_to_variant_builder!(StringViewArray);
444impl_append_to_variant_builder!(LargeStringArray);
445impl_append_to_variant_builder!(BinaryArray);
446impl_append_to_variant_builder!(BinaryViewArray);
447impl_append_to_variant_builder!(LargeBinaryArray);
448impl_append_to_variant_builder!(PrimitiveArray<Int8Type>);
449impl_append_to_variant_builder!(PrimitiveArray<Int16Type>);
450impl_append_to_variant_builder!(PrimitiveArray<Int32Type>);
451impl_append_to_variant_builder!(PrimitiveArray<Int64Type>);
452impl_append_to_variant_builder!(PrimitiveArray<Float32Type>);
453impl_append_to_variant_builder!(PrimitiveArray<Float64Type>);
454
455impl_append_to_variant_builder!(PrimitiveArray<Date32Type>, |days_since_epoch| {
456    Date32Type::to_naive_date_opt(days_since_epoch).ok_or_else(|| {
457        ArrowError::InvalidArgumentError(format!("Invalid Date32 value: {days_since_epoch}"))
458    })?
459});
460
461impl_append_to_variant_builder!(
462    PrimitiveArray<Time64MicrosecondType>,
463    |micros_since_midnight| {
464        time64us_to_time(micros_since_midnight).ok_or_else(|| {
465            ArrowError::InvalidArgumentError(format!(
466                "Invalid Time64 microsecond value: {micros_since_midnight}"
467            ))
468        })?
469    }
470);
471
472// UUID from FixedSizeBinary(16)
473// NOTE: FixedSizeBinaryArray guarantees the byte length, so we can safely unwrap
474impl_append_to_variant_builder!(FixedSizeBinaryArray, |bytes| {
475    Uuid::from_slice(bytes).unwrap()
476});
477
478/// Trait for timestamp types to handle conversion to `DateTime<Utc>`
479trait TimestampType: ArrowPrimitiveType<Native = i64> {
480    fn to_datetime_utc(value: i64) -> Result<DateTime<Utc>>;
481}
482
483impl TimestampType for TimestampMicrosecondType {
484    fn to_datetime_utc(micros: i64) -> Result<DateTime<Utc>> {
485        DateTime::from_timestamp_micros(micros).ok_or_else(|| {
486            ArrowError::InvalidArgumentError(format!(
487                "Invalid timestamp microsecond value: {micros}"
488            ))
489        })
490    }
491}
492
493impl TimestampType for TimestampNanosecondType {
494    fn to_datetime_utc(nanos: i64) -> Result<DateTime<Utc>> {
495        Ok(DateTime::from_timestamp_nanos(nanos))
496    }
497}
498
499/// Generic builder for timestamp types that handles timezone-aware conversion
500struct TimestampUnshredRowBuilder<'a, T: TimestampType> {
501    value: Option<&'a ArrayRef>,
502    typed_value: &'a PrimitiveArray<T>,
503    has_timezone: bool,
504}
505
506impl<'a, T: TimestampType> TimestampUnshredRowBuilder<'a, T> {
507    fn new(value: Option<&'a ArrayRef>, typed_value: &'a dyn Array, has_timezone: bool) -> Self {
508        Self {
509            value,
510            typed_value: typed_value.as_primitive(),
511            has_timezone,
512        }
513    }
514
515    fn append_row(
516        &mut self,
517        builder: &mut impl VariantBuilderExt,
518        metadata: &VariantMetadata,
519        index: usize,
520    ) -> Result<()> {
521        handle_unshredded_case!(self, builder, metadata, index, false);
522
523        // If we get here, typed_value is valid and value is NULL
524        let timestamp_value = self.typed_value.value(index);
525        let dt = T::to_datetime_utc(timestamp_value)?;
526        if self.has_timezone {
527            builder.append_value(dt);
528        } else {
529            builder.append_value(dt.naive_utc());
530        }
531        Ok(())
532    }
533}
534
535/// Generic builder for decimal unshredding
536struct DecimalUnshredRowBuilder<'a, A: DecimalType, V>
537where
538    V: VariantDecimalType<Native = A::Native>,
539{
540    value: Option<&'a ArrayRef>,
541    typed_value: &'a PrimitiveArray<A>,
542    scale: i8,
543    _phantom: PhantomData<V>,
544}
545
546impl<'a, A: DecimalType, V> DecimalUnshredRowBuilder<'a, A, V>
547where
548    V: VariantDecimalType<Native = A::Native>,
549{
550    fn new(value: Option<&'a ArrayRef>, typed_value: &'a dyn Array, scale: i8) -> Self {
551        Self {
552            value,
553            typed_value: typed_value.as_primitive(),
554            scale,
555            _phantom: PhantomData,
556        }
557    }
558
559    fn append_row(
560        &mut self,
561        builder: &mut impl VariantBuilderExt,
562        metadata: &VariantMetadata,
563        index: usize,
564    ) -> Result<()> {
565        handle_unshredded_case!(self, builder, metadata, index, false);
566
567        let raw = self.typed_value.value(index);
568        let variant = V::try_new_with_signed_scale(raw, self.scale)?;
569        builder.append_value(variant);
570        Ok(())
571    }
572}
573
574/// Builder for unshredding struct/object types with nested fields
575struct StructUnshredVariantBuilder<'a> {
576    value: Option<&'a ArrayRef>,
577    typed_value: &'a arrow::array::StructArray,
578    field_unshredders: IndexMap<&'a str, Option<UnshredVariantRowBuilder<'a>>>,
579}
580
581impl<'a> StructUnshredVariantBuilder<'a> {
582    fn try_new(value: Option<&'a ArrayRef>, typed_value: &'a StructArray) -> Result<Self> {
583        // Create unshredders for each field in constructor
584        let mut field_unshredders = IndexMap::new();
585        for (field, field_array) in typed_value.fields().iter().zip(typed_value.columns()) {
586            // Factory returns None for None/None case -- these are missing fields we should skip
587            let Some(field_array) = field_array.as_struct_opt() else {
588                return Err(ArrowError::InvalidArgumentError(format!(
589                    "Invalid shredded variant object field: expected Struct, got {}",
590                    field_array.data_type()
591                )));
592            };
593            let field_unshredder = UnshredVariantRowBuilder::try_new_opt(field_array.try_into()?)?;
594            field_unshredders.insert(field.name().as_ref(), field_unshredder);
595        }
596
597        Ok(Self {
598            value,
599            typed_value,
600            field_unshredders,
601        })
602    }
603
604    fn append_row(
605        &mut self,
606        builder: &mut impl VariantBuilderExt,
607        metadata: &VariantMetadata,
608        index: usize,
609    ) -> Result<()> {
610        let value = handle_unshredded_case!(self, builder, metadata, index, true);
611
612        // If we get here, typed_value is valid and value may or may not be valid
613        let mut object_builder = builder.try_new_object()?;
614
615        // Process typed fields (skip empty builders that indicate missing fields)
616        for (field_name, field_unshredder_opt) in &mut self.field_unshredders {
617            if let Some(field_unshredder) = field_unshredder_opt {
618                let mut field_builder = ObjectFieldBuilder::new(field_name, &mut object_builder);
619                field_unshredder.append_row(&mut field_builder, metadata, index)?;
620            }
621        }
622
623        // Process any unshredded fields (partial shredding)
624        if let Some(value) = value {
625            let Variant::Object(object) = value else {
626                return Err(ArrowError::InvalidArgumentError(
627                    "Expected object in value field for partially shredded struct".to_string(),
628                ));
629            };
630
631            for entry in object.iter_try() {
632                let (field_name, field_value) = entry?;
633                if self.field_unshredders.contains_key(field_name) {
634                    return Err(ArrowError::InvalidArgumentError(format!(
635                        "Field '{field_name}' appears in both typed_value and value",
636                    )));
637                }
638                object_builder.insert_bytes(field_name, field_value);
639            }
640        }
641
642        object_builder.finish();
643        Ok(())
644    }
645}
646
647/// Builder for unshredding list/array types with recursive element processing
648struct ListUnshredVariantBuilder<'a, L: ListLikeArray> {
649    value: Option<&'a ArrayRef>,
650    typed_value: &'a L,
651    element_unshredder: Box<UnshredVariantRowBuilder<'a>>,
652}
653
654impl<'a, L: ListLikeArray> ListUnshredVariantBuilder<'a, L> {
655    fn try_new(value: Option<&'a ArrayRef>, typed_value: &'a L) -> Result<Self> {
656        // Create a recursive unshredder for the list elements
657        // The element type comes from the values array of the list
658        let element_values = typed_value.values();
659
660        // For shredded lists, each element would be a ShreddedVariantFieldArray (struct)
661        // Extract value/typed_value from the element struct
662        let Some(element_values) = element_values.as_struct_opt() else {
663            return Err(ArrowError::InvalidArgumentError(format!(
664                "Invalid shredded variant array element: expected Struct, got {}",
665                element_values.data_type()
666            )));
667        };
668
669        // Create recursive unshredder for elements
670        //
671        // NOTE: A None/None array element is technically invalid, but the shredding spec
672        // requires us to emit `Variant::Null` when a required value is missing.
673        let element_unshredder = UnshredVariantRowBuilder::try_new_opt(element_values.try_into()?)?
674            .unwrap_or_else(|| UnshredVariantRowBuilder::null(None));
675
676        Ok(Self {
677            value,
678            typed_value,
679            element_unshredder: Box::new(element_unshredder),
680        })
681    }
682
683    fn append_row(
684        &mut self,
685        builder: &mut impl VariantBuilderExt,
686        metadata: &VariantMetadata,
687        index: usize,
688    ) -> Result<()> {
689        handle_unshredded_case!(self, builder, metadata, index, false);
690
691        // If we get here, typed_value is valid and value is NULL -- process the list elements
692        let mut list_builder = builder.try_new_list()?;
693        for element_index in self.typed_value.element_range(index) {
694            self.element_unshredder
695                .append_row(&mut list_builder, metadata, element_index)?;
696        }
697
698        list_builder.finish();
699        Ok(())
700    }
701}
702
703#[cfg(test)]
704mod tests {
705    use crate::VariantArray;
706    use arrow::array::{
707        ArrayRef, BinaryArray, BinaryViewArray, LargeBinaryArray, LargeStringArray, StringViewArray,
708    };
709    use parquet_variant::Variant;
710    use std::sync::Arc;
711
712    #[test]
713    fn test_unshred_utf8view_typed_value() {
714        let metadata_bytes: &[u8] = &[0x01, 0x00, 0x00];
715        let metadata: ArrayRef =
716            Arc::new(BinaryViewArray::from_iter_values(vec![metadata_bytes; 3]));
717
718        let typed_value: ArrayRef = Arc::new(StringViewArray::from(vec![
719            Some("hello"),
720            Some("middle"),
721            Some("world"),
722        ]));
723
724        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
725
726        let result = crate::unshred_variant(&variant_array).unwrap();
727
728        assert_eq!(result.len(), 3);
729        assert_eq!(result.value(0), Variant::from("hello"));
730        assert_eq!(result.value(1), Variant::from("middle"));
731        assert_eq!(result.value(2), Variant::from("world"));
732    }
733
734    #[test]
735    fn test_unshred_largeutf8_typed_value() {
736        let metadata_bytes: &[u8] = &[0x01, 0x00, 0x00];
737        let metadata: ArrayRef =
738            Arc::new(BinaryViewArray::from_iter_values(vec![metadata_bytes; 3]));
739
740        let typed_value: ArrayRef = Arc::new(LargeStringArray::from(vec![
741            Some("hello"),
742            Some("middle"),
743            Some("world"),
744        ]));
745
746        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
747
748        let result = crate::unshred_variant(&variant_array).unwrap();
749
750        assert_eq!(result.len(), 3);
751        assert_eq!(result.value(0), Variant::from("hello"));
752        assert_eq!(result.value(1), Variant::from("middle"));
753        assert_eq!(result.value(2), Variant::from("world"));
754    }
755
756    #[test]
757    fn test_unshred_binary_typed_value() {
758        let metadata_bytes: &[u8] = &[0x01, 0x00, 0x00];
759        let metadata: ArrayRef =
760            Arc::new(BinaryViewArray::from_iter_values(vec![metadata_bytes; 3]));
761
762        let typed_value: ArrayRef = Arc::new(BinaryArray::from_iter_values(vec![
763            &b"\x00\x01\x02"[..],
764            &b"\xff\xaa"[..],
765            &b"\xde\xad\xbe\xef"[..],
766        ]));
767
768        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
769
770        let result = crate::unshred_variant(&variant_array).unwrap();
771
772        assert_eq!(result.len(), 3);
773        assert_eq!(result.value(0), Variant::from(&b"\x00\x01\x02"[..]));
774        assert_eq!(result.value(1), Variant::from(&b"\xff\xaa"[..]));
775        assert_eq!(result.value(2), Variant::from(&b"\xde\xad\xbe\xef"[..]));
776    }
777
778    #[test]
779    fn test_unshred_largebinary_typed_value() {
780        let metadata_bytes: &[u8] = &[0x01, 0x00, 0x00];
781        let metadata: ArrayRef =
782            Arc::new(BinaryViewArray::from_iter_values(vec![metadata_bytes; 3]));
783
784        let typed_value: ArrayRef = Arc::new(LargeBinaryArray::from_iter_values(vec![
785            &b"\x00\x01\x02"[..],
786            &b"\xff\xaa"[..],
787            &b"\xde\xad\xbe\xef"[..],
788        ]));
789
790        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
791
792        let result = crate::unshred_variant(&variant_array).unwrap();
793
794        assert_eq!(result.len(), 3);
795        assert_eq!(result.value(0), Variant::from(&b"\x00\x01\x02"[..]));
796        assert_eq!(result.value(1), Variant::from(&b"\xff\xaa"[..]));
797        assert_eq!(result.value(2), Variant::from(&b"\xde\xad\xbe\xef"[..]));
798    }
799
800    #[test]
801    fn test_unshred_returns_err_on_malformed_metadata() {
802        // empty metadata bytes fail VariantMetadata's header parse. before this fix the
803        // call inside unshred_variant used the panicking `VariantMetadata::new`, which
804        // crashed the thread instead of surfacing the spec violation through the
805        // documented `Result` return type.
806        let metadata: ArrayRef = Arc::new(BinaryViewArray::from_iter_values(vec![&b""[..]]));
807
808        let typed_value: ArrayRef = Arc::new(StringViewArray::from(vec![Some("hello")]));
809
810        let variant_array = VariantArray::from_parts(metadata, None, Some(typed_value), None);
811
812        let result = crate::unshred_variant(&variant_array);
813
814        assert!(
815            result.is_err(),
816            "unshred_variant must return Err on malformed metadata, not panic",
817        );
818    }
819}