1use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder};
21use crate::variant_to_arrow::{
22 PrimitiveVariantToArrowRowBuilder, make_primitive_variant_to_arrow_row_builder,
23};
24use crate::{VariantArray, VariantValueArrayBuilder};
25use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
26use arrow::buffer::NullBuffer;
27use arrow::compute::CastOptions;
28use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
29use arrow::error::{ArrowError, Result};
30use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement};
31
32use indexmap::IndexMap;
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result<VariantArray> {
71 if array.typed_value_field().is_some() {
72 return Err(ArrowError::InvalidArgumentError(
73 "Input is already shredded".to_string(),
74 ));
75 }
76
77 if array.value_field().is_none() {
78 return Ok(array.clone());
80 };
81
82 let cast_options = CastOptions::default();
83 let mut builder = make_variant_to_shredded_variant_arrow_row_builder(
84 as_type,
85 &cast_options,
86 array.len(),
87 true,
88 )?;
89 for i in 0..array.len() {
90 if array.is_null(i) {
91 builder.append_null()?;
92 } else {
93 builder.append_value(array.value(i))?;
94 }
95 }
96 let (value, typed_value, nulls) = builder.finish()?;
97 Ok(VariantArray::from_parts(
98 array.metadata_field().clone(),
99 Some(value),
100 Some(typed_value),
101 nulls,
102 ))
103}
104
105pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>(
106 data_type: &'a DataType,
107 cast_options: &'a CastOptions,
108 capacity: usize,
109 top_level: bool,
110) -> Result<VariantToShreddedVariantRowBuilder<'a>> {
111 let builder = match data_type {
112 DataType::Struct(fields) => {
113 let typed_value_builder = VariantToShreddedObjectVariantRowBuilder::try_new(
114 fields,
115 cast_options,
116 capacity,
117 top_level,
118 )?;
119 VariantToShreddedVariantRowBuilder::Object(typed_value_builder)
120 }
121 DataType::List(_)
122 | DataType::LargeList(_)
123 | DataType::ListView(_)
124 | DataType::LargeListView(_)
125 | DataType::FixedSizeList(..) => {
126 return Err(ArrowError::NotYetImplemented(
127 "Shredding variant array values as arrow lists".to_string(),
128 ));
129 }
130 DataType::Boolean
133 | DataType::Int8
134 | DataType::Int16
135 | DataType::Int32
136 | DataType::Int64
137 | DataType::Float32
138 | DataType::Float64
139 | DataType::Decimal32(..)
140 | DataType::Decimal64(..)
141 | DataType::Decimal128(..)
142 | DataType::Date32
143 | DataType::Time64(TimeUnit::Microsecond)
144 | DataType::Timestamp(TimeUnit::Microsecond | TimeUnit::Nanosecond, _)
145 | DataType::Binary
146 | DataType::BinaryView
147 | DataType::Utf8
148 | DataType::Utf8View
149 | DataType::FixedSizeBinary(16) => {
151 let builder =
152 make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?;
153 let typed_value_builder =
154 VariantToShreddedPrimitiveVariantRowBuilder::new(builder, capacity, top_level);
155 VariantToShreddedVariantRowBuilder::Primitive(typed_value_builder)
156 }
157 DataType::FixedSizeBinary(_) => {
158 return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported.")))
159 }
160 _ => {
161 return Err(ArrowError::InvalidArgumentError(format!("{data_type} is not a valid variant shredding type")))
162 }
163 };
164 Ok(builder)
165}
166
167pub(crate) enum VariantToShreddedVariantRowBuilder<'a> {
168 Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>),
169 Object(VariantToShreddedObjectVariantRowBuilder<'a>),
170}
171impl<'a> VariantToShreddedVariantRowBuilder<'a> {
172 pub fn append_null(&mut self) -> Result<()> {
173 use VariantToShreddedVariantRowBuilder::*;
174 match self {
175 Primitive(b) => b.append_null(),
176 Object(b) => b.append_null(),
177 }
178 }
179
180 pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
181 use VariantToShreddedVariantRowBuilder::*;
182 match self {
183 Primitive(b) => b.append_value(value),
184 Object(b) => b.append_value(value),
185 }
186 }
187
188 pub fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
189 use VariantToShreddedVariantRowBuilder::*;
190 match self {
191 Primitive(b) => b.finish(),
192 Object(b) => b.finish(),
193 }
194 }
195}
196
197pub(crate) struct VariantToShreddedPrimitiveVariantRowBuilder<'a> {
199 value_builder: VariantValueArrayBuilder,
200 typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
201 nulls: NullBufferBuilder,
202 top_level: bool,
203}
204
205impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> {
206 pub(crate) fn new(
207 typed_value_builder: PrimitiveVariantToArrowRowBuilder<'a>,
208 capacity: usize,
209 top_level: bool,
210 ) -> Self {
211 Self {
212 value_builder: VariantValueArrayBuilder::new(capacity),
213 typed_value_builder,
214 nulls: NullBufferBuilder::new(capacity),
215 top_level,
216 }
217 }
218 fn append_null(&mut self) -> Result<()> {
219 self.nulls.append(!self.top_level);
222 self.value_builder.append_null();
223 self.typed_value_builder.append_null()
224 }
225 fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
226 self.nulls.append_non_null();
227 if self.typed_value_builder.append_value(&value)? {
228 self.value_builder.append_null();
229 } else {
230 self.value_builder.append_value(value);
231 }
232 Ok(true)
233 }
234 fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
235 Ok((
236 self.value_builder.build()?,
237 self.typed_value_builder.finish()?,
238 self.nulls.finish(),
239 ))
240 }
241}
242
243pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> {
244 value_builder: VariantValueArrayBuilder,
245 typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>,
246 typed_value_nulls: NullBufferBuilder,
247 nulls: NullBufferBuilder,
248 top_level: bool,
249}
250
251impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
252 fn try_new(
253 fields: &'a Fields,
254 cast_options: &'a CastOptions,
255 capacity: usize,
256 top_level: bool,
257 ) -> Result<Self> {
258 let typed_value_builders = fields.iter().map(|field| {
259 let builder = make_variant_to_shredded_variant_arrow_row_builder(
260 field.data_type(),
261 cast_options,
262 capacity,
263 false,
264 )?;
265 Ok((field.name().as_str(), builder))
266 });
267 Ok(Self {
268 value_builder: VariantValueArrayBuilder::new(capacity),
269 typed_value_builders: typed_value_builders.collect::<Result<_>>()?,
270 typed_value_nulls: NullBufferBuilder::new(capacity),
271 nulls: NullBufferBuilder::new(capacity),
272 top_level,
273 })
274 }
275
276 fn append_null(&mut self) -> Result<()> {
277 self.nulls.append(!self.top_level);
280 self.value_builder.append_null();
281 self.typed_value_nulls.append_null();
282 for (_, typed_value_builder) in &mut self.typed_value_builders {
283 typed_value_builder.append_null()?;
284 }
285 Ok(())
286 }
287 fn append_value(&mut self, value: Variant<'_, '_>) -> Result<bool> {
288 let Variant::Object(ref obj) = value else {
289 self.nulls.append_non_null();
291 self.value_builder.append_value(value);
292 self.typed_value_nulls.append_null();
293 for (_, typed_value_builder) in &mut self.typed_value_builders {
294 typed_value_builder.append_null()?;
295 }
296 return Ok(false);
297 };
298
299 let mut builder = self.value_builder.builder_ext(value.metadata());
301 let mut object_builder = builder.try_new_object()?;
302 let mut seen = std::collections::HashSet::new();
303 let mut partially_shredded = false;
304 for (field_name, value) in obj.iter() {
305 match self.typed_value_builders.get_mut(field_name) {
306 Some(typed_value_builder) => {
307 typed_value_builder.append_value(value)?;
308 seen.insert(field_name);
309 }
310 None => {
311 object_builder.insert_bytes(field_name, value);
312 partially_shredded = true;
313 }
314 }
315 }
316
317 for (field_name, typed_value_builder) in &mut self.typed_value_builders {
319 if !seen.contains(field_name) {
320 typed_value_builder.append_null()?;
321 }
322 }
323
324 if partially_shredded {
326 object_builder.finish();
327 } else {
328 drop(object_builder);
329 self.value_builder.append_null();
330 }
331
332 self.typed_value_nulls.append_non_null();
333 self.nulls.append_non_null();
334 Ok(true)
335 }
336 fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option<NullBuffer>)> {
337 let mut builder = StructArrayBuilder::new();
338 for (field_name, typed_value_builder) in self.typed_value_builders {
339 let (value, typed_value, nulls) = typed_value_builder.finish()?;
340 let array =
341 ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls);
342 builder = builder.with_field(field_name, ArrayRef::from(array), false);
343 }
344 if let Some(nulls) = self.typed_value_nulls.finish() {
345 builder = builder.with_nulls(nulls);
346 }
347 Ok((
348 self.value_builder.build()?,
349 Arc::new(builder.build()),
350 self.nulls.finish(),
351 ))
352 }
353}
354
355#[derive(Clone)]
357pub struct ShreddingField {
358 data_type: DataType,
359 nullable: bool,
360}
361
362impl ShreddingField {
363 fn new(data_type: DataType, nullable: bool) -> Self {
364 Self {
365 data_type,
366 nullable,
367 }
368 }
369
370 fn null() -> Self {
371 Self::new(DataType::Null, true)
372 }
373}
374
375pub trait IntoShreddingField {
377 fn into_shredding_field(self) -> ShreddingField;
378}
379
380impl IntoShreddingField for FieldRef {
381 fn into_shredding_field(self) -> ShreddingField {
382 ShreddingField::new(self.data_type().clone(), self.is_nullable())
383 }
384}
385
386impl IntoShreddingField for &DataType {
387 fn into_shredding_field(self) -> ShreddingField {
388 ShreddingField::new(self.clone(), true)
389 }
390}
391
392impl IntoShreddingField for DataType {
393 fn into_shredding_field(self) -> ShreddingField {
394 ShreddingField::new(self, true)
395 }
396}
397
398impl IntoShreddingField for (&DataType, bool) {
399 fn into_shredding_field(self) -> ShreddingField {
400 ShreddingField::new(self.0.clone(), self.1)
401 }
402}
403
404impl IntoShreddingField for (DataType, bool) {
405 fn into_shredding_field(self) -> ShreddingField {
406 ShreddingField::new(self.0, self.1)
407 }
408}
409
410#[derive(Default, Clone)]
449pub struct ShreddedSchemaBuilder {
450 root: VariantSchemaNode,
451}
452
453impl ShreddedSchemaBuilder {
454 pub fn new() -> Self {
456 Self::default()
457 }
458
459 pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Self
471 where
472 P: Into<VariantPath<'a>>,
473 F: IntoShreddingField,
474 {
475 let path: VariantPath<'a> = path.into();
476 self.root.insert_path(&path, field.into_shredding_field());
477 self
478 }
479
480 pub fn build(self) -> DataType {
482 let shredding_type = self.root.to_shredding_type();
483 match shredding_type {
484 Some(shredding_type) => shredding_type,
485 None => DataType::Null,
486 }
487 }
488}
489
490#[derive(Clone)]
492enum VariantSchemaNode {
493 Leaf(ShreddingField),
495 Struct(BTreeMap<String, VariantSchemaNode>),
497}
498
499impl Default for VariantSchemaNode {
500 fn default() -> Self {
501 Self::Leaf(ShreddingField::null())
502 }
503}
504
505impl VariantSchemaNode {
506 fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
508 self.insert_path_elements(path, field);
509 }
510
511 fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], field: ShreddingField) {
512 let Some((head, tail)) = segments.split_first() else {
513 *self = Self::Leaf(field);
514 return;
515 };
516
517 match head {
518 VariantPathElement::Field { name } => {
519 let children = match self {
521 Self::Struct(children) => children,
522 _ => {
523 *self = Self::Struct(BTreeMap::new());
524 match self {
525 Self::Struct(children) => children,
526 _ => unreachable!(),
527 }
528 }
529 };
530
531 children
532 .entry(name.to_string())
533 .or_default()
534 .insert_path_elements(tail, field);
535 }
536 VariantPathElement::Index { .. } => {
537 unreachable!("List paths are not supported yet");
539 }
540 }
541 }
542
543 fn to_shredding_type(&self) -> Option<DataType> {
547 match self {
548 Self::Leaf(field) => Some(field.data_type.clone()),
549 Self::Struct(children) => {
550 let child_fields: Vec<_> = children
551 .iter()
552 .filter_map(|(name, child)| child.to_shredding_field(name))
553 .collect();
554 if child_fields.is_empty() {
555 None
556 } else {
557 Some(DataType::Struct(Fields::from(child_fields)))
558 }
559 }
560 }
561 }
562
563 fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
564 match self {
565 Self::Leaf(field) => Some(Arc::new(Field::new(
566 name,
567 field.data_type.clone(),
568 field.nullable,
569 ))),
570 Self::Struct(_) => self
571 .to_shredding_type()
572 .map(|data_type| Arc::new(Field::new(name, data_type, true))),
573 }
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580 use crate::VariantArrayBuilder;
581 use arrow::array::{Array, FixedSizeBinaryArray, Float64Array, Int64Array};
582 use arrow::datatypes::{DataType, Field, Fields, TimeUnit, UnionFields, UnionMode};
583 use parquet_variant::{
584 ObjectBuilder, ReadOnlyMetadataBuilder, Variant, VariantBuilder, VariantPath,
585 VariantPathElement,
586 };
587 use std::sync::Arc;
588 use uuid::Uuid;
589
590 #[test]
591 fn test_already_shredded_input_error() {
592 let temp_array = VariantArray::from_iter(vec![Some(Variant::from("test"))]);
595 let metadata = temp_array.metadata_field().clone();
596 let value = temp_array.value_field().unwrap().clone();
597 let typed_value = Arc::new(Int64Array::from(vec![42])) as ArrayRef;
598
599 let shredded_array =
600 VariantArray::from_parts(metadata, Some(value), Some(typed_value), None);
601
602 let result = shred_variant(&shredded_array, &DataType::Int64);
603 assert!(matches!(
604 result.unwrap_err(),
605 ArrowError::InvalidArgumentError(_)
606 ));
607 }
608
609 #[test]
610 fn test_all_null_input() {
611 let metadata = BinaryViewArray::from_iter_values([&[1u8, 0u8]]); let all_null_array = VariantArray::from_parts(metadata, None, None, None);
614 let result = shred_variant(&all_null_array, &DataType::Int64).unwrap();
615
616 assert!(result.value_field().is_none());
618 assert!(result.typed_value_field().is_none());
619 }
620
621 #[test]
622 fn test_unsupported_list_schema() {
623 let input = VariantArray::from_iter([Variant::from(42)]);
624 let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true)));
625 shred_variant(&input, &list_schema).expect_err("unsupported");
626 }
627
628 #[test]
629 fn test_invalid_fixed_size_binary_shredding() {
630 let mock_uuid_1 = Uuid::new_v4();
631
632 let input = VariantArray::from_iter([Some(Variant::from(mock_uuid_1)), None]);
633
634 let err = shred_variant(&input, &DataType::FixedSizeBinary(17)).unwrap_err();
636
637 assert_eq!(
638 err.to_string(),
639 "Invalid argument error: FixedSizeBinary(17) is not a valid variant shredding type. Only FixedSizeBinary(16) for UUID is supported."
640 );
641 }
642
643 #[test]
644 fn test_uuid_shredding() {
645 let mock_uuid_1 = Uuid::new_v4();
646 let mock_uuid_2 = Uuid::new_v4();
647
648 let input = VariantArray::from_iter([
649 Some(Variant::from(mock_uuid_1)),
650 None,
651 Some(Variant::from(false)),
652 Some(Variant::from(mock_uuid_2)),
653 ]);
654
655 let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();
656
657 let uuids = variant_array
673 .typed_value_field()
674 .unwrap()
675 .as_any()
676 .downcast_ref::<FixedSizeBinaryArray>()
677 .unwrap();
678
679 assert_eq!(uuids.len(), 4);
680
681 assert!(!uuids.is_null(0));
682
683 let got_uuid_1: &[u8] = uuids.value(0);
684 assert_eq!(got_uuid_1, mock_uuid_1.as_bytes());
685
686 assert!(uuids.is_null(1));
687 assert!(uuids.is_null(2));
688
689 assert!(!uuids.is_null(3));
690
691 let got_uuid_2: &[u8] = uuids.value(3);
692 assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
693 }
694
695 #[test]
696 fn test_primitive_shredding_comprehensive() {
697 let input = VariantArray::from_iter(vec![
699 Some(Variant::from(42i64)), Some(Variant::from("hello")), Some(Variant::from(100i64)), None, Some(Variant::Null), Some(Variant::from(3i8)), ]);
706
707 let result = shred_variant(&input, &DataType::Int64).unwrap();
708
709 let metadata_field = result.metadata_field();
711 let value_field = result.value_field().unwrap();
712 let typed_value_field = result
713 .typed_value_field()
714 .unwrap()
715 .as_any()
716 .downcast_ref::<Int64Array>()
717 .unwrap();
718
719 assert_eq!(result.len(), 6);
721
722 assert!(!result.is_null(0));
724 assert!(value_field.is_null(0)); assert!(!typed_value_field.is_null(0));
726 assert_eq!(typed_value_field.value(0), 42);
727
728 assert!(!result.is_null(1));
730 assert!(!value_field.is_null(1)); assert!(typed_value_field.is_null(1)); assert_eq!(
733 Variant::new(metadata_field.value(1), value_field.value(1)),
734 Variant::from("hello")
735 );
736
737 assert!(!result.is_null(2));
739 assert!(value_field.is_null(2));
740 assert_eq!(typed_value_field.value(2), 100);
741
742 assert!(result.is_null(3));
744
745 assert!(!result.is_null(4));
747 assert!(!value_field.is_null(4)); assert_eq!(
749 Variant::new(metadata_field.value(4), value_field.value(4)),
750 Variant::Null
751 );
752 assert!(typed_value_field.is_null(4));
753
754 assert!(!result.is_null(5));
756 assert!(value_field.is_null(5)); assert!(!typed_value_field.is_null(5));
758 assert_eq!(typed_value_field.value(5), 3);
759 }
760
761 #[test]
762 fn test_primitive_different_target_types() {
763 let input = VariantArray::from_iter(vec![
764 Variant::from(42i32),
765 Variant::from(3.15f64),
766 Variant::from("not_a_number"),
767 ]);
768
769 let result_int32 = shred_variant(&input, &DataType::Int32).unwrap();
771 let typed_value_int32 = result_int32
772 .typed_value_field()
773 .unwrap()
774 .as_any()
775 .downcast_ref::<arrow::array::Int32Array>()
776 .unwrap();
777 assert_eq!(typed_value_int32.value(0), 42);
778 assert!(typed_value_int32.is_null(1)); assert!(typed_value_int32.is_null(2)); let result_float64 = shred_variant(&input, &DataType::Float64).unwrap();
783 let typed_value_float64 = result_float64
784 .typed_value_field()
785 .unwrap()
786 .as_any()
787 .downcast_ref::<Float64Array>()
788 .unwrap();
789 assert_eq!(typed_value_float64.value(0), 42.0); assert_eq!(typed_value_float64.value(1), 3.15);
791 assert!(typed_value_float64.is_null(2)); }
793
794 #[test]
795 fn test_invalid_shredded_types_rejected() {
796 let input = VariantArray::from_iter([Variant::from(42)]);
797
798 let invalid_types = vec![
799 DataType::UInt8,
800 DataType::Float16,
801 DataType::Decimal256(38, 10),
802 DataType::Date64,
803 DataType::Time32(TimeUnit::Second),
804 DataType::Time64(TimeUnit::Nanosecond),
805 DataType::Timestamp(TimeUnit::Millisecond, None),
806 DataType::LargeBinary,
807 DataType::LargeUtf8,
808 DataType::FixedSizeBinary(17),
809 DataType::Union(
810 UnionFields::new(
811 vec![0_i8, 1_i8],
812 vec![
813 Field::new("int_field", DataType::Int32, false),
814 Field::new("str_field", DataType::Utf8, true),
815 ],
816 ),
817 UnionMode::Dense,
818 ),
819 DataType::Map(
820 Arc::new(Field::new(
821 "entries",
822 DataType::Struct(Fields::from(vec![
823 Field::new("key", DataType::Utf8, false),
824 Field::new("value", DataType::Int32, true),
825 ])),
826 false,
827 )),
828 false,
829 ),
830 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
831 DataType::RunEndEncoded(
832 Arc::new(Field::new("run_ends", DataType::Int32, false)),
833 Arc::new(Field::new("values", DataType::Utf8, true)),
834 ),
835 ];
836
837 for data_type in invalid_types {
838 let err = shred_variant(&input, &data_type).unwrap_err();
839 assert!(
840 matches!(err, ArrowError::InvalidArgumentError(_)),
841 "expected InvalidArgumentError for {:?}, got {:?}",
842 data_type,
843 err
844 );
845 }
846 }
847
848 #[test]
849 fn test_object_shredding_comprehensive() {
850 let mut builder = VariantArrayBuilder::new(7);
851
852 builder
854 .new_object()
855 .with_field("score", 95.5f64)
856 .with_field("age", 30i64)
857 .finish();
858
859 builder
861 .new_object()
862 .with_field("score", 87.2f64)
863 .with_field("age", 25i64)
864 .with_field("email", "bob@example.com")
865 .finish();
866
867 builder.new_object().with_field("age", 35i64).finish();
869
870 builder
872 .new_object()
873 .with_field("score", "ninety-five")
874 .with_field("age", "thirty")
875 .finish();
876
877 builder.append_variant(Variant::from("not an object"));
879
880 builder.new_object().finish();
882
883 builder.append_null();
885
886 builder.new_object().with_field("foo", 10).finish();
888
889 builder
891 .new_object()
892 .with_field("score", 66.67f64)
893 .with_field("foo", 10)
894 .finish();
895
896 let input = builder.build();
897
898 let target_schema = ShreddedSchemaBuilder::default()
901 .with_path("score", &DataType::Float64)
902 .with_path("age", &DataType::Int64)
903 .build();
904
905 let result = shred_variant(&input, &target_schema).unwrap();
906
907 assert!(result.value_field().is_some());
909 assert!(result.typed_value_field().is_some());
910 assert_eq!(result.len(), 9);
911
912 let metadata = result.metadata_field();
913
914 let value = result.value_field().unwrap();
915 let typed_value = result
916 .typed_value_field()
917 .unwrap()
918 .as_any()
919 .downcast_ref::<arrow::array::StructArray>()
920 .unwrap();
921
922 let score_field =
924 ShreddedVariantFieldArray::try_new(typed_value.column_by_name("score").unwrap())
925 .unwrap();
926 let age_field =
927 ShreddedVariantFieldArray::try_new(typed_value.column_by_name("age").unwrap()).unwrap();
928
929 let score_value = score_field
930 .value_field()
931 .unwrap()
932 .as_any()
933 .downcast_ref::<BinaryViewArray>()
934 .unwrap();
935 let score_typed_value = score_field
936 .typed_value_field()
937 .unwrap()
938 .as_any()
939 .downcast_ref::<Float64Array>()
940 .unwrap();
941 let age_value = age_field
942 .value_field()
943 .unwrap()
944 .as_any()
945 .downcast_ref::<BinaryViewArray>()
946 .unwrap();
947 let age_typed_value = age_field
948 .typed_value_field()
949 .unwrap()
950 .as_any()
951 .downcast_ref::<Int64Array>()
952 .unwrap();
953
954 struct ShreddedValue<'m, 'v, T> {
956 value: Option<Variant<'m, 'v>>,
957 typed_value: Option<T>,
958 }
959 struct ShreddedStruct<'m, 'v> {
960 score: ShreddedValue<'m, 'v, f64>,
961 age: ShreddedValue<'m, 'v, i64>,
962 }
963 fn get_value<'m, 'v>(
964 i: usize,
965 metadata: &'m BinaryViewArray,
966 value: &'v BinaryViewArray,
967 ) -> Variant<'m, 'v> {
968 Variant::new(metadata.value(i), value.value(i))
969 }
970 let expect = |i, expected_result: Option<ShreddedValue<ShreddedStruct>>| {
971 match expected_result {
972 Some(ShreddedValue {
973 value: expected_value,
974 typed_value: expected_typed_value,
975 }) => {
976 assert!(result.is_valid(i));
977 match expected_value {
978 Some(expected_value) => {
979 assert!(value.is_valid(i));
980 assert_eq!(expected_value, get_value(i, metadata, value));
981 }
982 None => {
983 assert!(value.is_null(i));
984 }
985 }
986 match expected_typed_value {
987 Some(ShreddedStruct {
988 score: expected_score,
989 age: expected_age,
990 }) => {
991 assert!(typed_value.is_valid(i));
992 assert!(score_field.is_valid(i)); assert!(age_field.is_valid(i)); match expected_score.value {
995 Some(expected_score_value) => {
996 assert!(score_value.is_valid(i));
997 assert_eq!(
998 expected_score_value,
999 get_value(i, metadata, score_value)
1000 );
1001 }
1002 None => {
1003 assert!(score_value.is_null(i));
1004 }
1005 }
1006 match expected_score.typed_value {
1007 Some(expected_score) => {
1008 assert!(score_typed_value.is_valid(i));
1009 assert_eq!(expected_score, score_typed_value.value(i));
1010 }
1011 None => {
1012 assert!(score_typed_value.is_null(i));
1013 }
1014 }
1015 match expected_age.value {
1016 Some(expected_age_value) => {
1017 assert!(age_value.is_valid(i));
1018 assert_eq!(
1019 expected_age_value,
1020 get_value(i, metadata, age_value)
1021 );
1022 }
1023 None => {
1024 assert!(age_value.is_null(i));
1025 }
1026 }
1027 match expected_age.typed_value {
1028 Some(expected_age) => {
1029 assert!(age_typed_value.is_valid(i));
1030 assert_eq!(expected_age, age_typed_value.value(i));
1031 }
1032 None => {
1033 assert!(age_typed_value.is_null(i));
1034 }
1035 }
1036 }
1037 None => {
1038 assert!(typed_value.is_null(i));
1039 }
1040 }
1041 }
1042 None => {
1043 assert!(result.is_null(i));
1044 }
1045 };
1046 };
1047
1048 expect(
1050 0,
1051 Some(ShreddedValue {
1052 value: None,
1053 typed_value: Some(ShreddedStruct {
1054 score: ShreddedValue {
1055 value: None,
1056 typed_value: Some(95.5),
1057 },
1058 age: ShreddedValue {
1059 value: None,
1060 typed_value: Some(30),
1061 },
1062 }),
1063 }),
1064 );
1065
1066 let mut builder = VariantBuilder::new();
1068 builder
1069 .new_object()
1070 .with_field("email", "bob@example.com")
1071 .finish();
1072 let (m, v) = builder.finish();
1073 let expected_value = Variant::new(&m, &v);
1074
1075 expect(
1076 1,
1077 Some(ShreddedValue {
1078 value: Some(expected_value),
1079 typed_value: Some(ShreddedStruct {
1080 score: ShreddedValue {
1081 value: None,
1082 typed_value: Some(87.2),
1083 },
1084 age: ShreddedValue {
1085 value: None,
1086 typed_value: Some(25),
1087 },
1088 }),
1089 }),
1090 );
1091
1092 expect(
1094 2,
1095 Some(ShreddedValue {
1096 value: None,
1097 typed_value: Some(ShreddedStruct {
1098 score: ShreddedValue {
1099 value: None,
1100 typed_value: None,
1101 },
1102 age: ShreddedValue {
1103 value: None,
1104 typed_value: Some(35),
1105 },
1106 }),
1107 }),
1108 );
1109
1110 expect(
1112 3,
1113 Some(ShreddedValue {
1114 value: None,
1115 typed_value: Some(ShreddedStruct {
1116 score: ShreddedValue {
1117 value: Some(Variant::from("ninety-five")),
1118 typed_value: None,
1119 },
1120 age: ShreddedValue {
1121 value: Some(Variant::from("thirty")),
1122 typed_value: None,
1123 },
1124 }),
1125 }),
1126 );
1127
1128 expect(
1130 4,
1131 Some(ShreddedValue {
1132 value: Some(Variant::from("not an object")),
1133 typed_value: None,
1134 }),
1135 );
1136
1137 expect(
1139 5,
1140 Some(ShreddedValue {
1141 value: None,
1142 typed_value: Some(ShreddedStruct {
1143 score: ShreddedValue {
1144 value: None,
1145 typed_value: None,
1146 },
1147 age: ShreddedValue {
1148 value: None,
1149 typed_value: None,
1150 },
1151 }),
1152 }),
1153 );
1154
1155 expect(6, None);
1157
1158 let object_with_foo_field = |i| {
1160 use parquet_variant::{ParentState, ValueBuilder, VariantMetadata};
1161 let metadata = VariantMetadata::new(metadata.value(i));
1162 let mut metadata_builder = ReadOnlyMetadataBuilder::new(&metadata);
1163 let mut value_builder = ValueBuilder::new();
1164 let state = ParentState::variant(&mut value_builder, &mut metadata_builder);
1165 ObjectBuilder::new(state, false)
1166 .with_field("foo", 10)
1167 .finish();
1168 (metadata, value_builder.into_inner())
1169 };
1170
1171 let (m, v) = object_with_foo_field(7);
1173 expect(
1174 7,
1175 Some(ShreddedValue {
1176 value: Some(Variant::new_with_metadata(m, &v)),
1177 typed_value: Some(ShreddedStruct {
1178 score: ShreddedValue {
1179 value: None,
1180 typed_value: None,
1181 },
1182 age: ShreddedValue {
1183 value: None,
1184 typed_value: None,
1185 },
1186 }),
1187 }),
1188 );
1189
1190 let (m, v) = object_with_foo_field(8);
1192 expect(
1193 8,
1194 Some(ShreddedValue {
1195 value: Some(Variant::new_with_metadata(m, &v)),
1196 typed_value: Some(ShreddedStruct {
1197 score: ShreddedValue {
1198 value: None,
1199 typed_value: Some(66.67),
1200 },
1201 age: ShreddedValue {
1202 value: None,
1203 typed_value: None,
1204 },
1205 }),
1206 }),
1207 );
1208 }
1209
1210 #[test]
1211 fn test_object_different_schemas() {
1212 let mut builder = VariantArrayBuilder::new(1);
1214 builder
1215 .new_object()
1216 .with_field("id", 123i32)
1217 .with_field("age", 25i64)
1218 .with_field("score", 95.5f64)
1219 .finish();
1220 let input = builder.build();
1221
1222 let schema1 = ShreddedSchemaBuilder::default()
1224 .with_path("id", &DataType::Int32)
1225 .build();
1226 let result1 = shred_variant(&input, &schema1).unwrap();
1227 let value_field1 = result1.value_field().unwrap();
1228 assert!(!value_field1.is_null(0)); let schema2 = ShreddedSchemaBuilder::default()
1232 .with_path("id", &DataType::Int32)
1233 .with_path("age", &DataType::Int64)
1234 .build();
1235 let result2 = shred_variant(&input, &schema2).unwrap();
1236 let value_field2 = result2.value_field().unwrap();
1237 assert!(!value_field2.is_null(0)); let schema3 = ShreddedSchemaBuilder::default()
1241 .with_path("id", &DataType::Int32)
1242 .with_path("age", &DataType::Int64)
1243 .with_path("score", &DataType::Float64)
1244 .build();
1245 let result3 = shred_variant(&input, &schema3).unwrap();
1246 let value_field3 = result3.value_field().unwrap();
1247 assert!(value_field3.is_null(0)); }
1249
1250 #[test]
1251 fn test_uuid_shredding_in_objects() {
1252 let mock_uuid_1 = Uuid::new_v4();
1253 let mock_uuid_2 = Uuid::new_v4();
1254 let mock_uuid_3 = Uuid::new_v4();
1255
1256 let mut builder = VariantArrayBuilder::new(6);
1257
1258 builder
1260 .new_object()
1261 .with_field("id", mock_uuid_1)
1262 .with_field("session_id", mock_uuid_2)
1263 .finish();
1264
1265 builder
1267 .new_object()
1268 .with_field("id", mock_uuid_2)
1269 .with_field("session_id", mock_uuid_3)
1270 .with_field("name", "test_user")
1271 .finish();
1272
1273 builder.new_object().with_field("id", mock_uuid_1).finish();
1275
1276 builder
1278 .new_object()
1279 .with_field("id", mock_uuid_3)
1280 .with_field("session_id", "not-a-uuid")
1281 .finish();
1282
1283 builder
1285 .new_object()
1286 .with_field("id", 12345i64)
1287 .with_field("session_id", mock_uuid_1)
1288 .finish();
1289
1290 builder.append_null();
1292
1293 let input = builder.build();
1294
1295 let target_schema = ShreddedSchemaBuilder::default()
1296 .with_path("id", DataType::FixedSizeBinary(16))
1297 .with_path("session_id", DataType::FixedSizeBinary(16))
1298 .build();
1299
1300 let result = shred_variant(&input, &target_schema).unwrap();
1301
1302 assert!(result.value_field().is_some());
1303 assert!(result.typed_value_field().is_some());
1304 assert_eq!(result.len(), 6);
1305
1306 let metadata = result.metadata_field();
1307 let value = result.value_field().unwrap();
1308 let typed_value = result
1309 .typed_value_field()
1310 .unwrap()
1311 .as_any()
1312 .downcast_ref::<arrow::array::StructArray>()
1313 .unwrap();
1314
1315 let id_field =
1317 ShreddedVariantFieldArray::try_new(typed_value.column_by_name("id").unwrap()).unwrap();
1318 let session_id_field =
1319 ShreddedVariantFieldArray::try_new(typed_value.column_by_name("session_id").unwrap())
1320 .unwrap();
1321
1322 let id_value = id_field
1323 .value_field()
1324 .unwrap()
1325 .as_any()
1326 .downcast_ref::<BinaryViewArray>()
1327 .unwrap();
1328 let id_typed_value = id_field
1329 .typed_value_field()
1330 .unwrap()
1331 .as_any()
1332 .downcast_ref::<FixedSizeBinaryArray>()
1333 .unwrap();
1334 let session_id_value = session_id_field
1335 .value_field()
1336 .unwrap()
1337 .as_any()
1338 .downcast_ref::<BinaryViewArray>()
1339 .unwrap();
1340 let session_id_typed_value = session_id_field
1341 .typed_value_field()
1342 .unwrap()
1343 .as_any()
1344 .downcast_ref::<FixedSizeBinaryArray>()
1345 .unwrap();
1346
1347 assert!(result.is_valid(0));
1349
1350 assert!(value.is_null(0)); assert!(id_value.is_null(0));
1352 assert!(session_id_value.is_null(0));
1353
1354 assert!(typed_value.is_valid(0));
1355 assert!(id_typed_value.is_valid(0));
1356 assert!(session_id_typed_value.is_valid(0));
1357
1358 assert_eq!(id_typed_value.value(0), mock_uuid_1.as_bytes());
1359 assert_eq!(session_id_typed_value.value(0), mock_uuid_2.as_bytes());
1360
1361 assert!(result.is_valid(1));
1363
1364 assert!(value.is_valid(1)); assert!(typed_value.is_valid(1));
1366
1367 assert!(id_value.is_null(1));
1368 assert!(id_typed_value.is_valid(1));
1369 assert_eq!(id_typed_value.value(1), mock_uuid_2.as_bytes());
1370
1371 assert!(session_id_value.is_null(1));
1372 assert!(session_id_typed_value.is_valid(1));
1373 assert_eq!(session_id_typed_value.value(1), mock_uuid_3.as_bytes());
1374
1375 let row_1_variant = Variant::new(metadata.value(1), value.value(1));
1377 let Variant::Object(obj) = row_1_variant else {
1378 panic!("Expected object");
1379 };
1380
1381 assert_eq!(obj.get("name"), Some(Variant::from("test_user")));
1382
1383 assert!(result.is_valid(2));
1385
1386 assert!(value.is_null(2)); assert!(typed_value.is_valid(2));
1388
1389 assert!(id_value.is_null(2));
1390 assert!(id_typed_value.is_valid(2));
1391 assert_eq!(id_typed_value.value(2), mock_uuid_1.as_bytes());
1392
1393 assert!(session_id_value.is_null(2));
1394 assert!(session_id_typed_value.is_null(2)); assert!(result.is_valid(3));
1398
1399 assert!(value.is_null(3)); assert!(typed_value.is_valid(3));
1401
1402 assert!(id_value.is_null(3));
1403 assert!(id_typed_value.is_valid(3));
1404 assert_eq!(id_typed_value.value(3), mock_uuid_3.as_bytes());
1405
1406 assert!(session_id_value.is_valid(3)); assert!(session_id_typed_value.is_null(3));
1408 let session_id_variant = Variant::new(metadata.value(3), session_id_value.value(3));
1409 assert_eq!(session_id_variant, Variant::from("not-a-uuid"));
1410
1411 assert!(result.is_valid(4));
1413
1414 assert!(value.is_null(4)); assert!(typed_value.is_valid(4));
1416
1417 assert!(id_value.is_valid(4)); assert!(id_typed_value.is_null(4));
1419 let id_variant = Variant::new(metadata.value(4), id_value.value(4));
1420 assert_eq!(id_variant, Variant::from(12345i64));
1421
1422 assert!(session_id_value.is_null(4));
1423 assert!(session_id_typed_value.is_valid(4));
1424 assert_eq!(session_id_typed_value.value(4), mock_uuid_1.as_bytes());
1425
1426 assert!(result.is_null(5));
1428 }
1429
1430 #[test]
1431 fn test_spec_compliance() {
1432 let input = VariantArray::from_iter(vec![Variant::from(42i64), Variant::from("hello")]);
1433
1434 let result = shred_variant(&input, &DataType::Int64).unwrap();
1435
1436 let inner_struct = result.inner();
1438 assert!(inner_struct.column_by_name("metadata").is_some());
1439 assert!(inner_struct.column_by_name("value").is_some());
1440 assert!(inner_struct.column_by_name("typed_value").is_some());
1441
1442 assert_eq!(result.metadata_field().len(), input.metadata_field().len());
1444 assert_eq!(result.metadata_field().len(), input.metadata_field().len());
1447
1448 assert_eq!(result.len(), input.len());
1450 assert!(result.value_field().is_some());
1451 assert!(result.typed_value_field().is_some());
1452
1453 let value_field = result.value_field().unwrap();
1456 let typed_value_field = result
1457 .typed_value_field()
1458 .unwrap()
1459 .as_any()
1460 .downcast_ref::<Int64Array>()
1461 .unwrap();
1462
1463 for i in 0..result.len() {
1464 if !result.is_null(i) {
1465 let value_is_null = value_field.is_null(i);
1466 let typed_value_is_null = typed_value_field.is_null(i);
1467 assert!(
1469 value_is_null || typed_value_is_null,
1470 "Row {}: both value and typed_value are non-null for primitive shredding",
1471 i
1472 );
1473 }
1474 }
1475 }
1476
1477 #[test]
1478 fn test_variant_schema_builder_simple() {
1479 let shredding_type = ShreddedSchemaBuilder::default()
1480 .with_path("a", &DataType::Int64)
1481 .with_path("b", &DataType::Float64)
1482 .build();
1483
1484 assert_eq!(
1485 shredding_type,
1486 DataType::Struct(Fields::from(vec![
1487 Field::new("a", DataType::Int64, true),
1488 Field::new("b", DataType::Float64, true),
1489 ]))
1490 );
1491 }
1492
1493 #[test]
1494 fn test_variant_schema_builder_nested() {
1495 let shredding_type = ShreddedSchemaBuilder::default()
1496 .with_path("a", &DataType::Int64)
1497 .with_path("b.c", &DataType::Utf8)
1498 .with_path("b.d", &DataType::Float64)
1499 .build();
1500
1501 assert_eq!(
1502 shredding_type,
1503 DataType::Struct(Fields::from(vec![
1504 Field::new("a", DataType::Int64, true),
1505 Field::new(
1506 "b",
1507 DataType::Struct(Fields::from(vec![
1508 Field::new("c", DataType::Utf8, true),
1509 Field::new("d", DataType::Float64, true),
1510 ])),
1511 true
1512 ),
1513 ]))
1514 );
1515 }
1516
1517 #[test]
1518 fn test_variant_schema_builder_with_path_variant_path_arg() {
1519 let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
1520 let shredding_type = ShreddedSchemaBuilder::default()
1521 .with_path(path, &DataType::Int64)
1522 .build();
1523
1524 match shredding_type {
1525 DataType::Struct(fields) => {
1526 assert_eq!(fields.len(), 1);
1527 assert_eq!(fields[0].name(), "a.b");
1528 assert_eq!(fields[0].data_type(), &DataType::Int64);
1529 }
1530 _ => panic!("expected struct data type"),
1531 }
1532 }
1533
1534 #[test]
1535 fn test_variant_schema_builder_custom_nullability() {
1536 let shredding_type = ShreddedSchemaBuilder::default()
1537 .with_path(
1538 "foo",
1539 Arc::new(Field::new("should_be_renamed", DataType::Utf8, false)),
1540 )
1541 .with_path("bar", (&DataType::Int64, false))
1542 .build();
1543
1544 let DataType::Struct(fields) = shredding_type else {
1545 panic!("expected struct data type");
1546 };
1547
1548 let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
1549 assert_eq!(foo.data_type(), &DataType::Utf8);
1550 assert!(!foo.is_nullable());
1551
1552 let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
1553 assert_eq!(bar.data_type(), &DataType::Int64);
1554 assert!(!bar.is_nullable());
1555 }
1556
1557 #[test]
1558 fn test_variant_schema_builder_with_shred_variant() {
1559 let mut builder = VariantArrayBuilder::new(3);
1560 builder
1561 .new_object()
1562 .with_field("time", 1234567890i64)
1563 .with_field("hostname", "server1")
1564 .with_field("extra", 42)
1565 .finish();
1566 builder
1567 .new_object()
1568 .with_field("time", 9876543210i64)
1569 .with_field("hostname", "server2")
1570 .finish();
1571 builder.append_null();
1572
1573 let input = builder.build();
1574
1575 let shredding_type = ShreddedSchemaBuilder::default()
1576 .with_path("time", &DataType::Int64)
1577 .with_path("hostname", &DataType::Utf8)
1578 .build();
1579
1580 let result = shred_variant(&input, &shredding_type).unwrap();
1581
1582 assert_eq!(
1583 result.data_type(),
1584 &DataType::Struct(Fields::from(vec![
1585 Field::new("metadata", DataType::BinaryView, false),
1586 Field::new("value", DataType::BinaryView, true),
1587 Field::new(
1588 "typed_value",
1589 DataType::Struct(Fields::from(vec![
1590 Field::new(
1591 "hostname",
1592 DataType::Struct(Fields::from(vec![
1593 Field::new("value", DataType::BinaryView, true),
1594 Field::new("typed_value", DataType::Utf8, true),
1595 ])),
1596 false,
1597 ),
1598 Field::new(
1599 "time",
1600 DataType::Struct(Fields::from(vec![
1601 Field::new("value", DataType::BinaryView, true),
1602 Field::new("typed_value", DataType::Int64, true),
1603 ])),
1604 false,
1605 ),
1606 ])),
1607 true,
1608 ),
1609 ]))
1610 );
1611
1612 assert_eq!(result.len(), 3);
1613 assert!(result.typed_value_field().is_some());
1614
1615 let typed_value = result
1616 .typed_value_field()
1617 .unwrap()
1618 .as_any()
1619 .downcast_ref::<arrow::array::StructArray>()
1620 .unwrap();
1621
1622 let time_field =
1623 ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
1624 .unwrap();
1625 let hostname_field =
1626 ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
1627 .unwrap();
1628
1629 let time_typed = time_field
1630 .typed_value_field()
1631 .unwrap()
1632 .as_any()
1633 .downcast_ref::<Int64Array>()
1634 .unwrap();
1635 let hostname_typed = hostname_field
1636 .typed_value_field()
1637 .unwrap()
1638 .as_any()
1639 .downcast_ref::<arrow::array::StringArray>()
1640 .unwrap();
1641
1642 assert!(!result.is_null(0));
1644 assert_eq!(time_typed.value(0), 1234567890);
1645 assert_eq!(hostname_typed.value(0), "server1");
1646
1647 assert!(!result.is_null(1));
1649 assert_eq!(time_typed.value(1), 9876543210);
1650 assert_eq!(hostname_typed.value(1), "server2");
1651
1652 assert!(result.is_null(2));
1654 }
1655
1656 #[test]
1657 fn test_variant_schema_builder_conflicting_path() {
1658 let shredding_type = ShreddedSchemaBuilder::default()
1659 .with_path("a", &DataType::Int64)
1660 .with_path("a", &DataType::Float64)
1661 .build();
1662
1663 assert_eq!(
1664 shredding_type,
1665 DataType::Struct(Fields::from(
1666 vec![Field::new("a", DataType::Float64, true),]
1667 ))
1668 );
1669 }
1670
1671 #[test]
1672 fn test_variant_schema_builder_root_path() {
1673 let path = VariantPath::new(vec![]);
1674 let shredding_type = ShreddedSchemaBuilder::default()
1675 .with_path(path, &DataType::Int64)
1676 .build();
1677
1678 assert_eq!(shredding_type, DataType::Int64);
1679 }
1680
1681 #[test]
1682 fn test_variant_schema_builder_empty_path() {
1683 let shredding_type = ShreddedSchemaBuilder::default()
1684 .with_path("", &DataType::Int64)
1685 .build();
1686
1687 assert_eq!(shredding_type, DataType::Int64);
1688 }
1689
1690 #[test]
1691 fn test_variant_schema_builder_default() {
1692 let shredding_type = ShreddedSchemaBuilder::default().build();
1693 assert_eq!(shredding_type, DataType::Null);
1694 }
1695}