parquet_variant_compute/variant_array_builder.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`VariantArrayBuilder`] implementation
19
20use crate::VariantArray;
21use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray};
22use arrow_schema::{DataType, Field, Fields};
23use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt};
24use std::sync::Arc;
25
26/// A builder for [`VariantArray`]
27///
28/// This builder is used to construct a `VariantArray` and allows APIs for
29/// adding metadata
30///
31/// This builder always creates a `VariantArray` using [`BinaryViewArray`] for both
32/// the metadata and value fields.
33///
34/// # TODO
35/// 1. Support shredding: <https://github.com/apache/arrow-rs/issues/7895>
36///
37/// ## Example:
38/// ```
39/// # use arrow::array::Array;
40/// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
41/// # use parquet_variant_compute::VariantArrayBuilder;
42/// // Create a new VariantArrayBuilder with a capacity of 100 rows
43/// let mut builder = VariantArrayBuilder::new(100);
44/// // append variant values
45/// builder.append_variant(Variant::from(42));
46/// // append a null row (note not a Variant::Null)
47/// builder.append_null();
48/// // append an object to the builder
49/// let mut vb = builder.variant_builder();
50/// vb.new_object()
51/// .with_field("foo", "bar")
52/// .finish()
53/// .unwrap();
54/// vb.finish(); // must call finish to write the variant to the buffers
55///
56/// // create the final VariantArray
57/// let variant_array = builder.build();
58/// assert_eq!(variant_array.len(), 3);
59/// // // Access the values
60/// // row 1 is not null and is an integer
61/// assert!(!variant_array.is_null(0));
62/// assert_eq!(variant_array.value(0), Variant::from(42i32));
63/// // row 1 is null
64/// assert!(variant_array.is_null(1));
65/// // row 2 is not null and is an object
66/// assert!(!variant_array.is_null(2));
67/// let value = variant_array.value(2);
68/// let obj = value.as_object().expect("expected object");
69/// assert_eq!(obj.get("foo"), Some(Variant::from("bar")));
70/// ```
71#[derive(Debug)]
72pub struct VariantArrayBuilder {
73 /// Nulls
74 nulls: NullBufferBuilder,
75 /// buffer for all the metadata
76 metadata_buffer: Vec<u8>,
77 /// (offset, len) pairs for locations of metadata in the buffer
78 metadata_locations: Vec<(usize, usize)>,
79 /// buffer for values
80 value_buffer: Vec<u8>,
81 /// (offset, len) pairs for locations of values in the buffer
82 value_locations: Vec<(usize, usize)>,
83 /// The fields of the final `StructArray`
84 ///
85 /// TODO: 1) Add extension type metadata
86 /// TODO: 2) Add support for shredding
87 fields: Fields,
88}
89
90impl VariantArrayBuilder {
91 pub fn new(row_capacity: usize) -> Self {
92 // The subfields are expected to be non-nullable according to the parquet variant spec.
93 let metadata_field = Field::new("metadata", DataType::BinaryView, false);
94 let value_field = Field::new("value", DataType::BinaryView, false);
95
96 Self {
97 nulls: NullBufferBuilder::new(row_capacity),
98 metadata_buffer: Vec::new(), // todo allocation capacity
99 metadata_locations: Vec::with_capacity(row_capacity),
100 value_buffer: Vec::new(),
101 value_locations: Vec::with_capacity(row_capacity),
102 fields: Fields::from(vec![metadata_field, value_field]),
103 }
104 }
105
106 /// Build the final builder
107 pub fn build(self) -> VariantArray {
108 let Self {
109 mut nulls,
110 metadata_buffer,
111 metadata_locations,
112 value_buffer,
113 value_locations,
114 fields,
115 } = self;
116
117 let metadata_array = binary_view_array_from_buffers(metadata_buffer, metadata_locations);
118
119 let value_array = binary_view_array_from_buffers(value_buffer, value_locations);
120
121 // The build the final struct array
122 let inner = StructArray::new(
123 fields,
124 vec![
125 Arc::new(metadata_array) as ArrayRef,
126 Arc::new(value_array) as ArrayRef,
127 ],
128 nulls.finish(),
129 );
130 // TODO add arrow extension type metadata
131
132 VariantArray::try_new(Arc::new(inner)).expect("valid VariantArray by construction")
133 }
134
135 /// Appends a null row to the builder.
136 pub fn append_null(&mut self) {
137 self.nulls.append_null();
138 // The subfields are expected to be non-nullable according to the parquet variant spec.
139 let metadata_offset = self.metadata_buffer.len();
140 let metadata_length = 0;
141 self.metadata_locations
142 .push((metadata_offset, metadata_length));
143 let value_offset = self.value_buffer.len();
144 let value_length = 0;
145 self.value_locations.push((value_offset, value_length));
146 }
147
148 /// Append the [`Variant`] to the builder as the next row
149 pub fn append_variant(&mut self, variant: Variant) {
150 let mut direct_builder = self.variant_builder();
151 direct_builder.variant_builder.append_value(variant);
152 direct_builder.finish()
153 }
154
155 /// Return a `VariantArrayVariantBuilder` that writes directly to the
156 /// buffers of this builder.
157 ///
158 /// You must call [`VariantArrayVariantBuilder::finish`] to complete the builder
159 ///
160 /// # Example
161 /// ```
162 /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt};
163 /// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder};
164 /// let mut array_builder = VariantArrayBuilder::new(10);
165 ///
166 /// // First row has a string
167 /// let mut variant_builder = array_builder.variant_builder();
168 /// variant_builder.append_value("Hello, World!");
169 /// // must call finish to write the variant to the buffers
170 /// variant_builder.finish();
171 ///
172 /// // Second row is an object
173 /// let mut variant_builder = array_builder.variant_builder();
174 /// variant_builder
175 /// .new_object()
176 /// .with_field("my_field", 42i64)
177 /// .finish()
178 /// .unwrap();
179 /// variant_builder.finish();
180 ///
181 /// // finalize the array
182 /// let variant_array: VariantArray = array_builder.build();
183 ///
184 /// // verify what we wrote is still there
185 /// assert_eq!(variant_array.value(0), Variant::from("Hello, World!"));
186 /// assert!(variant_array.value(1).as_object().is_some());
187 /// ```
188 pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder<'_> {
189 // append directly into the metadata and value buffers
190 let metadata_buffer = std::mem::take(&mut self.metadata_buffer);
191 let value_buffer = std::mem::take(&mut self.value_buffer);
192 VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer)
193 }
194}
195
196/// A `VariantBuilderExt` that writes directly to the buffers of a `VariantArrayBuilder`.
197///
198// This struct implements [`VariantBuilderExt`], so in most cases it can be used as a
199// [`VariantBuilder`] to perform variant-related operations for [`VariantArrayBuilder`].
200///
201/// If [`Self::finish`] is not called, any changes will be rolled back
202///
203/// See [`VariantArrayBuilder::variant_builder`] for an example
204pub struct VariantArrayVariantBuilder<'a> {
205 /// was finish called?
206 finished: bool,
207 /// starting offset in the variant_builder's `metadata` buffer
208 metadata_offset: usize,
209 /// starting offset in the variant_builder's `value` buffer
210 value_offset: usize,
211 /// Parent array builder that this variant builder writes to. Buffers
212 /// have been moved into the variant builder, and must be returned on
213 /// drop
214 array_builder: &'a mut VariantArrayBuilder,
215 /// Builder for the in progress variant value, temporarily owns the buffers
216 /// from `array_builder`
217 variant_builder: VariantBuilder,
218}
219
220impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> {
221 fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
222 self.variant_builder.append_value(value);
223 }
224
225 fn new_list(&mut self) -> ListBuilder<'_> {
226 self.variant_builder.new_list()
227 }
228
229 fn new_object(&mut self) -> ObjectBuilder<'_> {
230 self.variant_builder.new_object()
231 }
232}
233
234impl<'a> VariantArrayVariantBuilder<'a> {
235 /// Constructs a new VariantArrayVariantBuilder
236 ///
237 /// Note this is not public as this is a structure that is logically
238 /// part of the [`VariantArrayBuilder`] and relies on its internal structure
239 fn new(
240 array_builder: &'a mut VariantArrayBuilder,
241 metadata_buffer: Vec<u8>,
242 value_buffer: Vec<u8>,
243 ) -> Self {
244 let metadata_offset = metadata_buffer.len();
245 let value_offset = value_buffer.len();
246 VariantArrayVariantBuilder {
247 finished: false,
248 metadata_offset,
249 value_offset,
250 variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer),
251 array_builder,
252 }
253 }
254
255 /// Return a reference to the underlying `VariantBuilder`
256 pub fn inner(&self) -> &VariantBuilder {
257 &self.variant_builder
258 }
259
260 /// Return a mutable reference to the underlying `VariantBuilder`
261 pub fn inner_mut(&mut self) -> &mut VariantBuilder {
262 &mut self.variant_builder
263 }
264
265 /// Called to finish the in progress variant and write it to the underlying
266 /// buffers
267 ///
268 /// Note if you do not call finish, on drop any changes made to the
269 /// underlying buffers will be rolled back.
270 pub fn finish(mut self) {
271 self.finished = true;
272
273 let metadata_offset = self.metadata_offset;
274 let value_offset = self.value_offset;
275 // get the buffers back from the variant builder
276 let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish();
277
278 // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost)
279 let metadata_len = metadata_buffer
280 .len()
281 .checked_sub(metadata_offset)
282 .expect("metadata length decreased unexpectedly");
283 let value_len = value_buffer
284 .len()
285 .checked_sub(value_offset)
286 .expect("value length decreased unexpectedly");
287
288 // commit the changes by putting the
289 // offsets and lengths into the parent array builder.
290 self.array_builder
291 .metadata_locations
292 .push((metadata_offset, metadata_len));
293 self.array_builder
294 .value_locations
295 .push((value_offset, value_len));
296 self.array_builder.nulls.append_non_null();
297 // put the buffers back into the array builder
298 self.array_builder.metadata_buffer = metadata_buffer;
299 self.array_builder.value_buffer = value_buffer;
300 }
301}
302
303impl<'a> Drop for VariantArrayVariantBuilder<'a> {
304 /// If the builder was not finished, roll back any changes made to the
305 /// underlying buffers (by truncating them)
306 fn drop(&mut self) {
307 if self.finished {
308 return;
309 }
310
311 // if the object was not finished, need to rollback any changes by
312 // truncating the buffers to the original offsets
313 let metadata_offset = self.metadata_offset;
314 let value_offset = self.value_offset;
315
316 // get the buffers back from the variant builder
317 let (mut metadata_buffer, mut value_buffer) =
318 std::mem::take(&mut self.variant_builder).into_buffers();
319
320 // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) so panic immediately
321 metadata_buffer
322 .len()
323 .checked_sub(metadata_offset)
324 .expect("metadata length decreased unexpectedly");
325 value_buffer
326 .len()
327 .checked_sub(value_offset)
328 .expect("value length decreased unexpectedly");
329
330 // Note this truncate is fast because truncate doesn't free any memory:
331 // it just has to drop elements (and u8 doesn't have a destructor)
332 metadata_buffer.truncate(metadata_offset);
333 value_buffer.truncate(value_offset);
334
335 // put the buffers back into the array builder
336 self.array_builder.metadata_buffer = metadata_buffer;
337 self.array_builder.value_buffer = value_buffer;
338 }
339}
340
341fn binary_view_array_from_buffers(
342 buffer: Vec<u8>,
343 locations: Vec<(usize, usize)>,
344) -> BinaryViewArray {
345 let mut builder = BinaryViewBuilder::with_capacity(locations.len());
346 let block = builder.append_block(buffer.into());
347 // TODO this can be much faster if it creates the views directly during append
348 for (offset, length) in locations {
349 let offset = offset.try_into().expect("offset should fit in u32");
350 let length = length.try_into().expect("length should fit in u32");
351 builder
352 .try_append_view(block, offset, length)
353 .expect("Failed to append view");
354 }
355 builder.finish()
356}
357
358#[cfg(test)]
359mod test {
360 use super::*;
361 use arrow::array::Array;
362
363 /// Test that both the metadata and value buffers are non nullable
364 #[test]
365 fn test_variant_array_builder_non_nullable() {
366 let mut builder = VariantArrayBuilder::new(10);
367 builder.append_null(); // should not panic
368 builder.append_variant(Variant::from(42i32));
369 let variant_array = builder.build();
370
371 assert_eq!(variant_array.len(), 2);
372 assert!(variant_array.is_null(0));
373 assert!(!variant_array.is_null(1));
374 assert_eq!(variant_array.value(1), Variant::from(42i32));
375
376 // the metadata and value fields of non shredded variants should not be null
377 assert!(variant_array.metadata_field().nulls().is_none());
378 assert!(variant_array.value_field().unwrap().nulls().is_none());
379 let DataType::Struct(fields) = variant_array.data_type() else {
380 panic!("Expected VariantArray to have Struct data type");
381 };
382 for field in fields {
383 assert!(
384 !field.is_nullable(),
385 "Field {} should be non-nullable",
386 field.name()
387 );
388 }
389 }
390
391 /// Test using sub builders to append variants
392 #[test]
393 fn test_variant_array_builder_variant_builder() {
394 let mut builder = VariantArrayBuilder::new(10);
395 builder.append_null(); // should not panic
396 builder.append_variant(Variant::from(42i32));
397
398 // let's make a sub-object in the next row
399 let mut sub_builder = builder.variant_builder();
400 sub_builder
401 .new_object()
402 .with_field("foo", "bar")
403 .finish()
404 .unwrap();
405 sub_builder.finish(); // must call finish to write the variant to the buffers
406
407 // append a new list
408 let mut sub_builder = builder.variant_builder();
409 sub_builder
410 .new_list()
411 .with_value(Variant::from(1i32))
412 .with_value(Variant::from(2i32))
413 .finish();
414 sub_builder.finish();
415 let variant_array = builder.build();
416
417 assert_eq!(variant_array.len(), 4);
418 assert!(variant_array.is_null(0));
419 assert!(!variant_array.is_null(1));
420 assert_eq!(variant_array.value(1), Variant::from(42i32));
421 assert!(!variant_array.is_null(2));
422 let variant = variant_array.value(2);
423 let variant = variant.as_object().expect("variant to be an object");
424 assert_eq!(variant.get("foo").unwrap(), Variant::from("bar"));
425 assert!(!variant_array.is_null(3));
426 let variant = variant_array.value(3);
427 let list = variant.as_list().expect("variant to be a list");
428 assert_eq!(list.len(), 2);
429 }
430
431 /// Test using non-finished sub builders to append variants
432 #[test]
433 fn test_variant_array_builder_variant_builder_reset() {
434 let mut builder = VariantArrayBuilder::new(10);
435
436 // make a sub-object in the first row
437 let mut sub_builder = builder.variant_builder();
438 sub_builder
439 .new_object()
440 .with_field("foo", 1i32)
441 .finish()
442 .unwrap();
443 sub_builder.finish(); // must call finish to write the variant to the buffers
444
445 // start appending an object but don't finish
446 let mut sub_builder = builder.variant_builder();
447 sub_builder
448 .new_object()
449 .with_field("bar", 2i32)
450 .finish()
451 .unwrap();
452 drop(sub_builder); // drop the sub builder without finishing it
453
454 // make a third sub-object (this should reset the previous unfinished object)
455 let mut sub_builder = builder.variant_builder();
456 sub_builder
457 .new_object()
458 .with_field("baz", 3i32)
459 .finish()
460 .unwrap();
461 sub_builder.finish(); // must call finish to write the variant to the buffers
462
463 let variant_array = builder.build();
464
465 // only the two finished objects should be present
466 assert_eq!(variant_array.len(), 2);
467 assert!(!variant_array.is_null(0));
468 let variant = variant_array.value(0);
469 let variant = variant.as_object().expect("variant to be an object");
470 assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32));
471
472 assert!(!variant_array.is_null(1));
473 let variant = variant_array.value(1);
474 let variant = variant.as_object().expect("variant to be an object");
475 assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32));
476 }
477}