arrow_data/transform/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Low-level array data abstractions.
19//!
20//! Provides utilities for creating, manipulating, and converting Arrow arrays
21//! made of primitive types, strings, and nested types.
22
23use super::{data::new_buffers, ArrayData, ArrayDataBuilder, ByteView};
24use crate::bit_mask::set_bits;
25use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
26use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
27use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode};
28use half::f16;
29use num::Integer;
30use std::mem;
31
32mod boolean;
33mod fixed_binary;
34mod fixed_size_list;
35mod list;
36mod null;
37mod primitive;
38mod run;
39mod structure;
40mod union;
41mod utils;
42mod variable_size;
43
44type ExtendNullBits<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize) + 'a>;
45// function that extends `[start..start+len]` to the mutable array.
46// this is dynamic because different data_types influence how buffers and children are extended.
47type Extend<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize, usize) + 'a>;
48
49type ExtendNulls = Box<dyn Fn(&mut _MutableArrayData, usize)>;
50
51/// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData].
52/// This is just a data container.
53#[derive(Debug)]
54struct _MutableArrayData<'a> {
55    pub data_type: DataType,
56    pub null_count: usize,
57
58    pub len: usize,
59    pub null_buffer: Option<MutableBuffer>,
60
61    // arrow specification only allows up to 3 buffers (2 ignoring the nulls above).
62    // Thus, we place them in the stack to avoid bound checks and greater data locality.
63    pub buffer1: MutableBuffer,
64    pub buffer2: MutableBuffer,
65    pub child_data: Vec<MutableArrayData<'a>>,
66}
67
68impl _MutableArrayData<'_> {
69    fn null_buffer(&mut self) -> &mut MutableBuffer {
70        self.null_buffer
71            .as_mut()
72            .expect("MutableArrayData not nullable")
73    }
74}
75
76fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits {
77    if let Some(nulls) = array.nulls() {
78        let bytes = nulls.validity();
79        Box::new(move |mutable, start, len| {
80            let mutable_len = mutable.len;
81            let out = mutable.null_buffer();
82            utils::resize_for_bits(out, mutable_len + len);
83            mutable.null_count += set_bits(
84                out.as_slice_mut(),
85                bytes,
86                mutable_len,
87                nulls.offset() + start,
88                len,
89            );
90        })
91    } else if use_nulls {
92        Box::new(|mutable, _, len| {
93            let mutable_len = mutable.len;
94            let out = mutable.null_buffer();
95            utils::resize_for_bits(out, mutable_len + len);
96            let write_data = out.as_slice_mut();
97            (0..len).for_each(|i| {
98                bit_util::set_bit(write_data, mutable_len + i);
99            });
100        })
101    } else {
102        Box::new(|_, _, _| {})
103    }
104}
105
106/// Efficiently create an [ArrayData] from one or more existing [ArrayData]s by
107/// copying chunks.
108///
109/// The main use case of this struct is to perform unary operations to arrays of
110/// arbitrary types, such as `filter` and `take`.
111///
112/// # Example
113/// ```
114/// use arrow_buffer::Buffer;
115/// use arrow_data::ArrayData;
116/// use arrow_data::transform::MutableArrayData;
117/// use arrow_schema::DataType;
118/// fn i32_array(values: &[i32]) -> ArrayData {
119///   ArrayData::try_new(DataType::Int32, 5, None, 0, vec![Buffer::from_slice_ref(values)], vec![]).unwrap()
120/// }
121/// let arr1  = i32_array(&[1, 2, 3, 4, 5]);
122/// let arr2  = i32_array(&[6, 7, 8, 9, 10]);
123/// // Create a mutable array for copying values from arr1 and arr2, with a capacity for 6 elements
124/// let capacity = 3 * std::mem::size_of::<i32>();
125/// let mut mutable = MutableArrayData::new(vec![&arr1, &arr2], false, 10);
126/// // Copy the first 3 elements from arr1
127/// mutable.extend(0, 0, 3);
128/// // Copy the last 3 elements from arr2
129/// mutable.extend(1, 2, 4);
130/// // Complete the MutableArrayData into a new ArrayData
131/// let frozen = mutable.freeze();
132/// assert_eq!(frozen, i32_array(&[1, 2, 3, 8, 9, 10]));
133/// ```
134pub struct MutableArrayData<'a> {
135    /// Input arrays: the data being read FROM.
136    ///
137    /// Note this is "dead code" because all actual references to the arrays are
138    /// stored in closures for extending values and nulls.
139    #[allow(dead_code)]
140    arrays: Vec<&'a ArrayData>,
141
142    /// In progress output array: The data being written TO
143    ///
144    /// Note these fields are in a separate struct, [_MutableArrayData], as they
145    /// cannot be in [MutableArrayData] itself due to mutability invariants (interior
146    /// mutability): [MutableArrayData] contains a function that can only mutate
147    /// [_MutableArrayData], not [MutableArrayData] itself
148    data: _MutableArrayData<'a>,
149
150    /// The child data of the `Array` in Dictionary arrays.
151    ///
152    /// This is not stored in `_MutableArrayData` because these values are
153    /// constant and only needed at the end, when freezing [_MutableArrayData].
154    dictionary: Option<ArrayData>,
155
156    /// Variadic data buffers referenced by views.
157    ///
158    /// Note this this is not stored in `_MutableArrayData` because these values
159    /// are constant and only needed at the end, when freezing
160    /// [_MutableArrayData]
161    variadic_data_buffers: Vec<Buffer>,
162
163    /// function used to extend output array with values from input arrays.
164    ///
165    /// This function's lifetime is bound to the input arrays because it reads
166    /// values from them.
167    extend_values: Vec<Extend<'a>>,
168
169    /// function used to extend the output array with nulls from input arrays.
170    ///
171    /// This function's lifetime is bound to the input arrays because it reads
172    /// nulls from it.
173    extend_null_bits: Vec<ExtendNullBits<'a>>,
174
175    /// function used to extend the output array with null elements.
176    ///
177    /// This function is independent of the arrays and therefore has no lifetime.
178    extend_nulls: ExtendNulls,
179}
180
181impl std::fmt::Debug for MutableArrayData<'_> {
182    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
183        // ignores the closures.
184        f.debug_struct("MutableArrayData")
185            .field("data", &self.data)
186            .finish()
187    }
188}
189
190/// Builds an extend that adds `offset` to the source primitive
191/// Additionally validates that `max` fits into the
192/// the underlying primitive returning None if not
193fn build_extend_dictionary(array: &ArrayData, offset: usize, max: usize) -> Option<Extend> {
194    macro_rules! validate_and_build {
195        ($dt: ty) => {{
196            let _: $dt = max.try_into().ok()?;
197            let offset: $dt = offset.try_into().ok()?;
198            Some(primitive::build_extend_with_offset(array, offset))
199        }};
200    }
201    match array.data_type() {
202        DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
203            DataType::UInt8 => validate_and_build!(u8),
204            DataType::UInt16 => validate_and_build!(u16),
205            DataType::UInt32 => validate_and_build!(u32),
206            DataType::UInt64 => validate_and_build!(u64),
207            DataType::Int8 => validate_and_build!(i8),
208            DataType::Int16 => validate_and_build!(i16),
209            DataType::Int32 => validate_and_build!(i32),
210            DataType::Int64 => validate_and_build!(i64),
211            _ => unreachable!(),
212        },
213        _ => None,
214    }
215}
216
217/// Builds an extend that adds `buffer_offset` to any buffer indices encountered
218fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend {
219    let views = array.buffer::<u128>(0);
220    Box::new(
221        move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| {
222            mutable
223                .buffer1
224                .extend(views[start..start + len].iter().map(|v| {
225                    let len = *v as u32;
226                    if len <= 12 {
227                        return *v; // Stored inline
228                    }
229                    let mut view = ByteView::from(*v);
230                    view.buffer_index += buffer_offset;
231                    view.into()
232                }))
233        },
234    )
235}
236
237fn build_extend(array: &ArrayData) -> Extend {
238    match array.data_type() {
239        DataType::Null => null::build_extend(array),
240        DataType::Boolean => boolean::build_extend(array),
241        DataType::UInt8 => primitive::build_extend::<u8>(array),
242        DataType::UInt16 => primitive::build_extend::<u16>(array),
243        DataType::UInt32 => primitive::build_extend::<u32>(array),
244        DataType::UInt64 => primitive::build_extend::<u64>(array),
245        DataType::Int8 => primitive::build_extend::<i8>(array),
246        DataType::Int16 => primitive::build_extend::<i16>(array),
247        DataType::Int32 => primitive::build_extend::<i32>(array),
248        DataType::Int64 => primitive::build_extend::<i64>(array),
249        DataType::Float32 => primitive::build_extend::<f32>(array),
250        DataType::Float64 => primitive::build_extend::<f64>(array),
251        DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
252            primitive::build_extend::<i32>(array)
253        }
254        DataType::Date64
255        | DataType::Time64(_)
256        | DataType::Timestamp(_, _)
257        | DataType::Duration(_)
258        | DataType::Interval(IntervalUnit::DayTime) => primitive::build_extend::<i64>(array),
259        DataType::Interval(IntervalUnit::MonthDayNano) => primitive::build_extend::<i128>(array),
260        DataType::Decimal32(_, _) => primitive::build_extend::<i32>(array),
261        DataType::Decimal64(_, _) => primitive::build_extend::<i64>(array),
262        DataType::Decimal128(_, _) => primitive::build_extend::<i128>(array),
263        DataType::Decimal256(_, _) => primitive::build_extend::<i256>(array),
264        DataType::Utf8 | DataType::Binary => variable_size::build_extend::<i32>(array),
265        DataType::LargeUtf8 | DataType::LargeBinary => variable_size::build_extend::<i64>(array),
266        DataType::BinaryView | DataType::Utf8View => unreachable!("should use build_extend_view"),
267        DataType::Map(_, _) | DataType::List(_) => list::build_extend::<i32>(array),
268        DataType::ListView(_) | DataType::LargeListView(_) => {
269            unimplemented!("ListView/LargeListView not implemented")
270        }
271        DataType::LargeList(_) => list::build_extend::<i64>(array),
272        DataType::Dictionary(_, _) => unreachable!("should use build_extend_dictionary"),
273        DataType::Struct(_) => structure::build_extend(array),
274        DataType::FixedSizeBinary(_) => fixed_binary::build_extend(array),
275        DataType::Float16 => primitive::build_extend::<f16>(array),
276        DataType::FixedSizeList(_, _) => fixed_size_list::build_extend(array),
277        DataType::Union(_, mode) => match mode {
278            UnionMode::Sparse => union::build_extend_sparse(array),
279            UnionMode::Dense => union::build_extend_dense(array),
280        },
281        DataType::RunEndEncoded(_, _) => run::build_extend(array),
282    }
283}
284
285fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
286    Box::new(match data_type {
287        DataType::Null => null::extend_nulls,
288        DataType::Boolean => boolean::extend_nulls,
289        DataType::UInt8 => primitive::extend_nulls::<u8>,
290        DataType::UInt16 => primitive::extend_nulls::<u16>,
291        DataType::UInt32 => primitive::extend_nulls::<u32>,
292        DataType::UInt64 => primitive::extend_nulls::<u64>,
293        DataType::Int8 => primitive::extend_nulls::<i8>,
294        DataType::Int16 => primitive::extend_nulls::<i16>,
295        DataType::Int32 => primitive::extend_nulls::<i32>,
296        DataType::Int64 => primitive::extend_nulls::<i64>,
297        DataType::Float32 => primitive::extend_nulls::<f32>,
298        DataType::Float64 => primitive::extend_nulls::<f64>,
299        DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => {
300            primitive::extend_nulls::<i32>
301        }
302        DataType::Date64
303        | DataType::Time64(_)
304        | DataType::Timestamp(_, _)
305        | DataType::Duration(_)
306        | DataType::Interval(IntervalUnit::DayTime) => primitive::extend_nulls::<i64>,
307        DataType::Interval(IntervalUnit::MonthDayNano) => primitive::extend_nulls::<i128>,
308        DataType::Decimal32(_, _) => primitive::extend_nulls::<i32>,
309        DataType::Decimal64(_, _) => primitive::extend_nulls::<i64>,
310        DataType::Decimal128(_, _) => primitive::extend_nulls::<i128>,
311        DataType::Decimal256(_, _) => primitive::extend_nulls::<i256>,
312        DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::<i32>,
313        DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::<i64>,
314        DataType::BinaryView | DataType::Utf8View => primitive::extend_nulls::<u128>,
315        DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::<i32>,
316        DataType::ListView(_) | DataType::LargeListView(_) => {
317            unimplemented!("ListView/LargeListView not implemented")
318        }
319        DataType::LargeList(_) => list::extend_nulls::<i64>,
320        DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
321            DataType::UInt8 => primitive::extend_nulls::<u8>,
322            DataType::UInt16 => primitive::extend_nulls::<u16>,
323            DataType::UInt32 => primitive::extend_nulls::<u32>,
324            DataType::UInt64 => primitive::extend_nulls::<u64>,
325            DataType::Int8 => primitive::extend_nulls::<i8>,
326            DataType::Int16 => primitive::extend_nulls::<i16>,
327            DataType::Int32 => primitive::extend_nulls::<i32>,
328            DataType::Int64 => primitive::extend_nulls::<i64>,
329            _ => unreachable!(),
330        },
331        DataType::Struct(_) => structure::extend_nulls,
332        DataType::FixedSizeBinary(_) => fixed_binary::extend_nulls,
333        DataType::Float16 => primitive::extend_nulls::<f16>,
334        DataType::FixedSizeList(_, _) => fixed_size_list::extend_nulls,
335        DataType::Union(_, mode) => match mode {
336            UnionMode::Sparse => union::extend_nulls_sparse,
337            UnionMode::Dense => union::extend_nulls_dense,
338        },
339        DataType::RunEndEncoded(_, _) => run::extend_nulls,
340    })
341}
342
343fn preallocate_offset_and_binary_buffer<Offset: ArrowNativeType + Integer>(
344    capacity: usize,
345    binary_size: usize,
346) -> [MutableBuffer; 2] {
347    // offsets
348    let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<Offset>());
349    // safety: `unsafe` code assumes that this buffer is initialized with one element
350    buffer.push(Offset::zero());
351
352    [
353        buffer,
354        MutableBuffer::new(binary_size * mem::size_of::<u8>()),
355    ]
356}
357
358/// Define capacities to pre-allocate for child data or data buffers.
359#[derive(Debug, Clone)]
360pub enum Capacities {
361    /// Binary, Utf8 and LargeUtf8 data types
362    ///
363    /// Defines
364    /// * the capacity of the array offsets
365    /// * the capacity of the binary/ str buffer
366    Binary(usize, Option<usize>),
367    /// List and LargeList data types
368    ///
369    /// Defines
370    /// * the capacity of the array offsets
371    /// * the capacity of the child data
372    List(usize, Option<Box<Capacities>>),
373    /// Struct type
374    ///
375    /// Defines
376    /// * the capacity of the array
377    /// * the capacities of the fields
378    Struct(usize, Option<Vec<Capacities>>),
379    /// Dictionary type
380    ///
381    /// Defines
382    /// * the capacity of the array/keys
383    /// * the capacity of the values
384    Dictionary(usize, Option<Box<Capacities>>),
385    /// Don't preallocate inner buffers and rely on array growth strategy
386    Array(usize),
387}
388
389impl<'a> MutableArrayData<'a> {
390    /// Returns a new [MutableArrayData] with capacity to `capacity` slots and
391    /// specialized to create an [ArrayData] from multiple `arrays`.
392    ///
393    /// # Arguments
394    /// * `arrays` - the source arrays to copy from
395    /// * `use_nulls` - a flag used to optimize insertions
396    ///   - `false` if the only source of nulls are the arrays themselves
397    ///   - `true` if the user plans to call [MutableArrayData::extend_nulls].
398    /// * capacity - the preallocated capacity of the output array, in bytes
399    ///
400    /// Thus, if `use_nulls` is `false`, calling
401    /// [MutableArrayData::extend_nulls] should not be used.
402    pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
403        Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity))
404    }
405
406    /// Similar to [MutableArrayData::new], but lets users define the
407    /// preallocated capacities of the array with more granularity.
408    ///
409    /// See [MutableArrayData::new] for more information on the arguments.
410    ///
411    /// # Panics
412    ///
413    /// This function panics if the given `capacities` don't match the data type
414    /// of `arrays`. Or when a [Capacities] variant is not yet supported.
415    pub fn with_capacities(
416        arrays: Vec<&'a ArrayData>,
417        use_nulls: bool,
418        capacities: Capacities,
419    ) -> Self {
420        let data_type = arrays[0].data_type();
421
422        for a in arrays.iter().skip(1) {
423            assert_eq!(
424                data_type,
425                a.data_type(),
426                "Arrays with inconsistent types passed to MutableArrayData"
427            )
428        }
429
430        // if any of the arrays has nulls, insertions from any array requires setting bits
431        // as there is at least one array with nulls.
432        let use_nulls = use_nulls | arrays.iter().any(|array| array.null_count() > 0);
433
434        let mut array_capacity;
435
436        let [buffer1, buffer2] = match (data_type, &capacities) {
437            (
438                DataType::LargeUtf8 | DataType::LargeBinary,
439                Capacities::Binary(capacity, Some(value_cap)),
440            ) => {
441                array_capacity = *capacity;
442                preallocate_offset_and_binary_buffer::<i64>(*capacity, *value_cap)
443            }
444            (DataType::Utf8 | DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => {
445                array_capacity = *capacity;
446                preallocate_offset_and_binary_buffer::<i32>(*capacity, *value_cap)
447            }
448            (_, Capacities::Array(capacity)) => {
449                array_capacity = *capacity;
450                new_buffers(data_type, *capacity)
451            }
452            (
453                DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _),
454                Capacities::List(capacity, _),
455            ) => {
456                array_capacity = *capacity;
457                new_buffers(data_type, *capacity)
458            }
459            _ => panic!("Capacities: {capacities:?} not yet supported"),
460        };
461
462        let child_data = match &data_type {
463            DataType::Decimal32(_, _)
464            | DataType::Decimal64(_, _)
465            | DataType::Decimal128(_, _)
466            | DataType::Decimal256(_, _)
467            | DataType::Null
468            | DataType::Boolean
469            | DataType::UInt8
470            | DataType::UInt16
471            | DataType::UInt32
472            | DataType::UInt64
473            | DataType::Int8
474            | DataType::Int16
475            | DataType::Int32
476            | DataType::Int64
477            | DataType::Float16
478            | DataType::Float32
479            | DataType::Float64
480            | DataType::Date32
481            | DataType::Date64
482            | DataType::Time32(_)
483            | DataType::Time64(_)
484            | DataType::Duration(_)
485            | DataType::Timestamp(_, _)
486            | DataType::Utf8
487            | DataType::Binary
488            | DataType::LargeUtf8
489            | DataType::LargeBinary
490            | DataType::BinaryView
491            | DataType::Utf8View
492            | DataType::Interval(_)
493            | DataType::FixedSizeBinary(_) => vec![],
494            DataType::ListView(_) | DataType::LargeListView(_) => {
495                unimplemented!("ListView/LargeListView not implemented")
496            }
497            DataType::Map(_, _) | DataType::List(_) | DataType::LargeList(_) => {
498                let children = arrays
499                    .iter()
500                    .map(|array| &array.child_data()[0])
501                    .collect::<Vec<_>>();
502
503                let capacities =
504                    if let Capacities::List(capacity, ref child_capacities) = capacities {
505                        child_capacities
506                            .clone()
507                            .map(|c| *c)
508                            .unwrap_or(Capacities::Array(capacity))
509                    } else {
510                        Capacities::Array(array_capacity)
511                    };
512
513                vec![MutableArrayData::with_capacities(
514                    children, use_nulls, capacities,
515                )]
516            }
517            // the dictionary type just appends keys and clones the values.
518            DataType::Dictionary(_, _) => vec![],
519            DataType::Struct(fields) => match capacities {
520                Capacities::Struct(capacity, Some(ref child_capacities)) => {
521                    array_capacity = capacity;
522                    (0..fields.len())
523                        .zip(child_capacities)
524                        .map(|(i, child_cap)| {
525                            let child_arrays = arrays
526                                .iter()
527                                .map(|array| &array.child_data()[i])
528                                .collect::<Vec<_>>();
529                            MutableArrayData::with_capacities(
530                                child_arrays,
531                                use_nulls,
532                                child_cap.clone(),
533                            )
534                        })
535                        .collect::<Vec<_>>()
536                }
537                Capacities::Struct(capacity, None) => {
538                    array_capacity = capacity;
539                    (0..fields.len())
540                        .map(|i| {
541                            let child_arrays = arrays
542                                .iter()
543                                .map(|array| &array.child_data()[i])
544                                .collect::<Vec<_>>();
545                            MutableArrayData::new(child_arrays, use_nulls, capacity)
546                        })
547                        .collect::<Vec<_>>()
548                }
549                _ => (0..fields.len())
550                    .map(|i| {
551                        let child_arrays = arrays
552                            .iter()
553                            .map(|array| &array.child_data()[i])
554                            .collect::<Vec<_>>();
555                        MutableArrayData::new(child_arrays, use_nulls, array_capacity)
556                    })
557                    .collect::<Vec<_>>(),
558            },
559            DataType::RunEndEncoded(_, _) => {
560                let run_ends_child = arrays
561                    .iter()
562                    .map(|array| &array.child_data()[0])
563                    .collect::<Vec<_>>();
564                let value_child = arrays
565                    .iter()
566                    .map(|array| &array.child_data()[1])
567                    .collect::<Vec<_>>();
568                vec![
569                    MutableArrayData::new(run_ends_child, false, array_capacity),
570                    MutableArrayData::new(value_child, use_nulls, array_capacity),
571                ]
572            }
573            DataType::FixedSizeList(_, size) => {
574                let children = arrays
575                    .iter()
576                    .map(|array| &array.child_data()[0])
577                    .collect::<Vec<_>>();
578                let capacities =
579                    if let Capacities::List(capacity, ref child_capacities) = capacities {
580                        child_capacities
581                            .clone()
582                            .map(|c| *c)
583                            .unwrap_or(Capacities::Array(capacity * *size as usize))
584                    } else {
585                        Capacities::Array(array_capacity * *size as usize)
586                    };
587                vec![MutableArrayData::with_capacities(
588                    children, use_nulls, capacities,
589                )]
590            }
591            DataType::Union(fields, _) => (0..fields.len())
592                .map(|i| {
593                    let child_arrays = arrays
594                        .iter()
595                        .map(|array| &array.child_data()[i])
596                        .collect::<Vec<_>>();
597                    MutableArrayData::new(child_arrays, use_nulls, array_capacity)
598                })
599                .collect::<Vec<_>>(),
600        };
601
602        // Get the dictionary if any, and if it is a concatenation of multiple
603        let (dictionary, dict_concat) = match &data_type {
604            DataType::Dictionary(_, _) => {
605                // If more than one dictionary, concatenate dictionaries together
606                let dict_concat = !arrays
607                    .windows(2)
608                    .all(|a| a[0].child_data()[0].ptr_eq(&a[1].child_data()[0]));
609
610                match dict_concat {
611                    false => (Some(arrays[0].child_data()[0].clone()), false),
612                    true => {
613                        if let Capacities::Dictionary(_, _) = capacities {
614                            panic!("dictionary capacity not yet supported")
615                        }
616                        let dictionaries: Vec<_> =
617                            arrays.iter().map(|array| &array.child_data()[0]).collect();
618                        let lengths: Vec<_> = dictionaries
619                            .iter()
620                            .map(|dictionary| dictionary.len())
621                            .collect();
622                        let capacity = lengths.iter().sum();
623
624                        let mut mutable = MutableArrayData::new(dictionaries, false, capacity);
625
626                        for (i, len) in lengths.iter().enumerate() {
627                            mutable.extend(i, 0, *len)
628                        }
629
630                        (Some(mutable.freeze()), true)
631                    }
632                }
633            }
634            _ => (None, false),
635        };
636
637        let variadic_data_buffers = match &data_type {
638            DataType::BinaryView | DataType::Utf8View => arrays
639                .iter()
640                .flat_map(|x| x.buffers().iter().skip(1))
641                .map(Buffer::clone)
642                .collect(),
643            _ => vec![],
644        };
645
646        let extend_nulls = build_extend_nulls(data_type);
647
648        let extend_null_bits = arrays
649            .iter()
650            .map(|array| build_extend_null_bits(array, use_nulls))
651            .collect();
652
653        let null_buffer = use_nulls.then(|| {
654            let null_bytes = bit_util::ceil(array_capacity, 8);
655            MutableBuffer::from_len_zeroed(null_bytes)
656        });
657
658        let extend_values = match &data_type {
659            DataType::Dictionary(_, _) => {
660                let mut next_offset = 0;
661                let extend_values: Result<Vec<_>, _> = arrays
662                    .iter()
663                    .map(|array| {
664                        let offset = next_offset;
665                        let dict_len = array.child_data()[0].len();
666
667                        if dict_concat {
668                            next_offset += dict_len;
669                        }
670
671                        build_extend_dictionary(array, offset, offset + dict_len)
672                            .ok_or(ArrowError::DictionaryKeyOverflowError)
673                    })
674                    .collect();
675
676                extend_values.expect("MutableArrayData::new is infallible")
677            }
678            DataType::BinaryView | DataType::Utf8View => {
679                let mut next_offset = 0u32;
680                arrays
681                    .iter()
682                    .map(|arr| {
683                        let num_data_buffers = (arr.buffers().len() - 1) as u32;
684                        let offset = next_offset;
685                        next_offset = next_offset
686                            .checked_add(num_data_buffers)
687                            .expect("view buffer index overflow");
688                        build_extend_view(arr, offset)
689                    })
690                    .collect()
691            }
692            _ => arrays.iter().map(|array| build_extend(array)).collect(),
693        };
694
695        let data = _MutableArrayData {
696            data_type: data_type.clone(),
697            len: 0,
698            null_count: 0,
699            null_buffer,
700            buffer1,
701            buffer2,
702            child_data,
703        };
704        Self {
705            arrays,
706            data,
707            dictionary,
708            variadic_data_buffers,
709            extend_values,
710            extend_null_bits,
711            extend_nulls,
712        }
713    }
714
715    /// Extends the in progress array with a region of the input arrays
716    ///
717    /// # Arguments
718    /// * `index` - the index of array that you what to copy values from
719    /// * `start` - the start index of the chunk (inclusive)
720    /// * `end` - the end index of the chunk (exclusive)
721    ///
722    /// # Panic
723    /// This function panics if there is an invalid index,
724    /// i.e. `index` >= the number of source arrays
725    /// or `end` > the length of the `index`th array
726    pub fn extend(&mut self, index: usize, start: usize, end: usize) {
727        let len = end - start;
728        (self.extend_null_bits[index])(&mut self.data, start, len);
729        (self.extend_values[index])(&mut self.data, index, start, len);
730        self.data.len += len;
731    }
732
733    /// Extends the in progress array with null elements, ignoring the input arrays.
734    ///
735    /// # Panics
736    ///
737    /// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays
738    pub fn extend_nulls(&mut self, len: usize) {
739        self.data.len += len;
740        let bit_len = bit_util::ceil(self.data.len, 8);
741        let nulls = self.data.null_buffer();
742        nulls.resize(bit_len, 0);
743        self.data.null_count += len;
744        (self.extend_nulls)(&mut self.data, len);
745    }
746
747    /// Returns the current length
748    #[inline]
749    pub fn len(&self) -> usize {
750        self.data.len
751    }
752
753    /// Returns true if len is 0
754    #[inline]
755    pub fn is_empty(&self) -> bool {
756        self.data.len == 0
757    }
758
759    /// Returns the current null count
760    #[inline]
761    pub fn null_count(&self) -> usize {
762        self.data.null_count
763    }
764
765    /// Creates a [ArrayData] from the in progress array, consuming `self`.
766    pub fn freeze(self) -> ArrayData {
767        unsafe { self.into_builder().build_unchecked() }
768    }
769
770    /// Consume self and returns the in progress array as [`ArrayDataBuilder`].
771    ///
772    /// This is useful for extending the default behavior of MutableArrayData.
773    pub fn into_builder(self) -> ArrayDataBuilder {
774        let data = self.data;
775
776        let buffers = match data.data_type {
777            DataType::Null
778            | DataType::Struct(_)
779            | DataType::FixedSizeList(_, _)
780            | DataType::RunEndEncoded(_, _) => {
781                vec![]
782            }
783            DataType::BinaryView | DataType::Utf8View => {
784                let mut b = self.variadic_data_buffers;
785                b.insert(0, data.buffer1.into());
786                b
787            }
788            DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => {
789                vec![data.buffer1.into(), data.buffer2.into()]
790            }
791            DataType::Union(_, mode) => {
792                match mode {
793                    // Based on Union's DataTypeLayout
794                    UnionMode::Sparse => vec![data.buffer1.into()],
795                    UnionMode::Dense => vec![data.buffer1.into(), data.buffer2.into()],
796                }
797            }
798            _ => vec![data.buffer1.into()],
799        };
800
801        let child_data = match data.data_type {
802            DataType::Dictionary(_, _) => vec![self.dictionary.unwrap()],
803            _ => data.child_data.into_iter().map(|x| x.freeze()).collect(),
804        };
805
806        let nulls = match data.data_type {
807            // RunEndEncoded and Null arrays cannot have top-level null bitmasks
808            DataType::RunEndEncoded(_, _) | DataType::Null => None,
809            _ => data
810                .null_buffer
811                .map(|nulls| {
812                    let bools = BooleanBuffer::new(nulls.into(), 0, data.len);
813                    unsafe { NullBuffer::new_unchecked(bools, data.null_count) }
814                })
815                .filter(|n| n.null_count() > 0),
816        };
817
818        ArrayDataBuilder::new(data.data_type)
819            .offset(0)
820            .len(data.len)
821            .nulls(nulls)
822            .buffers(buffers)
823            .child_data(child_data)
824    }
825}
826
827// See arrow/tests/array_transform.rs for tests of transform functionality
828
829#[cfg(test)]
830mod test {
831    use super::*;
832    use arrow_schema::Field;
833    use std::sync::Arc;
834
835    #[test]
836    fn test_list_append_with_capacities() {
837        let array = ArrayData::new_empty(&DataType::List(Arc::new(Field::new(
838            "element",
839            DataType::Int64,
840            false,
841        ))));
842
843        let mutable = MutableArrayData::with_capacities(
844            vec![&array],
845            false,
846            Capacities::List(6, Some(Box::new(Capacities::Array(17)))),
847        );
848
849        // capacities are rounded up to multiples of 64 by MutableBuffer
850        assert_eq!(mutable.data.buffer1.capacity(), 64);
851        assert_eq!(mutable.data.child_data[0].data.buffer1.capacity(), 192);
852    }
853}