parquet_variant_compute/
variant_array_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`VariantArrayBuilder`] implementation
19
20use crate::VariantArray;
21use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray};
22use arrow_schema::{DataType, Field, Fields};
23use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt};
24use std::sync::Arc;
25
26/// A builder for [`VariantArray`]
27///
28/// This builder is used to construct a `VariantArray` and allows APIs for
29/// adding metadata
30///
31/// This builder always creates a `VariantArray` using [`BinaryViewArray`] for both
32/// the metadata and value fields.
33///
34/// # TODO
35/// 1. Support shredding: <https://github.com/apache/arrow-rs/issues/7895>
36///
37/// ## Example:
38/// ```
39/// # use arrow::array::Array;
40/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
41/// # use parquet_variant_compute::VariantArrayBuilder;
42/// // Create a new VariantArrayBuilder with a capacity of 100 rows
43/// let mut builder = VariantArrayBuilder::new(100);
44/// // append variant values
45/// builder.append_variant(Variant::from(42));
46/// // append a null row (note not a Variant::Null)
47/// builder.append_null();
48/// // append an object to the builder
49/// let mut vb = builder.variant_builder();
50/// vb.new_object()
51///   .with_field("foo", "bar")
52///   .finish()
53///   .unwrap();
54///  vb.finish(); // must call finish to write the variant to the buffers
55///
56/// // create the final VariantArray
57/// let variant_array = builder.build();
58/// assert_eq!(variant_array.len(), 3);
59/// // // Access the values
60/// // row 1 is not null and is an integer
61/// assert!(!variant_array.is_null(0));
62/// assert_eq!(variant_array.value(0), Variant::from(42i32));
63/// // row 1 is null
64/// assert!(variant_array.is_null(1));
65/// // row 2 is not null and is an object
66/// assert!(!variant_array.is_null(2));
67/// let value = variant_array.value(2);
68/// let obj = value.as_object().expect("expected object");
69/// assert_eq!(obj.get("foo"), Some(Variant::from("bar")));
70/// ```
71#[derive(Debug)]
72pub struct VariantArrayBuilder {
73    /// Nulls
74    nulls: NullBufferBuilder,
75    /// buffer for all the metadata
76    metadata_buffer: Vec<u8>,
77    /// (offset, len) pairs for locations of metadata in the buffer
78    metadata_locations: Vec<(usize, usize)>,
79    /// buffer for values
80    value_buffer: Vec<u8>,
81    /// (offset, len) pairs for locations of values in the buffer
82    value_locations: Vec<(usize, usize)>,
83    /// The fields of the final `StructArray`
84    ///
85    /// TODO: 1) Add extension type metadata
86    /// TODO: 2) Add support for shredding
87    fields: Fields,
88}
89
90impl VariantArrayBuilder {
91    pub fn new(row_capacity: usize) -> Self {
92        // The subfields are expected to be non-nullable according to the parquet variant spec.
93        let metadata_field = Field::new("metadata", DataType::BinaryView, false);
94        let value_field = Field::new("value", DataType::BinaryView, false);
95
96        Self {
97            nulls: NullBufferBuilder::new(row_capacity),
98            metadata_buffer: Vec::new(), // todo allocation capacity
99            metadata_locations: Vec::with_capacity(row_capacity),
100            value_buffer: Vec::new(),
101            value_locations: Vec::with_capacity(row_capacity),
102            fields: Fields::from(vec![metadata_field, value_field]),
103        }
104    }
105
106    /// Build the final builder
107    pub fn build(self) -> VariantArray {
108        let Self {
109            mut nulls,
110            metadata_buffer,
111            metadata_locations,
112            value_buffer,
113            value_locations,
114            fields,
115        } = self;
116
117        let metadata_array = binary_view_array_from_buffers(metadata_buffer, metadata_locations);
118
119        let value_array = binary_view_array_from_buffers(value_buffer, value_locations);
120
121        // The build the final struct array
122        let inner = StructArray::new(
123            fields,
124            vec![
125                Arc::new(metadata_array) as ArrayRef,
126                Arc::new(value_array) as ArrayRef,
127            ],
128            nulls.finish(),
129        );
130        // TODO add arrow extension type metadata
131
132        VariantArray::try_new(Arc::new(inner)).expect("valid VariantArray by construction")
133    }
134
135    /// Appends a null row to the builder.
136    pub fn append_null(&mut self) {
137        self.nulls.append_null();
138        // The subfields are expected to be non-nullable according to the parquet variant spec.
139        let metadata_offset = self.metadata_buffer.len();
140        let metadata_length = 0;
141        self.metadata_locations
142            .push((metadata_offset, metadata_length));
143        let value_offset = self.value_buffer.len();
144        let value_length = 0;
145        self.value_locations.push((value_offset, value_length));
146    }
147
148    /// Append the [`Variant`] to the builder as the next row
149    pub fn append_variant(&mut self, variant: Variant) {
150        let mut direct_builder = self.variant_builder();
151        direct_builder.variant_builder.append_value(variant);
152        direct_builder.finish()
153    }
154
155    /// Return a `VariantArrayVariantBuilder` that writes directly to the
156    /// buffers of this builder.
157    ///
158    /// You must call [`VariantArrayVariantBuilder::finish`] to complete the builder
159    ///
160    /// # Example
161    /// ```
162    /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
163    /// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder};
164    /// let mut array_builder = VariantArrayBuilder::new(10);
165    ///
166    /// // First row has a string
167    /// let mut variant_builder = array_builder.variant_builder();
168    /// variant_builder.append_value("Hello, World!");
169    /// // must call finish to write the variant to the buffers
170    /// variant_builder.finish();
171    ///
172    /// // Second row is an object
173    /// let mut variant_builder = array_builder.variant_builder();
174    /// variant_builder
175    ///     .new_object()
176    ///     .with_field("my_field", 42i64)
177    ///     .finish()
178    ///     .unwrap();
179    /// variant_builder.finish();
180    ///
181    /// // finalize the array
182    /// let variant_array: VariantArray = array_builder.build();
183    ///
184    /// // verify what we wrote is still there
185    /// assert_eq!(variant_array.value(0), Variant::from("Hello, World!"));
186    /// assert!(variant_array.value(1).as_object().is_some());
187    ///  ```
188    pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder<'_> {
189        // append directly into the metadata and value buffers
190        let metadata_buffer = std::mem::take(&mut self.metadata_buffer);
191        let value_buffer = std::mem::take(&mut self.value_buffer);
192        VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer)
193    }
194}
195
196/// A `VariantBuilderExt` that writes directly to the buffers of a `VariantArrayBuilder`.
197///
198// This struct implements [`VariantBuilderExt`], so in most cases it can be used as a
199// [`VariantBuilder`] to perform variant-related operations for [`VariantArrayBuilder`].
200///
201/// If [`Self::finish`] is not called, any changes will be rolled back
202///
203/// See [`VariantArrayBuilder::variant_builder`] for an example
204pub struct VariantArrayVariantBuilder<'a> {
205    /// was finish called?
206    finished: bool,
207    /// starting offset in the variant_builder's `metadata` buffer
208    metadata_offset: usize,
209    /// starting offset in the variant_builder's `value` buffer
210    value_offset: usize,
211    /// Parent array builder that this variant builder writes to. Buffers
212    /// have been moved into the variant builder, and must be returned on
213    /// drop
214    array_builder: &'a mut VariantArrayBuilder,
215    /// Builder for the in progress variant value, temporarily owns the buffers
216    /// from `array_builder`
217    variant_builder: VariantBuilder,
218}
219
220impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> {
221    fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
222        self.variant_builder.append_value(value);
223    }
224
225    fn new_list(&mut self) -> ListBuilder<'_> {
226        self.variant_builder.new_list()
227    }
228
229    fn new_object(&mut self) -> ObjectBuilder<'_> {
230        self.variant_builder.new_object()
231    }
232}
233
234impl<'a> VariantArrayVariantBuilder<'a> {
235    /// Constructs a new VariantArrayVariantBuilder
236    ///
237    /// Note this is not public as this is a structure that is logically
238    /// part of the [`VariantArrayBuilder`] and relies on its internal structure
239    fn new(
240        array_builder: &'a mut VariantArrayBuilder,
241        metadata_buffer: Vec<u8>,
242        value_buffer: Vec<u8>,
243    ) -> Self {
244        let metadata_offset = metadata_buffer.len();
245        let value_offset = value_buffer.len();
246        VariantArrayVariantBuilder {
247            finished: false,
248            metadata_offset,
249            value_offset,
250            variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer),
251            array_builder,
252        }
253    }
254
255    /// Return a reference to the underlying `VariantBuilder`
256    pub fn inner(&self) -> &VariantBuilder {
257        &self.variant_builder
258    }
259
260    /// Return a mutable reference to the underlying `VariantBuilder`
261    pub fn inner_mut(&mut self) -> &mut VariantBuilder {
262        &mut self.variant_builder
263    }
264
265    /// Called to finish the in progress variant and write it to the underlying
266    /// buffers
267    ///
268    /// Note if you do not call finish, on drop any changes made to the
269    /// underlying buffers will be rolled back.
270    pub fn finish(mut self) {
271        self.finished = true;
272
273        let metadata_offset = self.metadata_offset;
274        let value_offset = self.value_offset;
275        // get the buffers back from the variant builder
276        let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish();
277
278        // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost)
279        let metadata_len = metadata_buffer
280            .len()
281            .checked_sub(metadata_offset)
282            .expect("metadata length decreased unexpectedly");
283        let value_len = value_buffer
284            .len()
285            .checked_sub(value_offset)
286            .expect("value length decreased unexpectedly");
287
288        // commit the changes by putting the
289        // offsets and lengths into the parent array builder.
290        self.array_builder
291            .metadata_locations
292            .push((metadata_offset, metadata_len));
293        self.array_builder
294            .value_locations
295            .push((value_offset, value_len));
296        self.array_builder.nulls.append_non_null();
297        // put the buffers back into the array builder
298        self.array_builder.metadata_buffer = metadata_buffer;
299        self.array_builder.value_buffer = value_buffer;
300    }
301}
302
303impl<'a> Drop for VariantArrayVariantBuilder<'a> {
304    /// If the builder was not finished, roll back any changes made to the
305    /// underlying buffers (by truncating them)
306    fn drop(&mut self) {
307        if self.finished {
308            return;
309        }
310
311        // if the object was not finished, need to rollback any changes by
312        // truncating the buffers to the original offsets
313        let metadata_offset = self.metadata_offset;
314        let value_offset = self.value_offset;
315
316        // get the buffers back from the variant builder
317        let (mut metadata_buffer, mut value_buffer) =
318            std::mem::take(&mut self.variant_builder).into_buffers();
319
320        // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) so panic immediately
321        metadata_buffer
322            .len()
323            .checked_sub(metadata_offset)
324            .expect("metadata length decreased unexpectedly");
325        value_buffer
326            .len()
327            .checked_sub(value_offset)
328            .expect("value length decreased unexpectedly");
329
330        // Note this truncate is fast because truncate doesn't free any memory:
331        // it just has to drop elements (and u8 doesn't have a destructor)
332        metadata_buffer.truncate(metadata_offset);
333        value_buffer.truncate(value_offset);
334
335        // put the buffers back into the array builder
336        self.array_builder.metadata_buffer = metadata_buffer;
337        self.array_builder.value_buffer = value_buffer;
338    }
339}
340
341fn binary_view_array_from_buffers(
342    buffer: Vec<u8>,
343    locations: Vec<(usize, usize)>,
344) -> BinaryViewArray {
345    let mut builder = BinaryViewBuilder::with_capacity(locations.len());
346    let block = builder.append_block(buffer.into());
347    // TODO this can be much faster if it creates the views directly during append
348    for (offset, length) in locations {
349        let offset = offset.try_into().expect("offset should fit in u32");
350        let length = length.try_into().expect("length should fit in u32");
351        builder
352            .try_append_view(block, offset, length)
353            .expect("Failed to append view");
354    }
355    builder.finish()
356}
357
358#[cfg(test)]
359mod test {
360    use super::*;
361    use arrow::array::Array;
362
363    /// Test that both the metadata and value buffers are non nullable
364    #[test]
365    fn test_variant_array_builder_non_nullable() {
366        let mut builder = VariantArrayBuilder::new(10);
367        builder.append_null(); // should not panic
368        builder.append_variant(Variant::from(42i32));
369        let variant_array = builder.build();
370
371        assert_eq!(variant_array.len(), 2);
372        assert!(variant_array.is_null(0));
373        assert!(!variant_array.is_null(1));
374        assert_eq!(variant_array.value(1), Variant::from(42i32));
375
376        // the metadata and value fields of non shredded variants should not be null
377        assert!(variant_array.metadata_field().nulls().is_none());
378        assert!(variant_array.value_field().unwrap().nulls().is_none());
379        let DataType::Struct(fields) = variant_array.data_type() else {
380            panic!("Expected VariantArray to have Struct data type");
381        };
382        for field in fields {
383            assert!(
384                !field.is_nullable(),
385                "Field {} should be non-nullable",
386                field.name()
387            );
388        }
389    }
390
391    /// Test using sub builders to append variants
392    #[test]
393    fn test_variant_array_builder_variant_builder() {
394        let mut builder = VariantArrayBuilder::new(10);
395        builder.append_null(); // should not panic
396        builder.append_variant(Variant::from(42i32));
397
398        // let's make a sub-object in the next row
399        let mut sub_builder = builder.variant_builder();
400        sub_builder
401            .new_object()
402            .with_field("foo", "bar")
403            .finish()
404            .unwrap();
405        sub_builder.finish(); // must call finish to write the variant to the buffers
406
407        // append a new list
408        let mut sub_builder = builder.variant_builder();
409        sub_builder
410            .new_list()
411            .with_value(Variant::from(1i32))
412            .with_value(Variant::from(2i32))
413            .finish();
414        sub_builder.finish();
415        let variant_array = builder.build();
416
417        assert_eq!(variant_array.len(), 4);
418        assert!(variant_array.is_null(0));
419        assert!(!variant_array.is_null(1));
420        assert_eq!(variant_array.value(1), Variant::from(42i32));
421        assert!(!variant_array.is_null(2));
422        let variant = variant_array.value(2);
423        let variant = variant.as_object().expect("variant to be an object");
424        assert_eq!(variant.get("foo").unwrap(), Variant::from("bar"));
425        assert!(!variant_array.is_null(3));
426        let variant = variant_array.value(3);
427        let list = variant.as_list().expect("variant to be a list");
428        assert_eq!(list.len(), 2);
429    }
430
431    /// Test using non-finished sub builders to append variants
432    #[test]
433    fn test_variant_array_builder_variant_builder_reset() {
434        let mut builder = VariantArrayBuilder::new(10);
435
436        // make a sub-object in the first row
437        let mut sub_builder = builder.variant_builder();
438        sub_builder
439            .new_object()
440            .with_field("foo", 1i32)
441            .finish()
442            .unwrap();
443        sub_builder.finish(); // must call finish to write the variant to the buffers
444
445        // start appending an object but don't finish
446        let mut sub_builder = builder.variant_builder();
447        sub_builder
448            .new_object()
449            .with_field("bar", 2i32)
450            .finish()
451            .unwrap();
452        drop(sub_builder); // drop the sub builder without finishing it
453
454        // make a third sub-object (this should reset the previous unfinished object)
455        let mut sub_builder = builder.variant_builder();
456        sub_builder
457            .new_object()
458            .with_field("baz", 3i32)
459            .finish()
460            .unwrap();
461        sub_builder.finish(); // must call finish to write the variant to the buffers
462
463        let variant_array = builder.build();
464
465        // only the two finished objects should be present
466        assert_eq!(variant_array.len(), 2);
467        assert!(!variant_array.is_null(0));
468        let variant = variant_array.value(0);
469        let variant = variant.as_object().expect("variant to be an object");
470        assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32));
471
472        assert!(!variant_array.is_null(1));
473        let variant = variant_array.value(1);
474        let variant = variant.as_object().expect("variant to be an object");
475        assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32));
476    }
477}