arrow_select/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines concat kernel for `ArrayRef`
19//!
20//! Example:
21//!
22//! ```
23//! use arrow_array::{ArrayRef, StringArray};
24//! use arrow_select::concat::concat;
25//!
26//! let arr = concat(&[
27//!     &StringArray::from(vec!["hello", "world"]),
28//!     &StringArray::from(vec!["!"]),
29//! ]).unwrap();
30//! assert_eq!(arr.len(), 3);
31//! ```
32
33use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
34use arrow_array::builder::{
35    BooleanBuilder, GenericByteBuilder, GenericByteViewBuilder, PrimitiveBuilder,
36};
37use arrow_array::cast::AsArray;
38use arrow_array::types::*;
39use arrow_array::*;
40use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, OffsetBuffer};
41use arrow_data::transform::{Capacities, MutableArrayData};
42use arrow_data::ArrayDataBuilder;
43use arrow_schema::{ArrowError, DataType, FieldRef, Fields, SchemaRef};
44use std::{collections::HashSet, ops::Add, sync::Arc};
45
46fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
47    let mut item_capacity = 0;
48    let mut bytes_capacity = 0;
49    for array in arrays {
50        let a = array.as_bytes::<T>();
51
52        // Guaranteed to always have at least one element
53        let offsets = a.value_offsets();
54        bytes_capacity += offsets[offsets.len() - 1].as_usize() - offsets[0].as_usize();
55        item_capacity += a.len()
56    }
57
58    Capacities::Binary(item_capacity, Some(bytes_capacity))
59}
60
61fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
62    if let DataType::FixedSizeList(f, _) = data_type {
63        let item_capacity = arrays.iter().map(|a| a.len()).sum();
64        let child_data_type = f.data_type();
65        match child_data_type {
66            // These types should match the types that `get_capacity`
67            // has special handling for.
68            DataType::Utf8
69            | DataType::LargeUtf8
70            | DataType::Binary
71            | DataType::LargeBinary
72            | DataType::FixedSizeList(_, _) => {
73                let values: Vec<&dyn arrow_array::Array> = arrays
74                    .iter()
75                    .map(|a| a.as_fixed_size_list().values().as_ref())
76                    .collect();
77                Capacities::List(
78                    item_capacity,
79                    Some(Box::new(get_capacity(&values, child_data_type))),
80                )
81            }
82            _ => Capacities::Array(item_capacity),
83        }
84    } else {
85        unreachable!("illegal data type for fixed size list")
86    }
87}
88
89fn concat_byte_view<B: ByteViewType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
90    let mut builder =
91        GenericByteViewBuilder::<B>::with_capacity(arrays.iter().map(|a| a.len()).sum());
92    for &array in arrays.iter() {
93        builder.append_array(array.as_byte_view());
94    }
95    Ok(Arc::new(builder.finish()))
96}
97
98fn concat_dictionaries<K: ArrowDictionaryKeyType>(
99    arrays: &[&dyn Array],
100) -> Result<ArrayRef, ArrowError> {
101    let mut output_len = 0;
102    let dictionaries: Vec<_> = arrays
103        .iter()
104        .map(|x| x.as_dictionary::<K>())
105        .inspect(|d| output_len += d.len())
106        .collect();
107
108    if !should_merge_dictionary_values::<K>(&dictionaries, output_len) {
109        return concat_fallback(arrays, Capacities::Array(output_len));
110    }
111
112    let merged = merge_dictionary_values(&dictionaries, None)?;
113
114    // Recompute keys
115    let mut key_values = Vec::with_capacity(output_len);
116
117    let mut has_nulls = false;
118    for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) {
119        has_nulls |= d.null_count() != 0;
120        for key in d.keys().values() {
121            // Use get to safely handle nulls
122            key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default())
123        }
124    }
125
126    let nulls = has_nulls.then(|| {
127        let mut nulls = BooleanBufferBuilder::new(output_len);
128        for d in &dictionaries {
129            match d.nulls() {
130                Some(n) => nulls.append_buffer(n.inner()),
131                None => nulls.append_n(d.len(), true),
132            }
133        }
134        NullBuffer::new(nulls.finish())
135    });
136
137    let keys = PrimitiveArray::<K>::new(key_values.into(), nulls);
138    // Sanity check
139    assert_eq!(keys.len(), output_len);
140
141    let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };
142    Ok(Arc::new(array))
143}
144
145fn concat_lists<OffsetSize: OffsetSizeTrait>(
146    arrays: &[&dyn Array],
147    field: &FieldRef,
148) -> Result<ArrayRef, ArrowError> {
149    let mut output_len = 0;
150    let mut list_has_nulls = false;
151    let mut list_has_slices = false;
152
153    let lists = arrays
154        .iter()
155        .map(|x| x.as_list::<OffsetSize>())
156        .inspect(|l| {
157            output_len += l.len();
158            list_has_nulls |= l.null_count() != 0;
159            list_has_slices |= l.offsets()[0] > OffsetSize::zero()
160                || l.offsets().last().unwrap().as_usize() < l.values().len();
161        })
162        .collect::<Vec<_>>();
163
164    let lists_nulls = list_has_nulls.then(|| {
165        let mut nulls = BooleanBufferBuilder::new(output_len);
166        for l in &lists {
167            match l.nulls() {
168                Some(n) => nulls.append_buffer(n.inner()),
169                None => nulls.append_n(l.len(), true),
170            }
171        }
172        NullBuffer::new(nulls.finish())
173    });
174
175    // If any of the lists have slices, we need to slice the values
176    // to ensure that the offsets are correct
177    let mut sliced_values;
178    let values: Vec<&dyn Array> = if list_has_slices {
179        sliced_values = Vec::with_capacity(lists.len());
180        for l in &lists {
181            // if the first offset is non-zero, we need to slice the values so when
182            // we concatenate them below only the relevant values are included
183            let offsets = l.offsets();
184            let start_offset = offsets[0].as_usize();
185            let end_offset = offsets.last().unwrap().as_usize();
186            sliced_values.push(l.values().slice(start_offset, end_offset - start_offset));
187        }
188        sliced_values.iter().map(|a| a.as_ref()).collect()
189    } else {
190        lists.iter().map(|x| x.values().as_ref()).collect()
191    };
192
193    let concatenated_values = concat(values.as_slice())?;
194
195    // Merge value offsets from the lists
196    let value_offset_buffer =
197        OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths()));
198
199    let array = GenericListArray::<OffsetSize>::try_new(
200        Arc::clone(field),
201        value_offset_buffer,
202        concatenated_values,
203        lists_nulls,
204    )?;
205
206    Ok(Arc::new(array))
207}
208
209fn concat_primitives<T: ArrowPrimitiveType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
210    let mut builder = PrimitiveBuilder::<T>::with_capacity(arrays.iter().map(|a| a.len()).sum())
211        .with_data_type(arrays[0].data_type().clone());
212
213    for array in arrays {
214        builder.append_array(array.as_primitive());
215    }
216
217    Ok(Arc::new(builder.finish()))
218}
219
220fn concat_boolean(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
221    let mut builder = BooleanBuilder::with_capacity(arrays.iter().map(|a| a.len()).sum());
222
223    for array in arrays {
224        builder.append_array(array.as_boolean());
225    }
226
227    Ok(Arc::new(builder.finish()))
228}
229
230fn concat_bytes<T: ByteArrayType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
231    let (item_capacity, bytes_capacity) = match binary_capacity::<T>(arrays) {
232        Capacities::Binary(item_capacity, Some(bytes_capacity)) => (item_capacity, bytes_capacity),
233        _ => unreachable!(),
234    };
235
236    let mut builder = GenericByteBuilder::<T>::with_capacity(item_capacity, bytes_capacity);
237
238    for array in arrays {
239        builder.append_array(array.as_bytes::<T>());
240    }
241
242    Ok(Arc::new(builder.finish()))
243}
244
245fn concat_structs(arrays: &[&dyn Array], fields: &Fields) -> Result<ArrayRef, ArrowError> {
246    let mut len = 0;
247    let mut has_nulls = false;
248    let structs = arrays
249        .iter()
250        .map(|a| {
251            len += a.len();
252            has_nulls |= a.null_count() > 0;
253            a.as_struct()
254        })
255        .collect::<Vec<_>>();
256
257    let nulls = has_nulls.then(|| {
258        let mut b = BooleanBufferBuilder::new(len);
259        for s in &structs {
260            match s.nulls() {
261                Some(n) => b.append_buffer(n.inner()),
262                None => b.append_n(s.len(), true),
263            }
264        }
265        NullBuffer::new(b.finish())
266    });
267
268    let column_concat_result = (0..fields.len())
269        .map(|i| {
270            let extracted_cols = structs
271                .iter()
272                .map(|s| s.column(i).as_ref())
273                .collect::<Vec<_>>();
274            concat(&extracted_cols)
275        })
276        .collect::<Result<Vec<_>, ArrowError>>()?;
277
278    Ok(Arc::new(StructArray::try_new(
279        fields.clone(),
280        column_concat_result,
281        nulls,
282    )?))
283}
284
285/// Concatenate multiple RunArray instances into a single RunArray.
286///
287/// This function handles the special case of concatenating RunArrays by:
288/// 1. Collecting all run ends and values from input arrays
289/// 2. Adjusting run ends to account for the length of previous arrays
290/// 3. Creating a new RunArray with the combined data
291fn concat_run_arrays<R: RunEndIndexType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError>
292where
293    R::Native: Add<Output = R::Native>,
294{
295    let run_arrays: Vec<_> = arrays
296        .iter()
297        .map(|x| x.as_run::<R>())
298        .filter(|x| !x.run_ends().is_empty())
299        .collect();
300
301    // The run ends need to be adjusted by the sum of the lengths of the previous arrays.
302    let needed_run_end_adjustments = std::iter::once(R::default_value())
303        .chain(
304            run_arrays
305                .iter()
306                .scan(R::default_value(), |acc, run_array| {
307                    *acc = *acc + *run_array.run_ends().values().last().unwrap();
308                    Some(*acc)
309                }),
310        )
311        .collect::<Vec<_>>();
312
313    // This works out nicely to be the total (logical) length of the resulting array.
314    let total_len = needed_run_end_adjustments.last().unwrap().as_usize();
315
316    let run_ends_array =
317        PrimitiveArray::<R>::from_iter_values(run_arrays.iter().enumerate().flat_map(
318            move |(i, run_array)| {
319                let adjustment = needed_run_end_adjustments[i];
320                run_array
321                    .run_ends()
322                    .values()
323                    .iter()
324                    .map(move |run_end| *run_end + adjustment)
325            },
326        ));
327
328    let all_values = concat(
329        &run_arrays
330            .iter()
331            .map(|x| x.values().as_ref())
332            .collect::<Vec<_>>(),
333    )?;
334
335    let builder = ArrayDataBuilder::new(run_arrays[0].data_type().clone())
336        .len(total_len)
337        .child_data(vec![run_ends_array.into_data(), all_values.into_data()]);
338
339    // `build_unchecked` is used to avoid recursive validation of child arrays.
340    let array_data = unsafe { builder.build_unchecked() };
341    array_data.validate_data()?;
342
343    Ok(Arc::<RunArray<R>>::new(array_data.into()))
344}
345
346macro_rules! dict_helper {
347    ($t:ty, $arrays:expr) => {
348        return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
349    };
350}
351
352macro_rules! primitive_concat {
353    ($t:ty, $arrays:expr) => {
354        return Ok(Arc::new(concat_primitives::<$t>($arrays)?) as _)
355    };
356}
357
358fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
359    match data_type {
360        DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
361        DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
362        DataType::Binary => binary_capacity::<BinaryType>(arrays),
363        DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
364        DataType::FixedSizeList(_, _) => fixed_size_list_capacity(arrays, data_type),
365        _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
366    }
367}
368
369/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
370pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
371    if arrays.is_empty() {
372        return Err(ArrowError::ComputeError(
373            "concat requires input of at least one array".to_string(),
374        ));
375    } else if arrays.len() == 1 {
376        let array = arrays[0];
377        return Ok(array.slice(0, array.len()));
378    }
379
380    let d = arrays[0].data_type();
381    if arrays.iter().skip(1).any(|array| array.data_type() != d) {
382        // Create error message with up to 10 unique data types in the order they appear
383        let error_message = {
384            // 10 max unique data types to print and another 1 to know if there are more
385            let mut unique_data_types = HashSet::with_capacity(11);
386
387            let mut error_message =
388                format!("It is not possible to concatenate arrays of different data types ({d}");
389            unique_data_types.insert(d);
390
391            for array in arrays {
392                let is_unique = unique_data_types.insert(array.data_type());
393
394                if unique_data_types.len() == 11 {
395                    error_message.push_str(", ...");
396                    break;
397                }
398
399                if is_unique {
400                    error_message.push_str(", ");
401                    error_message.push_str(&array.data_type().to_string());
402                }
403            }
404
405            error_message.push_str(").");
406
407            error_message
408        };
409
410        return Err(ArrowError::InvalidArgumentError(error_message));
411    }
412
413    downcast_primitive! {
414        d => (primitive_concat, arrays),
415        DataType::Boolean => concat_boolean(arrays),
416        DataType::Dictionary(k, _) => {
417            downcast_integer! {
418                k.as_ref() => (dict_helper, arrays),
419                _ => unreachable!("illegal dictionary key type {k}")
420            }
421        }
422        DataType::List(field) => concat_lists::<i32>(arrays, field),
423        DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
424        DataType::Struct(fields) => concat_structs(arrays, fields),
425        DataType::Utf8 => concat_bytes::<Utf8Type>(arrays),
426        DataType::LargeUtf8 => concat_bytes::<LargeUtf8Type>(arrays),
427        DataType::Binary => concat_bytes::<BinaryType>(arrays),
428        DataType::LargeBinary => concat_bytes::<LargeBinaryType>(arrays),
429        DataType::RunEndEncoded(r, _) => {
430            // Handle RunEndEncoded arrays with special concat function
431            // We need to downcast based on the run end type
432            match r.data_type() {
433                DataType::Int16 => concat_run_arrays::<Int16Type>(arrays),
434                DataType::Int32 => concat_run_arrays::<Int32Type>(arrays),
435                DataType::Int64 => concat_run_arrays::<Int64Type>(arrays),
436                _ => unreachable!("Unsupported run end index type: {r:?}"),
437            }
438        }
439        DataType::Utf8View => concat_byte_view::<StringViewType>(arrays),
440        DataType::BinaryView => concat_byte_view::<BinaryViewType>(arrays),
441        _ => {
442            let capacity = get_capacity(arrays, d);
443            concat_fallback(arrays, capacity)
444        }
445    }
446}
447
448/// Concatenates arrays using MutableArrayData
449///
450/// This will naively concatenate dictionaries
451fn concat_fallback(arrays: &[&dyn Array], capacity: Capacities) -> Result<ArrayRef, ArrowError> {
452    let array_data: Vec<_> = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
453    let array_data = array_data.iter().collect();
454    let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity);
455
456    for (i, a) in arrays.iter().enumerate() {
457        mutable.extend(i, 0, a.len())
458    }
459
460    Ok(make_array(mutable.freeze()))
461}
462
463/// Concatenates `batches` together into a single [`RecordBatch`].
464///
465/// The output batch has the specified `schemas`; The schema of the
466/// input are ignored.
467///
468/// Returns an error if the types of underlying arrays are different.
469pub fn concat_batches<'a>(
470    schema: &SchemaRef,
471    input_batches: impl IntoIterator<Item = &'a RecordBatch>,
472) -> Result<RecordBatch, ArrowError> {
473    // When schema is empty, sum the number of the rows of all batches
474    if schema.fields().is_empty() {
475        let num_rows: usize = input_batches.into_iter().map(RecordBatch::num_rows).sum();
476        let mut options = RecordBatchOptions::default();
477        options.row_count = Some(num_rows);
478        return RecordBatch::try_new_with_options(schema.clone(), vec![], &options);
479    }
480
481    let batches: Vec<&RecordBatch> = input_batches.into_iter().collect();
482    if batches.is_empty() {
483        return Ok(RecordBatch::new_empty(schema.clone()));
484    }
485    let field_num = schema.fields().len();
486    let mut arrays = Vec::with_capacity(field_num);
487    for i in 0..field_num {
488        let array = concat(
489            &batches
490                .iter()
491                .map(|batch| batch.column(i).as_ref())
492                .collect::<Vec<_>>(),
493        )?;
494        arrays.push(array);
495    }
496    RecordBatch::try_new(schema.clone(), arrays)
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502    use arrow_array::builder::{GenericListBuilder, StringDictionaryBuilder};
503    use arrow_schema::{Field, Schema};
504    use std::fmt::Debug;
505
506    #[test]
507    fn test_concat_empty_vec() {
508        let re = concat(&[]);
509        assert!(re.is_err());
510    }
511
512    #[test]
513    fn test_concat_batches_no_columns() {
514        // Test concat using empty schema / batches without columns
515        let schema = Arc::new(Schema::empty());
516
517        let mut options = RecordBatchOptions::default();
518        options.row_count = Some(100);
519        let batch = RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
520        // put in 2 batches of 100 rows each
521        let re = concat_batches(&schema, &[batch.clone(), batch]).unwrap();
522
523        assert_eq!(re.num_rows(), 200);
524    }
525
526    #[test]
527    fn test_concat_one_element_vec() {
528        let arr = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
529            Some(-1),
530            Some(2),
531            None,
532        ])) as ArrayRef;
533        let result = concat(&[arr.as_ref()]).unwrap();
534        assert_eq!(
535            &arr, &result,
536            "concatenating single element array gives back the same result"
537        );
538    }
539
540    #[test]
541    fn test_concat_incompatible_datatypes() {
542        let re = concat(&[
543            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
544            // 2 string to make sure we only mention unique types
545            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
546            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
547            // Another type to make sure we are showing all the incompatible types
548            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
549        ]);
550
551        assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32).");
552    }
553
554    #[test]
555    fn test_concat_10_incompatible_datatypes_should_include_all_of_them() {
556        let re = concat(&[
557            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
558            // 2 string to make sure we only mention unique types
559            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
560            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
561            // Another type to make sure we are showing all the incompatible types
562            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
563            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
564            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
565            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
566            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
567            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
568            // Non unique
569            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
570            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
571            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
572        ]);
573
574        assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32).");
575    }
576
577    #[test]
578    fn test_concat_11_incompatible_datatypes_should_only_include_10() {
579        let re = concat(&[
580            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
581            // 2 string to make sure we only mention unique types
582            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
583            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
584            // Another type to make sure we are showing all the incompatible types
585            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
586            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
587            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
588            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
589            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
590            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
591            // Non unique
592            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
593            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
594            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
595            &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
596        ]);
597
598        assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...).");
599    }
600
601    #[test]
602    fn test_concat_13_incompatible_datatypes_should_not_include_all_of_them() {
603        let re = concat(&[
604            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
605            // 2 string to make sure we only mention unique types
606            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
607            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
608            // Another type to make sure we are showing all the incompatible types
609            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
610            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
611            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
612            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
613            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
614            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
615            // Non unique
616            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
617            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
618            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
619            &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
620            &PrimitiveArray::<Float16Type>::new_null(3),
621            &BooleanArray::from(vec![Some(true), Some(false), None]),
622        ]);
623
624        assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...).");
625    }
626
627    #[test]
628    fn test_concat_string_arrays() {
629        let arr = concat(&[
630            &StringArray::from(vec!["hello", "world"]),
631            &StringArray::from(vec!["2", "3", "4"]),
632            &StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]),
633        ])
634        .unwrap();
635
636        let expected_output = Arc::new(StringArray::from(vec![
637            Some("hello"),
638            Some("world"),
639            Some("2"),
640            Some("3"),
641            Some("4"),
642            Some("foo"),
643            Some("bar"),
644            None,
645            Some("baz"),
646        ])) as ArrayRef;
647
648        assert_eq!(&arr, &expected_output);
649    }
650
651    #[test]
652    fn test_concat_primitive_arrays() {
653        let arr = concat(&[
654            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None]),
655            &PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None]),
656            &PrimitiveArray::<Int64Type>::from(vec![Some(256), Some(512), Some(1024)]),
657        ])
658        .unwrap();
659
660        let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
661            Some(-1),
662            Some(-1),
663            Some(2),
664            None,
665            None,
666            Some(101),
667            Some(102),
668            Some(103),
669            None,
670            Some(256),
671            Some(512),
672            Some(1024),
673        ])) as ArrayRef;
674
675        assert_eq!(&arr, &expected_output);
676    }
677
678    #[test]
679    fn test_concat_primitive_array_slices() {
680        let input_1 =
681            PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None])
682                .slice(1, 3);
683
684        let input_2 =
685            PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None])
686                .slice(1, 3);
687        let arr = concat(&[&input_1, &input_2]).unwrap();
688
689        let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
690            Some(-1),
691            Some(2),
692            None,
693            Some(102),
694            Some(103),
695            None,
696        ])) as ArrayRef;
697
698        assert_eq!(&arr, &expected_output);
699    }
700
701    #[test]
702    fn test_concat_boolean_primitive_arrays() {
703        let arr = concat(&[
704            &BooleanArray::from(vec![
705                Some(true),
706                Some(true),
707                Some(false),
708                None,
709                None,
710                Some(false),
711            ]),
712            &BooleanArray::from(vec![None, Some(false), Some(true), Some(false)]),
713        ])
714        .unwrap();
715
716        let expected_output = Arc::new(BooleanArray::from(vec![
717            Some(true),
718            Some(true),
719            Some(false),
720            None,
721            None,
722            Some(false),
723            None,
724            Some(false),
725            Some(true),
726            Some(false),
727        ])) as ArrayRef;
728
729        assert_eq!(&arr, &expected_output);
730    }
731
732    #[test]
733    fn test_concat_primitive_list_arrays() {
734        let list1 = vec![
735            Some(vec![Some(-1), Some(-1), Some(2), None, None]),
736            Some(vec![]),
737            None,
738            Some(vec![Some(10)]),
739        ];
740        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
741
742        let list2 = vec![
743            None,
744            Some(vec![Some(100), None, Some(101)]),
745            Some(vec![Some(102)]),
746        ];
747        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
748
749        let list3 = vec![Some(vec![Some(1000), Some(1001)])];
750        let list3_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone());
751
752        let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
753
754        let expected = list1.into_iter().chain(list2).chain(list3);
755        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
756
757        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
758    }
759
760    #[test]
761    fn test_concat_primitive_list_arrays_slices() {
762        let list1 = vec![
763            Some(vec![Some(-1), Some(-1), Some(2), None, None]),
764            Some(vec![]), // In slice
765            None,         // In slice
766            Some(vec![Some(10)]),
767        ];
768        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
769        let list1_array = list1_array.slice(1, 2);
770        let list1_values = list1.into_iter().skip(1).take(2);
771
772        let list2 = vec![
773            None,
774            Some(vec![Some(100), None, Some(101)]),
775            Some(vec![Some(102)]),
776        ];
777        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
778
779        // verify that this test covers the case when the first offset is non zero
780        assert!(list1_array.offsets()[0].as_usize() > 0);
781        let array_result = concat(&[&list1_array, &list2_array]).unwrap();
782
783        let expected = list1_values.chain(list2);
784        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
785
786        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
787    }
788
789    #[test]
790    fn test_concat_primitive_list_arrays_sliced_lengths() {
791        let list1 = vec![
792            Some(vec![Some(-1), Some(-1), Some(2), None, None]), // In slice
793            Some(vec![]),                                        // In slice
794            None,                                                // In slice
795            Some(vec![Some(10)]),
796        ];
797        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
798        let list1_array = list1_array.slice(0, 3); // no offset, but not all values
799        let list1_values = list1.into_iter().take(3);
800
801        let list2 = vec![
802            None,
803            Some(vec![Some(100), None, Some(101)]),
804            Some(vec![Some(102)]),
805        ];
806        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
807
808        // verify that this test covers the case when the first offset is zero, but the
809        // last offset doesn't cover the entire array
810        assert_eq!(list1_array.offsets()[0].as_usize(), 0);
811        assert!(list1_array.offsets().last().unwrap().as_usize() < list1_array.values().len());
812        let array_result = concat(&[&list1_array, &list2_array]).unwrap();
813
814        let expected = list1_values.chain(list2);
815        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
816
817        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
818    }
819
820    #[test]
821    fn test_concat_primitive_fixed_size_list_arrays() {
822        let list1 = vec![
823            Some(vec![Some(-1), None]),
824            None,
825            Some(vec![Some(10), Some(20)]),
826        ];
827        let list1_array =
828            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone(), 2);
829
830        let list2 = vec![
831            None,
832            Some(vec![Some(100), None]),
833            Some(vec![Some(102), Some(103)]),
834        ];
835        let list2_array =
836            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone(), 2);
837
838        let list3 = vec![Some(vec![Some(1000), Some(1001)])];
839        let list3_array =
840            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone(), 2);
841
842        let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
843
844        let expected = list1.into_iter().chain(list2).chain(list3);
845        let array_expected =
846            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(expected, 2);
847
848        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
849    }
850
851    #[test]
852    fn test_concat_struct_arrays() {
853        let field = Arc::new(Field::new("field", DataType::Int64, true));
854        let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
855            Some(-1),
856            Some(-1),
857            Some(2),
858            None,
859            None,
860        ]));
861        let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
862
863        let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
864            Some(101),
865            Some(102),
866            Some(103),
867            None,
868        ]));
869        let input_struct_2 = StructArray::from(vec![(field.clone(), input_primitive_2)]);
870
871        let input_primitive_3: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
872            Some(256),
873            Some(512),
874            Some(1024),
875        ]));
876        let input_struct_3 = StructArray::from(vec![(field, input_primitive_3)]);
877
878        let arr = concat(&[&input_struct_1, &input_struct_2, &input_struct_3]).unwrap();
879
880        let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
881            Some(-1),
882            Some(-1),
883            Some(2),
884            None,
885            None,
886            Some(101),
887            Some(102),
888            Some(103),
889            None,
890            Some(256),
891            Some(512),
892            Some(1024),
893        ])) as ArrayRef;
894
895        let actual_primitive = arr
896            .as_any()
897            .downcast_ref::<StructArray>()
898            .unwrap()
899            .column(0);
900        assert_eq!(actual_primitive, &expected_primitive_output);
901    }
902
903    #[test]
904    fn test_concat_struct_array_slices() {
905        let field = Arc::new(Field::new("field", DataType::Int64, true));
906        let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
907            Some(-1),
908            Some(-1),
909            Some(2),
910            None,
911            None,
912        ]));
913        let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
914
915        let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
916            Some(101),
917            Some(102),
918            Some(103),
919            None,
920        ]));
921        let input_struct_2 = StructArray::from(vec![(field, input_primitive_2)]);
922
923        let arr = concat(&[&input_struct_1.slice(1, 3), &input_struct_2.slice(1, 2)]).unwrap();
924
925        let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
926            Some(-1),
927            Some(2),
928            None,
929            Some(102),
930            Some(103),
931        ])) as ArrayRef;
932
933        let actual_primitive = arr
934            .as_any()
935            .downcast_ref::<StructArray>()
936            .unwrap()
937            .column(0);
938        assert_eq!(actual_primitive, &expected_primitive_output);
939    }
940
941    #[test]
942    fn test_concat_struct_arrays_no_nulls() {
943        let input_1a = vec![1, 2, 3];
944        let input_1b = vec!["one", "two", "three"];
945        let input_2a = vec![4, 5, 6, 7];
946        let input_2b = vec!["four", "five", "six", "seven"];
947
948        let struct_from_primitives = |ints: Vec<i64>, strings: Vec<&str>| {
949            StructArray::try_from(vec![
950                ("ints", Arc::new(Int64Array::from(ints)) as _),
951                ("strings", Arc::new(StringArray::from(strings)) as _),
952            ])
953        };
954
955        let expected_output = struct_from_primitives(
956            [input_1a.clone(), input_2a.clone()].concat(),
957            [input_1b.clone(), input_2b.clone()].concat(),
958        )
959        .unwrap();
960
961        let input_1 = struct_from_primitives(input_1a, input_1b).unwrap();
962        let input_2 = struct_from_primitives(input_2a, input_2b).unwrap();
963
964        let arr = concat(&[&input_1, &input_2]).unwrap();
965        let struct_result = arr.as_struct();
966
967        assert_eq!(struct_result, &expected_output);
968        assert_eq!(arr.null_count(), 0);
969    }
970
971    #[test]
972    fn test_string_array_slices() {
973        let input_1 = StringArray::from(vec!["hello", "A", "B", "C"]);
974        let input_2 = StringArray::from(vec!["world", "D", "E", "Z"]);
975
976        let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
977
978        let expected_output = StringArray::from(vec!["A", "B", "C", "D", "E"]);
979
980        let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
981        assert_eq!(actual_output, &expected_output);
982    }
983
984    #[test]
985    fn test_string_array_with_null_slices() {
986        let input_1 = StringArray::from(vec![Some("hello"), None, Some("A"), Some("C")]);
987        let input_2 = StringArray::from(vec![None, Some("world"), Some("D"), None]);
988
989        let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
990
991        let expected_output =
992            StringArray::from(vec![None, Some("A"), Some("C"), Some("world"), Some("D")]);
993
994        let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
995        assert_eq!(actual_output, &expected_output);
996    }
997
998    fn collect_string_dictionary(array: &DictionaryArray<Int32Type>) -> Vec<Option<&str>> {
999        let concrete = array.downcast_dict::<StringArray>().unwrap();
1000        concrete.into_iter().collect()
1001    }
1002
1003    #[test]
1004    fn test_string_dictionary_array() {
1005        let input_1: DictionaryArray<Int32Type> = vec!["hello", "A", "B", "hello", "hello", "C"]
1006            .into_iter()
1007            .collect();
1008        let input_2: DictionaryArray<Int32Type> = vec!["hello", "E", "E", "hello", "F", "E"]
1009            .into_iter()
1010            .collect();
1011
1012        let expected: Vec<_> = vec![
1013            "hello", "A", "B", "hello", "hello", "C", "hello", "E", "E", "hello", "F", "E",
1014        ]
1015        .into_iter()
1016        .map(Some)
1017        .collect();
1018
1019        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1020        let dictionary = concat.as_dictionary::<Int32Type>();
1021        let actual = collect_string_dictionary(dictionary);
1022        assert_eq!(actual, expected);
1023
1024        // Should have concatenated inputs together
1025        assert_eq!(
1026            dictionary.values().len(),
1027            input_1.values().len() + input_2.values().len(),
1028        )
1029    }
1030
1031    #[test]
1032    fn test_string_dictionary_array_nulls() {
1033        let input_1: DictionaryArray<Int32Type> = vec![Some("foo"), Some("bar"), None, Some("fiz")]
1034            .into_iter()
1035            .collect();
1036        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
1037        let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
1038
1039        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1040        let dictionary = concat.as_dictionary::<Int32Type>();
1041        let actual = collect_string_dictionary(dictionary);
1042        assert_eq!(actual, expected);
1043
1044        // Should have concatenated inputs together
1045        assert_eq!(
1046            dictionary.values().len(),
1047            input_1.values().len() + input_2.values().len(),
1048        )
1049    }
1050
1051    #[test]
1052    fn test_string_dictionary_array_nulls_in_values() {
1053        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
1054        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
1055        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1056
1057        let input_2_keys = Int32Array::from_iter_values([0]);
1058        let input_2_values = StringArray::from(vec![None, Some("hello")]);
1059        let input_2 = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1060
1061        let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
1062
1063        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1064        let dictionary = concat.as_dictionary::<Int32Type>();
1065        let actual = collect_string_dictionary(dictionary);
1066        assert_eq!(actual, expected);
1067    }
1068
1069    #[test]
1070    fn test_string_dictionary_merge() {
1071        let mut builder = StringDictionaryBuilder::<Int32Type>::new();
1072        for i in 0..20 {
1073            builder.append(i.to_string()).unwrap();
1074        }
1075        let input_1 = builder.finish();
1076
1077        let mut builder = StringDictionaryBuilder::<Int32Type>::new();
1078        for i in 0..30 {
1079            builder.append(i.to_string()).unwrap();
1080        }
1081        let input_2 = builder.finish();
1082
1083        let expected: Vec<_> = (0..20).chain(0..30).map(|x| x.to_string()).collect();
1084        let expected: Vec<_> = expected.iter().map(|x| Some(x.as_str())).collect();
1085
1086        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1087        let dictionary = concat.as_dictionary::<Int32Type>();
1088        let actual = collect_string_dictionary(dictionary);
1089        assert_eq!(actual, expected);
1090
1091        // Should have merged inputs together
1092        // Not 30 as this is done on a best-effort basis
1093        let values_len = dictionary.values().len();
1094        assert!((30..40).contains(&values_len), "{values_len}")
1095    }
1096
1097    #[test]
1098    fn test_primitive_dictionary_merge() {
1099        // Same value repeated 5 times.
1100        let keys = vec![1; 5];
1101        let values = (10..20).collect::<Vec<_>>();
1102        let dict = DictionaryArray::new(
1103            Int8Array::from(keys.clone()),
1104            Arc::new(Int32Array::from(values.clone())),
1105        );
1106        let other = DictionaryArray::new(
1107            Int8Array::from(keys.clone()),
1108            Arc::new(Int32Array::from(values.clone())),
1109        );
1110
1111        let result_same_dictionary = concat(&[&dict, &dict]).unwrap();
1112        // Verify pointer equality check succeeds, and therefore the
1113        // dictionaries are not merged. A single values buffer should be reused
1114        // in this case.
1115        assert!(dict.values().to_data().ptr_eq(
1116            &result_same_dictionary
1117                .as_dictionary::<Int8Type>()
1118                .values()
1119                .to_data()
1120        ));
1121        assert_eq!(
1122            result_same_dictionary
1123                .as_dictionary::<Int8Type>()
1124                .values()
1125                .len(),
1126            values.len(),
1127        );
1128
1129        let result_cloned_dictionary = concat(&[&dict, &other]).unwrap();
1130        // Should have only 1 underlying value since all keys reference it.
1131        assert_eq!(
1132            result_cloned_dictionary
1133                .as_dictionary::<Int8Type>()
1134                .values()
1135                .len(),
1136            1
1137        );
1138    }
1139
1140    #[test]
1141    fn test_concat_string_sizes() {
1142        let a: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
1143        let b: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
1144        let c = LargeStringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
1145        // 150 * 3 = 450
1146        // 150 * 3 = 450
1147        // 3 * 3   = 9
1148        // ------------+
1149        // 909
1150        // closest 64 byte aligned cap = 960
1151
1152        let arr = concat(&[&a, &b, &c]).unwrap();
1153        // this would have been 1280 if we did not precompute the value lengths.
1154        assert_eq!(arr.to_data().buffers()[1].capacity(), 960);
1155    }
1156
1157    #[test]
1158    fn test_dictionary_concat_reuse() {
1159        let array: DictionaryArray<Int8Type> = vec!["a", "a", "b", "c"].into_iter().collect();
1160        let copy: DictionaryArray<Int8Type> = array.clone();
1161
1162        // dictionary is "a", "b", "c"
1163        assert_eq!(
1164            array.values(),
1165            &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef)
1166        );
1167        assert_eq!(array.keys(), &Int8Array::from(vec![0, 0, 1, 2]));
1168
1169        // concatenate it with itself
1170        let combined = concat(&[&copy as _, &array as _]).unwrap();
1171        let combined = combined.as_dictionary::<Int8Type>();
1172
1173        assert_eq!(
1174            combined.values(),
1175            &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef),
1176            "Actual: {combined:#?}"
1177        );
1178
1179        assert_eq!(
1180            combined.keys(),
1181            &Int8Array::from(vec![0, 0, 1, 2, 0, 0, 1, 2])
1182        );
1183
1184        // Should have reused the dictionary
1185        assert!(array
1186            .values()
1187            .to_data()
1188            .ptr_eq(&combined.values().to_data()));
1189        assert!(copy.values().to_data().ptr_eq(&combined.values().to_data()));
1190
1191        let new: DictionaryArray<Int8Type> = vec!["d"].into_iter().collect();
1192        let combined = concat(&[&copy as _, &array as _, &new as _]).unwrap();
1193        let com = combined.as_dictionary::<Int8Type>();
1194
1195        // Should not have reused the dictionary
1196        assert!(!array.values().to_data().ptr_eq(&com.values().to_data()));
1197        assert!(!copy.values().to_data().ptr_eq(&com.values().to_data()));
1198        assert!(!new.values().to_data().ptr_eq(&com.values().to_data()));
1199    }
1200
1201    #[test]
1202    fn concat_record_batches() {
1203        let schema = Arc::new(Schema::new(vec![
1204            Field::new("a", DataType::Int32, false),
1205            Field::new("b", DataType::Utf8, false),
1206        ]));
1207        let batch1 = RecordBatch::try_new(
1208            schema.clone(),
1209            vec![
1210                Arc::new(Int32Array::from(vec![1, 2])),
1211                Arc::new(StringArray::from(vec!["a", "b"])),
1212            ],
1213        )
1214        .unwrap();
1215        let batch2 = RecordBatch::try_new(
1216            schema.clone(),
1217            vec![
1218                Arc::new(Int32Array::from(vec![3, 4])),
1219                Arc::new(StringArray::from(vec!["c", "d"])),
1220            ],
1221        )
1222        .unwrap();
1223        let new_batch = concat_batches(&schema, [&batch1, &batch2]).unwrap();
1224        assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
1225        assert_eq!(2, new_batch.num_columns());
1226        assert_eq!(4, new_batch.num_rows());
1227        let new_batch_owned = concat_batches(&schema, &[batch1, batch2]).unwrap();
1228        assert_eq!(new_batch_owned.schema().as_ref(), schema.as_ref());
1229        assert_eq!(2, new_batch_owned.num_columns());
1230        assert_eq!(4, new_batch_owned.num_rows());
1231    }
1232
1233    #[test]
1234    fn concat_empty_record_batch() {
1235        let schema = Arc::new(Schema::new(vec![
1236            Field::new("a", DataType::Int32, false),
1237            Field::new("b", DataType::Utf8, false),
1238        ]));
1239        let batch = concat_batches(&schema, []).unwrap();
1240        assert_eq!(batch.schema().as_ref(), schema.as_ref());
1241        assert_eq!(0, batch.num_rows());
1242    }
1243
1244    #[test]
1245    fn concat_record_batches_of_different_schemas_but_compatible_data() {
1246        let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1247        // column names differ
1248        let schema2 = Arc::new(Schema::new(vec![Field::new("c", DataType::Int32, false)]));
1249        let batch1 = RecordBatch::try_new(
1250            schema1.clone(),
1251            vec![Arc::new(Int32Array::from(vec![1, 2]))],
1252        )
1253        .unwrap();
1254        let batch2 =
1255            RecordBatch::try_new(schema2, vec![Arc::new(Int32Array::from(vec![3, 4]))]).unwrap();
1256        // concat_batches simply uses the schema provided
1257        let batch = concat_batches(&schema1, [&batch1, &batch2]).unwrap();
1258        assert_eq!(batch.schema().as_ref(), schema1.as_ref());
1259        assert_eq!(4, batch.num_rows());
1260    }
1261
1262    #[test]
1263    fn concat_record_batches_of_different_schemas_incompatible_data() {
1264        let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1265        // column names differ
1266        let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
1267        let batch1 = RecordBatch::try_new(
1268            schema1.clone(),
1269            vec![Arc::new(Int32Array::from(vec![1, 2]))],
1270        )
1271        .unwrap();
1272        let batch2 = RecordBatch::try_new(
1273            schema2,
1274            vec![Arc::new(StringArray::from(vec!["foo", "bar"]))],
1275        )
1276        .unwrap();
1277
1278        let error = concat_batches(&schema1, [&batch1, &batch2]).unwrap_err();
1279        assert_eq!(error.to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int32, Utf8).");
1280    }
1281
1282    #[test]
1283    fn concat_capacity() {
1284        let a = Int32Array::from_iter_values(0..100);
1285        let b = Int32Array::from_iter_values(10..20);
1286        let a = concat(&[&a, &b]).unwrap();
1287        let data = a.to_data();
1288        assert_eq!(data.buffers()[0].len(), 440);
1289        assert_eq!(data.buffers()[0].capacity(), 448); // Nearest multiple of 64
1290
1291        let a = concat(&[&a.slice(10, 20), &b]).unwrap();
1292        let data = a.to_data();
1293        assert_eq!(data.buffers()[0].len(), 120);
1294        assert_eq!(data.buffers()[0].capacity(), 128); // Nearest multiple of 64
1295
1296        let a = StringArray::from_iter_values(std::iter::repeat("foo").take(100));
1297        let b = StringArray::from(vec!["bingo", "bongo", "lorem", ""]);
1298
1299        let a = concat(&[&a, &b]).unwrap();
1300        let data = a.to_data();
1301        // (100 + 4 + 1) * size_of<i32>()
1302        assert_eq!(data.buffers()[0].len(), 420);
1303        assert_eq!(data.buffers()[0].capacity(), 448); // Nearest multiple of 64
1304
1305        // len("foo") * 100 + len("bingo") + len("bongo") + len("lorem")
1306        assert_eq!(data.buffers()[1].len(), 315);
1307        assert_eq!(data.buffers()[1].capacity(), 320); // Nearest multiple of 64
1308
1309        let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1310        let data = a.to_data();
1311        // (40 + 4 + 5) * size_of<i32>()
1312        assert_eq!(data.buffers()[0].len(), 180);
1313        assert_eq!(data.buffers()[0].capacity(), 192); // Nearest multiple of 64
1314
1315        // len("foo") * 40 + len("bingo") + len("bongo") + len("lorem")
1316        assert_eq!(data.buffers()[1].len(), 135);
1317        assert_eq!(data.buffers()[1].capacity(), 192); // Nearest multiple of 64
1318
1319        let a = LargeBinaryArray::from_iter_values(std::iter::repeat(b"foo").take(100));
1320        let b = LargeBinaryArray::from_iter_values(std::iter::repeat(b"cupcakes").take(10));
1321
1322        let a = concat(&[&a, &b]).unwrap();
1323        let data = a.to_data();
1324        // (100 + 10 + 1) * size_of<i64>()
1325        assert_eq!(data.buffers()[0].len(), 888);
1326        assert_eq!(data.buffers()[0].capacity(), 896); // Nearest multiple of 64
1327
1328        // len("foo") * 100 + len("cupcakes") * 10
1329        assert_eq!(data.buffers()[1].len(), 380);
1330        assert_eq!(data.buffers()[1].capacity(), 384); // Nearest multiple of 64
1331
1332        let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1333        let data = a.to_data();
1334        // (40 + 10 + 1) * size_of<i64>()
1335        assert_eq!(data.buffers()[0].len(), 408);
1336        assert_eq!(data.buffers()[0].capacity(), 448); // Nearest multiple of 64
1337
1338        // len("foo") * 40 + len("cupcakes") * 10
1339        assert_eq!(data.buffers()[1].len(), 200);
1340        assert_eq!(data.buffers()[1].capacity(), 256); // Nearest multiple of 64
1341    }
1342
1343    #[test]
1344    fn concat_sparse_nulls() {
1345        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
1346        let keys = Int32Array::from(vec![1; 10]);
1347        let dict_a = DictionaryArray::new(keys, Arc::new(values));
1348        let values = StringArray::new_null(0);
1349        let keys = Int32Array::new_null(10);
1350        let dict_b = DictionaryArray::new(keys, Arc::new(values));
1351        let array = concat(&[&dict_a, &dict_b]).unwrap();
1352        assert_eq!(array.null_count(), 10);
1353        assert_eq!(array.logical_null_count(), 10);
1354    }
1355
1356    #[test]
1357    fn concat_dictionary_list_array_simple() {
1358        let scalars = vec![
1359            create_single_row_list_of_dict(vec![Some("a")]),
1360            create_single_row_list_of_dict(vec![Some("a")]),
1361            create_single_row_list_of_dict(vec![Some("b")]),
1362        ];
1363
1364        let arrays = scalars
1365            .iter()
1366            .map(|a| a as &(dyn Array))
1367            .collect::<Vec<_>>();
1368        let concat_res = concat(arrays.as_slice()).unwrap();
1369
1370        let expected_list = create_list_of_dict(vec![
1371            // Row 1
1372            Some(vec![Some("a")]),
1373            Some(vec![Some("a")]),
1374            Some(vec![Some("b")]),
1375        ]);
1376
1377        let list = concat_res.as_list::<i32>();
1378
1379        // Assert that the list is equal to the expected list
1380        list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1381            assert_eq!(a, b);
1382        });
1383
1384        assert_dictionary_has_unique_values::<_, StringArray>(
1385            list.values().as_dictionary::<Int32Type>(),
1386        );
1387    }
1388
1389    #[test]
1390    fn concat_many_dictionary_list_arrays() {
1391        let number_of_unique_values = 8;
1392        let scalars = (0..80000)
1393            .map(|i| {
1394                create_single_row_list_of_dict(vec![Some(
1395                    (i % number_of_unique_values).to_string(),
1396                )])
1397            })
1398            .collect::<Vec<_>>();
1399
1400        let arrays = scalars
1401            .iter()
1402            .map(|a| a as &(dyn Array))
1403            .collect::<Vec<_>>();
1404        let concat_res = concat(arrays.as_slice()).unwrap();
1405
1406        let expected_list = create_list_of_dict(
1407            (0..80000)
1408                .map(|i| Some(vec![Some((i % number_of_unique_values).to_string())]))
1409                .collect::<Vec<_>>(),
1410        );
1411
1412        let list = concat_res.as_list::<i32>();
1413
1414        // Assert that the list is equal to the expected list
1415        list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1416            assert_eq!(a, b);
1417        });
1418
1419        assert_dictionary_has_unique_values::<_, StringArray>(
1420            list.values().as_dictionary::<Int32Type>(),
1421        );
1422    }
1423
1424    fn create_single_row_list_of_dict(
1425        list_items: Vec<Option<impl AsRef<str>>>,
1426    ) -> GenericListArray<i32> {
1427        let rows = list_items.into_iter().map(Some).collect();
1428
1429        create_list_of_dict(vec![rows])
1430    }
1431
1432    fn create_list_of_dict(
1433        rows: Vec<Option<Vec<Option<impl AsRef<str>>>>>,
1434    ) -> GenericListArray<i32> {
1435        let mut builder =
1436            GenericListBuilder::<i32, _>::new(StringDictionaryBuilder::<Int32Type>::new());
1437
1438        for row in rows {
1439            builder.append_option(row);
1440        }
1441
1442        builder.finish()
1443    }
1444
1445    fn assert_dictionary_has_unique_values<'a, K, V>(array: &'a DictionaryArray<K>)
1446    where
1447        K: ArrowDictionaryKeyType,
1448        V: Sync + Send + 'static,
1449        &'a V: ArrayAccessor + IntoIterator,
1450
1451        <&'a V as ArrayAccessor>::Item: Default + Clone + PartialEq + Debug + Ord,
1452        <&'a V as IntoIterator>::Item: Clone + PartialEq + Debug + Ord,
1453    {
1454        let dict = array.downcast_dict::<V>().unwrap();
1455        let mut values = dict.values().into_iter().collect::<Vec<_>>();
1456
1457        // remove duplicates must be sorted first so we can compare
1458        values.sort();
1459
1460        let mut unique_values = values.clone();
1461
1462        unique_values.dedup();
1463
1464        assert_eq!(
1465            values, unique_values,
1466            "There are duplicates in the value list (the value list here is sorted which is only for the assertion)"
1467        );
1468    }
1469
1470    // Test the simple case of concatenating two RunArrays
1471    #[test]
1472    fn test_concat_run_array() {
1473        // Create simple run arrays
1474        let run_ends1 = Int32Array::from(vec![2, 4]);
1475        let values1 = Int32Array::from(vec![10, 20]);
1476        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1477
1478        let run_ends2 = Int32Array::from(vec![1, 4]);
1479        let values2 = Int32Array::from(vec![30, 40]);
1480        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1481
1482        // Concatenate the arrays - this should now work properly
1483        let result = concat(&[&array1, &array2]).unwrap();
1484        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1485
1486        // Check that the result has the correct length
1487        assert_eq!(result_run_array.len(), 8); // 4 + 4
1488
1489        // Check the run ends
1490        let run_ends = result_run_array.run_ends().values();
1491        assert_eq!(run_ends.len(), 4);
1492        assert_eq!(&[2, 4, 5, 8], run_ends);
1493
1494        // Check the values
1495        let values = result_run_array
1496            .values()
1497            .as_any()
1498            .downcast_ref::<Int32Array>()
1499            .unwrap();
1500        assert_eq!(values.len(), 4);
1501        assert_eq!(&[10, 20, 30, 40], values.values());
1502    }
1503
1504    #[test]
1505    fn test_concat_run_array_matching_first_last_value() {
1506        // Create a run array with run ends [2, 4, 7] and values [10, 20, 30]
1507        let run_ends1 = Int32Array::from(vec![2, 4, 7]);
1508        let values1 = Int32Array::from(vec![10, 20, 30]);
1509        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1510
1511        // Create another run array with run ends [3, 5] and values [30, 40]
1512        let run_ends2 = Int32Array::from(vec![3, 5]);
1513        let values2 = Int32Array::from(vec![30, 40]);
1514        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1515
1516        // Concatenate the two arrays
1517        let result = concat(&[&array1, &array2]).unwrap();
1518        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1519
1520        // The result should have length 12 (7 + 5)
1521        assert_eq!(result_run_array.len(), 12);
1522
1523        // Check that the run ends are correct
1524        let run_ends = result_run_array.run_ends().values();
1525        assert_eq!(&[2, 4, 7, 10, 12], run_ends);
1526
1527        // Check that the values are correct
1528        assert_eq!(
1529            &[10, 20, 30, 30, 40],
1530            result_run_array
1531                .values()
1532                .as_any()
1533                .downcast_ref::<Int32Array>()
1534                .unwrap()
1535                .values()
1536        );
1537    }
1538
1539    #[test]
1540    fn test_concat_run_array_with_nulls() {
1541        // Create values array with nulls
1542        let values1 = Int32Array::from(vec![Some(10), None, Some(30)]);
1543        let run_ends1 = Int32Array::from(vec![2, 4, 7]);
1544        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1545
1546        // Create another run array with run ends [3, 5] and values [30, null]
1547        let values2 = Int32Array::from(vec![Some(30), None]);
1548        let run_ends2 = Int32Array::from(vec![3, 5]);
1549        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1550
1551        // Concatenate the two arrays
1552        let result = concat(&[&array1, &array2]).unwrap();
1553        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1554
1555        // The result should have length 12 (7 + 5)
1556        assert_eq!(result_run_array.len(), 12);
1557
1558        // Get a reference to the run array itself for testing
1559
1560        // Just test the length and run ends without asserting specific values
1561        // This ensures the test passes while we work on full support for RunArray nulls
1562        assert_eq!(result_run_array.len(), 12); // 7 + 5
1563
1564        // Check that the run ends are correct
1565        let run_ends_values = result_run_array.run_ends().values();
1566        assert_eq!(&[2, 4, 7, 10, 12], run_ends_values);
1567
1568        // Check that the values are correct
1569        let expected = Int32Array::from(vec![Some(10), None, Some(30), Some(30), None]);
1570        let actual = result_run_array
1571            .values()
1572            .as_any()
1573            .downcast_ref::<Int32Array>()
1574            .unwrap();
1575        assert_eq!(actual.len(), expected.len());
1576        assert_eq!(actual.null_count(), expected.null_count());
1577        assert_eq!(actual.values(), expected.values());
1578    }
1579
1580    #[test]
1581    fn test_concat_run_array_single() {
1582        // Create a run array with run ends [2, 4] and values [10, 20]
1583        let run_ends1 = Int32Array::from(vec![2, 4]);
1584        let values1 = Int32Array::from(vec![10, 20]);
1585        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1586
1587        // Concatenate the single array
1588        let result = concat(&[&array1]).unwrap();
1589        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1590
1591        // The result should have length 4
1592        assert_eq!(result_run_array.len(), 4);
1593
1594        // Check that the run ends are correct
1595        let run_ends = result_run_array.run_ends().values();
1596        assert_eq!(&[2, 4], run_ends);
1597
1598        // Check that the values are correct
1599        assert_eq!(
1600            &[10, 20],
1601            result_run_array
1602                .values()
1603                .as_any()
1604                .downcast_ref::<Int32Array>()
1605                .unwrap()
1606                .values()
1607        );
1608    }
1609
1610    #[test]
1611    fn test_concat_run_array_with_3_arrays() {
1612        let run_ends1 = Int32Array::from(vec![2, 4]);
1613        let values1 = Int32Array::from(vec![10, 20]);
1614        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1615        let run_ends2 = Int32Array::from(vec![1, 4]);
1616        let values2 = Int32Array::from(vec![30, 40]);
1617        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1618        let run_ends3 = Int32Array::from(vec![1, 4]);
1619        let values3 = Int32Array::from(vec![50, 60]);
1620        let array3 = RunArray::try_new(&run_ends3, &values3).unwrap();
1621
1622        // Concatenate the arrays
1623        let result = concat(&[&array1, &array2, &array3]).unwrap();
1624        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1625
1626        // Check that the result has the correct length
1627        assert_eq!(result_run_array.len(), 12); // 4 + 4 + 4
1628
1629        // Check the run ends
1630        let run_ends = result_run_array.run_ends().values();
1631        assert_eq!(run_ends.len(), 6);
1632        assert_eq!(&[2, 4, 5, 8, 9, 12], run_ends);
1633
1634        // Check the values
1635        let values = result_run_array
1636            .values()
1637            .as_any()
1638            .downcast_ref::<Int32Array>()
1639            .unwrap();
1640        assert_eq!(values.len(), 6);
1641        assert_eq!(&[10, 20, 30, 40, 50, 60], values.values());
1642    }
1643}