Skip to main content

arrow_select/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines concat kernel for `ArrayRef`
19//!
20//! Example:
21//!
22//! ```
23//! use arrow_array::{ArrayRef, StringArray};
24//! use arrow_select::concat::concat;
25//!
26//! let arr = concat(&[
27//!     &StringArray::from(vec!["hello", "world"]),
28//!     &StringArray::from(vec!["!"]),
29//! ]).unwrap();
30//! assert_eq!(arr.len(), 3);
31//! ```
32
33use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
34use arrow_array::builder::{
35    BooleanBuilder, GenericByteBuilder, GenericByteViewBuilder, PrimitiveBuilder,
36};
37use arrow_array::cast::AsArray;
38use arrow_array::types::*;
39use arrow_array::*;
40use arrow_buffer::{
41    ArrowNativeType, BooleanBufferBuilder, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer,
42};
43use arrow_data::ArrayDataBuilder;
44use arrow_data::transform::{Capacities, MutableArrayData};
45use arrow_schema::{ArrowError, DataType, FieldRef, Fields, SchemaRef};
46use std::{collections::HashSet, ops::Add, sync::Arc};
47
48fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
49    let mut item_capacity = 0;
50    let mut bytes_capacity = 0;
51    for array in arrays {
52        let a = array.as_bytes::<T>();
53
54        // Guaranteed to always have at least one element
55        let offsets = a.value_offsets();
56        bytes_capacity += offsets[offsets.len() - 1].as_usize() - offsets[0].as_usize();
57        item_capacity += a.len()
58    }
59
60    Capacities::Binary(item_capacity, Some(bytes_capacity))
61}
62
63fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
64    if let DataType::FixedSizeList(f, _) = data_type {
65        let item_capacity = arrays.iter().map(|a| a.len()).sum();
66        let child_data_type = f.data_type();
67        match child_data_type {
68            // These types should match the types that `get_capacity`
69            // has special handling for.
70            DataType::Utf8
71            | DataType::LargeUtf8
72            | DataType::Binary
73            | DataType::LargeBinary
74            | DataType::FixedSizeList(_, _) => {
75                let values: Vec<&dyn arrow_array::Array> = arrays
76                    .iter()
77                    .map(|a| a.as_fixed_size_list().values().as_ref())
78                    .collect();
79                Capacities::List(
80                    item_capacity,
81                    Some(Box::new(get_capacity(&values, child_data_type))),
82                )
83            }
84            _ => Capacities::Array(item_capacity),
85        }
86    } else {
87        unreachable!("illegal data type for fixed size list")
88    }
89}
90
91fn concat_byte_view<B: ByteViewType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
92    let mut builder =
93        GenericByteViewBuilder::<B>::with_capacity(arrays.iter().map(|a| a.len()).sum());
94    for &array in arrays.iter() {
95        builder.append_array(array.as_byte_view());
96    }
97    Ok(Arc::new(builder.finish()))
98}
99
100fn concat_dictionaries<K: ArrowDictionaryKeyType>(
101    arrays: &[&dyn Array],
102) -> Result<ArrayRef, ArrowError> {
103    let mut output_len = 0;
104    let dictionaries: Vec<_> = arrays
105        .iter()
106        .map(|x| x.as_dictionary::<K>())
107        .inspect(|d| output_len += d.len())
108        .collect();
109
110    if !should_merge_dictionary_values::<K>(&dictionaries, output_len).0 {
111        return concat_fallback(arrays, Capacities::Array(output_len));
112    }
113
114    let merged = merge_dictionary_values(&dictionaries, None)?;
115
116    // Recompute keys
117    let mut key_values = Vec::with_capacity(output_len);
118
119    let mut has_nulls = false;
120    for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) {
121        has_nulls |= d.null_count() != 0;
122        for key in d.keys().values() {
123            // Use get to safely handle nulls
124            key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default())
125        }
126    }
127
128    let nulls = has_nulls.then(|| {
129        let mut nulls = BooleanBufferBuilder::new(output_len);
130        for d in &dictionaries {
131            match d.nulls() {
132                Some(n) => nulls.append_buffer(n.inner()),
133                None => nulls.append_n(d.len(), true),
134            }
135        }
136        NullBuffer::new(nulls.finish())
137    });
138
139    let keys = PrimitiveArray::<K>::try_new(key_values.into(), nulls)?;
140    // Sanity check
141    assert_eq!(keys.len(), output_len);
142
143    let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };
144    Ok(Arc::new(array))
145}
146
147fn concat_lists<OffsetSize: OffsetSizeTrait>(
148    arrays: &[&dyn Array],
149    field: &FieldRef,
150) -> Result<ArrayRef, ArrowError> {
151    let mut output_len = 0;
152    let mut list_has_nulls = false;
153    let mut list_has_slices = false;
154
155    let lists = arrays
156        .iter()
157        .map(|x| x.as_list::<OffsetSize>())
158        .inspect(|l| {
159            output_len += l.len();
160            list_has_nulls |= l.null_count() != 0;
161            list_has_slices |= l.offsets()[0] > OffsetSize::zero()
162                || l.offsets().last().unwrap().as_usize() < l.values().len();
163        })
164        .collect::<Vec<_>>();
165
166    let lists_nulls = list_has_nulls.then(|| {
167        let mut nulls = BooleanBufferBuilder::new(output_len);
168        for l in &lists {
169            match l.nulls() {
170                Some(n) => nulls.append_buffer(n.inner()),
171                None => nulls.append_n(l.len(), true),
172            }
173        }
174        NullBuffer::new(nulls.finish())
175    });
176
177    // If any of the lists have slices, we need to slice the values
178    // to ensure that the offsets are correct
179    let mut sliced_values;
180    let values: Vec<&dyn Array> = if list_has_slices {
181        sliced_values = Vec::with_capacity(lists.len());
182        for l in &lists {
183            // if the first offset is non-zero, we need to slice the values so when
184            // we concatenate them below only the relevant values are included
185            let offsets = l.offsets();
186            let start_offset = offsets[0].as_usize();
187            let end_offset = offsets.last().unwrap().as_usize();
188            sliced_values.push(l.values().slice(start_offset, end_offset - start_offset));
189        }
190        sliced_values.iter().map(|a| a.as_ref()).collect()
191    } else {
192        lists.iter().map(|x| x.values().as_ref()).collect()
193    };
194
195    let concatenated_values = concat(values.as_slice())?;
196
197    // Merge value offsets from the lists
198    let value_offset_buffer =
199        OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths()));
200
201    let array = GenericListArray::<OffsetSize>::try_new(
202        Arc::clone(field),
203        value_offset_buffer,
204        concatenated_values,
205        lists_nulls,
206    )?;
207
208    Ok(Arc::new(array))
209}
210
211fn concat_maps(
212    arrays: &[&dyn Array],
213    field: &FieldRef,
214    ordered: bool,
215) -> Result<ArrayRef, ArrowError> {
216    let mut output_len = 0;
217    let mut map_has_nulls = false;
218    let mut map_has_slices = false;
219
220    let maps = arrays
221        .iter()
222        .map(|x| x.as_map())
223        .inspect(|m| {
224            output_len += m.len();
225            map_has_nulls |= m.null_count() != 0;
226            map_has_slices |=
227                m.offsets()[0] > 0 || m.offsets().last().unwrap().as_usize() < m.entries().len();
228        })
229        .collect::<Vec<_>>();
230
231    let map_nulls = map_has_nulls.then(|| {
232        let mut nulls = BooleanBufferBuilder::new(output_len);
233        for m in &maps {
234            match m.nulls() {
235                Some(n) => nulls.append_buffer(n.inner()),
236                None => nulls.append_n(m.len(), true),
237            }
238        }
239        NullBuffer::new(nulls.finish())
240    });
241
242    // If any of the maps have slices, we need to slice the entries
243    // to ensure that the offsets are correct
244    let mut sliced_entries: Vec<ArrayRef>;
245    let entries: Vec<&dyn Array> = if map_has_slices {
246        sliced_entries = Vec::with_capacity(maps.len());
247        for m in &maps {
248            let offsets = m.offsets();
249            let start_offset = offsets[0].as_usize();
250            let end_offset = offsets.last().unwrap().as_usize();
251            let entries_arr: &dyn Array = m.entries();
252            sliced_entries.push(entries_arr.slice(start_offset, end_offset - start_offset));
253        }
254        sliced_entries.iter().map(|a| a.as_ref()).collect()
255    } else {
256        maps.iter().map(|m| m.entries() as &dyn Array).collect()
257    };
258
259    let concatenated_entries = concat(entries.as_slice())?;
260
261    // Merge value offsets from the maps
262    let value_offset_buffer =
263        OffsetBuffer::<i32>::from_lengths(maps.iter().flat_map(|m| m.offsets().lengths()));
264
265    let array = MapArray::try_new(
266        Arc::clone(field),
267        value_offset_buffer,
268        // Safety: Map entries are always StructArrays, so this downcast is guaranteed to succeed
269        concatenated_entries.as_struct().clone(),
270        map_nulls,
271        ordered,
272    )?;
273
274    Ok(Arc::new(array))
275}
276
277fn concat_list_view<OffsetSize: OffsetSizeTrait>(
278    arrays: &[&dyn Array],
279    field: &FieldRef,
280) -> Result<ArrayRef, ArrowError> {
281    let mut output_len = 0;
282    let mut list_has_nulls = false;
283
284    let lists = arrays
285        .iter()
286        .map(|x| x.as_list_view::<OffsetSize>())
287        .inspect(|l| {
288            output_len += l.len();
289            list_has_nulls |= l.null_count() != 0;
290        })
291        .collect::<Vec<_>>();
292
293    let lists_nulls = list_has_nulls.then(|| {
294        let mut nulls = BooleanBufferBuilder::new(output_len);
295        for l in &lists {
296            match l.nulls() {
297                Some(n) => nulls.append_buffer(n.inner()),
298                None => nulls.append_n(l.len(), true),
299            }
300        }
301        NullBuffer::new(nulls.finish())
302    });
303
304    let values: Vec<&dyn Array> = lists.iter().map(|l| l.values().as_ref()).collect();
305
306    let concatenated_values = concat(values.as_slice())?;
307
308    let sizes: ScalarBuffer<OffsetSize> = lists.iter().flat_map(|x| x.sizes()).copied().collect();
309
310    let mut offsets = MutableBuffer::with_capacity(lists.iter().map(|l| l.offsets().len()).sum());
311    let mut global_offset = OffsetSize::zero();
312    for l in lists.iter() {
313        for &offset in l.offsets() {
314            offsets.push(offset + global_offset);
315        }
316
317        // advance the offsets
318        global_offset += OffsetSize::from_usize(l.values().len()).unwrap();
319    }
320
321    let offsets = ScalarBuffer::from(offsets);
322
323    let array = GenericListViewArray::try_new(
324        field.clone(),
325        offsets,
326        sizes,
327        concatenated_values,
328        lists_nulls,
329    )?;
330
331    Ok(Arc::new(array))
332}
333
334fn concat_primitives<T: ArrowPrimitiveType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
335    let mut builder = PrimitiveBuilder::<T>::with_capacity(arrays.iter().map(|a| a.len()).sum())
336        .with_data_type(arrays[0].data_type().clone());
337
338    for array in arrays {
339        builder.append_array(array.as_primitive());
340    }
341
342    Ok(Arc::new(builder.finish()))
343}
344
345fn concat_boolean(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
346    let mut builder = BooleanBuilder::with_capacity(arrays.iter().map(|a| a.len()).sum());
347
348    for array in arrays {
349        builder.append_array(array.as_boolean());
350    }
351
352    Ok(Arc::new(builder.finish()))
353}
354
355fn concat_bytes<T: ByteArrayType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
356    let (item_capacity, bytes_capacity) = match binary_capacity::<T>(arrays) {
357        Capacities::Binary(item_capacity, Some(bytes_capacity)) => (item_capacity, bytes_capacity),
358        _ => unreachable!(),
359    };
360
361    let mut builder = GenericByteBuilder::<T>::with_capacity(item_capacity, bytes_capacity);
362
363    for array in arrays {
364        builder.append_array(array.as_bytes::<T>())?;
365    }
366
367    Ok(Arc::new(builder.finish()))
368}
369
370fn concat_structs(arrays: &[&dyn Array], fields: &Fields) -> Result<ArrayRef, ArrowError> {
371    let mut len = 0;
372    let mut has_nulls = false;
373    let structs = arrays
374        .iter()
375        .map(|a| {
376            len += a.len();
377            has_nulls |= a.null_count() > 0;
378            a.as_struct()
379        })
380        .collect::<Vec<_>>();
381
382    let nulls = has_nulls.then(|| {
383        let mut b = BooleanBufferBuilder::new(len);
384        for s in &structs {
385            match s.nulls() {
386                Some(n) => b.append_buffer(n.inner()),
387                None => b.append_n(s.len(), true),
388            }
389        }
390        NullBuffer::new(b.finish())
391    });
392
393    let column_concat_result = (0..fields.len())
394        .map(|i| {
395            let extracted_cols = structs
396                .iter()
397                .map(|s| s.column(i).as_ref())
398                .collect::<Vec<_>>();
399            concat(&extracted_cols)
400        })
401        .collect::<Result<Vec<_>, ArrowError>>()?;
402
403    Ok(Arc::new(StructArray::try_new_with_length(
404        fields.clone(),
405        column_concat_result,
406        nulls,
407        len,
408    )?))
409}
410
411/// Concatenate multiple RunArray instances into a single RunArray.
412///
413/// This function handles the special case of concatenating RunArrays by:
414/// 1. Collecting all run ends and values from input arrays
415/// 2. Adjusting run ends to account for the length of previous arrays
416/// 3. Creating a new RunArray with the combined data
417fn concat_run_arrays<R: RunEndIndexType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError>
418where
419    R::Native: Add<Output = R::Native>,
420{
421    let run_arrays: Vec<_> = arrays
422        .iter()
423        .map(|x| x.as_run::<R>())
424        .filter(|x| !x.run_ends().is_empty())
425        .collect();
426
427    // The run ends need to be adjusted by the sum of the lengths of the previous arrays.
428    let needed_run_end_adjustments = std::iter::once(R::default_value())
429        .chain(
430            run_arrays
431                .iter()
432                .scan(R::default_value(), |acc, run_array| {
433                    *acc = *acc + R::Native::from_usize(run_array.len()).unwrap();
434                    Some(*acc)
435                }),
436        )
437        .collect::<Vec<_>>();
438
439    // This works out nicely to be the total (logical) length of the resulting array.
440    let total_len = needed_run_end_adjustments.last().unwrap().as_usize();
441
442    let run_ends_array =
443        PrimitiveArray::<R>::from_iter_values(run_arrays.iter().enumerate().flat_map(
444            move |(i, run_array)| {
445                let adjustment = needed_run_end_adjustments[i];
446                run_array
447                    .run_ends()
448                    .sliced_values()
449                    .map(move |run_end| run_end + adjustment)
450            },
451        ));
452
453    let values_slices: Vec<ArrayRef> = run_arrays
454        .iter()
455        .map(|run_array| run_array.values_slice())
456        .collect();
457
458    let all_values = concat(&values_slices.iter().map(|x| x.as_ref()).collect::<Vec<_>>())?;
459
460    let builder = ArrayDataBuilder::new(run_arrays[0].data_type().clone())
461        .len(total_len)
462        .child_data(vec![run_ends_array.into_data(), all_values.into_data()]);
463
464    // `build_unchecked` is used to avoid recursive validation of child arrays.
465    let array_data = unsafe { builder.build_unchecked() };
466    array_data.validate_data()?;
467
468    Ok(Arc::<RunArray<R>>::new(array_data.into()))
469}
470
471macro_rules! dict_helper {
472    ($t:ty, $arrays:expr) => {
473        return concat_dictionaries::<$t>($arrays)
474    };
475}
476
477macro_rules! primitive_concat {
478    ($t:ty, $arrays:expr) => {
479        return concat_primitives::<$t>($arrays)
480    };
481}
482
483fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
484    match data_type {
485        DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
486        DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
487        DataType::Binary => binary_capacity::<BinaryType>(arrays),
488        DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
489        DataType::FixedSizeList(_, _) => fixed_size_list_capacity(arrays, data_type),
490        _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
491    }
492}
493
494/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
495pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
496    if arrays.is_empty() {
497        return Err(ArrowError::ComputeError(
498            "concat requires input of at least one array".to_string(),
499        ));
500    } else if arrays.len() == 1 {
501        let array = arrays[0];
502        return Ok(array.slice(0, array.len()));
503    }
504
505    let d = arrays[0].data_type();
506    if arrays.iter().skip(1).any(|array| array.data_type() != d) {
507        // Create error message with up to 10 unique data types in the order they appear
508        let error_message = {
509            // 10 max unique data types to print and another 1 to know if there are more
510            let mut unique_data_types = HashSet::with_capacity(11);
511
512            let mut error_message =
513                format!("It is not possible to concatenate arrays of different data types ({d}");
514            unique_data_types.insert(d);
515
516            for array in arrays {
517                let is_unique = unique_data_types.insert(array.data_type());
518
519                if unique_data_types.len() == 11 {
520                    error_message.push_str(", ...");
521                    break;
522                }
523
524                if is_unique {
525                    error_message.push_str(", ");
526                    error_message.push_str(&array.data_type().to_string());
527                }
528            }
529
530            error_message.push_str(").");
531
532            error_message
533        };
534
535        return Err(ArrowError::InvalidArgumentError(error_message));
536    }
537
538    downcast_primitive! {
539        d => (primitive_concat, arrays),
540        DataType::Boolean => concat_boolean(arrays),
541        DataType::Dictionary(k, _) => {
542            downcast_integer! {
543                k.as_ref() => (dict_helper, arrays),
544                _ => unreachable!("illegal dictionary key type {k}")
545            }
546        }
547        DataType::List(field) => concat_lists::<i32>(arrays, field),
548        DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
549        DataType::ListView(field) => concat_list_view::<i32>(arrays, field),
550        DataType::LargeListView(field) => concat_list_view::<i64>(arrays, field),
551        DataType::Map(field, ordered) => concat_maps(arrays, field, *ordered),
552        DataType::Struct(fields) => concat_structs(arrays, fields),
553        DataType::Utf8 => concat_bytes::<Utf8Type>(arrays),
554        DataType::LargeUtf8 => concat_bytes::<LargeUtf8Type>(arrays),
555        DataType::Binary => concat_bytes::<BinaryType>(arrays),
556        DataType::LargeBinary => concat_bytes::<LargeBinaryType>(arrays),
557        DataType::RunEndEncoded(r, _) => {
558            // Handle RunEndEncoded arrays with special concat function
559            // We need to downcast based on the run end type
560            match r.data_type() {
561                DataType::Int16 => concat_run_arrays::<Int16Type>(arrays),
562                DataType::Int32 => concat_run_arrays::<Int32Type>(arrays),
563                DataType::Int64 => concat_run_arrays::<Int64Type>(arrays),
564                _ => unreachable!("Unsupported run end index type: {r:?}"),
565            }
566        }
567        DataType::Utf8View => concat_byte_view::<StringViewType>(arrays),
568        DataType::BinaryView => concat_byte_view::<BinaryViewType>(arrays),
569        _ => {
570            let capacity = get_capacity(arrays, d);
571            concat_fallback(arrays, capacity)
572        }
573    }
574}
575
576/// Concatenates arrays using MutableArrayData
577///
578/// This will naively concatenate dictionaries
579fn concat_fallback(arrays: &[&dyn Array], capacity: Capacities) -> Result<ArrayRef, ArrowError> {
580    let array_data: Vec<_> = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
581    let array_data = array_data.iter().collect();
582    let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity);
583
584    for (i, a) in arrays.iter().enumerate() {
585        mutable.try_extend(i, 0, a.len())?
586    }
587
588    Ok(make_array(mutable.freeze()))
589}
590
591/// Concatenates `batches` together into a single [`RecordBatch`].
592///
593/// The output batch has the specified `schemas`; The schema of the
594/// input are ignored.
595///
596/// # Notes
597///
598/// - Callers should budget for peak memory use to approach 2x the input
599///   size, as the input batches and output arrays co-exist during construction.
600/// - Arrays with `i32` offsets, such as `StringArray` and `BinaryArray`, only
601///   support up to ~2GiB of payloads. Concatenating large arrays of these types
602///   can cause offset overflows.
603///
604/// # Errors
605///
606/// Returns an error if the types of underlying arrays are different.
607pub fn concat_batches<'a>(
608    schema: &SchemaRef,
609    input_batches: impl IntoIterator<Item = &'a RecordBatch>,
610) -> Result<RecordBatch, ArrowError> {
611    // When schema is empty, sum the number of the rows of all batches
612    if schema.fields().is_empty() {
613        let num_rows: usize = input_batches.into_iter().map(RecordBatch::num_rows).sum();
614        let mut options = RecordBatchOptions::default();
615        options.row_count = Some(num_rows);
616        return RecordBatch::try_new_with_options(schema.clone(), vec![], &options);
617    }
618
619    let batches: Vec<&RecordBatch> = input_batches.into_iter().collect();
620    if batches.is_empty() {
621        return Ok(RecordBatch::new_empty(schema.clone()));
622    }
623    let field_num = schema.fields().len();
624    let mut arrays = Vec::with_capacity(field_num);
625    for i in 0..field_num {
626        let array = concat(
627            &batches
628                .iter()
629                .map(|batch| batch.column(i).as_ref())
630                .collect::<Vec<_>>(),
631        )?;
632        arrays.push(array);
633    }
634    RecordBatch::try_new(schema.clone(), arrays)
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use arrow_array::builder::{
641        GenericListBuilder, Int32Builder as Int32ArrayBuilder, Int64Builder, ListViewBuilder,
642        MapBuilder, StringBuilder, StringDictionaryBuilder,
643    };
644    use arrow_schema::{Field, Schema};
645    use std::fmt::Debug;
646
647    #[test]
648    fn test_concat_empty_vec() {
649        let re = concat(&[]);
650        assert!(re.is_err());
651    }
652
653    #[test]
654    fn test_concat_batches_no_columns() {
655        // Test concat using empty schema / batches without columns
656        let schema = Arc::new(Schema::empty());
657
658        let mut options = RecordBatchOptions::default();
659        options.row_count = Some(100);
660        let batch = RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
661        // put in 2 batches of 100 rows each
662        let re = concat_batches(&schema, &[batch.clone(), batch]).unwrap();
663
664        assert_eq!(re.num_rows(), 200);
665    }
666
667    #[test]
668    fn test_concat_one_element_vec() {
669        let arr = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
670            Some(-1),
671            Some(2),
672            None,
673        ])) as ArrayRef;
674        let result = concat(&[arr.as_ref()]).unwrap();
675        assert_eq!(
676            &arr, &result,
677            "concatenating single element array gives back the same result"
678        );
679    }
680
681    #[test]
682    fn test_concat_incompatible_datatypes() {
683        let re = concat(&[
684            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
685            // 2 string to make sure we only mention unique types
686            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
687            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
688            // Another type to make sure we are showing all the incompatible types
689            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
690        ]);
691
692        assert_eq!(
693            re.unwrap_err().to_string(),
694            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32)."
695        );
696    }
697
698    #[test]
699    fn test_concat_10_incompatible_datatypes_should_include_all_of_them() {
700        let re = concat(&[
701            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
702            // 2 string to make sure we only mention unique types
703            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
704            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
705            // Another type to make sure we are showing all the incompatible types
706            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
707            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
708            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
709            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
710            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
711            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
712            // Non unique
713            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
714            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
715            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
716        ]);
717
718        assert_eq!(
719            re.unwrap_err().to_string(),
720            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32)."
721        );
722    }
723
724    #[test]
725    fn test_concat_11_incompatible_datatypes_should_only_include_10() {
726        let re = concat(&[
727            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
728            // 2 string to make sure we only mention unique types
729            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
730            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
731            // Another type to make sure we are showing all the incompatible types
732            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
733            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
734            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
735            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
736            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
737            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
738            // Non unique
739            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
740            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
741            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
742            &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
743        ]);
744
745        assert_eq!(
746            re.unwrap_err().to_string(),
747            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...)."
748        );
749    }
750
751    #[test]
752    fn test_concat_13_incompatible_datatypes_should_not_include_all_of_them() {
753        let re = concat(&[
754            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
755            // 2 string to make sure we only mention unique types
756            &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
757            &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
758            // Another type to make sure we are showing all the incompatible types
759            &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
760            &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
761            &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
762            &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
763            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
764            &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
765            // Non unique
766            &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
767            &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
768            &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
769            &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
770            &PrimitiveArray::<Float16Type>::new_null(3),
771            &BooleanArray::from(vec![Some(true), Some(false), None]),
772        ]);
773
774        assert_eq!(
775            re.unwrap_err().to_string(),
776            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...)."
777        );
778    }
779
780    #[test]
781    fn test_concat_string_arrays() {
782        let arr = concat(&[
783            &StringArray::from(vec!["hello", "world"]),
784            &StringArray::from(vec!["2", "3", "4"]),
785            &StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]),
786        ])
787        .unwrap();
788
789        let expected_output = Arc::new(StringArray::from(vec![
790            Some("hello"),
791            Some("world"),
792            Some("2"),
793            Some("3"),
794            Some("4"),
795            Some("foo"),
796            Some("bar"),
797            None,
798            Some("baz"),
799        ])) as ArrayRef;
800
801        assert_eq!(&arr, &expected_output);
802    }
803
804    #[test]
805    fn test_concat_string_view_arrays() {
806        let arr = concat(&[
807            &StringViewArray::from(vec!["helloxxxxxxxxxxa", "world____________"]),
808            &StringViewArray::from(vec!["helloxxxxxxxxxxy", "3", "4"]),
809            &StringViewArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]),
810        ])
811        .unwrap();
812
813        let expected_output = Arc::new(StringViewArray::from(vec![
814            Some("helloxxxxxxxxxxa"),
815            Some("world____________"),
816            Some("helloxxxxxxxxxxy"),
817            Some("3"),
818            Some("4"),
819            Some("foo"),
820            Some("bar"),
821            None,
822            Some("baz"),
823        ])) as ArrayRef;
824
825        assert_eq!(&arr, &expected_output);
826    }
827
828    #[test]
829    fn test_concat_primitive_arrays() {
830        let arr = concat(&[
831            &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None]),
832            &PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None]),
833            &PrimitiveArray::<Int64Type>::from(vec![Some(256), Some(512), Some(1024)]),
834        ])
835        .unwrap();
836
837        let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
838            Some(-1),
839            Some(-1),
840            Some(2),
841            None,
842            None,
843            Some(101),
844            Some(102),
845            Some(103),
846            None,
847            Some(256),
848            Some(512),
849            Some(1024),
850        ])) as ArrayRef;
851
852        assert_eq!(&arr, &expected_output);
853    }
854
855    #[test]
856    fn test_concat_primitive_array_slices() {
857        let input_1 =
858            PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None])
859                .slice(1, 3);
860
861        let input_2 =
862            PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None])
863                .slice(1, 3);
864        let arr = concat(&[&input_1, &input_2]).unwrap();
865
866        let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
867            Some(-1),
868            Some(2),
869            None,
870            Some(102),
871            Some(103),
872            None,
873        ])) as ArrayRef;
874
875        assert_eq!(&arr, &expected_output);
876    }
877
878    #[test]
879    fn test_concat_boolean_primitive_arrays() {
880        let arr = concat(&[
881            &BooleanArray::from(vec![
882                Some(true),
883                Some(true),
884                Some(false),
885                None,
886                None,
887                Some(false),
888            ]),
889            &BooleanArray::from(vec![None, Some(false), Some(true), Some(false)]),
890        ])
891        .unwrap();
892
893        let expected_output = Arc::new(BooleanArray::from(vec![
894            Some(true),
895            Some(true),
896            Some(false),
897            None,
898            None,
899            Some(false),
900            None,
901            Some(false),
902            Some(true),
903            Some(false),
904        ])) as ArrayRef;
905
906        assert_eq!(&arr, &expected_output);
907    }
908
909    #[test]
910    fn test_concat_primitive_list_arrays() {
911        let list1 = [
912            Some(vec![Some(-1), Some(-1), Some(2), None, None]),
913            Some(vec![]),
914            None,
915            Some(vec![Some(10)]),
916        ];
917        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
918
919        let list2 = [
920            None,
921            Some(vec![Some(100), None, Some(101)]),
922            Some(vec![Some(102)]),
923        ];
924        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
925
926        let list3 = [Some(vec![Some(1000), Some(1001)])];
927        let list3_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone());
928
929        let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
930
931        let expected = list1.into_iter().chain(list2).chain(list3);
932        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
933
934        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
935    }
936
937    #[test]
938    fn test_concat_primitive_list_arrays_slices() {
939        let list1 = [
940            Some(vec![Some(-1), Some(-1), Some(2), None, None]),
941            Some(vec![]), // In slice
942            None,         // In slice
943            Some(vec![Some(10)]),
944        ];
945        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
946        let list1_array = list1_array.slice(1, 2);
947        let list1_values = list1.into_iter().skip(1).take(2);
948
949        let list2 = [
950            None,
951            Some(vec![Some(100), None, Some(101)]),
952            Some(vec![Some(102)]),
953        ];
954        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
955
956        // verify that this test covers the case when the first offset is non zero
957        assert!(list1_array.offsets()[0].as_usize() > 0);
958        let array_result = concat(&[&list1_array, &list2_array]).unwrap();
959
960        let expected = list1_values.chain(list2);
961        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
962
963        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
964    }
965
966    #[test]
967    fn test_concat_primitive_list_arrays_sliced_lengths() {
968        let list1 = [
969            Some(vec![Some(-1), Some(-1), Some(2), None, None]), // In slice
970            Some(vec![]),                                        // In slice
971            None,                                                // In slice
972            Some(vec![Some(10)]),
973        ];
974        let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
975        let list1_array = list1_array.slice(0, 3); // no offset, but not all values
976        let list1_values = list1.into_iter().take(3);
977
978        let list2 = [
979            None,
980            Some(vec![Some(100), None, Some(101)]),
981            Some(vec![Some(102)]),
982        ];
983        let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
984
985        // verify that this test covers the case when the first offset is zero, but the
986        // last offset doesn't cover the entire array
987        assert_eq!(list1_array.offsets()[0].as_usize(), 0);
988        assert!(list1_array.offsets().last().unwrap().as_usize() < list1_array.values().len());
989        let array_result = concat(&[&list1_array, &list2_array]).unwrap();
990
991        let expected = list1_values.chain(list2);
992        let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
993
994        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
995    }
996
997    #[test]
998    fn test_concat_primitive_fixed_size_list_arrays() {
999        let list1 = [
1000            Some(vec![Some(-1), None]),
1001            None,
1002            Some(vec![Some(10), Some(20)]),
1003        ];
1004        let list1_array =
1005            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone(), 2);
1006
1007        let list2 = [
1008            None,
1009            Some(vec![Some(100), None]),
1010            Some(vec![Some(102), Some(103)]),
1011        ];
1012        let list2_array =
1013            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone(), 2);
1014
1015        let list3 = [Some(vec![Some(1000), Some(1001)])];
1016        let list3_array =
1017            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone(), 2);
1018
1019        let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
1020
1021        let expected = list1.into_iter().chain(list2).chain(list3);
1022        let array_expected =
1023            FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(expected, 2);
1024
1025        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
1026    }
1027
1028    #[test]
1029    fn test_concat_list_view_arrays() {
1030        let list1 = [
1031            Some(vec![Some(-1), None]),
1032            None,
1033            Some(vec![Some(10), Some(20)]),
1034        ];
1035        let mut list1_array = ListViewBuilder::new(Int64Builder::new());
1036        for v in list1.iter() {
1037            list1_array.append_option(v.clone());
1038        }
1039        let list1_array = list1_array.finish();
1040
1041        let list2 = [
1042            None,
1043            Some(vec![Some(100), None]),
1044            Some(vec![Some(102), Some(103)]),
1045        ];
1046        let mut list2_array = ListViewBuilder::new(Int64Builder::new());
1047        for v in list2.iter() {
1048            list2_array.append_option(v.clone());
1049        }
1050        let list2_array = list2_array.finish();
1051
1052        let list3 = [Some(vec![Some(1000), Some(1001)])];
1053        let mut list3_array = ListViewBuilder::new(Int64Builder::new());
1054        for v in list3.iter() {
1055            list3_array.append_option(v.clone());
1056        }
1057        let list3_array = list3_array.finish();
1058
1059        let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
1060
1061        let expected: Vec<_> = list1.into_iter().chain(list2).chain(list3).collect();
1062        let mut array_expected = ListViewBuilder::new(Int64Builder::new());
1063        for v in expected.iter() {
1064            array_expected.append_option(v.clone());
1065        }
1066        let array_expected = array_expected.finish();
1067
1068        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
1069    }
1070
1071    #[test]
1072    fn test_concat_sliced_list_view_arrays() {
1073        let list1 = [
1074            Some(vec![Some(-1), None]),
1075            None,
1076            Some(vec![Some(10), Some(20)]),
1077        ];
1078        let mut list1_array = ListViewBuilder::new(Int64Builder::new());
1079        for v in list1.iter() {
1080            list1_array.append_option(v.clone());
1081        }
1082        let list1_array = list1_array.finish();
1083
1084        let list2 = [
1085            None,
1086            Some(vec![Some(100), None]),
1087            Some(vec![Some(102), Some(103)]),
1088        ];
1089        let mut list2_array = ListViewBuilder::new(Int64Builder::new());
1090        for v in list2.iter() {
1091            list2_array.append_option(v.clone());
1092        }
1093        let list2_array = list2_array.finish();
1094
1095        let list3 = [Some(vec![Some(1000), Some(1001)])];
1096        let mut list3_array = ListViewBuilder::new(Int64Builder::new());
1097        for v in list3.iter() {
1098            list3_array.append_option(v.clone());
1099        }
1100        let list3_array = list3_array.finish();
1101
1102        // Concat sliced arrays.
1103        // ListView slicing will slice the offset/sizes but preserve the original values child.
1104        let array_result = concat(&[
1105            &list1_array.slice(1, 2),
1106            &list2_array.slice(1, 2),
1107            &list3_array.slice(0, 1),
1108        ])
1109        .unwrap();
1110
1111        let expected: Vec<_> = vec![
1112            None,
1113            Some(vec![Some(10), Some(20)]),
1114            Some(vec![Some(100), None]),
1115            Some(vec![Some(102), Some(103)]),
1116            Some(vec![Some(1000), Some(1001)]),
1117        ];
1118        let mut array_expected = ListViewBuilder::new(Int64Builder::new());
1119        for v in expected.iter() {
1120            array_expected.append_option(v.clone());
1121        }
1122        let array_expected = array_expected.finish();
1123
1124        assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
1125    }
1126
1127    #[test]
1128    fn test_concat_struct_arrays() {
1129        let field = Arc::new(Field::new("field", DataType::Int64, true));
1130        let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
1131            Some(-1),
1132            Some(-1),
1133            Some(2),
1134            None,
1135            None,
1136        ]));
1137        let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
1138
1139        let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
1140            Some(101),
1141            Some(102),
1142            Some(103),
1143            None,
1144        ]));
1145        let input_struct_2 = StructArray::from(vec![(field.clone(), input_primitive_2)]);
1146
1147        let input_primitive_3: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
1148            Some(256),
1149            Some(512),
1150            Some(1024),
1151        ]));
1152        let input_struct_3 = StructArray::from(vec![(field, input_primitive_3)]);
1153
1154        let arr = concat(&[&input_struct_1, &input_struct_2, &input_struct_3]).unwrap();
1155
1156        let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
1157            Some(-1),
1158            Some(-1),
1159            Some(2),
1160            None,
1161            None,
1162            Some(101),
1163            Some(102),
1164            Some(103),
1165            None,
1166            Some(256),
1167            Some(512),
1168            Some(1024),
1169        ])) as ArrayRef;
1170
1171        let actual_primitive = arr
1172            .as_any()
1173            .downcast_ref::<StructArray>()
1174            .unwrap()
1175            .column(0);
1176        assert_eq!(actual_primitive, &expected_primitive_output);
1177    }
1178
1179    #[test]
1180    fn test_concat_struct_array_slices() {
1181        let field = Arc::new(Field::new("field", DataType::Int64, true));
1182        let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
1183            Some(-1),
1184            Some(-1),
1185            Some(2),
1186            None,
1187            None,
1188        ]));
1189        let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
1190
1191        let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
1192            Some(101),
1193            Some(102),
1194            Some(103),
1195            None,
1196        ]));
1197        let input_struct_2 = StructArray::from(vec![(field, input_primitive_2)]);
1198
1199        let arr = concat(&[&input_struct_1.slice(1, 3), &input_struct_2.slice(1, 2)]).unwrap();
1200
1201        let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
1202            Some(-1),
1203            Some(2),
1204            None,
1205            Some(102),
1206            Some(103),
1207        ])) as ArrayRef;
1208
1209        let actual_primitive = arr
1210            .as_any()
1211            .downcast_ref::<StructArray>()
1212            .unwrap()
1213            .column(0);
1214        assert_eq!(actual_primitive, &expected_primitive_output);
1215    }
1216
1217    #[test]
1218    fn test_concat_struct_arrays_no_nulls() {
1219        let input_1a = vec![1, 2, 3];
1220        let input_1b = vec!["one", "two", "three"];
1221        let input_2a = vec![4, 5, 6, 7];
1222        let input_2b = vec!["four", "five", "six", "seven"];
1223
1224        let struct_from_primitives = |ints: Vec<i64>, strings: Vec<&str>| {
1225            StructArray::try_from(vec![
1226                ("ints", Arc::new(Int64Array::from(ints)) as _),
1227                ("strings", Arc::new(StringArray::from(strings)) as _),
1228            ])
1229        };
1230
1231        let expected_output = struct_from_primitives(
1232            [input_1a.clone(), input_2a.clone()].concat(),
1233            [input_1b.clone(), input_2b.clone()].concat(),
1234        )
1235        .unwrap();
1236
1237        let input_1 = struct_from_primitives(input_1a, input_1b).unwrap();
1238        let input_2 = struct_from_primitives(input_2a, input_2b).unwrap();
1239
1240        let arr = concat(&[&input_1, &input_2]).unwrap();
1241        let struct_result = arr.as_struct();
1242
1243        assert_eq!(struct_result, &expected_output);
1244        assert_eq!(arr.null_count(), 0);
1245    }
1246
1247    #[test]
1248    fn test_concat_struct_no_fields() {
1249        let input_1 = StructArray::new_empty_fields(10, None);
1250        let input_2 = StructArray::new_empty_fields(10, None);
1251        let arr = concat(&[&input_1, &input_2]).unwrap();
1252
1253        assert_eq!(arr.len(), 20);
1254        assert_eq!(arr.null_count(), 0);
1255
1256        let input1_valid = StructArray::new_empty_fields(10, Some(NullBuffer::new_valid(10)));
1257        let input2_null = StructArray::new_empty_fields(10, Some(NullBuffer::new_null(10)));
1258        let arr = concat(&[&input1_valid, &input2_null]).unwrap();
1259
1260        assert_eq!(arr.len(), 20);
1261        assert_eq!(arr.null_count(), 10);
1262    }
1263
1264    #[test]
1265    fn test_string_array_slices() {
1266        let input_1 = StringArray::from(vec!["hello", "A", "B", "C"]);
1267        let input_2 = StringArray::from(vec!["world", "D", "E", "Z"]);
1268
1269        let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
1270
1271        let expected_output = StringArray::from(vec!["A", "B", "C", "D", "E"]);
1272
1273        let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
1274        assert_eq!(actual_output, &expected_output);
1275    }
1276
1277    #[test]
1278    fn test_string_array_with_null_slices() {
1279        let input_1 = StringArray::from(vec![Some("hello"), None, Some("A"), Some("C")]);
1280        let input_2 = StringArray::from(vec![None, Some("world"), Some("D"), None]);
1281
1282        let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
1283
1284        let expected_output =
1285            StringArray::from(vec![None, Some("A"), Some("C"), Some("world"), Some("D")]);
1286
1287        let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
1288        assert_eq!(actual_output, &expected_output);
1289    }
1290
1291    fn collect_string_dictionary(array: &DictionaryArray<Int32Type>) -> Vec<Option<&str>> {
1292        let concrete = array.downcast_dict::<StringArray>().unwrap();
1293        concrete.into_iter().collect()
1294    }
1295
1296    #[test]
1297    fn test_string_dictionary_array() {
1298        let input_1: DictionaryArray<Int32Type> = vec!["hello", "A", "B", "hello", "hello", "C"]
1299            .into_iter()
1300            .collect();
1301        let input_2: DictionaryArray<Int32Type> = vec!["hello", "E", "E", "hello", "F", "E"]
1302            .into_iter()
1303            .collect();
1304
1305        let expected: Vec<_> = vec![
1306            "hello", "A", "B", "hello", "hello", "C", "hello", "E", "E", "hello", "F", "E",
1307        ]
1308        .into_iter()
1309        .map(Some)
1310        .collect();
1311
1312        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1313        let dictionary = concat.as_dictionary::<Int32Type>();
1314        let actual = collect_string_dictionary(dictionary);
1315        assert_eq!(actual, expected);
1316
1317        // Should have concatenated inputs together
1318        assert_eq!(
1319            dictionary.values().len(),
1320            input_1.values().len() + input_2.values().len(),
1321        )
1322    }
1323
1324    #[test]
1325    fn test_string_dictionary_array_nulls() {
1326        let input_1: DictionaryArray<Int32Type> = vec![Some("foo"), Some("bar"), None, Some("fiz")]
1327            .into_iter()
1328            .collect();
1329        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
1330        let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
1331
1332        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1333        let dictionary = concat.as_dictionary::<Int32Type>();
1334        let actual = collect_string_dictionary(dictionary);
1335        assert_eq!(actual, expected);
1336
1337        // Should have concatenated inputs together
1338        assert_eq!(
1339            dictionary.values().len(),
1340            input_1.values().len() + input_2.values().len(),
1341        )
1342    }
1343
1344    #[test]
1345    fn test_string_dictionary_array_nulls_in_values() {
1346        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
1347        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
1348        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1349
1350        let input_2_keys = Int32Array::from_iter_values([0]);
1351        let input_2_values = StringArray::from(vec![None, Some("hello")]);
1352        let input_2 = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1353
1354        let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
1355
1356        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1357        let dictionary = concat.as_dictionary::<Int32Type>();
1358        let actual = collect_string_dictionary(dictionary);
1359        assert_eq!(actual, expected);
1360    }
1361
1362    #[test]
1363    fn test_string_dictionary_merge() {
1364        let mut builder = StringDictionaryBuilder::<Int32Type>::new();
1365        for i in 0..20 {
1366            builder.append(i.to_string()).unwrap();
1367        }
1368        let input_1 = builder.finish();
1369
1370        let mut builder = StringDictionaryBuilder::<Int32Type>::new();
1371        for i in 0..30 {
1372            builder.append(i.to_string()).unwrap();
1373        }
1374        let input_2 = builder.finish();
1375
1376        let expected: Vec<_> = (0..20).chain(0..30).map(|x| x.to_string()).collect();
1377        let expected: Vec<_> = expected.iter().map(|x| Some(x.as_str())).collect();
1378
1379        let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
1380        let dictionary = concat.as_dictionary::<Int32Type>();
1381        let actual = collect_string_dictionary(dictionary);
1382        assert_eq!(actual, expected);
1383
1384        // Should have merged inputs together
1385        // Not 30 as this is done on a best-effort basis
1386        let values_len = dictionary.values().len();
1387        assert!((30..40).contains(&values_len), "{values_len}")
1388    }
1389
1390    #[test]
1391    fn test_primitive_dictionary_merge() {
1392        // Same value repeated 5 times.
1393        let keys = vec![1; 5];
1394        let values = (10..20).collect::<Vec<_>>();
1395        let dict = DictionaryArray::new(
1396            Int8Array::from(keys.clone()),
1397            Arc::new(Int32Array::from(values.clone())),
1398        );
1399        let other = DictionaryArray::new(
1400            Int8Array::from(keys.clone()),
1401            Arc::new(Int32Array::from(values.clone())),
1402        );
1403
1404        let result_same_dictionary = concat(&[&dict, &dict]).unwrap();
1405        // Verify pointer equality check succeeds, and therefore the
1406        // dictionaries are not merged. A single values buffer should be reused
1407        // in this case.
1408        assert!(
1409            dict.values().to_data().ptr_eq(
1410                &result_same_dictionary
1411                    .as_dictionary::<Int8Type>()
1412                    .values()
1413                    .to_data()
1414            )
1415        );
1416        assert_eq!(
1417            result_same_dictionary
1418                .as_dictionary::<Int8Type>()
1419                .values()
1420                .len(),
1421            values.len(),
1422        );
1423
1424        let result_cloned_dictionary = concat(&[&dict, &other]).unwrap();
1425        // Should have only 1 underlying value since all keys reference it.
1426        assert_eq!(
1427            result_cloned_dictionary
1428                .as_dictionary::<Int8Type>()
1429                .values()
1430                .len(),
1431            1
1432        );
1433    }
1434
1435    #[test]
1436    fn test_concat_string_sizes() {
1437        let a: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
1438        let b: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
1439        let c = LargeStringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
1440        // 150 * 3 = 450
1441        // 150 * 3 = 450
1442        // 3 * 3   = 9
1443        // ------------+
1444        // 909
1445
1446        let arr = concat(&[&a, &b, &c]).unwrap();
1447        assert_eq!(arr.to_data().buffers()[1].capacity(), 909);
1448    }
1449
1450    #[test]
1451    fn test_dictionary_concat_reuse() {
1452        let array: DictionaryArray<Int8Type> = vec!["a", "a", "b", "c"].into_iter().collect();
1453        let copy: DictionaryArray<Int8Type> = array.clone();
1454
1455        // dictionary is "a", "b", "c"
1456        assert_eq!(
1457            array.values(),
1458            &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef)
1459        );
1460        assert_eq!(array.keys(), &Int8Array::from(vec![0, 0, 1, 2]));
1461
1462        // concatenate it with itself
1463        let combined = concat(&[&copy as _, &array as _]).unwrap();
1464        let combined = combined.as_dictionary::<Int8Type>();
1465
1466        assert_eq!(
1467            combined.values(),
1468            &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef),
1469            "Actual: {combined:#?}"
1470        );
1471
1472        assert_eq!(
1473            combined.keys(),
1474            &Int8Array::from(vec![0, 0, 1, 2, 0, 0, 1, 2])
1475        );
1476
1477        // Should have reused the dictionary
1478        assert!(
1479            array
1480                .values()
1481                .to_data()
1482                .ptr_eq(&combined.values().to_data())
1483        );
1484        assert!(copy.values().to_data().ptr_eq(&combined.values().to_data()));
1485
1486        let new: DictionaryArray<Int8Type> = vec!["d"].into_iter().collect();
1487        let combined = concat(&[&copy as _, &array as _, &new as _]).unwrap();
1488        let com = combined.as_dictionary::<Int8Type>();
1489
1490        // Should not have reused the dictionary
1491        assert!(!array.values().to_data().ptr_eq(&com.values().to_data()));
1492        assert!(!copy.values().to_data().ptr_eq(&com.values().to_data()));
1493        assert!(!new.values().to_data().ptr_eq(&com.values().to_data()));
1494    }
1495
1496    #[test]
1497    fn concat_record_batches() {
1498        let schema = Arc::new(Schema::new(vec![
1499            Field::new("a", DataType::Int32, false),
1500            Field::new("b", DataType::Utf8, false),
1501        ]));
1502        let batch1 = RecordBatch::try_new(
1503            schema.clone(),
1504            vec![
1505                Arc::new(Int32Array::from(vec![1, 2])),
1506                Arc::new(StringArray::from(vec!["a", "b"])),
1507            ],
1508        )
1509        .unwrap();
1510        let batch2 = RecordBatch::try_new(
1511            schema.clone(),
1512            vec![
1513                Arc::new(Int32Array::from(vec![3, 4])),
1514                Arc::new(StringArray::from(vec!["c", "d"])),
1515            ],
1516        )
1517        .unwrap();
1518        let new_batch = concat_batches(&schema, [&batch1, &batch2]).unwrap();
1519        assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
1520        assert_eq!(2, new_batch.num_columns());
1521        assert_eq!(4, new_batch.num_rows());
1522        let new_batch_owned = concat_batches(&schema, &[batch1, batch2]).unwrap();
1523        assert_eq!(new_batch_owned.schema().as_ref(), schema.as_ref());
1524        assert_eq!(2, new_batch_owned.num_columns());
1525        assert_eq!(4, new_batch_owned.num_rows());
1526    }
1527
1528    #[test]
1529    fn concat_empty_record_batch() {
1530        let schema = Arc::new(Schema::new(vec![
1531            Field::new("a", DataType::Int32, false),
1532            Field::new("b", DataType::Utf8, false),
1533        ]));
1534        let batch = concat_batches(&schema, []).unwrap();
1535        assert_eq!(batch.schema().as_ref(), schema.as_ref());
1536        assert_eq!(0, batch.num_rows());
1537    }
1538
1539    #[test]
1540    fn concat_record_batches_of_different_schemas_but_compatible_data() {
1541        let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1542        // column names differ
1543        let schema2 = Arc::new(Schema::new(vec![Field::new("c", DataType::Int32, false)]));
1544        let batch1 = RecordBatch::try_new(
1545            schema1.clone(),
1546            vec![Arc::new(Int32Array::from(vec![1, 2]))],
1547        )
1548        .unwrap();
1549        let batch2 =
1550            RecordBatch::try_new(schema2, vec![Arc::new(Int32Array::from(vec![3, 4]))]).unwrap();
1551        // concat_batches simply uses the schema provided
1552        let batch = concat_batches(&schema1, [&batch1, &batch2]).unwrap();
1553        assert_eq!(batch.schema().as_ref(), schema1.as_ref());
1554        assert_eq!(4, batch.num_rows());
1555    }
1556
1557    #[test]
1558    fn concat_record_batches_of_different_schemas_incompatible_data() {
1559        let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1560        // column names differ
1561        let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
1562        let batch1 = RecordBatch::try_new(
1563            schema1.clone(),
1564            vec![Arc::new(Int32Array::from(vec![1, 2]))],
1565        )
1566        .unwrap();
1567        let batch2 = RecordBatch::try_new(
1568            schema2,
1569            vec![Arc::new(StringArray::from(vec!["foo", "bar"]))],
1570        )
1571        .unwrap();
1572
1573        let error = concat_batches(&schema1, [&batch1, &batch2]).unwrap_err();
1574        assert_eq!(
1575            error.to_string(),
1576            "Invalid argument error: It is not possible to concatenate arrays of different data types (Int32, Utf8)."
1577        );
1578    }
1579
1580    #[test]
1581    fn concat_capacity() {
1582        let a = Int32Array::from_iter_values(0..100);
1583        let b = Int32Array::from_iter_values(10..20);
1584        let a = concat(&[&a, &b]).unwrap();
1585        let data = a.to_data();
1586        assert_eq!(data.buffers()[0].len(), 440);
1587        assert_eq!(data.buffers()[0].capacity(), 440);
1588
1589        let a = concat(&[&a.slice(10, 20), &b]).unwrap();
1590        let data = a.to_data();
1591        assert_eq!(data.buffers()[0].len(), 120);
1592        assert_eq!(data.buffers()[0].capacity(), 120);
1593
1594        let a = StringArray::from_iter_values(std::iter::repeat_n("foo", 100));
1595        let b = StringArray::from(vec!["bingo", "bongo", "lorem", ""]);
1596
1597        let a = concat(&[&a, &b]).unwrap();
1598        let data = a.to_data();
1599        // (100 + 4 + 1) * size_of<i32>()
1600        assert_eq!(data.buffers()[0].len(), 420);
1601        assert_eq!(data.buffers()[0].capacity(), 420);
1602
1603        // len("foo") * 100 + len("bingo") + len("bongo") + len("lorem")
1604        assert_eq!(data.buffers()[1].len(), 315);
1605        assert_eq!(data.buffers()[1].capacity(), 315);
1606
1607        let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1608        let data = a.to_data();
1609        // (40 + 4 + 5) * size_of<i32>()
1610        assert_eq!(data.buffers()[0].len(), 180);
1611        assert_eq!(data.buffers()[0].capacity(), 180);
1612
1613        // len("foo") * 40 + len("bingo") + len("bongo") + len("lorem")
1614        assert_eq!(data.buffers()[1].len(), 135);
1615        assert_eq!(data.buffers()[1].capacity(), 135);
1616
1617        let a = LargeBinaryArray::from_iter_values(std::iter::repeat_n(b"foo", 100));
1618        let b = LargeBinaryArray::from_iter_values(std::iter::repeat_n(b"cupcakes", 10));
1619
1620        let a = concat(&[&a, &b]).unwrap();
1621        let data = a.to_data();
1622        // (100 + 10 + 1) * size_of<i64>()
1623        assert_eq!(data.buffers()[0].len(), 888);
1624        assert_eq!(data.buffers()[0].capacity(), 888);
1625
1626        // len("foo") * 100 + len("cupcakes") * 10
1627        assert_eq!(data.buffers()[1].len(), 380);
1628        assert_eq!(data.buffers()[1].capacity(), 380);
1629
1630        let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1631        let data = a.to_data();
1632        // (40 + 10 + 1) * size_of<i64>()
1633        assert_eq!(data.buffers()[0].len(), 408);
1634        assert_eq!(data.buffers()[0].capacity(), 408);
1635
1636        // len("foo") * 40 + len("cupcakes") * 10
1637        assert_eq!(data.buffers()[1].len(), 200);
1638        assert_eq!(data.buffers()[1].capacity(), 200);
1639    }
1640
1641    #[test]
1642    fn concat_sparse_nulls() {
1643        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
1644        let keys = Int32Array::from(vec![1; 10]);
1645        let dict_a = DictionaryArray::new(keys, Arc::new(values));
1646        let values = StringArray::new_null(0);
1647        let keys = Int32Array::new_null(10);
1648        let dict_b = DictionaryArray::new(keys, Arc::new(values));
1649        let array = concat(&[&dict_a, &dict_b]).unwrap();
1650        assert_eq!(array.null_count(), 10);
1651        assert_eq!(array.logical_null_count(), 10);
1652    }
1653
1654    #[test]
1655    fn concat_dictionary_list_array_simple() {
1656        let scalars = [
1657            create_single_row_list_of_dict(vec![Some("a")]),
1658            create_single_row_list_of_dict(vec![Some("a")]),
1659            create_single_row_list_of_dict(vec![Some("b")]),
1660        ];
1661
1662        let arrays = scalars.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
1663        let concat_res = concat(arrays.as_slice()).unwrap();
1664
1665        let expected_list = create_list_of_dict(vec![
1666            // Row 1
1667            Some(vec![Some("a")]),
1668            Some(vec![Some("a")]),
1669            Some(vec![Some("b")]),
1670        ]);
1671
1672        let list = concat_res.as_list::<i32>();
1673
1674        // Assert that the list is equal to the expected list
1675        list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1676            assert_eq!(a, b);
1677        });
1678
1679        assert_dictionary_has_unique_values::<_, StringArray>(
1680            list.values().as_dictionary::<Int32Type>(),
1681        );
1682    }
1683
1684    #[test]
1685    fn concat_many_dictionary_list_arrays() {
1686        let number_of_unique_values = 8;
1687        let scalars = (0..80000)
1688            .map(|i| {
1689                create_single_row_list_of_dict(vec![Some(
1690                    (i % number_of_unique_values).to_string(),
1691                )])
1692            })
1693            .collect::<Vec<_>>();
1694
1695        let arrays = scalars.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
1696        let concat_res = concat(arrays.as_slice()).unwrap();
1697
1698        let expected_list = create_list_of_dict(
1699            (0..80000)
1700                .map(|i| Some(vec![Some((i % number_of_unique_values).to_string())]))
1701                .collect::<Vec<_>>(),
1702        );
1703
1704        let list = concat_res.as_list::<i32>();
1705
1706        // Assert that the list is equal to the expected list
1707        list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1708            assert_eq!(a, b);
1709        });
1710
1711        assert_dictionary_has_unique_values::<_, StringArray>(
1712            list.values().as_dictionary::<Int32Type>(),
1713        );
1714    }
1715
1716    fn create_single_row_list_of_dict(
1717        list_items: Vec<Option<impl AsRef<str>>>,
1718    ) -> GenericListArray<i32> {
1719        let rows = list_items.into_iter().map(Some).collect();
1720
1721        create_list_of_dict(vec![rows])
1722    }
1723
1724    fn create_list_of_dict(
1725        rows: Vec<Option<Vec<Option<impl AsRef<str>>>>>,
1726    ) -> GenericListArray<i32> {
1727        let mut builder =
1728            GenericListBuilder::<i32, _>::new(StringDictionaryBuilder::<Int32Type>::new());
1729
1730        for row in rows {
1731            builder.append_option(row);
1732        }
1733
1734        builder.finish()
1735    }
1736
1737    fn assert_dictionary_has_unique_values<'a, K, V>(array: &'a DictionaryArray<K>)
1738    where
1739        K: ArrowDictionaryKeyType,
1740        V: Sync + Send + 'static,
1741        &'a V: ArrayAccessor + IntoIterator,
1742        <&'a V as ArrayAccessor>::Item: Default + Clone + PartialEq + Debug + Ord,
1743        <&'a V as IntoIterator>::Item: Clone + PartialEq + Debug + Ord,
1744    {
1745        let dict = array.downcast_dict::<V>().unwrap();
1746        let mut values = dict.values().into_iter().collect::<Vec<_>>();
1747
1748        // remove duplicates must be sorted first so we can compare
1749        values.sort();
1750
1751        let mut unique_values = values.clone();
1752
1753        unique_values.dedup();
1754
1755        assert_eq!(
1756            values, unique_values,
1757            "There are duplicates in the value list (the value list here is sorted which is only for the assertion)"
1758        );
1759    }
1760
1761    // Test the simple case of concatenating two RunArrays
1762    #[test]
1763    fn test_concat_run_array() {
1764        // Create simple run arrays
1765        let run_ends1 = Int32Array::from(vec![2, 4]);
1766        let values1 = Int32Array::from(vec![10, 20]);
1767        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1768
1769        let run_ends2 = Int32Array::from(vec![1, 4]);
1770        let values2 = Int32Array::from(vec![30, 40]);
1771        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1772
1773        // Concatenate the arrays - this should now work properly
1774        let result = concat(&[&array1, &array2]).unwrap();
1775        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1776
1777        // Check that the result has the correct length
1778        assert_eq!(result_run_array.len(), 8); // 4 + 4
1779
1780        // Check the run ends
1781        let run_ends = result_run_array.run_ends().values();
1782        assert_eq!(run_ends.len(), 4);
1783        assert_eq!(&[2, 4, 5, 8], run_ends);
1784
1785        // Check the values
1786        let values = result_run_array
1787            .values()
1788            .as_any()
1789            .downcast_ref::<Int32Array>()
1790            .unwrap();
1791        assert_eq!(values.len(), 4);
1792        assert_eq!(&[10, 20, 30, 40], values.values());
1793    }
1794
1795    #[test]
1796    fn test_concat_sliced_run_array() {
1797        // Slicing away first run in both arrays
1798        let run_ends1 = Int32Array::from(vec![2, 4]);
1799        let values1 = Int32Array::from(vec![10, 20]);
1800        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap(); // [10, 10, 20, 20]
1801        let array1 = array1.slice(2, 2); // [20, 20]
1802
1803        let run_ends2 = Int32Array::from(vec![1, 4]);
1804        let values2 = Int32Array::from(vec![30, 40]);
1805        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap(); // [30, 40, 40, 40]
1806        let array2 = array2.slice(1, 3); // [40, 40, 40]
1807
1808        let result = concat(&[&array1, &array2]).unwrap();
1809        let result = result.as_run::<Int32Type>();
1810        let result = result.downcast::<Int32Array>().unwrap();
1811
1812        let expected = vec![20, 20, 40, 40, 40];
1813        let actual = result.into_iter().flatten().collect::<Vec<_>>();
1814        assert_eq!(expected, actual);
1815    }
1816
1817    #[test]
1818    fn test_concat_run_array_matching_first_last_value() {
1819        // Create a run array with run ends [2, 4, 7] and values [10, 20, 30]
1820        let run_ends1 = Int32Array::from(vec![2, 4, 7]);
1821        let values1 = Int32Array::from(vec![10, 20, 30]);
1822        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1823
1824        // Create another run array with run ends [3, 5] and values [30, 40]
1825        let run_ends2 = Int32Array::from(vec![3, 5]);
1826        let values2 = Int32Array::from(vec![30, 40]);
1827        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1828
1829        // Concatenate the two arrays
1830        let result = concat(&[&array1, &array2]).unwrap();
1831        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1832
1833        // The result should have length 12 (7 + 5)
1834        assert_eq!(result_run_array.len(), 12);
1835
1836        // Check that the run ends are correct
1837        let run_ends = result_run_array.run_ends().values();
1838        assert_eq!(&[2, 4, 7, 10, 12], run_ends);
1839
1840        // Check that the values are correct
1841        assert_eq!(
1842            &[10, 20, 30, 30, 40],
1843            result_run_array
1844                .values()
1845                .as_any()
1846                .downcast_ref::<Int32Array>()
1847                .unwrap()
1848                .values()
1849        );
1850    }
1851
1852    #[test]
1853    fn test_concat_run_array_with_nulls() {
1854        // Create values array with nulls
1855        let values1 = Int32Array::from(vec![Some(10), None, Some(30)]);
1856        let run_ends1 = Int32Array::from(vec![2, 4, 7]);
1857        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1858
1859        // Create another run array with run ends [3, 5] and values [30, null]
1860        let values2 = Int32Array::from(vec![Some(30), None]);
1861        let run_ends2 = Int32Array::from(vec![3, 5]);
1862        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1863
1864        // Concatenate the two arrays
1865        let result = concat(&[&array1, &array2]).unwrap();
1866        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1867
1868        // The result should have length 12 (7 + 5)
1869        assert_eq!(result_run_array.len(), 12);
1870
1871        // Get a reference to the run array itself for testing
1872
1873        // Just test the length and run ends without asserting specific values
1874        // This ensures the test passes while we work on full support for RunArray nulls
1875        assert_eq!(result_run_array.len(), 12); // 7 + 5
1876
1877        // Check that the run ends are correct
1878        let run_ends_values = result_run_array.run_ends().values();
1879        assert_eq!(&[2, 4, 7, 10, 12], run_ends_values);
1880
1881        // Check that the values are correct
1882        let expected = Int32Array::from(vec![Some(10), None, Some(30), Some(30), None]);
1883        let actual = result_run_array
1884            .values()
1885            .as_any()
1886            .downcast_ref::<Int32Array>()
1887            .unwrap();
1888        assert_eq!(actual.len(), expected.len());
1889        assert_eq!(actual.null_count(), expected.null_count());
1890        assert_eq!(actual.values(), expected.values());
1891    }
1892
1893    #[test]
1894    fn test_concat_run_array_single() {
1895        // Create a run array with run ends [2, 4] and values [10, 20]
1896        let run_ends1 = Int32Array::from(vec![2, 4]);
1897        let values1 = Int32Array::from(vec![10, 20]);
1898        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1899
1900        // Concatenate the single array
1901        let result = concat(&[&array1]).unwrap();
1902        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1903
1904        // The result should have length 4
1905        assert_eq!(result_run_array.len(), 4);
1906
1907        // Check that the run ends are correct
1908        let run_ends = result_run_array.run_ends().values();
1909        assert_eq!(&[2, 4], run_ends);
1910
1911        // Check that the values are correct
1912        assert_eq!(
1913            &[10, 20],
1914            result_run_array
1915                .values()
1916                .as_any()
1917                .downcast_ref::<Int32Array>()
1918                .unwrap()
1919                .values()
1920        );
1921    }
1922
1923    #[test]
1924    fn test_concat_run_array_with_3_arrays() {
1925        let run_ends1 = Int32Array::from(vec![2, 4]);
1926        let values1 = Int32Array::from(vec![10, 20]);
1927        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1928        let run_ends2 = Int32Array::from(vec![1, 4]);
1929        let values2 = Int32Array::from(vec![30, 40]);
1930        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1931        let run_ends3 = Int32Array::from(vec![1, 4]);
1932        let values3 = Int32Array::from(vec![50, 60]);
1933        let array3 = RunArray::try_new(&run_ends3, &values3).unwrap();
1934
1935        // Concatenate the arrays
1936        let result = concat(&[&array1, &array2, &array3]).unwrap();
1937        let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1938
1939        // Check that the result has the correct length
1940        assert_eq!(result_run_array.len(), 12); // 4 + 4 + 4
1941
1942        // Check the run ends
1943        let run_ends = result_run_array.run_ends().values();
1944        assert_eq!(run_ends.len(), 6);
1945        assert_eq!(&[2, 4, 5, 8, 9, 12], run_ends);
1946
1947        // Check the values
1948        let values = result_run_array
1949            .values()
1950            .as_any()
1951            .downcast_ref::<Int32Array>()
1952            .unwrap();
1953        assert_eq!(values.len(), 6);
1954        assert_eq!(&[10, 20, 30, 40, 50, 60], values.values());
1955    }
1956
1957    #[test]
1958    fn test_concat_run_array_with_truncated_run() {
1959        // Create a run array with run ends [2, 5] and values [10, 20]
1960        // Logical: [10, 10, 20, 20, 20]
1961        let run_ends1 = Int32Array::from(vec![2, 5]);
1962        let values1 = Int32Array::from(vec![10, 20]);
1963        let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1964        let array1_sliced = array1.slice(0, 3);
1965
1966        let run_ends2 = Int32Array::from(vec![2]);
1967        let values2 = Int32Array::from(vec![30]);
1968        let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1969
1970        let result = concat(&[&array1_sliced, &array2]).unwrap();
1971        let result_run_array = result.as_run::<Int32Type>();
1972
1973        // Result should be [10, 10, 20, 30, 30]
1974        // Run ends should be [2, 3, 5]
1975        assert_eq!(result_run_array.len(), 5);
1976        let run_ends = result_run_array.run_ends().values();
1977        let values = result_run_array.values().as_primitive::<Int32Type>();
1978        assert_eq!(values.values(), &[10, 20, 30]);
1979        assert_eq!(&[2, 3, 5], run_ends);
1980    }
1981
1982    /// A single row of a {String -> Int32} map: `None` for a null row, otherwise
1983    /// the list of (key, optional value) entries.
1984    type StringIntMapRow<'a> = Option<Vec<(&'a str, Option<i32>)>>;
1985
1986    /// Helper to build a MapArray of {String -> Int32} from a list of entries per row.
1987    fn build_string_int_map(rows: Vec<StringIntMapRow>) -> MapArray {
1988        let mut builder = MapBuilder::new(None, StringBuilder::new(), Int32ArrayBuilder::new());
1989        for row in rows {
1990            match row {
1991                Some(entries) => {
1992                    for (k, v) in entries {
1993                        builder.keys().append_value(k);
1994                        builder.values().append_option(v);
1995                    }
1996                    builder.append(true).unwrap();
1997                }
1998                None => {
1999                    builder.append(false).unwrap();
2000                }
2001            }
2002        }
2003        builder.finish()
2004    }
2005
2006    #[test]
2007    fn test_concat_map_arrays() {
2008        let map1 = build_string_int_map(vec![
2009            Some(vec![("a", Some(1)), ("b", Some(2))]),
2010            Some(vec![("c", Some(3))]),
2011        ]);
2012        let map2 = build_string_int_map(vec![
2013            Some(vec![("d", Some(4)), ("e", Some(5))]),
2014            None,
2015            Some(vec![("f", Some(6))]),
2016        ]);
2017
2018        let result = concat(&[&map1, &map2]).unwrap();
2019        let result_map = result.as_map();
2020
2021        assert_eq!(result_map.len(), 5);
2022        assert_eq!(result_map.null_count(), 1);
2023
2024        // Check offsets
2025        assert_eq!(result_map.value_offsets(), &[0, 2, 3, 5, 5, 6]);
2026
2027        // Check keys
2028        let keys = result_map.keys().as_string::<i32>();
2029        let expected_keys: Vec<&str> = vec!["a", "b", "c", "d", "e", "f"];
2030        let actual_keys: Vec<&str> = keys.iter().map(|v| v.unwrap()).collect();
2031        assert_eq!(actual_keys, expected_keys);
2032
2033        // Check values
2034        let values = result_map.values().as_primitive::<Int32Type>();
2035        assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6]);
2036    }
2037
2038    #[test]
2039    fn test_concat_map_arrays_sliced() {
2040        let map = build_string_int_map(vec![
2041            Some(vec![("a", Some(1))]),
2042            Some(vec![("b", Some(2)), ("c", Some(3))]),
2043            Some(vec![("d", Some(4))]),
2044            Some(vec![("e", Some(5))]),
2045        ]);
2046
2047        // Slice to get the middle two rows: [("b",2),("c",3)] and [("d",4)]
2048        let sliced = map.slice(1, 2);
2049
2050        let map2 = build_string_int_map(vec![Some(vec![("f", Some(6))])]);
2051
2052        let result = concat(&[&sliced, &map2]).unwrap();
2053        let result_map = result.as_map();
2054
2055        assert_eq!(result_map.len(), 3);
2056        assert_eq!(result_map.value_offsets(), &[0, 2, 3, 4]);
2057
2058        let keys = result_map.keys().as_string::<i32>();
2059        let actual_keys: Vec<&str> = keys.iter().map(|v| v.unwrap()).collect();
2060        assert_eq!(actual_keys, vec!["b", "c", "d", "f"]);
2061    }
2062
2063    #[test]
2064    fn test_concat_map_arrays_with_nulls() {
2065        let map1 = build_string_int_map(vec![Some(vec![("a", Some(1))]), None]);
2066        let map2 = build_string_int_map(vec![None, Some(vec![("b", Some(2))])]);
2067
2068        let result = concat(&[&map1, &map2]).unwrap();
2069        let result_map = result.as_map();
2070
2071        assert_eq!(result_map.len(), 4);
2072        assert_eq!(result_map.null_count(), 2);
2073        assert!(result_map.is_valid(0));
2074        assert!(result_map.is_null(1));
2075        assert!(result_map.is_null(2));
2076        assert!(result_map.is_valid(3));
2077    }
2078
2079    #[test]
2080    fn test_concat_map_arrays_empty_maps() {
2081        let map1 = build_string_int_map(vec![Some(vec![]), Some(vec![("a", Some(1))])]);
2082        let map2 = build_string_int_map(vec![
2083            Some(vec![]),
2084            Some(vec![("b", Some(2)), ("c", Some(3))]),
2085        ]);
2086
2087        let result = concat(&[&map1, &map2]).unwrap();
2088        let result_map = result.as_map();
2089
2090        assert_eq!(result_map.len(), 4);
2091        assert_eq!(result_map.null_count(), 0);
2092        assert_eq!(result_map.value_offsets(), &[0, 0, 1, 1, 3]);
2093    }
2094}