Skip to main content

arrow_data/transform/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Low-level array data abstractions.
19//!
20//! Provides utilities for creating, manipulating, and converting Arrow arrays
21//! made of primitive types, strings, and nested types.
22
23use super::{ArrayData, ArrayDataBuilder, ByteView, data::new_buffers};
24use crate::bit_mask::set_bits;
25use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
26use arrow_buffer::{ArrowNativeType, Buffer, IntervalMonthDayNano, MutableBuffer, bit_util, i256};
27use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode};
28use half::f16;
29use num_integer::Integer;
30use std::mem;
31
32mod boolean;
33mod fixed_binary;
34mod fixed_size_list;
35mod list;
36mod list_view;
37mod null;
38mod primitive;
39mod run;
40mod structure;
41mod union;
42mod utils;
43mod variable_size;
44
45type ExtendNullBits<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize) + 'a>;
46// function that extends `[start..start+len]` to the mutable array.
47// this is dynamic because different data_types influence how buffers and children are extended.
48type Extend<'a> =
49    Box<dyn Fn(&mut _MutableArrayData, usize, usize, usize) -> Result<(), ArrowError> + 'a>;
50
51type ExtendNulls = Box<dyn Fn(&mut _MutableArrayData, usize) -> Result<(), ArrowError>>;
52
53/// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData].
54/// This is just a data container.
55#[derive(Debug)]
56struct _MutableArrayData<'a> {
57    pub data_type: DataType,
58    pub null_count: usize,
59
60    pub len: usize,
61    pub null_buffer: Option<MutableBuffer>,
62
63    // arrow specification only allows up to 3 buffers (2 ignoring the nulls above).
64    // Thus, we place them in the stack to avoid bound checks and greater data locality.
65    pub buffer1: MutableBuffer,
66    pub buffer2: MutableBuffer,
67    pub child_data: Vec<MutableArrayData<'a>>,
68}
69
70impl _MutableArrayData<'_> {
71    fn null_buffer(&mut self) -> &mut MutableBuffer {
72        self.null_buffer
73            .as_mut()
74            .expect("MutableArrayData not nullable")
75    }
76}
77
78fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits<'_> {
79    if let Some(nulls) = array.nulls() {
80        let bytes = nulls.validity();
81        Box::new(move |mutable, start, len| {
82            let mutable_len = mutable.len;
83            let out = mutable.null_buffer();
84            utils::resize_for_bits(out, mutable_len + len);
85            mutable.null_count += set_bits(
86                out.as_slice_mut(),
87                bytes,
88                mutable_len,
89                nulls.offset() + start,
90                len,
91            );
92        })
93    } else if use_nulls {
94        Box::new(|mutable, _, len| {
95            let mutable_len = mutable.len;
96            let out = mutable.null_buffer();
97            utils::resize_for_bits(out, mutable_len + len);
98            let write_data = out.as_slice_mut();
99            (0..len).for_each(|i| {
100                bit_util::set_bit(write_data, mutable_len + i);
101            });
102        })
103    } else {
104        Box::new(|_, _, _| {})
105    }
106}
107
108/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by
109/// copying chunks.
110///
111/// The main use case of this struct is to perform unary operations to arrays of
112/// arbitrary types, such as `filter` and `take`.
113///
114/// # Example
115/// ```
116/// use arrow_buffer::Buffer;
117/// use arrow_data::ArrayData;
118/// use arrow_data::transform::MutableArrayData;
119/// use arrow_schema::DataType;
120/// fn i32_array(values: &[i32]) -> ArrayData {
121///   ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap()
122/// }
123/// let arr1  = i32_array(&[1, 2, 3, 4, 5]);
124/// let arr2  = i32_array(&[6, 7, 8, 9, 10]);
125/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements
126/// let capacity = 3 * std::mem::size_of::<i32>();
127/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10);
128/// // Copy the first 3 elements from arr1
129/// mutable.extend(0, 0, 3);
130/// // Copy the last 3 elements from arr2
131/// mutable.extend(1, 2, 4);
132/// // Complete the MutableArrayData into a new ArrayData
133/// let frozen = mutable.freeze();
134/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10]));
135/// ```
136pub struct MutableArrayData<'a> {
137    /// Input arrays: the data being read FROM.
138    ///
139    /// Note this is "dead code" because all actual references to the arrays are
140    /// stored in closures for extending values and nulls.
141    #[allow(dead_code)]
142    arrays: Vec<&'a ArrayData>,
143
144    /// In progress output array: The data being written TO
145    ///
146    /// Note these fields are in a separate struct, [_MutableArrayData], as they
147    /// cannot be in [MutableArrayData] itself due to mutability invariants (interior
148    /// mutability): [MutableArrayData] contains a function that can only mutate
149    /// [_MutableArrayData], not [MutableArrayData] itself
150    data: _MutableArrayData<'a>,
151
152    /// The child data of the `Array` in Dictionary arrays.
153    ///
154    /// This is not stored in `_MutableArrayData` because these values are
155    /// constant and only needed at the end, when freezing [_MutableArrayData].
156    dictionary: Option<ArrayData>,
157
158    /// Variadic data buffers referenced by views.
159    ///
160    /// Note this this is not stored in `_MutableArrayData` because these values
161    /// are constant and only needed at the end, when freezing
162    /// [_MutableArrayData]
163    variadic_data_buffers: Vec<Buffer>,
164
165    /// function used to extend output array with values from input arrays.
166    ///
167    /// This function's lifetime is bound to the input arrays because it reads
168    /// values from them.
169    extend_values: Vec<Extend<'a>>,
170
171    /// function used to extend the output array with nulls from input arrays.
172    ///
173    /// This function's lifetime is bound to the input arrays because it reads
174    /// nulls from it.
175    extend_null_bits: Vec<ExtendNullBits<'a>>,
176
177    /// function used to extend the output array with null elements.
178    ///
179    /// This function is independent of the arrays and therefore has no lifetime.
180    extend_nulls: ExtendNulls,
181}
182
183impl std::fmt::Debug for MutableArrayData<'_> {
184    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
185        // ignores the closures.
186        f.debug_struct("MutableArrayData")
187            .field("data", &self.data)
188            .finish()
189    }
190}
191
192/// Builds an extend that adds `offset` to the source primitive
193/// Additionally validates that `max` fits into the
194/// the underlying primitive returning None if not
195fn build_extend_dictionary(array: &ArrayData, offset: usize, max: usize) -> Option<Extend<'_>> {
196    macro_rules! validate_and_build {
197        ($dt: ty) => {{
198            let _: $dt = max.try_into().ok()?;
199            let offset: $dt = offset.try_into().ok()?;
200            Some(primitive::build_extend_with_offset(array, offset))
201        }};
202    }
203    match array.data_type() {
204        DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
205            DataType::UInt8 => validate_and_build!(u8),
206            DataType::UInt16 => validate_and_build!(u16),
207            DataType::UInt32 => validate_and_build!(u32),
208            DataType::UInt64 => validate_and_build!(u64),
209            DataType::Int8 => validate_and_build!(i8),
210            DataType::Int16 => validate_and_build!(i16),
211            DataType::Int32 => validate_and_build!(i32),
212            DataType::Int64 => validate_and_build!(i64),
213            _ => unreachable!(),
214        },
215        _ => None,
216    }
217}
218
219/// Builds an extend that adds `buffer_offset` to any buffer indices encountered
220fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend<'_> {
221    let views = array.buffer::<u128>(0);
222    Box::new(
223        move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| {
224            mutable
225                .buffer1
226                .extend(views[start..start + len].iter().map(|v| {
227                    let len = *v as u32;
228                    if len <= 12 {
229                        return *v; // Stored inline
230                    }
231                    let mut view = ByteView::from(*v);
232                    view.buffer_index += buffer_offset;
233                    view.into()
234                }));
235            Ok(())
236        },
237    )
238}
239
240fn build_extend(array: &ArrayData) -> Extend<'_> {
241    match array.data_type() {
242        DataType::Null => null::build_extend(array),
243        DataType::Boolean => boolean::build_extend(array),
244        DataType::UInt8 => primitive::build_extend::<u8>(array),
245        DataType::UInt16 => primitive::build_extend::<u16>(array),
246        DataType::UInt32 => primitive::build_extend::<u32>(array),
247        DataType::UInt64 => primitive::build_extend::<u64>(array),
248        DataType::Int8 => primitive::build_extend::<i8>(array),
249        DataType::Int16 => primitive::build_extend::<i16>(array),
250        DataType::Int32 => primitive::build_extend::<i32>(array),
251        DataType::Int64 => primitive::build_extend::<i64>(array),
252        DataType::Float32 => primitive::build_extend::<f32>(array),
253        DataType::Float64 => primitive::build_extend::<f64>(array),
254        DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
255            primitive::build_extend::<i32>(array)
256        }
257        DataType::Date64
258        | DataType::Time64(_)
259        | DataType::Timestamp(_, _)
260        | DataType::Duration(_)
261        | DataType::Interval(IntervalUnit::DayTime) => primitive::build_extend::<i64>(array),
262        DataType::Interval(IntervalUnit::MonthDayNano) => {
263            primitive::build_extend::<IntervalMonthDayNano>(array)
264        }
265        DataType::Decimal32(_, _) => primitive::build_extend::<i32>(array),
266        DataType::Decimal64(_, _) => primitive::build_extend::<i64>(array),
267        DataType::Decimal128(_, _) => primitive::build_extend::<i128>(array),
268        DataType::Decimal256(_, _) => primitive::build_extend::<i256>(array),
269        DataType::Utf8 | DataType::Binary => variable_size::build_extend::<i32>(array),
270        DataType::LargeUtf8 | DataType::LargeBinary => variable_size::build_extend::<i64>(array),
271        DataType::BinaryView | DataType::Utf8View => unreachable!("should use build_extend_view"),
272        DataType::Map(_, _) | DataType::List(_) => list::build_extend::<i32>(array),
273        DataType::LargeList(_) => list::build_extend::<i64>(array),
274        DataType::ListView(_) => list_view::build_extend::<i32>(array),
275        DataType::LargeListView(_) => list_view::build_extend::<i64>(array),
276        DataType::Dictionary(_, _) => unreachable!("should use build_extend_dictionary"),
277        DataType::Struct(_) => structure::build_extend(array),
278        DataType::FixedSizeBinary(_) => fixed_binary::build_extend(array),
279        DataType::Float16 => primitive::build_extend::<f16>(array),
280        DataType::FixedSizeList(_, _) => fixed_size_list::build_extend(array),
281        DataType::Union(_, mode) => match mode {
282            UnionMode::Sparse => union::build_extend_sparse(array),
283            UnionMode::Dense => union::build_extend_dense(array),
284        },
285        DataType::RunEndEncoded(_, _) => run::build_extend(array),
286    }
287}
288
289fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
290    Box::new(match data_type {
291        DataType::Null => null::extend_nulls,
292        DataType::Boolean => boolean::extend_nulls,
293        DataType::UInt8 => primitive::extend_nulls::<u8>,
294        DataType::UInt16 => primitive::extend_nulls::<u16>,
295        DataType::UInt32 => primitive::extend_nulls::<u32>,
296        DataType::UInt64 => primitive::extend_nulls::<u64>,
297        DataType::Int8 => primitive::extend_nulls::<i8>,
298        DataType::Int16 => primitive::extend_nulls::<i16>,
299        DataType::Int32 => primitive::extend_nulls::<i32>,
300        DataType::Int64 => primitive::extend_nulls::<i64>,
301        DataType::Float32 => primitive::extend_nulls::<f32>,
302        DataType::Float64 => primitive::extend_nulls::<f64>,
303        DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
304            primitive::extend_nulls::<i32>
305        }
306        DataType::Date64
307        | DataType::Time64(_)
308        | DataType::Timestamp(_, _)
309        | DataType::Duration(_)
310        | DataType::Interval(IntervalUnit::DayTime) => primitive::extend_nulls::<i64>,
311        DataType::Interval(IntervalUnit::MonthDayNano) => {
312            primitive::extend_nulls::<IntervalMonthDayNano>
313        }
314        DataType::Decimal32(_, _) => primitive::extend_nulls::<i32>,
315        DataType::Decimal64(_, _) => primitive::extend_nulls::<i64>,
316        DataType::Decimal128(_, _) => primitive::extend_nulls::<i128>,
317        DataType::Decimal256(_, _) => primitive::extend_nulls::<i256>,
318        DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::<i32>,
319        DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::<i64>,
320        DataType::BinaryView | DataType::Utf8View => primitive::extend_nulls::<u128>,
321        DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::<i32>,
322        DataType::LargeList(_) => list::extend_nulls::<i64>,
323        DataType::ListView(_) => list_view::extend_nulls::<i32>,
324        DataType::LargeListView(_) => list_view::extend_nulls::<i64>,
325        DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
326            DataType::UInt8 => primitive::extend_nulls::<u8>,
327            DataType::UInt16 => primitive::extend_nulls::<u16>,
328            DataType::UInt32 => primitive::extend_nulls::<u32>,
329            DataType::UInt64 => primitive::extend_nulls::<u64>,
330            DataType::Int8 => primitive::extend_nulls::<i8>,
331            DataType::Int16 => primitive::extend_nulls::<i16>,
332            DataType::Int32 => primitive::extend_nulls::<i32>,
333            DataType::Int64 => primitive::extend_nulls::<i64>,
334            _ => unreachable!(),
335        },
336        DataType::Struct(_) => structure::extend_nulls,
337        DataType::FixedSizeBinary(_) => fixed_binary::extend_nulls,
338        DataType::Float16 => primitive::extend_nulls::<f16>,
339        DataType::FixedSizeList(_, _) => fixed_size_list::extend_nulls,
340        DataType::Union(_, mode) => match mode {
341            UnionMode::Sparse => union::extend_nulls_sparse,
342            UnionMode::Dense => union::extend_nulls_dense,
343        },
344        DataType::RunEndEncoded(_, _) => run::extend_nulls,
345    })
346}
347
348fn preallocate_offset_and_binary_buffer<Offset: ArrowNativeType + Integer>(
349    capacity: usize,
350    binary_size: usize,
351) -> [MutableBuffer; 2] {
352    // offsets
353    let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<Offset>());
354    // safety: `unsafe` code assumes that this buffer is initialized with one element
355    buffer.push(Offset::zero());
356
357    [
358        buffer,
359        MutableBuffer::new(binary_size * mem::size_of::<u8>()),
360    ]
361}
362
363/// Define capacities to pre-allocate for child data or data buffers.
364#[derive(Debug, Clone)]
365pub enum Capacities {
366    /// Binary, Utf8 and LargeUtf8 data types
367    ///
368    /// Defines
369    /// * the capacity of the array offsets
370    /// * the capacity of the binary/ str buffer
371    Binary(usize, Option<usize>),
372    /// List and LargeList data types
373    ///
374    /// Defines
375    /// * the capacity of the array offsets
376    /// * the capacity of the child data
377    List(usize, Option<Box<Capacities>>),
378    /// Struct type
379    ///
380    /// Defines
381    /// * the capacity of the array
382    /// * the capacities of the fields
383    Struct(usize, Option<Vec<Capacities>>),
384    /// Dictionary type
385    ///
386    /// Defines
387    /// * the capacity of the array/keys
388    /// * the capacity of the values
389    Dictionary(usize, Option<Box<Capacities>>),
390    /// Don't preallocate inner buffers and rely on array growth strategy
391    Array(usize),
392}
393
394impl<'a> MutableArrayData<'a> {
395    /// Returns a new [MutableArrayData] with capacity to `capacity` slots and
396    /// specialized to create an [ArrayData] from multiple `arrays`.
397    ///
398    /// # Arguments
399    /// * `arrays` - the source arrays to copy from
400    /// * `use_nulls` - a flag used to optimize insertions
401    ///   - `false` if the only source of nulls are the arrays themselves
402    ///   - `true` if the user plans to call [MutableArrayData::extend_nulls].
403    /// * capacity - the preallocated capacity of the output array, in bytes
404    ///
405    /// Thus, if `use_nulls` is `false`, calling
406    /// [MutableArrayData::extend_nulls] should not be used.
407    pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
408        Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity))
409    }
410
411    /// Similar to [MutableArrayData::new], but lets users define the
412    /// preallocated capacities of the array with more granularity.
413    ///
414    /// See [MutableArrayData::new] for more information on the arguments.
415    ///
416    /// # Panics
417    ///
418    /// This function panics if the given `capacities` don't match the data type
419    /// of `arrays`. Or when a [Capacities] variant is not yet supported.
420    pub fn with_capacities(
421        arrays: Vec<&'a ArrayData>,
422        use_nulls: bool,
423        capacities: Capacities,
424    ) -> Self {
425        let data_type = arrays[0].data_type();
426
427        for a in arrays.iter().skip(1) {
428            assert_eq!(
429                data_type,
430                a.data_type(),
431                "Arrays with inconsistent types passed to MutableArrayData"
432            )
433        }
434
435        // if any of the arrays has nulls, insertions from any array requires setting bits
436        // as there is at least one array with nulls.
437        let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0);
438
439        let mut array_capacity;
440
441        let [buffer1, buffer2] = match (data_type, &capacities) {
442            (
443                DataType::LargeUtf8 | DataType::LargeBinary,
444                Capacities::Binary(capacity, Some(value_cap)),
445            ) => {
446                array_capacity = *capacity;
447                preallocate_offset_and_binary_buffer::<i64>(*capacity, *value_cap)
448            }
449            (DataType::Utf8 | DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => {
450                array_capacity = *capacity;
451                preallocate_offset_and_binary_buffer::<i32>(*capacity, *value_cap)
452            }
453            (_, Capacities::Array(capacity)) => {
454                array_capacity = *capacity;
455                new_buffers(data_type, *capacity)
456            }
457            (
458                DataType::List(_)
459                | DataType::LargeList(_)
460                | DataType::ListView(_)
461                | DataType::LargeListView(_)
462                | DataType::FixedSizeList(_, _),
463                Capacities::List(capacity, _),
464            ) => {
465                array_capacity = *capacity;
466                new_buffers(data_type, *capacity)
467            }
468            _ => panic!("Capacities: {capacities:?} not yet supported"),
469        };
470
471        let child_data = match &data_type {
472            DataType::Decimal32(_, _)
473            | DataType::Decimal64(_, _)
474            | DataType::Decimal128(_, _)
475            | DataType::Decimal256(_, _)
476            | DataType::Null
477            | DataType::Boolean
478            | DataType::UInt8
479            | DataType::UInt16
480            | DataType::UInt32
481            | DataType::UInt64
482            | DataType::Int8
483            | DataType::Int16
484            | DataType::Int32
485            | DataType::Int64
486            | DataType::Float16
487            | DataType::Float32
488            | DataType::Float64
489            | DataType::Date32
490            | DataType::Date64
491            | DataType::Time32(_)
492            | DataType::Time64(_)
493            | DataType::Duration(_)
494            | DataType::Timestamp(_, _)
495            | DataType::Utf8
496            | DataType::Binary
497            | DataType::LargeUtf8
498            | DataType::LargeBinary
499            | DataType::BinaryView
500            | DataType::Utf8View
501            | DataType::Interval(_)
502            | DataType::FixedSizeBinary(_) => vec![],
503            DataType::Map(_, _)
504            | DataType::List(_)
505            | DataType::LargeList(_)
506            | DataType::ListView(_)
507            | DataType::LargeListView(_) => {
508                let children = arrays
509                    .iter()
510                    .map(|array| &array.child_data()[0])
511                    .collect::<Vec<_>>();
512
513                let capacities =
514                    if let Capacities::List(capacity, ref child_capacities) = capacities {
515                        child_capacities
516                            .clone()
517                            .map(|c| *c)
518                            .unwrap_or(Capacities::Array(capacity))
519                    } else {
520                        Capacities::Array(array_capacity)
521                    };
522
523                vec![MutableArrayData::with_capacities(
524                    children, use_nulls, capacities,
525                )]
526            }
527            // the dictionary type just appends keys and clones the values.
528            DataType::Dictionary(_, _) => vec![],
529            DataType::Struct(fields) => match capacities {
530                Capacities::Struct(capacity, Some(ref child_capacities)) => {
531                    array_capacity = capacity;
532                    (0..fields.len())
533                        .zip(child_capacities)
534                        .map(|(i, child_cap)| {
535                            let child_arrays = arrays
536                                .iter()
537                                .map(|array| &array.child_data()[i])
538                                .collect::<Vec<_>>();
539                            MutableArrayData::with_capacities(
540                                child_arrays,
541                                use_nulls,
542                                child_cap.clone(),
543                            )
544                        })
545                        .collect::<Vec<_>>()
546                }
547                Capacities::Struct(capacity, None) => {
548                    array_capacity = capacity;
549                    (0..fields.len())
550                        .map(|i| {
551                            let child_arrays = arrays
552                                .iter()
553                                .map(|array| &array.child_data()[i])
554                                .collect::<Vec<_>>();
555                            MutableArrayData::new(child_arrays, use_nulls, capacity)
556                        })
557                        .collect::<Vec<_>>()
558                }
559                _ => (0..fields.len())
560                    .map(|i| {
561                        let child_arrays = arrays
562                            .iter()
563                            .map(|array| &array.child_data()[i])
564                            .collect::<Vec<_>>();
565                        MutableArrayData::new(child_arrays, use_nulls, array_capacity)
566                    })
567                    .collect::<Vec<_>>(),
568            },
569            DataType::RunEndEncoded(_, _) => {
570                let run_ends_child = arrays
571                    .iter()
572                    .map(|array| &array.child_data()[0])
573                    .collect::<Vec<_>>();
574                let value_child = arrays
575                    .iter()
576                    .map(|array| &array.child_data()[1])
577                    .collect::<Vec<_>>();
578                vec![
579                    MutableArrayData::new(run_ends_child, false, array_capacity),
580                    MutableArrayData::new(value_child, use_nulls, array_capacity),
581                ]
582            }
583            DataType::FixedSizeList(_, size) => {
584                let children = arrays
585                    .iter()
586                    .map(|array| &array.child_data()[0])
587                    .collect::<Vec<_>>();
588                let capacities =
589                    if let Capacities::List(capacity, ref child_capacities) = capacities {
590                        child_capacities
591                            .clone()
592                            .map(|c| *c)
593                            .unwrap_or(Capacities::Array(capacity * *size as usize))
594                    } else {
595                        Capacities::Array(array_capacity * *size as usize)
596                    };
597                vec![MutableArrayData::with_capacities(
598                    children, use_nulls, capacities,
599                )]
600            }
601            DataType::Union(fields, _) => (0..fields.len())
602                .map(|i| {
603                    let child_arrays = arrays
604                        .iter()
605                        .map(|array| &array.child_data()[i])
606                        .collect::<Vec<_>>();
607                    MutableArrayData::new(child_arrays, use_nulls, array_capacity)
608                })
609                .collect::<Vec<_>>(),
610        };
611
612        // Get the dictionary if any, and if it is a concatenation of multiple
613        let (dictionary, dict_concat) = match &data_type {
614            DataType::Dictionary(_, _) => {
615                // If more than one dictionary, concatenate dictionaries together
616                let dict_concat = !arrays
617                    .windows(2)
618                    .all(|a| a[0].child_data()[0].ptr_eq(&a[1].child_data()[0]));
619
620                match dict_concat {
621                    false => (Some(arrays[0].child_data()[0].clone()), false),
622                    true => {
623                        if let Capacities::Dictionary(_, _) = capacities {
624                            panic!("dictionary capacity not yet supported")
625                        }
626                        let dictionaries: Vec<_> =
627                            arrays.iter().map(|array| &array.child_data()[0]).collect();
628                        let lengths: Vec<_> = dictionaries
629                            .iter()
630                            .map(|dictionary| dictionary.len())
631                            .collect();
632                        let capacity = lengths.iter().sum();
633
634                        let mut mutable = MutableArrayData::new(dictionaries, false, capacity);
635
636                        for (i, len) in lengths.iter().enumerate() {
637                            mutable.try_extend(i, 0, *len).expect(
638                                "extend failed while building dictionary; \
639                                 this is a bug in MutableArrayData",
640                            )
641                        }
642
643                        (Some(mutable.freeze()), true)
644                    }
645                }
646            }
647            _ => (None, false),
648        };
649
650        let variadic_data_buffers = match &data_type {
651            DataType::BinaryView | DataType::Utf8View => arrays
652                .iter()
653                .flat_map(|x| x.buffers().iter().skip(1))
654                .map(Buffer::clone)
655                .collect(),
656            _ => vec![],
657        };
658
659        let extend_nulls = build_extend_nulls(data_type);
660
661        let extend_null_bits = arrays
662            .iter()
663            .map(|array| build_extend_null_bits(array, use_nulls))
664            .collect();
665
666        let null_buffer = use_nulls.then(|| {
667            let null_bytes = bit_util::ceil(array_capacity, 8);
668            MutableBuffer::from_len_zeroed(null_bytes)
669        });
670
671        let extend_values = match &data_type {
672            DataType::Dictionary(_, _) => {
673                let mut next_offset = 0;
674                let extend_values: Result<Vec<_>, _> = arrays
675                    .iter()
676                    .map(|array| {
677                        let offset = next_offset;
678                        let dict_len = array.child_data()[0].len();
679
680                        if dict_concat {
681                            next_offset += dict_len;
682                        }
683
684                        build_extend_dictionary(array, offset, offset + dict_len)
685                            .ok_or(ArrowError::DictionaryKeyOverflowError)
686                    })
687                    .collect();
688
689                extend_values.expect("MutableArrayData::new is infallible")
690            }
691            DataType::BinaryView | DataType::Utf8View => {
692                let mut next_offset = 0u32;
693                arrays
694                    .iter()
695                    .map(|arr| {
696                        let num_data_buffers = (arr.buffers().len() - 1) as u32;
697                        let offset = next_offset;
698                        next_offset = next_offset
699                            .checked_add(num_data_buffers)
700                            .expect("view buffer index overflow");
701                        build_extend_view(arr, offset)
702                    })
703                    .collect()
704            }
705            _ => arrays.iter().map(|array| build_extend(array)).collect(),
706        };
707
708        let data = _MutableArrayData {
709            data_type: data_type.clone(),
710            len: 0,
711            null_count: 0,
712            null_buffer,
713            buffer1,
714            buffer2,
715            child_data,
716        };
717        Self {
718            arrays,
719            data,
720            dictionary,
721            variadic_data_buffers,
722            extend_values,
723            extend_null_bits,
724            extend_nulls,
725        }
726    }
727
728    /// Extends the in progress array with a region of the input arrays, returning an error on
729    /// overflow.
730    ///
731    /// # Arguments
732    /// * `index` - the index of array that you want to copy values from
733    /// * `start` - the start index of the chunk (inclusive)
734    /// * `end` - the end index of the chunk (exclusive)
735    ///
736    /// # Errors
737    /// Returns an error if offset arithmetic overflows the underlying integer type.
738    ///
739    /// # Panic
740    /// This function panics if there is an invalid index,
741    /// i.e. `index` >= the number of source arrays
742    /// or `end` > the length of the `index`th array
743    pub fn try_extend(&mut self, index: usize, start: usize, end: usize) -> Result<(), ArrowError> {
744        let len = end - start;
745        (self.extend_null_bits[index])(&mut self.data, start, len);
746        // Snapshot buffer lengths before attempting the extend so we can roll
747        // back to a consistent state if it fails.
748        let buf1_len = self.data.buffer1.len();
749        let buf2_len = self.data.buffer2.len();
750        if let Err(e) = (self.extend_values[index])(&mut self.data, index, start, len) {
751            // Restore buffers to their pre-call lengths so the array remains
752            // in a valid state for the caller to inspect or retry.
753            self.data.buffer1.truncate(buf1_len);
754            self.data.buffer2.truncate(buf2_len);
755            return Err(e);
756        }
757        self.data.len += len;
758        Ok(())
759    }
760
761    /// Extends the in progress array with a region of the input arrays.
762    ///
763    /// # Panic
764    /// This function panics if there is an invalid index,
765    /// i.e. `index` >= the number of source arrays,
766    /// `end` > the length of the `index`th array,
767    /// or the offset type overflows (e.g. more than 2 GiB in a `StringArray`).
768    #[deprecated(
769        since = "59.0.0",
770        note = "Use `try_extend` which returns an error on overflow instead of panicking"
771    )]
772    pub fn extend(&mut self, index: usize, start: usize, end: usize) {
773        self.try_extend(index, start, end)
774            .expect("extend failed due to offset overflow")
775    }
776
777    /// Extends the in progress array with null elements, ignoring the input arrays, returning an
778    /// error on overflow.
779    ///
780    /// Prefer this over [`extend_nulls`](Self::extend_nulls) to handle cases where the run-end
781    /// counter overflows (relevant for `RunEndEncoded` arrays).
782    ///
783    /// # Panics
784    ///
785    /// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays
786    pub fn try_extend_nulls(&mut self, len: usize) -> Result<(), ArrowError> {
787        self.data.len += len;
788        let bit_len = bit_util::ceil(self.data.len, 8);
789        let nulls = self.data.null_buffer();
790        nulls.resize(bit_len, 0);
791        self.data.null_count += len;
792        (self.extend_nulls)(&mut self.data, len)?;
793        Ok(())
794    }
795
796    /// Extends the in progress array with null elements, ignoring the input arrays.
797    ///
798    /// # Panics
799    ///
800    /// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays,
801    /// or if the run-end counter overflows for `RunEndEncoded` arrays.
802    #[deprecated(
803        since = "59.0.0",
804        note = "Use `try_extend_nulls` which returns an error on overflow instead of panicking"
805    )]
806    pub fn extend_nulls(&mut self, len: usize) {
807        self.try_extend_nulls(len)
808            .expect("extend_nulls failed due to overflow")
809    }
810
811    /// Returns the current length
812    #[inline]
813    pub fn len(&self) -> usize {
814        self.data.len
815    }
816
817    /// Returns true if len is 0
818    #[inline]
819    pub fn is_empty(&self) -> bool {
820        self.data.len == 0
821    }
822
823    /// Returns the current null count
824    #[inline]
825    pub fn null_count(&self) -> usize {
826        self.data.null_count
827    }
828
829    /// Creates a [ArrayData] from the in progress array, consuming `self`.
830    pub fn freeze(self) -> ArrayData {
831        unsafe { self.into_builder().build_unchecked() }
832    }
833
834    /// Consume self and returns the in progress array as [`ArrayDataBuilder`].
835    ///
836    /// This is useful for extending the default behavior of MutableArrayData.
837    pub fn into_builder(self) -> ArrayDataBuilder {
838        let data = self.data;
839
840        let buffers = match data.data_type {
841            DataType::Null
842            | DataType::Struct(_)
843            | DataType::FixedSizeList(_, _)
844            | DataType::RunEndEncoded(_, _) => {
845                vec![]
846            }
847            DataType::BinaryView | DataType::Utf8View => {
848                let mut b = self.variadic_data_buffers;
849                b.insert(0, data.buffer1.into());
850                b
851            }
852            DataType::Utf8
853            | DataType::Binary
854            | DataType::LargeUtf8
855            | DataType::LargeBinary
856            | DataType::ListView(_)
857            | DataType::LargeListView(_) => {
858                vec![data.buffer1.into(), data.buffer2.into()]
859            }
860            DataType::Union(_, mode) => {
861                match mode {
862                    // Based on Union's DataTypeLayout
863                    UnionMode::Sparse => vec![data.buffer1.into()],
864                    UnionMode::Dense => vec![data.buffer1.into(), data.buffer2.into()],
865                }
866            }
867            _ => vec![data.buffer1.into()],
868        };
869
870        let child_data = match data.data_type {
871            DataType::Dictionary(_, _) => vec![self.dictionary.unwrap()],
872            _ => data.child_data.into_iter().map(|x| x.freeze()).collect(),
873        };
874
875        let nulls = match data.data_type {
876            // RunEndEncoded, Null, and Union arrays cannot have top-level null bitmasks
877            DataType::RunEndEncoded(_, _) | DataType::Null | DataType::Union(_, _) => None,
878            _ => data
879                .null_buffer
880                .map(|nulls| {
881                    let bools = BooleanBuffer::new(nulls.into(), 0, data.len);
882                    unsafe { NullBuffer::new_unchecked(bools, data.null_count) }
883                })
884                .filter(|n| n.null_count() > 0),
885        };
886
887        ArrayDataBuilder::new(data.data_type)
888            .offset(0)
889            .len(data.len)
890            .nulls(nulls)
891            .buffers(buffers)
892            .child_data(child_data)
893    }
894}
895
896// See arrow/tests/array_transform.rs for tests of transform functionality
897
898#[cfg(test)]
899mod test {
900    use super::*;
901    use arrow_schema::Field;
902    use std::sync::Arc;
903
904    #[test]
905    fn test_list_append_with_capacities() {
906        let array = ArrayData::new_empty(&DataType::List(Arc::new(Field::new(
907            "element",
908            DataType::Int64,
909            false,
910        ))));
911
912        let mutable = MutableArrayData::with_capacities(
913            vec![&array],
914            false,
915            Capacities::List(6, Some(Box::new(Capacities::Array(17)))),
916        );
917
918        // capacities are rounded up to multiples of 64 by MutableBuffer
919        assert_eq!(mutable.data.buffer1.capacity(), 64);
920        assert_eq!(mutable.data.child_data[0].data.buffer1.capacity(), 192);
921    }
922}