arrow_data/transform/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Low-level array data abstractions.
19//!
20//! Provides utilities for creating, manipulating, and converting Arrow arrays
21//! made of primitive types, strings, and nested types.
22
23use super::{ArrayData, ArrayDataBuilder, ByteView, data::new_buffers};
24use crate::bit_mask::set_bits;
25use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
26use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, bit_util, i256};
27use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode};
28use half::f16;
29use num_integer::Integer;
30use std::mem;
31
32mod boolean;
33mod fixed_binary;
34mod fixed_size_list;
35mod list;
36mod list_view;
37mod null;
38mod primitive;
39mod run;
40mod structure;
41mod union;
42mod utils;
43mod variable_size;
44
45type ExtendNullBits<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize) + 'a>;
46// function that extends `[start..start+len]` to the mutable array.
47// this is dynamic because different data_types influence how buffers and children are extended.
48type Extend<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize, usize) + 'a>;
49
50type ExtendNulls = Box<dyn Fn(&mut _MutableArrayData, usize)>;
51
52/// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData].
53/// This is just a data container.
54#[derive(Debug)]
55struct _MutableArrayData<'a> {
56    pub data_type: DataType,
57    pub null_count: usize,
58
59    pub len: usize,
60    pub null_buffer: Option<MutableBuffer>,
61
62    // arrow specification only allows up to 3 buffers (2 ignoring the nulls above).
63    // Thus, we place them in the stack to avoid bound checks and greater data locality.
64    pub buffer1: MutableBuffer,
65    pub buffer2: MutableBuffer,
66    pub child_data: Vec<MutableArrayData<'a>>,
67}
68
69impl _MutableArrayData<'_> {
70    fn null_buffer(&mut self) -> &mut MutableBuffer {
71        self.null_buffer
72            .as_mut()
73            .expect("MutableArrayData not nullable")
74    }
75}
76
77fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits<'_> {
78    if let Some(nulls) = array.nulls() {
79        let bytes = nulls.validity();
80        Box::new(move |mutable, start, len| {
81            let mutable_len = mutable.len;
82            let out = mutable.null_buffer();
83            utils::resize_for_bits(out, mutable_len + len);
84            mutable.null_count += set_bits(
85                out.as_slice_mut(),
86                bytes,
87                mutable_len,
88                nulls.offset() + start,
89                len,
90            );
91        })
92    } else if use_nulls {
93        Box::new(|mutable, _, len| {
94            let mutable_len = mutable.len;
95            let out = mutable.null_buffer();
96            utils::resize_for_bits(out, mutable_len + len);
97            let write_data = out.as_slice_mut();
98            (0..len).for_each(|i| {
99                bit_util::set_bit(write_data, mutable_len + i);
100            });
101        })
102    } else {
103        Box::new(|_, _, _| {})
104    }
105}
106
107/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by
108/// copying chunks.
109///
110/// The main use case of this struct is to perform unary operations to arrays of
111/// arbitrary types, such as `filter` and `take`.
112///
113/// # Example
114/// ```
115/// use arrow_buffer::Buffer;
116/// use arrow_data::ArrayData;
117/// use arrow_data::transform::MutableArrayData;
118/// use arrow_schema::DataType;
119/// fn i32_array(values: &[i32]) -> ArrayData {
120///   ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap()
121/// }
122/// let arr1  = i32_array(&[1, 2, 3, 4, 5]);
123/// let arr2  = i32_array(&[6, 7, 8, 9, 10]);
124/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements
125/// let capacity = 3 * std::mem::size_of::<i32>();
126/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10);
127/// // Copy the first 3 elements from arr1
128/// mutable.extend(0, 0, 3);
129/// // Copy the last 3 elements from arr2
130/// mutable.extend(1, 2, 4);
131/// // Complete the MutableArrayData into a new ArrayData
132/// let frozen = mutable.freeze();
133/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10]));
134/// ```
135pub struct MutableArrayData<'a> {
136    /// Input arrays: the data being read FROM.
137    ///
138    /// Note this is "dead code" because all actual references to the arrays are
139    /// stored in closures for extending values and nulls.
140    #[allow(dead_code)]
141    arrays: Vec<&'a ArrayData>,
142
143    /// In progress output array: The data being written TO
144    ///
145    /// Note these fields are in a separate struct, [_MutableArrayData], as they
146    /// cannot be in [MutableArrayData] itself due to mutability invariants (interior
147    /// mutability): [MutableArrayData] contains a function that can only mutate
148    /// [_MutableArrayData], not [MutableArrayData] itself
149    data: _MutableArrayData<'a>,
150
151    /// The child data of the `Array` in Dictionary arrays.
152    ///
153    /// This is not stored in `_MutableArrayData` because these values are
154    /// constant and only needed at the end, when freezing [_MutableArrayData].
155    dictionary: Option<ArrayData>,
156
157    /// Variadic data buffers referenced by views.
158    ///
159    /// Note this this is not stored in `_MutableArrayData` because these values
160    /// are constant and only needed at the end, when freezing
161    /// [_MutableArrayData]
162    variadic_data_buffers: Vec<Buffer>,
163
164    /// function used to extend output array with values from input arrays.
165    ///
166    /// This function's lifetime is bound to the input arrays because it reads
167    /// values from them.
168    extend_values: Vec<Extend<'a>>,
169
170    /// function used to extend the output array with nulls from input arrays.
171    ///
172    /// This function's lifetime is bound to the input arrays because it reads
173    /// nulls from it.
174    extend_null_bits: Vec<ExtendNullBits<'a>>,
175
176    /// function used to extend the output array with null elements.
177    ///
178    /// This function is independent of the arrays and therefore has no lifetime.
179    extend_nulls: ExtendNulls,
180}
181
182impl std::fmt::Debug for MutableArrayData<'_> {
183    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
184        // ignores the closures.
185        f.debug_struct("MutableArrayData")
186            .field("data", &self.data)
187            .finish()
188    }
189}
190
191/// Builds an extend that adds `offset` to the source primitive
192/// Additionally validates that `max` fits into the
193/// the underlying primitive returning None if not
194fn build_extend_dictionary(array: &ArrayData, offset: usize, max: usize) -> Option<Extend<'_>> {
195    macro_rules! validate_and_build {
196        ($dt: ty) => {{
197            let _: $dt = max.try_into().ok()?;
198            let offset: $dt = offset.try_into().ok()?;
199            Some(primitive::build_extend_with_offset(array, offset))
200        }};
201    }
202    match array.data_type() {
203        DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
204            DataType::UInt8 => validate_and_build!(u8),
205            DataType::UInt16 => validate_and_build!(u16),
206            DataType::UInt32 => validate_and_build!(u32),
207            DataType::UInt64 => validate_and_build!(u64),
208            DataType::Int8 => validate_and_build!(i8),
209            DataType::Int16 => validate_and_build!(i16),
210            DataType::Int32 => validate_and_build!(i32),
211            DataType::Int64 => validate_and_build!(i64),
212            _ => unreachable!(),
213        },
214        _ => None,
215    }
216}
217
218/// Builds an extend that adds `buffer_offset` to any buffer indices encountered
219fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend<'_> {
220    let views = array.buffer::<u128>(0);
221    Box::new(
222        move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| {
223            mutable
224                .buffer1
225                .extend(views[start..start + len].iter().map(|v| {
226                    let len = *v as u32;
227                    if len <= 12 {
228                        return *v; // Stored inline
229                    }
230                    let mut view = ByteView::from(*v);
231                    view.buffer_index += buffer_offset;
232                    view.into()
233                }))
234        },
235    )
236}
237
238fn build_extend(array: &ArrayData) -> Extend<'_> {
239    match array.data_type() {
240        DataType::Null => null::build_extend(array),
241        DataType::Boolean => boolean::build_extend(array),
242        DataType::UInt8 => primitive::build_extend::<u8>(array),
243        DataType::UInt16 => primitive::build_extend::<u16>(array),
244        DataType::UInt32 => primitive::build_extend::<u32>(array),
245        DataType::UInt64 => primitive::build_extend::<u64>(array),
246        DataType::Int8 => primitive::build_extend::<i8>(array),
247        DataType::Int16 => primitive::build_extend::<i16>(array),
248        DataType::Int32 => primitive::build_extend::<i32>(array),
249        DataType::Int64 => primitive::build_extend::<i64>(array),
250        DataType::Float32 => primitive::build_extend::<f32>(array),
251        DataType::Float64 => primitive::build_extend::<f64>(array),
252        DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
253            primitive::build_extend::<i32>(array)
254        }
255        DataType::Date64
256        | DataType::Time64(_)
257        | DataType::Timestamp(_, _)
258        | DataType::Duration(_)
259        | DataType::Interval(IntervalUnit::DayTime) => primitive::build_extend::<i64>(array),
260        DataType::Interval(IntervalUnit::MonthDayNano) => primitive::build_extend::<i128>(array),
261        DataType::Decimal32(_, _) => primitive::build_extend::<i32>(array),
262        DataType::Decimal64(_, _) => primitive::build_extend::<i64>(array),
263        DataType::Decimal128(_, _) => primitive::build_extend::<i128>(array),
264        DataType::Decimal256(_, _) => primitive::build_extend::<i256>(array),
265        DataType::Utf8 | DataType::Binary => variable_size::build_extend::<i32>(array),
266        DataType::LargeUtf8 | DataType::LargeBinary => variable_size::build_extend::<i64>(array),
267        DataType::BinaryView | DataType::Utf8View => unreachable!("should use build_extend_view"),
268        DataType::Map(_, _) | DataType::List(_) => list::build_extend::<i32>(array),
269        DataType::LargeList(_) => list::build_extend::<i64>(array),
270        DataType::ListView(_) => list_view::build_extend::<i32>(array),
271        DataType::LargeListView(_) => list_view::build_extend::<i64>(array),
272        DataType::Dictionary(_, _) => unreachable!("should use build_extend_dictionary"),
273        DataType::Struct(_) => structure::build_extend(array),
274        DataType::FixedSizeBinary(_) => fixed_binary::build_extend(array),
275        DataType::Float16 => primitive::build_extend::<f16>(array),
276        DataType::FixedSizeList(_, _) => fixed_size_list::build_extend(array),
277        DataType::Union(_, mode) => match mode {
278            UnionMode::Sparse => union::build_extend_sparse(array),
279            UnionMode::Dense => union::build_extend_dense(array),
280        },
281        DataType::RunEndEncoded(_, _) => run::build_extend(array),
282    }
283}
284
285fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
286    Box::new(match data_type {
287        DataType::Null => null::extend_nulls,
288        DataType::Boolean => boolean::extend_nulls,
289        DataType::UInt8 => primitive::extend_nulls::<u8>,
290        DataType::UInt16 => primitive::extend_nulls::<u16>,
291        DataType::UInt32 => primitive::extend_nulls::<u32>,
292        DataType::UInt64 => primitive::extend_nulls::<u64>,
293        DataType::Int8 => primitive::extend_nulls::<i8>,
294        DataType::Int16 => primitive::extend_nulls::<i16>,
295        DataType::Int32 => primitive::extend_nulls::<i32>,
296        DataType::Int64 => primitive::extend_nulls::<i64>,
297        DataType::Float32 => primitive::extend_nulls::<f32>,
298        DataType::Float64 => primitive::extend_nulls::<f64>,
299        DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
300            primitive::extend_nulls::<i32>
301        }
302        DataType::Date64
303        | DataType::Time64(_)
304        | DataType::Timestamp(_, _)
305        | DataType::Duration(_)
306        | DataType::Interval(IntervalUnit::DayTime) => primitive::extend_nulls::<i64>,
307        DataType::Interval(IntervalUnit::MonthDayNano) => primitive::extend_nulls::<i128>,
308        DataType::Decimal32(_, _) => primitive::extend_nulls::<i32>,
309        DataType::Decimal64(_, _) => primitive::extend_nulls::<i64>,
310        DataType::Decimal128(_, _) => primitive::extend_nulls::<i128>,
311        DataType::Decimal256(_, _) => primitive::extend_nulls::<i256>,
312        DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::<i32>,
313        DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::<i64>,
314        DataType::BinaryView | DataType::Utf8View => primitive::extend_nulls::<u128>,
315        DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::<i32>,
316        DataType::LargeList(_) => list::extend_nulls::<i64>,
317        DataType::ListView(_) => list_view::extend_nulls::<i32>,
318        DataType::LargeListView(_) => list_view::extend_nulls::<i64>,
319        DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
320            DataType::UInt8 => primitive::extend_nulls::<u8>,
321            DataType::UInt16 => primitive::extend_nulls::<u16>,
322            DataType::UInt32 => primitive::extend_nulls::<u32>,
323            DataType::UInt64 => primitive::extend_nulls::<u64>,
324            DataType::Int8 => primitive::extend_nulls::<i8>,
325            DataType::Int16 => primitive::extend_nulls::<i16>,
326            DataType::Int32 => primitive::extend_nulls::<i32>,
327            DataType::Int64 => primitive::extend_nulls::<i64>,
328            _ => unreachable!(),
329        },
330        DataType::Struct(_) => structure::extend_nulls,
331        DataType::FixedSizeBinary(_) => fixed_binary::extend_nulls,
332        DataType::Float16 => primitive::extend_nulls::<f16>,
333        DataType::FixedSizeList(_, _) => fixed_size_list::extend_nulls,
334        DataType::Union(_, mode) => match mode {
335            UnionMode::Sparse => union::extend_nulls_sparse,
336            UnionMode::Dense => union::extend_nulls_dense,
337        },
338        DataType::RunEndEncoded(_, _) => run::extend_nulls,
339    })
340}
341
342fn preallocate_offset_and_binary_buffer<Offset: ArrowNativeType + Integer>(
343    capacity: usize,
344    binary_size: usize,
345) -> [MutableBuffer; 2] {
346    // offsets
347    let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<Offset>());
348    // safety: `unsafe` code assumes that this buffer is initialized with one element
349    buffer.push(Offset::zero());
350
351    [
352        buffer,
353        MutableBuffer::new(binary_size * mem::size_of::<u8>()),
354    ]
355}
356
357/// Define capacities to pre-allocate for child data or data buffers.
358#[derive(Debug, Clone)]
359pub enum Capacities {
360    /// Binary, Utf8 and LargeUtf8 data types
361    ///
362    /// Defines
363    /// * the capacity of the array offsets
364    /// * the capacity of the binary/ str buffer
365    Binary(usize, Option<usize>),
366    /// List and LargeList data types
367    ///
368    /// Defines
369    /// * the capacity of the array offsets
370    /// * the capacity of the child data
371    List(usize, Option<Box<Capacities>>),
372    /// Struct type
373    ///
374    /// Defines
375    /// * the capacity of the array
376    /// * the capacities of the fields
377    Struct(usize, Option<Vec<Capacities>>),
378    /// Dictionary type
379    ///
380    /// Defines
381    /// * the capacity of the array/keys
382    /// * the capacity of the values
383    Dictionary(usize, Option<Box<Capacities>>),
384    /// Don't preallocate inner buffers and rely on array growth strategy
385    Array(usize),
386}
387
388impl<'a> MutableArrayData<'a> {
389    /// Returns a new [MutableArrayData] with capacity to `capacity` slots and
390    /// specialized to create an [ArrayData] from multiple `arrays`.
391    ///
392    /// # Arguments
393    /// * `arrays` - the source arrays to copy from
394    /// * `use_nulls` - a flag used to optimize insertions
395    ///   - `false` if the only source of nulls are the arrays themselves
396    ///   - `true` if the user plans to call [MutableArrayData::extend_nulls].
397    /// * capacity - the preallocated capacity of the output array, in bytes
398    ///
399    /// Thus, if `use_nulls` is `false`, calling
400    /// [MutableArrayData::extend_nulls] should not be used.
401    pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
402        Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity))
403    }
404
405    /// Similar to [MutableArrayData::new], but lets users define the
406    /// preallocated capacities of the array with more granularity.
407    ///
408    /// See [MutableArrayData::new] for more information on the arguments.
409    ///
410    /// # Panics
411    ///
412    /// This function panics if the given `capacities` don't match the data type
413    /// of `arrays`. Or when a [Capacities] variant is not yet supported.
414    pub fn with_capacities(
415        arrays: Vec<&'a ArrayData>,
416        use_nulls: bool,
417        capacities: Capacities,
418    ) -> Self {
419        let data_type = arrays[0].data_type();
420
421        for a in arrays.iter().skip(1) {
422            assert_eq!(
423                data_type,
424                a.data_type(),
425                "Arrays with inconsistent types passed to MutableArrayData"
426            )
427        }
428
429        // if any of the arrays has nulls, insertions from any array requires setting bits
430        // as there is at least one array with nulls.
431        let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0);
432
433        let mut array_capacity;
434
435        let [buffer1, buffer2] = match (data_type, &capacities) {
436            (
437                DataType::LargeUtf8 | DataType::LargeBinary,
438                Capacities::Binary(capacity, Some(value_cap)),
439            ) => {
440                array_capacity = *capacity;
441                preallocate_offset_and_binary_buffer::<i64>(*capacity, *value_cap)
442            }
443            (DataType::Utf8 | DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => {
444                array_capacity = *capacity;
445                preallocate_offset_and_binary_buffer::<i32>(*capacity, *value_cap)
446            }
447            (_, Capacities::Array(capacity)) => {
448                array_capacity = *capacity;
449                new_buffers(data_type, *capacity)
450            }
451            (
452                DataType::List(_)
453                | DataType::LargeList(_)
454                | DataType::ListView(_)
455                | DataType::LargeListView(_)
456                | DataType::FixedSizeList(_, _),
457                Capacities::List(capacity, _),
458            ) => {
459                array_capacity = *capacity;
460                new_buffers(data_type, *capacity)
461            }
462            _ => panic!("Capacities: {capacities:?} not yet supported"),
463        };
464
465        let child_data = match &data_type {
466            DataType::Decimal32(_, _)
467            | DataType::Decimal64(_, _)
468            | DataType::Decimal128(_, _)
469            | DataType::Decimal256(_, _)
470            | DataType::Null
471            | DataType::Boolean
472            | DataType::UInt8
473            | DataType::UInt16
474            | DataType::UInt32
475            | DataType::UInt64
476            | DataType::Int8
477            | DataType::Int16
478            | DataType::Int32
479            | DataType::Int64
480            | DataType::Float16
481            | DataType::Float32
482            | DataType::Float64
483            | DataType::Date32
484            | DataType::Date64
485            | DataType::Time32(_)
486            | DataType::Time64(_)
487            | DataType::Duration(_)
488            | DataType::Timestamp(_, _)
489            | DataType::Utf8
490            | DataType::Binary
491            | DataType::LargeUtf8
492            | DataType::LargeBinary
493            | DataType::BinaryView
494            | DataType::Utf8View
495            | DataType::Interval(_)
496            | DataType::FixedSizeBinary(_) => vec![],
497            DataType::Map(_, _)
498            | DataType::List(_)
499            | DataType::LargeList(_)
500            | DataType::ListView(_)
501            | DataType::LargeListView(_) => {
502                let children = arrays
503                    .iter()
504                    .map(|array| &array.child_data()[0])
505                    .collect::<Vec<_>>();
506
507                let capacities =
508                    if let Capacities::List(capacity, ref child_capacities) = capacities {
509                        child_capacities
510                            .clone()
511                            .map(|c| *c)
512                            .unwrap_or(Capacities::Array(capacity))
513                    } else {
514                        Capacities::Array(array_capacity)
515                    };
516
517                vec![MutableArrayData::with_capacities(
518                    children, use_nulls, capacities,
519                )]
520            }
521            // the dictionary type just appends keys and clones the values.
522            DataType::Dictionary(_, _) => vec![],
523            DataType::Struct(fields) => match capacities {
524                Capacities::Struct(capacity, Some(ref child_capacities)) => {
525                    array_capacity = capacity;
526                    (0..fields.len())
527                        .zip(child_capacities)
528                        .map(|(i, child_cap)| {
529                            let child_arrays = arrays
530                                .iter()
531                                .map(|array| &array.child_data()[i])
532                                .collect::<Vec<_>>();
533                            MutableArrayData::with_capacities(
534                                child_arrays,
535                                use_nulls,
536                                child_cap.clone(),
537                            )
538                        })
539                        .collect::<Vec<_>>()
540                }
541                Capacities::Struct(capacity, None) => {
542                    array_capacity = capacity;
543                    (0..fields.len())
544                        .map(|i| {
545                            let child_arrays = arrays
546                                .iter()
547                                .map(|array| &array.child_data()[i])
548                                .collect::<Vec<_>>();
549                            MutableArrayData::new(child_arrays, use_nulls, capacity)
550                        })
551                        .collect::<Vec<_>>()
552                }
553                _ => (0..fields.len())
554                    .map(|i| {
555                        let child_arrays = arrays
556                            .iter()
557                            .map(|array| &array.child_data()[i])
558                            .collect::<Vec<_>>();
559                        MutableArrayData::new(child_arrays, use_nulls, array_capacity)
560                    })
561                    .collect::<Vec<_>>(),
562            },
563            DataType::RunEndEncoded(_, _) => {
564                let run_ends_child = arrays
565                    .iter()
566                    .map(|array| &array.child_data()[0])
567                    .collect::<Vec<_>>();
568                let value_child = arrays
569                    .iter()
570                    .map(|array| &array.child_data()[1])
571                    .collect::<Vec<_>>();
572                vec![
573                    MutableArrayData::new(run_ends_child, false, array_capacity),
574                    MutableArrayData::new(value_child, use_nulls, array_capacity),
575                ]
576            }
577            DataType::FixedSizeList(_, size) => {
578                let children = arrays
579                    .iter()
580                    .map(|array| &array.child_data()[0])
581                    .collect::<Vec<_>>();
582                let capacities =
583                    if let Capacities::List(capacity, ref child_capacities) = capacities {
584                        child_capacities
585                            .clone()
586                            .map(|c| *c)
587                            .unwrap_or(Capacities::Array(capacity * *size as usize))
588                    } else {
589                        Capacities::Array(array_capacity * *size as usize)
590                    };
591                vec![MutableArrayData::with_capacities(
592                    children, use_nulls, capacities,
593                )]
594            }
595            DataType::Union(fields, _) => (0..fields.len())
596                .map(|i| {
597                    let child_arrays = arrays
598                        .iter()
599                        .map(|array| &array.child_data()[i])
600                        .collect::<Vec<_>>();
601                    MutableArrayData::new(child_arrays, use_nulls, array_capacity)
602                })
603                .collect::<Vec<_>>(),
604        };
605
606        // Get the dictionary if any, and if it is a concatenation of multiple
607        let (dictionary, dict_concat) = match &data_type {
608            DataType::Dictionary(_, _) => {
609                // If more than one dictionary, concatenate dictionaries together
610                let dict_concat = !arrays
611                    .windows(2)
612                    .all(|a| a[0].child_data()[0].ptr_eq(&a[1].child_data()[0]));
613
614                match dict_concat {
615                    false => (Some(arrays[0].child_data()[0].clone()), false),
616                    true => {
617                        if let Capacities::Dictionary(_, _) = capacities {
618                            panic!("dictionary capacity not yet supported")
619                        }
620                        let dictionaries: Vec<_> =
621                            arrays.iter().map(|array| &array.child_data()[0]).collect();
622                        let lengths: Vec<_> = dictionaries
623                            .iter()
624                            .map(|dictionary| dictionary.len())
625                            .collect();
626                        let capacity = lengths.iter().sum();
627
628                        let mut mutable = MutableArrayData::new(dictionaries, false, capacity);
629
630                        for (i, len) in lengths.iter().enumerate() {
631                            mutable.extend(i, 0, *len)
632                        }
633
634                        (Some(mutable.freeze()), true)
635                    }
636                }
637            }
638            _ => (None, false),
639        };
640
641        let variadic_data_buffers = match &data_type {
642            DataType::BinaryView | DataType::Utf8View => arrays
643                .iter()
644                .flat_map(|x| x.buffers().iter().skip(1))
645                .map(Buffer::clone)
646                .collect(),
647            _ => vec![],
648        };
649
650        let extend_nulls = build_extend_nulls(data_type);
651
652        let extend_null_bits = arrays
653            .iter()
654            .map(|array| build_extend_null_bits(array, use_nulls))
655            .collect();
656
657        let null_buffer = use_nulls.then(|| {
658            let null_bytes = bit_util::ceil(array_capacity, 8);
659            MutableBuffer::from_len_zeroed(null_bytes)
660        });
661
662        let extend_values = match &data_type {
663            DataType::Dictionary(_, _) => {
664                let mut next_offset = 0;
665                let extend_values: Result<Vec<_>, _> = arrays
666                    .iter()
667                    .map(|array| {
668                        let offset = next_offset;
669                        let dict_len = array.child_data()[0].len();
670
671                        if dict_concat {
672                            next_offset += dict_len;
673                        }
674
675                        build_extend_dictionary(array, offset, offset + dict_len)
676                            .ok_or(ArrowError::DictionaryKeyOverflowError)
677                    })
678                    .collect();
679
680                extend_values.expect("MutableArrayData::new is infallible")
681            }
682            DataType::BinaryView | DataType::Utf8View => {
683                let mut next_offset = 0u32;
684                arrays
685                    .iter()
686                    .map(|arr| {
687                        let num_data_buffers = (arr.buffers().len() - 1) as u32;
688                        let offset = next_offset;
689                        next_offset = next_offset
690                            .checked_add(num_data_buffers)
691                            .expect("view buffer index overflow");
692                        build_extend_view(arr, offset)
693                    })
694                    .collect()
695            }
696            _ => arrays.iter().map(|array| build_extend(array)).collect(),
697        };
698
699        let data = _MutableArrayData {
700            data_type: data_type.clone(),
701            len: 0,
702            null_count: 0,
703            null_buffer,
704            buffer1,
705            buffer2,
706            child_data,
707        };
708        Self {
709            arrays,
710            data,
711            dictionary,
712            variadic_data_buffers,
713            extend_values,
714            extend_null_bits,
715            extend_nulls,
716        }
717    }
718
719    /// Extends the in progress array with a region of the input arrays
720    ///
721    /// # Arguments
722    /// * `index` - the index of array that you what to copy values from
723    /// * `start` - the start index of the chunk (inclusive)
724    /// * `end` - the end index of the chunk (exclusive)
725    ///
726    /// # Panic
727    /// This function panics if there is an invalid index,
728    /// i.e. `index` >= the number of source arrays
729    /// or `end` > the length of the `index`th array
730    pub fn extend(&mut self, index: usize, start: usize, end: usize) {
731        let len = end - start;
732        (self.extend_null_bits[index])(&mut self.data, start, len);
733        (self.extend_values[index])(&mut self.data, index, start, len);
734        self.data.len += len;
735    }
736
737    /// Extends the in progress array with null elements, ignoring the input arrays.
738    ///
739    /// # Panics
740    ///
741    /// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays
742    pub fn extend_nulls(&mut self, len: usize) {
743        self.data.len += len;
744        let bit_len = bit_util::ceil(self.data.len, 8);
745        let nulls = self.data.null_buffer();
746        nulls.resize(bit_len, 0);
747        self.data.null_count += len;
748        (self.extend_nulls)(&mut self.data, len);
749    }
750
751    /// Returns the current length
752    #[inline]
753    pub fn len(&self) -> usize {
754        self.data.len
755    }
756
757    /// Returns true if len is 0
758    #[inline]
759    pub fn is_empty(&self) -> bool {
760        self.data.len == 0
761    }
762
763    /// Returns the current null count
764    #[inline]
765    pub fn null_count(&self) -> usize {
766        self.data.null_count
767    }
768
769    /// Creates a [ArrayData] from the in progress array, consuming `self`.
770    pub fn freeze(self) -> ArrayData {
771        unsafe { self.into_builder().build_unchecked() }
772    }
773
774    /// Consume self and returns the in progress array as [`ArrayDataBuilder`].
775    ///
776    /// This is useful for extending the default behavior of MutableArrayData.
777    pub fn into_builder(self) -> ArrayDataBuilder {
778        let data = self.data;
779
780        let buffers = match data.data_type {
781            DataType::Null
782            | DataType::Struct(_)
783            | DataType::FixedSizeList(_, _)
784            | DataType::RunEndEncoded(_, _) => {
785                vec![]
786            }
787            DataType::BinaryView | DataType::Utf8View => {
788                let mut b = self.variadic_data_buffers;
789                b.insert(0, data.buffer1.into());
790                b
791            }
792            DataType::Utf8
793            | DataType::Binary
794            | DataType::LargeUtf8
795            | DataType::LargeBinary
796            | DataType::ListView(_)
797            | DataType::LargeListView(_) => {
798                vec![data.buffer1.into(), data.buffer2.into()]
799            }
800            DataType::Union(_, mode) => {
801                match mode {
802                    // Based on Union's DataTypeLayout
803                    UnionMode::Sparse => vec![data.buffer1.into()],
804                    UnionMode::Dense => vec![data.buffer1.into(), data.buffer2.into()],
805                }
806            }
807            _ => vec![data.buffer1.into()],
808        };
809
810        let child_data = match data.data_type {
811            DataType::Dictionary(_, _) => vec![self.dictionary.unwrap()],
812            _ => data.child_data.into_iter().map(|x| x.freeze()).collect(),
813        };
814
815        let nulls = match data.data_type {
816            // RunEndEncoded and Null arrays cannot have top-level null bitmasks
817            DataType::RunEndEncoded(_, _) | DataType::Null => None,
818            _ => data
819                .null_buffer
820                .map(|nulls| {
821                    let bools = BooleanBuffer::new(nulls.into(), 0, data.len);
822                    unsafe { NullBuffer::new_unchecked(bools, data.null_count) }
823                })
824                .filter(|n| n.null_count() > 0),
825        };
826
827        ArrayDataBuilder::new(data.data_type)
828            .offset(0)
829            .len(data.len)
830            .nulls(nulls)
831            .buffers(buffers)
832            .child_data(child_data)
833    }
834}
835
836// See arrow/tests/array_transform.rs for tests of transform functionality
837
838#[cfg(test)]
839mod test {
840    use super::*;
841    use arrow_schema::Field;
842    use std::sync::Arc;
843
844    #[test]
845    fn test_list_append_with_capacities() {
846        let array = ArrayData::new_empty(&DataType::List(Arc::new(Field::new(
847            "element",
848            DataType::Int64,
849            false,
850        ))));
851
852        let mutable = MutableArrayData::with_capacities(
853            vec![&array],
854            false,
855            Capacities::List(6, Some(Box::new(Capacities::Array(17)))),
856        );
857
858        // capacities are rounded up to multiples of 64 by MutableBuffer
859        assert_eq!(mutable.data.buffer1.capacity(), 64);
860        assert_eq!(mutable.data.child_data[0].data.buffer1.capacity(), 192);
861    }
862}