arrow_select/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines concat kernel for `ArrayRef`
19//!
20//! Example:
21//!
22//! ```
23//! use arrow_array::{ArrayRef, StringArray};
24//! use arrow_select::concat::concat;
25//!
26//! let arr = concat(&[
27//!     &StringArray::from(vec!["hello", "world"]),
28//!     &StringArray::from(vec!["!"]),
29//! ]).unwrap();
30//! assert_eq!(arr.len(), 3);
31//! ```
32
33use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
34use arrow_array::builder::{
35    BooleanBuilder, GenericByteBuilder, GenericByteViewBuilder, PrimitiveBuilder,
36};
37use arrow_array::cast::AsArray;
38use arrow_array::types::*;
39use arrow_array::*;
40use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, OffsetBuffer};
41use arrow_data::ArrayDataBuilder;
42use arrow_data::transform::{Capacities, MutableArrayData};
43use arrow_schema::{ArrowError, DataType, FieldRef, Fields, SchemaRef};
44use std::{collections::HashSet, ops::Add, sync::Arc};
45
46fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
47    let mut item_capacity = 0;
48    let mut bytes_capacity = 0;
49    for array in arrays {
50        let a = array.as_bytes::<T>();
51
52        // Guaranteed to always have at least one element
53        let offsets = a.value_offsets();
54        bytes_capacity += offsets[offsets.len() - 1].as_usize() - offsets[0].as_usize();
55        item_capacity += a.len()
56    }
57
58    Capacities::Binary(item_capacity, Some(bytes_capacity))
59}
60
61fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
62    if let DataType::FixedSizeList(f, _) = data_type {
63        let item_capacity = arrays.iter().map(|a| a.len()).sum();
64        let child_data_type = f.data_type();
65        match child_data_type {
66            // These types should match the types that `get_capacity`
67            // has special handling for.
68            DataType::Utf8
69            | DataType::LargeUtf8
70            | DataType::Binary
71            | DataType::LargeBinary
72            | DataType::FixedSizeList(_, _) => {
73                let values: Vec<&dyn arrow_array::Array> = arrays
74                    .iter()
75                    .map(|a| a.as_fixed_size_list().values().as_ref())
76                    .collect();
77                Capacities::List(
78                    item_capacity,
79                    Some(Box::new(get_capacity(&values, child_data_type))),
80                )
81            }
82            _ => Capacities::Array(item_capacity),
83        }
84    } else {
85        unreachable!("illegal data type for fixed size list")
86    }
87}
88
89fn concat_byte_view<B: ByteViewType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
90    let mut builder =
91        GenericByteViewBuilder::<B>::with_capacity(arrays.iter().map(|a| a.len()).sum());
92    for &array in arrays.iter() {
93        builder.append_array(array.as_byte_view());
94    }
95    Ok(Arc::new(builder.finish()))
96}
97
98fn concat_dictionaries<K: ArrowDictionaryKeyType>(
99    arrays: &[&dyn Array],
100) -> Result<ArrayRef, ArrowError> {
101    let mut output_len = 0;
102    let dictionaries: Vec<_> = arrays
103        .iter()
104        .map(|x| x.as_dictionary::<K>())
105        .inspect(|d| output_len += d.len())
106        .collect();
107
108    if !should_merge_dictionary_values::<K>(&dictionaries, output_len) {
109        return concat_fallback(arrays, Capacities::Array(output_len));
110    }
111
112    let merged = merge_dictionary_values(&dictionaries, None)?;
113
114    // Recompute keys
115    let mut key_values = Vec::with_capacity(output_len);
116
117    let mut has_nulls = false;
118    for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) {
119        has_nulls |= d.null_count() != 0;
120        for key in d.keys().values() {
121            // Use get to safely handle nulls
122            key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default())
123        }
124    }
125
126    let nulls = has_nulls.then(|| {
127        let mut nulls = BooleanBufferBuilder::new(output_len);
128        for d in &dictionaries {
129            match d.nulls() {
130                Some(n) => nulls.append_buffer(n.inner()),
131                None => nulls.append_n(d.len(), true),
132            }
133        }
134        NullBuffer::new(nulls.finish())
135    });
136
137    let keys = PrimitiveArray::<K>::try_new(key_values.into(), nulls)?;
138    // Sanity check
139    assert_eq!(keys.len(), output_len);
140
141    let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };
142    Ok(Arc::new(array))
143}
144
145fn concat_lists<OffsetSize: OffsetSizeTrait>(
146    arrays: &[&dyn Array],
147    field: &FieldRef,
148) -> Result<ArrayRef, ArrowError> {
149    let mut output_len = 0;
150    let mut list_has_nulls = false;
151    let mut list_has_slices = false;
152
153    let lists = arrays
154        .iter()
155        .map(|x| x.as_list::<OffsetSize>())
156        .inspect(|l| {
157            output_len += l.len();
158            list_has_nulls |= l.null_count() != 0;
159            list_has_slices |= l.offsets()[0] > OffsetSize::zero()
160                || l.offsets().last().unwrap().as_usize() < l.values().len();
161        })
162        .collect::<Vec<_>>();
163
164    let lists_nulls = list_has_nulls.then(|| {
165        let mut nulls = BooleanBufferBuilder::new(output_len);
166        for l in &lists {
167            match l.nulls() {
168                Some(n) => nulls.append_buffer(n.inner()),
169                None => nulls.append_n(l.len(), true),
170            }
171        }
172        NullBuffer::new(nulls.finish())
173    });
174
175    // If any of the lists have slices, we need to slice the values
176    // to ensure that the offsets are correct
177    let mut sliced_values;
178    let values: Vec<&dyn Array> = if list_has_slices {
179        sliced_values = Vec::with_capacity(lists.len());
180        for l in &lists {
181            // if the first offset is non-zero, we need to slice the values so when
182            // we concatenate them below only the relevant values are included
183            let offsets = l.offsets();
184            let start_offset = offsets[0].as_usize();
185            let end_offset = offsets.last().unwrap().as_usize();
186            sliced_values.push(l.values().slice(start_offset, end_offset - start_offset));
187        }
188        sliced_values.iter().map(|a| a.as_ref()).collect()
189    } else {
190        lists.iter().map(|x| x.values().as_ref()).collect()
191    };
192
193    let concatenated_values = concat(values.as_slice())?;
194
195    // Merge value offsets from the lists
196    let value_offset_buffer =
197        OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths()));
198
199    let array = GenericListArray::<OffsetSize>::try_new(
200        Arc::clone(field),
201        value_offset_buffer,
202        concatenated_values,
203        lists_nulls,
204    )?;
205
206    Ok(Arc::new(array))
207}
208
209fn concat_primitives<T: ArrowPrimitiveType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
210    let mut builder = PrimitiveBuilder::<T>::with_capacity(arrays.iter().map(|a| a.len()).sum())
211        .with_data_type(arrays[0].data_type().clone());
212
213    for array in arrays {
214        builder.append_array(array.as_primitive());
215    }
216
217    Ok(Arc::new(builder.finish()))
218}
219
220fn concat_boolean(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
221    let mut builder = BooleanBuilder::with_capacity(arrays.iter().map(|a| a.len()).sum());
222
223    for array in arrays {
224        builder.append_array(array.as_boolean());
225    }
226
227    Ok(Arc::new(builder.finish()))
228}
229
230fn concat_bytes<T: ByteArrayType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
231    let (item_capacity, bytes_capacity) = match binary_capacity::<T>(arrays) {
232        Capacities::Binary(item_capacity, Some(bytes_capacity)) => (item_capacity, bytes_capacity),
233        _ => unreachable!(),
234    };
235
236    let mut builder = GenericByteBuilder::<T>::with_capacity(item_capacity, bytes_capacity);
237
238    for array in arrays {
239        builder.append_array(array.as_bytes::<T>())?;
240    }
241
242    Ok(Arc::new(builder.finish()))
243}
244
245fn concat_structs(arrays: &[&dyn Array], fields: &Fields) -> Result<ArrayRef, ArrowError> {
246    let mut len = 0;
247    let mut has_nulls = false;
248    let structs = arrays
249        .iter()
250        .map(|a| {
251            len += a.len();
252            has_nulls |= a.null_count() > 0;
253            a.as_struct()
254        })
255        .collect::<Vec<_>>();
256
257    let nulls = has_nulls.then(|| {
258        let mut b = BooleanBufferBuilder::new(len);
259        for s in &structs {
260            match s.nulls() {
261                Some(n) => b.append_buffer(n.inner()),
262                None => b.append_n(s.len(), true),
263            }
264        }
265        NullBuffer::new(b.finish())
266    });
267
268    let column_concat_result = (0..fields.len())
269        .map(|i| {
270            let extracted_cols = structs
271                .iter()
272                .map(|s| s.column(i).as_ref())
273                .collect::<Vec<_>>();
274            concat(&extracted_cols)
275        })
276        .collect::<Result<Vec<_>, ArrowError>>()?;
277
278    Ok(Arc::new(StructArray::try_new_with_length(
279        fields.clone(),
280        column_concat_result,
281        nulls,
282        len,
283    )?))
284}
285
286/// Concatenate multiple RunArray instances into a single RunArray.
287///
288/// This function handles the special case of concatenating RunArrays by:
289/// 1. Collecting all run ends and values from input arrays
290/// 2. Adjusting run ends to account for the length of previous arrays
291/// 3. Creating a new RunArray with the combined data
292fn concat_run_arrays<R: RunEndIndexType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError>
293where
294    R::Native: Add<Output = R::Native>,
295{
296    let run_arrays: Vec<_> = arrays
297        .iter()
298        .map(|x| x.as_run::<R>())
299        .filter(|x| !x.run_ends().is_empty())
300        .collect();
301
302    // The run ends need to be adjusted by the sum of the lengths of the previous arrays.
303    let needed_run_end_adjustments = std::iter::once(R::default_value())
304        .chain(
305            run_arrays
306                .iter()
307                .scan(R::default_value(), |acc, run_array| {
308                    *acc = *acc + *run_array.run_ends().values().last().unwrap();
309                    Some(*acc)
310                }),
311        )
312        .collect::<Vec<_>>();
313
314    // This works out nicely to be the total (logical) length of the resulting array.
315    let total_len = needed_run_end_adjustments.last().unwrap().as_usize();
316
317    let run_ends_array =
318        PrimitiveArray::<R>::from_iter_values(run_arrays.iter().enumerate().flat_map(
319            move |(i, run_array)| {
320                let adjustment = needed_run_end_adjustments[i];
321                run_array
322                    .run_ends()
323                    .values()
324                    .iter()
325                    .map(move |run_end| *run_end + adjustment)
326            },
327        ));
328
329    let all_values = concat(
330        &run_arrays
331            .iter()
332            .map(|x| x.values().as_ref())
333            .collect::<Vec<_>>(),
334    )?;
335
336    let builder = ArrayDataBuilder::new(run_arrays[0].data_type().clone())
337        .len(total_len)
338        .child_data(vec![run_ends_array.into_data(), all_values.into_data()]);
339
340    // `build_unchecked` is used to avoid recursive validation of child arrays.
341    let array_data = unsafe { builder.build_unchecked() };
342    array_data.validate_data()?;
343
344    Ok(Arc::<RunArray<R>>::new(array_data.into()))
345}
346
347macro_rules! dict_helper {
348    ($t:ty, $arrays:expr) => {
349        return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
350    };
351}
352
353macro_rules! primitive_concat {
354    ($t:ty, $arrays:expr) => {
355        return Ok(Arc::new(concat_primitives::<$t>($arrays)?) as _)
356    };
357}
358
359fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
360    match data_type {
361        DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
362        DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
363        DataType::Binary => binary_capacity::<BinaryType>(arrays),
364        DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
365        DataType::FixedSizeList(_, _) => fixed_size_list_capacity(arrays, data_type),
366        _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
367    }
368}
369
370/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
371pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
372    if arrays.is_empty() {
373        return Err(ArrowError::ComputeError(
374            "concat requires input of at least one array".to_string(),
375        ));
376    } else if arrays.len() == 1 {
377        let array = arrays[0];
378        return Ok(array.slice(0, array.len()));
379    }
380
381    let d = arrays[0].data_type();
382    if arrays.iter().skip(1).any(|array| array.data_type() != d) {
383        // Create error message with up to 10 unique data types in the order they appear
384        let error_message = {
385            // 10 max unique data types to print and another 1 to know if there are more
386            let mut unique_data_types = HashSet::with_capacity(11);
387
388            let mut error_message =
389                format!("It is not possible to concatenate arrays of different data types ({d}");
390            unique_data_types.insert(d);
391
392            for array in arrays {
393                let is_unique = unique_data_types.insert(array.data_type());
394
395                if unique_data_types.len() == 11 {
396                    error_message.push_str(", ...");
397                    break;
398                }
399
400                if is_unique {
401                    error_message.push_str(", ");
402                    error_message.push_str(&array.data_type().to_string());
403                }
404            }
405
406            error_message.push_str(").");
407
408            error_message
409        };
410
411        return Err(ArrowError::InvalidArgumentError(error_message));
412    }
413
414    downcast_primitive! {
415        d => (primitive_concat, arrays),
416        DataType::Boolean => concat_boolean(arrays),
417        DataType::Dictionary(k, _) => {
418            downcast_integer! {
419                k.as_ref() => (dict_helper, arrays),
420                _ => unreachable!("illegal dictionary key type {k}")
421            }
422        }
423        DataType::List(field) => concat_lists::<i32>(arrays, field),
424        DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
425        DataType::Struct(fields) => concat_structs(arrays, fields),
426        DataType::Utf8 => concat_bytes::<Utf8Type>(arrays),
427        DataType::LargeUtf8 => concat_bytes::<LargeUtf8Type>(arrays),
428        DataType::Binary => concat_bytes::<BinaryType>(arrays),
429        DataType::LargeBinary => concat_bytes::<LargeBinaryType>(arrays),
430        DataType::RunEndEncoded(r, _) => {
431            // Handle RunEndEncoded arrays with special concat function
432            // We need to downcast based on the run end type
433            match r.data_type() {
434                DataType::Int16 => concat_run_arrays::<Int16Type>(arrays),
435                DataType::Int32 => concat_run_arrays::<Int32Type>(arrays),
436                DataType::Int64 => concat_run_arrays::<Int64Type>(arrays),
437                _ => unreachable!("Unsupported run end index type: {r:?}"),
438            }
439        }
440        DataType::Utf8View => concat_byte_view::<StringViewType>(arrays),
441        DataType::BinaryView => concat_byte_view::<BinaryViewType>(arrays),
442        _ => {
443            let capacity = get_capacity(arrays, d);
444            concat_fallback(arrays, capacity)
445        }
446    }
447}
448
449/// Concatenates arrays using MutableArrayData
450///
451/// This will naively concatenate dictionaries
452fn concat_fallback(arrays: &[&dyn Array], capacity: Capacities) -> Result<ArrayRef, ArrowError> {
453    let array_data: Vec<_> = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
454    let array_data = array_data.iter().collect();
455    let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity);
456
457    for (i, a) in arrays.iter().enumerate() {
458        mutable.extend(i, 0, a.len())
459    }
460
461    Ok(make_array(mutable.freeze()))
462}
463
464/// Concatenates `batches` together into a single [`RecordBatch`].
465///
466/// The output batch has the specified `schemas`; The schema of the
467/// input are ignored.
468///
469/// Returns an error if the types of underlying arrays are different.
470pub fn concat_batches<'a>(
471    schema: &SchemaRef,
472    input_batches: impl IntoIterator<Item = &'a RecordBatch>,
473) -> Result<RecordBatch, ArrowError> {
474    // When schema is empty, sum the number of the rows of all batches
475    if schema.fields().is_empty() {
476        let num_rows: usize = input_batches.into_iter().map(RecordBatch::num_rows).sum();
477        let mut options = RecordBatchOptions::default();
478        options.row_count = Some(num_rows);
479        return RecordBatch::try_new_with_options(schema.clone(), vec![], &options);
480    }
481
482    let batches: Vec<&RecordBatch> = input_batches.into_iter().collect();
483    if batches.is_empty() {
484        return Ok(RecordBatch::new_empty(schema.clone()));
485    }
486    let field_num = schema.fields().len();
487    let mut arrays = Vec::with_capacity(field_num);
488    for i in 0..field_num {
489        let array = concat(
490            &batches
491                .iter()
492                .map(|batch| batch.column(i).as_ref())
493                .collect::<Vec<_>>(),
494        )?;
495        arrays.push(array);
496    }
497    RecordBatch::try_new(schema.clone(), arrays)
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503    use arrow_array::builder::{GenericListBuilder, StringDictionaryBuilder};
504    use arrow_schema::{Field, Schema};
505    use std::fmt::Debug;
506
507    #[test]
508    fn test_concat_empty_vec() {
509        let re = concat(&[]);
510        assert!(re.is_err());
511    }
512
513    #[test]
514    fn test_concat_batches_no_columns() {
515        // Test concat using empty schema / batches without columns
516        let schema = Arc::new(Schema::empty());
517
518        let mut options = RecordBatchOptions::default();
519        options.row_count = Some(100);
520        let batch = RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
521        // put in 2 batches of 100 rows each
522        let re = concat_batches(&schema, &[batch.clone(), batch]).unwrap();
523
524        assert_eq!(re.num_rows(), 200);
525    }
526
527    #[test]
528    fn test_concat_one_element_vec() {
529        let arr = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
530            Some(-1),
531            Some(2),
532            None,
533        ])) as ArrayRef;
534        let result = concat(&[arr.as_ref()]).unwrap();
535        assert_eq!(
536            &arr, &result,
537            "concatenating single element array gives back the same result"
538        );
539    }
540
541    #[test]
542    fn test_concat_incompatible_datatypes() {
543        let re = concat(&[
544            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
545            // 2 string to make sure we only mention unique types
546            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
547            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
548            // Another type to make sure we are showing all the incompatible types
549            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
550        ]);
551
552        assert_eq!(
553            re.unwrap_err().to_string(),
554            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32)."
555        );
556    }
557
558    #[test]
559    fn test_concat_10_incompatible_datatypes_should_include_all_of_them() {
560        let re = concat(&[
561            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
562            // 2 string to make sure we only mention unique types
563            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
564            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
565            // Another type to make sure we are showing all the incompatible types
566            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
567            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
568            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
569            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
570            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
571            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
572            // Non unique
573            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
574            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
575            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
576        ]);
577
578        assert_eq!(
579            re.unwrap_err().to_string(),
580            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32)."
581        );
582    }
583
584    #[test]
585    fn test_concat_11_incompatible_datatypes_should_only_include_10() {
586        let re = concat(&[
587            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
588            // 2 string to make sure we only mention unique types
589            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
590            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
591            // Another type to make sure we are showing all the incompatible types
592            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
593            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
594            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
595            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
596            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
597            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
598            // Non unique
599            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
600            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
601            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
602            &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
603        ]);
604
605        assert_eq!(
606            re.unwrap_err().to_string(),
607            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...)."
608        );
609    }
610
611    #[test]
612    fn test_concat_13_incompatible_datatypes_should_not_include_all_of_them() {
613        let re = concat(&[
614            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
615            // 2 string to make sure we only mention unique types
616            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
617            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
618            // Another type to make sure we are showing all the incompatible types
619            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
620            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
621            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
622            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
623            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
624            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
625            // Non unique
626            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
627            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
628            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
629            &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
630            &PrimitiveArray::<Float16Type>::new_null(3),
631            &BooleanArray::from(vec![Some(true), Some(false), None]),
632        ]);
633
634        assert_eq!(
635            re.unwrap_err().to_string(),
636            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...)."
637        );
638    }
639
640    #[test]
641    fn test_concat_string_arrays() {
642        let arr = concat(&[
643            &StringArray::from(vec!["hello", "world"]),
644            &StringArray::from(vec!["2", "3", "4"]),
645            &StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]),
646        ])
647        .unwrap();
648
649        let expected_output = Arc::new(StringArray::from(vec![
650            Some("hello"),
651            Some("world"),
652            Some("2"),
653            Some("3"),
654            Some("4"),
655            Some("foo"),
656            Some("bar"),
657            None,
658            Some("baz"),
659        ])) as ArrayRef;
660
661        assert_eq!(&arr, &expected_output);
662    }
663
664    #[test]
665    fn test_concat_string_view_arrays() {
666        let arr = concat(&[
667            &StringViewArray::from(vec!["helloxxxxxxxxxxa", "world____________"]),
668            &StringViewArray::from(vec!["helloxxxxxxxxxxy", "3", "4"]),
669            &StringViewArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]),
670        ])
671        .unwrap();
672
673        let expected_output = Arc::new(StringViewArray::from(vec![
674            Some("helloxxxxxxxxxxa"),
675            Some("world____________"),
676            Some("helloxxxxxxxxxxy"),
677            Some("3"),
678            Some("4"),
679            Some("foo"),
680            Some("bar"),
681            None,
682            Some("baz"),
683        ])) as ArrayRef;
684
685        assert_eq!(&arr, &expected_output);
686    }
687
688    #[test]
689    fn test_concat_primitive_arrays() {
690        let arr = concat(&[
691            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None]),
692            &PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None]),
693            &PrimitiveArray::<Int64Type>::from(vec![Some(256), Some(512), Some(1024)]),
694        ])
695        .unwrap();
696
697        let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
698            Some(-1),
699            Some(-1),
700            Some(2),
701            None,
702            None,
703            Some(101),
704            Some(102),
705            Some(103),
706            None,
707            Some(256),
708            Some(512),
709            Some(1024),
710        ])) as ArrayRef;
711
712        assert_eq!(&arr, &expected_output);
713    }
714
715    #[test]
716    fn test_concat_primitive_array_slices() {
717        let input_1 =
718            PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None])
719                .slice(1, 3);
720
721        let input_2 =
722            PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None])
723                .slice(1, 3);
724        let arr = concat(&[&input_1, &input_2]).unwrap();
725
726        let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
727            Some(-1),
728            Some(2),
729            None,
730            Some(102),
731            Some(103),
732            None,
733        ])) as ArrayRef;
734
735        assert_eq!(&arr, &expected_output);
736    }
737
738    #[test]
739    fn test_concat_boolean_primitive_arrays() {
740        let arr = concat(&[
741            &BooleanArray::from(vec![
742                Some(true),
743                Some(true),
744                Some(false),
745                None,
746                None,
747                Some(false),
748            ]),
749            &BooleanArray::from(vec![None, Some(false), Some(true), Some(false)]),
750        ])
751        .unwrap();
752
753        let expected_output = Arc::new(BooleanArray::from(vec![
754            Some(true),
755            Some(true),
756            Some(false),
757            None,
758            None,
759            Some(false),
760            None,
761            Some(false),
762            Some(true),
763            Some(false),
764        ])) as ArrayRef;
765
766        assert_eq!(&arr, &expected_output);
767    }
768
769    #[test]
770    fn test_concat_primitive_list_arrays() {
771        let list1 = vec![
772            Some(vec![Some(-1), Some(-1), Some(2), None, None]),
773            Some(vec![]),
774            None,
775            Some(vec![Some(10)]),
776        ];
777        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
778
779        let list2 = vec![
780            None,
781            Some(vec![Some(100), None, Some(101)]),
782            Some(vec![Some(102)]),
783        ];
784        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
785
786        let list3 = vec![Some(vec![Some(1000), Some(1001)])];
787        let list3_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone());
788
789        let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
790
791        let expected = list1.into_iter().chain(list2).chain(list3);
792        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
793
794        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
795    }
796
797    #[test]
798    fn test_concat_primitive_list_arrays_slices() {
799        let list1 = vec![
800            Some(vec![Some(-1), Some(-1), Some(2), None, None]),
801            Some(vec![]), // In slice
802            None,         // In slice
803            Some(vec![Some(10)]),
804        ];
805        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
806        let list1_array = list1_array.slice(1, 2);
807        let list1_values = list1.into_iter().skip(1).take(2);
808
809        let list2 = vec![
810            None,
811            Some(vec![Some(100), None, Some(101)]),
812            Some(vec![Some(102)]),
813        ];
814        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
815
816        // verify that this test covers the case when the first offset is non zero
817        assert!(list1_array.offsets()[0].as_usize() > 0);
818        let array_result = concat(&[&list1_array, &list2_array]).unwrap();
819
820        let expected = list1_values.chain(list2);
821        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
822
823        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
824    }
825
826    #[test]
827    fn test_concat_primitive_list_arrays_sliced_lengths() {
828        let list1 = vec![
829            Some(vec![Some(-1), Some(-1), Some(2), None, None]), // In slice
830            Some(vec![]),                                        // In slice
831            None,                                                // In slice
832            Some(vec![Some(10)]),
833        ];
834        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
835        let list1_array = list1_array.slice(0, 3); // no offset, but not all values
836        let list1_values = list1.into_iter().take(3);
837
838        let list2 = vec![
839            None,
840            Some(vec![Some(100), None, Some(101)]),
841            Some(vec![Some(102)]),
842        ];
843        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
844
845        // verify that this test covers the case when the first offset is zero, but the
846        // last offset doesn't cover the entire array
847        assert_eq!(list1_array.offsets()[0].as_usize(), 0);
848        assert!(list1_array.offsets().last().unwrap().as_usize() < list1_array.values().len());
849        let array_result = concat(&[&list1_array, &list2_array]).unwrap();
850
851        let expected = list1_values.chain(list2);
852        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
853
854        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
855    }
856
857    #[test]
858    fn test_concat_primitive_fixed_size_list_arrays() {
859        let list1 = vec![
860            Some(vec![Some(-1), None]),
861            None,
862            Some(vec![Some(10), Some(20)]),
863        ];
864        let list1_array =
865            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone(), 2);
866
867        let list2 = vec![
868            None,
869            Some(vec![Some(100), None]),
870            Some(vec![Some(102), Some(103)]),
871        ];
872        let list2_array =
873            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone(), 2);
874
875        let list3 = vec![Some(vec![Some(1000), Some(1001)])];
876        let list3_array =
877            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone(), 2);
878
879        let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
880
881        let expected = list1.into_iter().chain(list2).chain(list3);
882        let array_expected =
883            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(expected, 2);
884
885        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
886    }
887
888    #[test]
889    fn test_concat_struct_arrays() {
890        let field = Arc::new(Field::new("field", DataType::Int64, true));
891        let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
892            Some(-1),
893            Some(-1),
894            Some(2),
895            None,
896            None,
897        ]));
898        let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
899
900        let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
901            Some(101),
902            Some(102),
903            Some(103),
904            None,
905        ]));
906        let input_struct_2 = StructArray::from(vec![(field.clone(), input_primitive_2)]);
907
908        let input_primitive_3: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
909            Some(256),
910            Some(512),
911            Some(1024),
912        ]));
913        let input_struct_3 = StructArray::from(vec![(field, input_primitive_3)]);
914
915        let arr = concat(&[&input_struct_1, &input_struct_2, &input_struct_3]).unwrap();
916
917        let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
918            Some(-1),
919            Some(-1),
920            Some(2),
921            None,
922            None,
923            Some(101),
924            Some(102),
925            Some(103),
926            None,
927            Some(256),
928            Some(512),
929            Some(1024),
930        ])) as ArrayRef;
931
932        let actual_primitive = arr
933            .as_any()
934            .downcast_ref::<StructArray>()
935            .unwrap()
936            .column(0);
937        assert_eq!(actual_primitive, &expected_primitive_output);
938    }
939
940    #[test]
941    fn test_concat_struct_array_slices() {
942        let field = Arc::new(Field::new("field", DataType::Int64, true));
943        let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
944            Some(-1),
945            Some(-1),
946            Some(2),
947            None,
948            None,
949        ]));
950        let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
951
952        let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
953            Some(101),
954            Some(102),
955            Some(103),
956            None,
957        ]));
958        let input_struct_2 = StructArray::from(vec![(field, input_primitive_2)]);
959
960        let arr = concat(&[&input_struct_1.slice(1, 3), &input_struct_2.slice(1, 2)]).unwrap();
961
962        let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
963            Some(-1),
964            Some(2),
965            None,
966            Some(102),
967            Some(103),
968        ])) as ArrayRef;
969
970        let actual_primitive = arr
971            .as_any()
972            .downcast_ref::<StructArray>()
973            .unwrap()
974            .column(0);
975        assert_eq!(actual_primitive, &expected_primitive_output);
976    }
977
978    #[test]
979    fn test_concat_struct_arrays_no_nulls() {
980        let input_1a = vec![1, 2, 3];
981        let input_1b = vec!["one", "two", "three"];
982        let input_2a = vec![4, 5, 6, 7];
983        let input_2b = vec!["four", "five", "six", "seven"];
984
985        let struct_from_primitives = |ints: Vec<i64>, strings: Vec<&str>| {
986            StructArray::try_from(vec![
987                ("ints", Arc::new(Int64Array::from(ints)) as _),
988                ("strings", Arc::new(StringArray::from(strings)) as _),
989            ])
990        };
991
992        let expected_output = struct_from_primitives(
993            [input_1a.clone(), input_2a.clone()].concat(),
994            [input_1b.clone(), input_2b.clone()].concat(),
995        )
996        .unwrap();
997
998        let input_1 = struct_from_primitives(input_1a, input_1b).unwrap();
999        let input_2 = struct_from_primitives(input_2a, input_2b).unwrap();
1000
1001        let arr = concat(&[&input_1, &input_2]).unwrap();
1002        let struct_result = arr.as_struct();
1003
1004        assert_eq!(struct_result, &expected_output);
1005        assert_eq!(arr.null_count(), 0);
1006    }
1007
1008    #[test]
1009    fn test_concat_struct_no_fields() {
1010        let input_1 = StructArray::new_empty_fields(10, None);
1011        let input_2 = StructArray::new_empty_fields(10, None);
1012        let arr = concat(&[&input_1, &input_2]).unwrap();
1013
1014        assert_eq!(arr.len(), 20);
1015        assert_eq!(arr.null_count(), 0);
1016
1017        let input1_valid = StructArray::new_empty_fields(10, Some(NullBuffer::new_valid(10)));
1018        let input2_null = StructArray::new_empty_fields(10, Some(NullBuffer::new_null(10)));
1019        let arr = concat(&[&input1_valid, &input2_null]).unwrap();
1020
1021        assert_eq!(arr.len(), 20);
1022        assert_eq!(arr.null_count(), 10);
1023    }
1024
1025    #[test]
1026    fn test_string_array_slices() {
1027        let input_1 = StringArray::from(vec!["hello", "A", "B", "C"]);
1028        let input_2 = StringArray::from(vec!["world", "D", "E", "Z"]);
1029
1030        let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
1031
1032        let expected_output = StringArray::from(vec!["A", "B", "C", "D", "E"]);
1033
1034        let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
1035        assert_eq!(actual_output, &expected_output);
1036    }
1037
1038    #[test]
1039    fn test_string_array_with_null_slices() {
1040        let input_1 = StringArray::from(vec![Some("hello"), None, Some("A"), Some("C")]);
1041        let input_2 = StringArray::from(vec![None, Some("world"), Some("D"), None]);
1042
1043        let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
1044
1045        let expected_output =
1046            StringArray::from(vec![None, Some("A"), Some("C"), Some("world"), Some("D")]);
1047
1048        let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
1049        assert_eq!(actual_output, &expected_output);
1050    }
1051
1052    fn collect_string_dictionary(array: &DictionaryArray<Int32Type>) -> Vec<Option<&str>> {
1053        let concrete = array.downcast_dict::<StringArray>().unwrap();
1054        concrete.into_iter().collect()
1055    }
1056
1057    #[test]
1058    fn test_string_dictionary_array() {
1059        let input_1: DictionaryArray<Int32Type> = vec!["hello", "A", "B", "hello", "hello", "C"]
1060            .into_iter()
1061            .collect();
1062        let input_2: DictionaryArray<Int32Type> = vec!["hello", "E", "E", "hello", "F", "E"]
1063            .into_iter()
1064            .collect();
1065
1066        let expected: Vec<_> = vec![
1067            "hello", "A", "B", "hello", "hello", "C", "hello", "E", "E", "hello", "F", "E",
1068        ]
1069        .into_iter()
1070        .map(Some)
1071        .collect();
1072
1073        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1074        let dictionary = concat.as_dictionary::<Int32Type>();
1075        let actual = collect_string_dictionary(dictionary);
1076        assert_eq!(actual, expected);
1077
1078        // Should have concatenated inputs together
1079        assert_eq!(
1080            dictionary.values().len(),
1081            input_1.values().len() + input_2.values().len(),
1082        )
1083    }
1084
1085    #[test]
1086    fn test_string_dictionary_array_nulls() {
1087        let input_1: DictionaryArray<Int32Type> = vec![Some("foo"), Some("bar"), None, Some("fiz")]
1088            .into_iter()
1089            .collect();
1090        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
1091        let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
1092
1093        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1094        let dictionary = concat.as_dictionary::<Int32Type>();
1095        let actual = collect_string_dictionary(dictionary);
1096        assert_eq!(actual, expected);
1097
1098        // Should have concatenated inputs together
1099        assert_eq!(
1100            dictionary.values().len(),
1101            input_1.values().len() + input_2.values().len(),
1102        )
1103    }
1104
1105    #[test]
1106    fn test_string_dictionary_array_nulls_in_values() {
1107        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
1108        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
1109        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1110
1111        let input_2_keys = Int32Array::from_iter_values([0]);
1112        let input_2_values = StringArray::from(vec![None, Some("hello")]);
1113        let input_2 = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1114
1115        let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
1116
1117        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1118        let dictionary = concat.as_dictionary::<Int32Type>();
1119        let actual = collect_string_dictionary(dictionary);
1120        assert_eq!(actual, expected);
1121    }
1122
1123    #[test]
1124    fn test_string_dictionary_merge() {
1125        let mut builder = StringDictionaryBuilder::<Int32Type>::new();
1126        for i in 0..20 {
1127            builder.append(i.to_string()).unwrap();
1128        }
1129        let input_1 = builder.finish();
1130
1131        let mut builder = StringDictionaryBuilder::<Int32Type>::new();
1132        for i in 0..30 {
1133            builder.append(i.to_string()).unwrap();
1134        }
1135        let input_2 = builder.finish();
1136
1137        let expected: Vec<_> = (0..20).chain(0..30).map(|x| x.to_string()).collect();
1138        let expected: Vec<_> = expected.iter().map(|x| Some(x.as_str())).collect();
1139
1140        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1141        let dictionary = concat.as_dictionary::<Int32Type>();
1142        let actual = collect_string_dictionary(dictionary);
1143        assert_eq!(actual, expected);
1144
1145        // Should have merged inputs together
1146        // Not 30 as this is done on a best-effort basis
1147        let values_len = dictionary.values().len();
1148        assert!((30..40).contains(&values_len), "{values_len}")
1149    }
1150
1151    #[test]
1152    fn test_primitive_dictionary_merge() {
1153        // Same value repeated 5 times.
1154        let keys = vec![1; 5];
1155        let values = (10..20).collect::<Vec<_>>();
1156        let dict = DictionaryArray::new(
1157            Int8Array::from(keys.clone()),
1158            Arc::new(Int32Array::from(values.clone())),
1159        );
1160        let other = DictionaryArray::new(
1161            Int8Array::from(keys.clone()),
1162            Arc::new(Int32Array::from(values.clone())),
1163        );
1164
1165        let result_same_dictionary = concat(&[&dict, &dict]).unwrap();
1166        // Verify pointer equality check succeeds, and therefore the
1167        // dictionaries are not merged. A single values buffer should be reused
1168        // in this case.
1169        assert!(
1170            dict.values().to_data().ptr_eq(
1171                &result_same_dictionary
1172                    .as_dictionary::<Int8Type>()
1173                    .values()
1174                    .to_data()
1175            )
1176        );
1177        assert_eq!(
1178            result_same_dictionary
1179                .as_dictionary::<Int8Type>()
1180                .values()
1181                .len(),
1182            values.len(),
1183        );
1184
1185        let result_cloned_dictionary = concat(&[&dict, &other]).unwrap();
1186        // Should have only 1 underlying value since all keys reference it.
1187        assert_eq!(
1188            result_cloned_dictionary
1189                .as_dictionary::<Int8Type>()
1190                .values()
1191                .len(),
1192            1
1193        );
1194    }
1195
1196    #[test]
1197    fn test_concat_string_sizes() {
1198        let a: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
1199        let b: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
1200        let c = LargeStringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
1201        // 150 * 3 = 450
1202        // 150 * 3 = 450
1203        // 3 * 3   = 9
1204        // ------------+
1205        // 909
1206
1207        let arr = concat(&[&a, &b, &c]).unwrap();
1208        assert_eq!(arr.to_data().buffers()[1].capacity(), 909);
1209    }
1210
1211    #[test]
1212    fn test_dictionary_concat_reuse() {
1213        let array: DictionaryArray<Int8Type> = vec!["a", "a", "b", "c"].into_iter().collect();
1214        let copy: DictionaryArray<Int8Type> = array.clone();
1215
1216        // dictionary is "a", "b", "c"
1217        assert_eq!(
1218            array.values(),
1219            &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef)
1220        );
1221        assert_eq!(array.keys(), &Int8Array::from(vec![0, 0, 1, 2]));
1222
1223        // concatenate it with itself
1224        let combined = concat(&[&copy as _, &array as _]).unwrap();
1225        let combined = combined.as_dictionary::<Int8Type>();
1226
1227        assert_eq!(
1228            combined.values(),
1229            &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef),
1230            "Actual: {combined:#?}"
1231        );
1232
1233        assert_eq!(
1234            combined.keys(),
1235            &Int8Array::from(vec![0, 0, 1, 2, 0, 0, 1, 2])
1236        );
1237
1238        // Should have reused the dictionary
1239        assert!(
1240            array
1241                .values()
1242                .to_data()
1243                .ptr_eq(&combined.values().to_data())
1244        );
1245        assert!(copy.values().to_data().ptr_eq(&combined.values().to_data()));
1246
1247        let new: DictionaryArray<Int8Type> = vec!["d"].into_iter().collect();
1248        let combined = concat(&[&copy as _, &array as _, &new as _]).unwrap();
1249        let com = combined.as_dictionary::<Int8Type>();
1250
1251        // Should not have reused the dictionary
1252        assert!(!array.values().to_data().ptr_eq(&com.values().to_data()));
1253        assert!(!copy.values().to_data().ptr_eq(&com.values().to_data()));
1254        assert!(!new.values().to_data().ptr_eq(&com.values().to_data()));
1255    }
1256
1257    #[test]
1258    fn concat_record_batches() {
1259        let schema = Arc::new(Schema::new(vec![
1260            Field::new("a", DataType::Int32, false),
1261            Field::new("b", DataType::Utf8, false),
1262        ]));
1263        let batch1 = RecordBatch::try_new(
1264            schema.clone(),
1265            vec![
1266                Arc::new(Int32Array::from(vec![1, 2])),
1267                Arc::new(StringArray::from(vec!["a", "b"])),
1268            ],
1269        )
1270        .unwrap();
1271        let batch2 = RecordBatch::try_new(
1272            schema.clone(),
1273            vec![
1274                Arc::new(Int32Array::from(vec![3, 4])),
1275                Arc::new(StringArray::from(vec!["c", "d"])),
1276            ],
1277        )
1278        .unwrap();
1279        let new_batch = concat_batches(&schema, [&batch1, &batch2]).unwrap();
1280        assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
1281        assert_eq!(2, new_batch.num_columns());
1282        assert_eq!(4, new_batch.num_rows());
1283        let new_batch_owned = concat_batches(&schema, &[batch1, batch2]).unwrap();
1284        assert_eq!(new_batch_owned.schema().as_ref(), schema.as_ref());
1285        assert_eq!(2, new_batch_owned.num_columns());
1286        assert_eq!(4, new_batch_owned.num_rows());
1287    }
1288
1289    #[test]
1290    fn concat_empty_record_batch() {
1291        let schema = Arc::new(Schema::new(vec![
1292            Field::new("a", DataType::Int32, false),
1293            Field::new("b", DataType::Utf8, false),
1294        ]));
1295        let batch = concat_batches(&schema, []).unwrap();
1296        assert_eq!(batch.schema().as_ref(), schema.as_ref());
1297        assert_eq!(0, batch.num_rows());
1298    }
1299
1300    #[test]
1301    fn concat_record_batches_of_different_schemas_but_compatible_data() {
1302        let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1303        // column names differ
1304        let schema2 = Arc::new(Schema::new(vec![Field::new("c", DataType::Int32, false)]));
1305        let batch1 = RecordBatch::try_new(
1306            schema1.clone(),
1307            vec![Arc::new(Int32Array::from(vec![1, 2]))],
1308        )
1309        .unwrap();
1310        let batch2 =
1311            RecordBatch::try_new(schema2, vec![Arc::new(Int32Array::from(vec![3, 4]))]).unwrap();
1312        // concat_batches simply uses the schema provided
1313        let batch = concat_batches(&schema1, [&batch1, &batch2]).unwrap();
1314        assert_eq!(batch.schema().as_ref(), schema1.as_ref());
1315        assert_eq!(4, batch.num_rows());
1316    }
1317
1318    #[test]
1319    fn concat_record_batches_of_different_schemas_incompatible_data() {
1320        let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1321        // column names differ
1322        let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
1323        let batch1 = RecordBatch::try_new(
1324            schema1.clone(),
1325            vec![Arc::new(Int32Array::from(vec![1, 2]))],
1326        )
1327        .unwrap();
1328        let batch2 = RecordBatch::try_new(
1329            schema2,
1330            vec![Arc::new(StringArray::from(vec!["foo", "bar"]))],
1331        )
1332        .unwrap();
1333
1334        let error = concat_batches(&schema1, [&batch1, &batch2]).unwrap_err();
1335        assert_eq!(
1336            error.to_string(),
1337            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int32, Utf8)."
1338        );
1339    }
1340
1341    #[test]
1342    fn concat_capacity() {
1343        let a = Int32Array::from_iter_values(0..100);
1344        let b = Int32Array::from_iter_values(10..20);
1345        let a = concat(&[&a, &b]).unwrap();
1346        let data = a.to_data();
1347        assert_eq!(data.buffers()[0].len(), 440);
1348        assert_eq!(data.buffers()[0].capacity(), 440);
1349
1350        let a = concat(&[&a.slice(10, 20), &b]).unwrap();
1351        let data = a.to_data();
1352        assert_eq!(data.buffers()[0].len(), 120);
1353        assert_eq!(data.buffers()[0].capacity(), 120);
1354
1355        let a = StringArray::from_iter_values(std::iter::repeat_n("foo", 100));
1356        let b = StringArray::from(vec!["bingo", "bongo", "lorem", ""]);
1357
1358        let a = concat(&[&a, &b]).unwrap();
1359        let data = a.to_data();
1360        // (100 + 4 + 1) * size_of<i32>()
1361        assert_eq!(data.buffers()[0].len(), 420);
1362        assert_eq!(data.buffers()[0].capacity(), 420);
1363
1364        // len("foo") * 100 + len("bingo") + len("bongo") + len("lorem")
1365        assert_eq!(data.buffers()[1].len(), 315);
1366        assert_eq!(data.buffers()[1].capacity(), 315);
1367
1368        let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1369        let data = a.to_data();
1370        // (40 + 4 + 5) * size_of<i32>()
1371        assert_eq!(data.buffers()[0].len(), 180);
1372        assert_eq!(data.buffers()[0].capacity(), 180);
1373
1374        // len("foo") * 40 + len("bingo") + len("bongo") + len("lorem")
1375        assert_eq!(data.buffers()[1].len(), 135);
1376        assert_eq!(data.buffers()[1].capacity(), 135);
1377
1378        let a = LargeBinaryArray::from_iter_values(std::iter::repeat_n(b"foo", 100));
1379        let b = LargeBinaryArray::from_iter_values(std::iter::repeat_n(b"cupcakes", 10));
1380
1381        let a = concat(&[&a, &b]).unwrap();
1382        let data = a.to_data();
1383        // (100 + 10 + 1) * size_of<i64>()
1384        assert_eq!(data.buffers()[0].len(), 888);
1385        assert_eq!(data.buffers()[0].capacity(), 888);
1386
1387        // len("foo") * 100 + len("cupcakes") * 10
1388        assert_eq!(data.buffers()[1].len(), 380);
1389        assert_eq!(data.buffers()[1].capacity(), 380);
1390
1391        let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1392        let data = a.to_data();
1393        // (40 + 10 + 1) * size_of<i64>()
1394        assert_eq!(data.buffers()[0].len(), 408);
1395        assert_eq!(data.buffers()[0].capacity(), 408);
1396
1397        // len("foo") * 40 + len("cupcakes") * 10
1398        assert_eq!(data.buffers()[1].len(), 200);
1399        assert_eq!(data.buffers()[1].capacity(), 200);
1400    }
1401
1402    #[test]
1403    fn concat_sparse_nulls() {
1404        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
1405        let keys = Int32Array::from(vec![1; 10]);
1406        let dict_a = DictionaryArray::new(keys, Arc::new(values));
1407        let values = StringArray::new_null(0);
1408        let keys = Int32Array::new_null(10);
1409        let dict_b = DictionaryArray::new(keys, Arc::new(values));
1410        let array = concat(&[&dict_a, &dict_b]).unwrap();
1411        assert_eq!(array.null_count(), 10);
1412        assert_eq!(array.logical_null_count(), 10);
1413    }
1414
1415    #[test]
1416    fn concat_dictionary_list_array_simple() {
1417        let scalars = vec![
1418            create_single_row_list_of_dict(vec![Some("a")]),
1419            create_single_row_list_of_dict(vec![Some("a")]),
1420            create_single_row_list_of_dict(vec![Some("b")]),
1421        ];
1422
1423        let arrays = scalars.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
1424        let concat_res = concat(arrays.as_slice()).unwrap();
1425
1426        let expected_list = create_list_of_dict(vec![
1427            // Row 1
1428            Some(vec![Some("a")]),
1429            Some(vec![Some("a")]),
1430            Some(vec![Some("b")]),
1431        ]);
1432
1433        let list = concat_res.as_list::<i32>();
1434
1435        // Assert that the list is equal to the expected list
1436        list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1437            assert_eq!(a, b);
1438        });
1439
1440        assert_dictionary_has_unique_values::<_, StringArray>(
1441            list.values().as_dictionary::<Int32Type>(),
1442        );
1443    }
1444
1445    #[test]
1446    fn concat_many_dictionary_list_arrays() {
1447        let number_of_unique_values = 8;
1448        let scalars = (0..80000)
1449            .map(|i| {
1450                create_single_row_list_of_dict(vec![Some(
1451                    (i % number_of_unique_values).to_string(),
1452                )])
1453            })
1454            .collect::<Vec<_>>();
1455
1456        let arrays = scalars.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
1457        let concat_res = concat(arrays.as_slice()).unwrap();
1458
1459        let expected_list = create_list_of_dict(
1460            (0..80000)
1461                .map(|i| Some(vec![Some((i % number_of_unique_values).to_string())]))
1462                .collect::<Vec<_>>(),
1463        );
1464
1465        let list = concat_res.as_list::<i32>();
1466
1467        // Assert that the list is equal to the expected list
1468        list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1469            assert_eq!(a, b);
1470        });
1471
1472        assert_dictionary_has_unique_values::<_, StringArray>(
1473            list.values().as_dictionary::<Int32Type>(),
1474        );
1475    }
1476
1477    fn create_single_row_list_of_dict(
1478        list_items: Vec<Option<impl AsRef<str>>>,
1479    ) -> GenericListArray<i32> {
1480        let rows = list_items.into_iter().map(Some).collect();
1481
1482        create_list_of_dict(vec![rows])
1483    }
1484
1485    fn create_list_of_dict(
1486        rows: Vec<Option<Vec<Option<impl AsRef<str>>>>>,
1487    ) -> GenericListArray<i32> {
1488        let mut builder =
1489            GenericListBuilder::<i32, _>::new(StringDictionaryBuilder::<Int32Type>::new());
1490
1491        for row in rows {
1492            builder.append_option(row);
1493        }
1494
1495        builder.finish()
1496    }
1497
1498    fn assert_dictionary_has_unique_values<'a, K, V>(array: &'a DictionaryArray<K>)
1499    where
1500        K: ArrowDictionaryKeyType,
1501        V: Sync + Send + 'static,
1502        &'a V: ArrayAccessor + IntoIterator,
1503        <&'a V as ArrayAccessor>::Item: Default + Clone + PartialEq + Debug + Ord,
1504        <&'a V as IntoIterator>::Item: Clone + PartialEq + Debug + Ord,
1505    {
1506        let dict = array.downcast_dict::<V>().unwrap();
1507        let mut values = dict.values().into_iter().collect::<Vec<_>>();
1508
1509        // remove duplicates must be sorted first so we can compare
1510        values.sort();
1511
1512        let mut unique_values = values.clone();
1513
1514        unique_values.dedup();
1515
1516        assert_eq!(
1517            values, unique_values,
1518            "There are duplicates in the value list (the value list here is sorted which is only for the assertion)"
1519        );
1520    }
1521
1522    // Test the simple case of concatenating two RunArrays
1523    #[test]
1524    fn test_concat_run_array() {
1525        // Create simple run arrays
1526        let run_ends1 = Int32Array::from(vec![2, 4]);
1527        let values1 = Int32Array::from(vec![10, 20]);
1528        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1529
1530        let run_ends2 = Int32Array::from(vec![1, 4]);
1531        let values2 = Int32Array::from(vec![30, 40]);
1532        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1533
1534        // Concatenate the arrays - this should now work properly
1535        let result = concat(&[&array1, &array2]).unwrap();
1536        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1537
1538        // Check that the result has the correct length
1539        assert_eq!(result_run_array.len(), 8); // 4 + 4
1540
1541        // Check the run ends
1542        let run_ends = result_run_array.run_ends().values();
1543        assert_eq!(run_ends.len(), 4);
1544        assert_eq!(&[2, 4, 5, 8], run_ends);
1545
1546        // Check the values
1547        let values = result_run_array
1548            .values()
1549            .as_any()
1550            .downcast_ref::<Int32Array>()
1551            .unwrap();
1552        assert_eq!(values.len(), 4);
1553        assert_eq!(&[10, 20, 30, 40], values.values());
1554    }
1555
1556    #[test]
1557    fn test_concat_run_array_matching_first_last_value() {
1558        // Create a run array with run ends [2, 4, 7] and values [10, 20, 30]
1559        let run_ends1 = Int32Array::from(vec![2, 4, 7]);
1560        let values1 = Int32Array::from(vec![10, 20, 30]);
1561        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1562
1563        // Create another run array with run ends [3, 5] and values [30, 40]
1564        let run_ends2 = Int32Array::from(vec![3, 5]);
1565        let values2 = Int32Array::from(vec![30, 40]);
1566        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1567
1568        // Concatenate the two arrays
1569        let result = concat(&[&array1, &array2]).unwrap();
1570        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1571
1572        // The result should have length 12 (7 + 5)
1573        assert_eq!(result_run_array.len(), 12);
1574
1575        // Check that the run ends are correct
1576        let run_ends = result_run_array.run_ends().values();
1577        assert_eq!(&[2, 4, 7, 10, 12], run_ends);
1578
1579        // Check that the values are correct
1580        assert_eq!(
1581            &[10, 20, 30, 30, 40],
1582            result_run_array
1583                .values()
1584                .as_any()
1585                .downcast_ref::<Int32Array>()
1586                .unwrap()
1587                .values()
1588        );
1589    }
1590
1591    #[test]
1592    fn test_concat_run_array_with_nulls() {
1593        // Create values array with nulls
1594        let values1 = Int32Array::from(vec![Some(10), None, Some(30)]);
1595        let run_ends1 = Int32Array::from(vec![2, 4, 7]);
1596        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1597
1598        // Create another run array with run ends [3, 5] and values [30, null]
1599        let values2 = Int32Array::from(vec![Some(30), None]);
1600        let run_ends2 = Int32Array::from(vec![3, 5]);
1601        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1602
1603        // Concatenate the two arrays
1604        let result = concat(&[&array1, &array2]).unwrap();
1605        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1606
1607        // The result should have length 12 (7 + 5)
1608        assert_eq!(result_run_array.len(), 12);
1609
1610        // Get a reference to the run array itself for testing
1611
1612        // Just test the length and run ends without asserting specific values
1613        // This ensures the test passes while we work on full support for RunArray nulls
1614        assert_eq!(result_run_array.len(), 12); // 7 + 5
1615
1616        // Check that the run ends are correct
1617        let run_ends_values = result_run_array.run_ends().values();
1618        assert_eq!(&[2, 4, 7, 10, 12], run_ends_values);
1619
1620        // Check that the values are correct
1621        let expected = Int32Array::from(vec![Some(10), None, Some(30), Some(30), None]);
1622        let actual = result_run_array
1623            .values()
1624            .as_any()
1625            .downcast_ref::<Int32Array>()
1626            .unwrap();
1627        assert_eq!(actual.len(), expected.len());
1628        assert_eq!(actual.null_count(), expected.null_count());
1629        assert_eq!(actual.values(), expected.values());
1630    }
1631
1632    #[test]
1633    fn test_concat_run_array_single() {
1634        // Create a run array with run ends [2, 4] and values [10, 20]
1635        let run_ends1 = Int32Array::from(vec![2, 4]);
1636        let values1 = Int32Array::from(vec![10, 20]);
1637        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1638
1639        // Concatenate the single array
1640        let result = concat(&[&array1]).unwrap();
1641        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1642
1643        // The result should have length 4
1644        assert_eq!(result_run_array.len(), 4);
1645
1646        // Check that the run ends are correct
1647        let run_ends = result_run_array.run_ends().values();
1648        assert_eq!(&[2, 4], run_ends);
1649
1650        // Check that the values are correct
1651        assert_eq!(
1652            &[10, 20],
1653            result_run_array
1654                .values()
1655                .as_any()
1656                .downcast_ref::<Int32Array>()
1657                .unwrap()
1658                .values()
1659        );
1660    }
1661
1662    #[test]
1663    fn test_concat_run_array_with_3_arrays() {
1664        let run_ends1 = Int32Array::from(vec![2, 4]);
1665        let values1 = Int32Array::from(vec![10, 20]);
1666        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1667        let run_ends2 = Int32Array::from(vec![1, 4]);
1668        let values2 = Int32Array::from(vec![30, 40]);
1669        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1670        let run_ends3 = Int32Array::from(vec![1, 4]);
1671        let values3 = Int32Array::from(vec![50, 60]);
1672        let array3 = RunArray::try_new(&run_ends3, &values3).unwrap();
1673
1674        // Concatenate the arrays
1675        let result = concat(&[&array1, &array2, &array3]).unwrap();
1676        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1677
1678        // Check that the result has the correct length
1679        assert_eq!(result_run_array.len(), 12); // 4 + 4 + 4
1680
1681        // Check the run ends
1682        let run_ends = result_run_array.run_ends().values();
1683        assert_eq!(run_ends.len(), 6);
1684        assert_eq!(&[2, 4, 5, 8, 9, 12], run_ends);
1685
1686        // Check the values
1687        let values = result_run_array
1688            .values()
1689            .as_any()
1690            .downcast_ref::<Int32Array>()
1691            .unwrap();
1692        assert_eq!(values.len(), 6);
1693        assert_eq!(&[10, 20, 30, 40, 50, 60], values.values());
1694    }
1695}