1use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
34use arrow_array::builder::{BooleanBuilder, GenericByteBuilder, PrimitiveBuilder};
35use arrow_array::cast::AsArray;
36use arrow_array::types::*;
37use arrow_array::*;
38use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, OffsetBuffer};
39use arrow_data::transform::{Capacities, MutableArrayData};
40use arrow_schema::{ArrowError, DataType, FieldRef, SchemaRef};
41use std::{collections::HashSet, sync::Arc};
42
43fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
44 let mut item_capacity = 0;
45 let mut bytes_capacity = 0;
46 for array in arrays {
47 let a = array.as_bytes::<T>();
48
49 let offsets = a.value_offsets();
51 bytes_capacity += offsets[offsets.len() - 1].as_usize() - offsets[0].as_usize();
52 item_capacity += a.len()
53 }
54
55 Capacities::Binary(item_capacity, Some(bytes_capacity))
56}
57
58fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
59 if let DataType::FixedSizeList(f, _) = data_type {
60 let item_capacity = arrays.iter().map(|a| a.len()).sum();
61 let child_data_type = f.data_type();
62 match child_data_type {
63 DataType::Utf8
66 | DataType::LargeUtf8
67 | DataType::Binary
68 | DataType::LargeBinary
69 | DataType::FixedSizeList(_, _) => {
70 let values: Vec<&dyn arrow_array::Array> = arrays
71 .iter()
72 .map(|a| a.as_fixed_size_list().values().as_ref())
73 .collect();
74 Capacities::List(
75 item_capacity,
76 Some(Box::new(get_capacity(&values, child_data_type))),
77 )
78 }
79 _ => Capacities::Array(item_capacity),
80 }
81 } else {
82 unreachable!("illegal data type for fixed size list")
83 }
84}
85
86fn concat_dictionaries<K: ArrowDictionaryKeyType>(
87 arrays: &[&dyn Array],
88) -> Result<ArrayRef, ArrowError> {
89 let mut output_len = 0;
90 let dictionaries: Vec<_> = arrays
91 .iter()
92 .map(|x| x.as_dictionary::<K>())
93 .inspect(|d| output_len += d.len())
94 .collect();
95
96 if !should_merge_dictionary_values::<K>(&dictionaries, output_len) {
97 return concat_fallback(arrays, Capacities::Array(output_len));
98 }
99
100 let merged = merge_dictionary_values(&dictionaries, None)?;
101
102 let mut key_values = Vec::with_capacity(output_len);
104
105 let mut has_nulls = false;
106 for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) {
107 has_nulls |= d.null_count() != 0;
108 for key in d.keys().values() {
109 key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default())
111 }
112 }
113
114 let nulls = has_nulls.then(|| {
115 let mut nulls = BooleanBufferBuilder::new(output_len);
116 for d in &dictionaries {
117 match d.nulls() {
118 Some(n) => nulls.append_buffer(n.inner()),
119 None => nulls.append_n(d.len(), true),
120 }
121 }
122 NullBuffer::new(nulls.finish())
123 });
124
125 let keys = PrimitiveArray::<K>::new(key_values.into(), nulls);
126 assert_eq!(keys.len(), output_len);
128
129 let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };
130 Ok(Arc::new(array))
131}
132
133fn concat_lists<OffsetSize: OffsetSizeTrait>(
134 arrays: &[&dyn Array],
135 field: &FieldRef,
136) -> Result<ArrayRef, ArrowError> {
137 let mut output_len = 0;
138 let mut list_has_nulls = false;
139 let mut list_has_slices = false;
140
141 let lists = arrays
142 .iter()
143 .map(|x| x.as_list::<OffsetSize>())
144 .inspect(|l| {
145 output_len += l.len();
146 list_has_nulls |= l.null_count() != 0;
147 list_has_slices |= l.offsets()[0] > OffsetSize::zero()
148 || l.offsets().last().unwrap().as_usize() < l.values().len();
149 })
150 .collect::<Vec<_>>();
151
152 let lists_nulls = list_has_nulls.then(|| {
153 let mut nulls = BooleanBufferBuilder::new(output_len);
154 for l in &lists {
155 match l.nulls() {
156 Some(n) => nulls.append_buffer(n.inner()),
157 None => nulls.append_n(l.len(), true),
158 }
159 }
160 NullBuffer::new(nulls.finish())
161 });
162
163 let mut sliced_values;
166 let values: Vec<&dyn Array> = if list_has_slices {
167 sliced_values = Vec::with_capacity(lists.len());
168 for l in &lists {
169 let offsets = l.offsets();
172 let start_offset = offsets[0].as_usize();
173 let end_offset = offsets.last().unwrap().as_usize();
174 sliced_values.push(l.values().slice(start_offset, end_offset - start_offset));
175 }
176 sliced_values.iter().map(|a| a.as_ref()).collect()
177 } else {
178 lists.iter().map(|x| x.values().as_ref()).collect()
179 };
180
181 let concatenated_values = concat(values.as_slice())?;
182
183 let value_offset_buffer =
185 OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths()));
186
187 let array = GenericListArray::<OffsetSize>::try_new(
188 Arc::clone(field),
189 value_offset_buffer,
190 concatenated_values,
191 lists_nulls,
192 )?;
193
194 Ok(Arc::new(array))
195}
196
197fn concat_primitives<T: ArrowPrimitiveType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
198 let mut builder = PrimitiveBuilder::<T>::with_capacity(arrays.iter().map(|a| a.len()).sum())
199 .with_data_type(arrays[0].data_type().clone());
200
201 for array in arrays {
202 builder.append_array(array.as_primitive());
203 }
204
205 Ok(Arc::new(builder.finish()))
206}
207
208fn concat_boolean(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
209 let mut builder = BooleanBuilder::with_capacity(arrays.iter().map(|a| a.len()).sum());
210
211 for array in arrays {
212 builder.append_array(array.as_boolean());
213 }
214
215 Ok(Arc::new(builder.finish()))
216}
217
218fn concat_bytes<T: ByteArrayType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
219 let (item_capacity, bytes_capacity) = match binary_capacity::<T>(arrays) {
220 Capacities::Binary(item_capacity, Some(bytes_capacity)) => (item_capacity, bytes_capacity),
221 _ => unreachable!(),
222 };
223
224 let mut builder = GenericByteBuilder::<T>::with_capacity(item_capacity, bytes_capacity);
225
226 for array in arrays {
227 builder.append_array(array.as_bytes::<T>());
228 }
229
230 Ok(Arc::new(builder.finish()))
231}
232
233macro_rules! dict_helper {
234 ($t:ty, $arrays:expr) => {
235 return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
236 };
237}
238
239macro_rules! primitive_concat {
240 ($t:ty, $arrays:expr) => {
241 return Ok(Arc::new(concat_primitives::<$t>($arrays)?) as _)
242 };
243}
244
245fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
246 match data_type {
247 DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
248 DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
249 DataType::Binary => binary_capacity::<BinaryType>(arrays),
250 DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
251 DataType::FixedSizeList(_, _) => fixed_size_list_capacity(arrays, data_type),
252 _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
253 }
254}
255
256pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
258 if arrays.is_empty() {
259 return Err(ArrowError::ComputeError(
260 "concat requires input of at least one array".to_string(),
261 ));
262 } else if arrays.len() == 1 {
263 let array = arrays[0];
264 return Ok(array.slice(0, array.len()));
265 }
266
267 let d = arrays[0].data_type();
268 if arrays.iter().skip(1).any(|array| array.data_type() != d) {
269 let error_message = {
271 let mut unique_data_types = HashSet::with_capacity(11);
273
274 let mut error_message =
275 format!("It is not possible to concatenate arrays of different data types ({d}");
276 unique_data_types.insert(d);
277
278 for array in arrays {
279 let is_unique = unique_data_types.insert(array.data_type());
280
281 if unique_data_types.len() == 11 {
282 error_message.push_str(", ...");
283 break;
284 }
285
286 if is_unique {
287 error_message.push_str(", ");
288 error_message.push_str(&array.data_type().to_string());
289 }
290 }
291
292 error_message.push_str(").");
293
294 error_message
295 };
296
297 return Err(ArrowError::InvalidArgumentError(error_message));
298 }
299
300 downcast_primitive! {
301 d => (primitive_concat, arrays),
302 DataType::Boolean => concat_boolean(arrays),
303 DataType::Dictionary(k, _) => {
304 downcast_integer! {
305 k.as_ref() => (dict_helper, arrays),
306 _ => unreachable!("illegal dictionary key type {k}")
307 }
308 }
309 DataType::List(field) => concat_lists::<i32>(arrays, field),
310 DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
311 DataType::Utf8 => concat_bytes::<Utf8Type>(arrays),
312 DataType::LargeUtf8 => concat_bytes::<LargeUtf8Type>(arrays),
313 DataType::Binary => concat_bytes::<BinaryType>(arrays),
314 DataType::LargeBinary => concat_bytes::<LargeBinaryType>(arrays),
315 _ => {
316 let capacity = get_capacity(arrays, d);
317 concat_fallback(arrays, capacity)
318 }
319 }
320}
321
322fn concat_fallback(arrays: &[&dyn Array], capacity: Capacities) -> Result<ArrayRef, ArrowError> {
326 let array_data: Vec<_> = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
327 let array_data = array_data.iter().collect();
328 let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity);
329
330 for (i, a) in arrays.iter().enumerate() {
331 mutable.extend(i, 0, a.len())
332 }
333
334 Ok(make_array(mutable.freeze()))
335}
336
337pub fn concat_batches<'a>(
344 schema: &SchemaRef,
345 input_batches: impl IntoIterator<Item = &'a RecordBatch>,
346) -> Result<RecordBatch, ArrowError> {
347 if schema.fields().is_empty() {
349 let num_rows: usize = input_batches.into_iter().map(RecordBatch::num_rows).sum();
350 let mut options = RecordBatchOptions::default();
351 options.row_count = Some(num_rows);
352 return RecordBatch::try_new_with_options(schema.clone(), vec![], &options);
353 }
354
355 let batches: Vec<&RecordBatch> = input_batches.into_iter().collect();
356 if batches.is_empty() {
357 return Ok(RecordBatch::new_empty(schema.clone()));
358 }
359 let field_num = schema.fields().len();
360 let mut arrays = Vec::with_capacity(field_num);
361 for i in 0..field_num {
362 let array = concat(
363 &batches
364 .iter()
365 .map(|batch| batch.column(i).as_ref())
366 .collect::<Vec<_>>(),
367 )?;
368 arrays.push(array);
369 }
370 RecordBatch::try_new(schema.clone(), arrays)
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use arrow_array::builder::{GenericListBuilder, StringDictionaryBuilder};
377 use arrow_schema::{Field, Schema};
378 use std::fmt::Debug;
379
380 #[test]
381 fn test_concat_empty_vec() {
382 let re = concat(&[]);
383 assert!(re.is_err());
384 }
385
386 #[test]
387 fn test_concat_batches_no_columns() {
388 let schema = Arc::new(Schema::empty());
390
391 let mut options = RecordBatchOptions::default();
392 options.row_count = Some(100);
393 let batch = RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
394 let re = concat_batches(&schema, &[batch.clone(), batch]).unwrap();
396
397 assert_eq!(re.num_rows(), 200);
398 }
399
400 #[test]
401 fn test_concat_one_element_vec() {
402 let arr = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
403 Some(-1),
404 Some(2),
405 None,
406 ])) as ArrayRef;
407 let result = concat(&[arr.as_ref()]).unwrap();
408 assert_eq!(
409 &arr, &result,
410 "concatenating single element array gives back the same result"
411 );
412 }
413
414 #[test]
415 fn test_concat_incompatible_datatypes() {
416 let re = concat(&[
417 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
418 &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
420 &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
421 &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
423 ]);
424
425 assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32).");
426 }
427
428 #[test]
429 fn test_concat_10_incompatible_datatypes_should_include_all_of_them() {
430 let re = concat(&[
431 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
432 &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
434 &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
435 &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
437 &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
438 &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
439 &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
440 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
441 &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
442 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
444 &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
445 &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
446 ]);
447
448 assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32).");
449 }
450
451 #[test]
452 fn test_concat_11_incompatible_datatypes_should_only_include_10() {
453 let re = concat(&[
454 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
455 &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
457 &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
458 &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
460 &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
461 &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
462 &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
463 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
464 &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
465 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
467 &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
468 &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
469 &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
470 ]);
471
472 assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...).");
473 }
474
475 #[test]
476 fn test_concat_13_incompatible_datatypes_should_not_include_all_of_them() {
477 let re = concat(&[
478 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
479 &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
481 &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
482 &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
484 &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
485 &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
486 &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
487 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
488 &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
489 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
491 &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
492 &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
493 &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
494 &PrimitiveArray::<Float16Type>::new_null(3),
495 &BooleanArray::from(vec![Some(true), Some(false), None]),
496 ]);
497
498 assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...).");
499 }
500
501 #[test]
502 fn test_concat_string_arrays() {
503 let arr = concat(&[
504 &StringArray::from(vec!["hello", "world"]),
505 &StringArray::from(vec!["2", "3", "4"]),
506 &StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]),
507 ])
508 .unwrap();
509
510 let expected_output = Arc::new(StringArray::from(vec![
511 Some("hello"),
512 Some("world"),
513 Some("2"),
514 Some("3"),
515 Some("4"),
516 Some("foo"),
517 Some("bar"),
518 None,
519 Some("baz"),
520 ])) as ArrayRef;
521
522 assert_eq!(&arr, &expected_output);
523 }
524
525 #[test]
526 fn test_concat_primitive_arrays() {
527 let arr = concat(&[
528 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None]),
529 &PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None]),
530 &PrimitiveArray::<Int64Type>::from(vec![Some(256), Some(512), Some(1024)]),
531 ])
532 .unwrap();
533
534 let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
535 Some(-1),
536 Some(-1),
537 Some(2),
538 None,
539 None,
540 Some(101),
541 Some(102),
542 Some(103),
543 None,
544 Some(256),
545 Some(512),
546 Some(1024),
547 ])) as ArrayRef;
548
549 assert_eq!(&arr, &expected_output);
550 }
551
552 #[test]
553 fn test_concat_primitive_array_slices() {
554 let input_1 =
555 PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None])
556 .slice(1, 3);
557
558 let input_2 =
559 PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None])
560 .slice(1, 3);
561 let arr = concat(&[&input_1, &input_2]).unwrap();
562
563 let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
564 Some(-1),
565 Some(2),
566 None,
567 Some(102),
568 Some(103),
569 None,
570 ])) as ArrayRef;
571
572 assert_eq!(&arr, &expected_output);
573 }
574
575 #[test]
576 fn test_concat_boolean_primitive_arrays() {
577 let arr = concat(&[
578 &BooleanArray::from(vec![
579 Some(true),
580 Some(true),
581 Some(false),
582 None,
583 None,
584 Some(false),
585 ]),
586 &BooleanArray::from(vec![None, Some(false), Some(true), Some(false)]),
587 ])
588 .unwrap();
589
590 let expected_output = Arc::new(BooleanArray::from(vec![
591 Some(true),
592 Some(true),
593 Some(false),
594 None,
595 None,
596 Some(false),
597 None,
598 Some(false),
599 Some(true),
600 Some(false),
601 ])) as ArrayRef;
602
603 assert_eq!(&arr, &expected_output);
604 }
605
606 #[test]
607 fn test_concat_primitive_list_arrays() {
608 let list1 = vec![
609 Some(vec![Some(-1), Some(-1), Some(2), None, None]),
610 Some(vec![]),
611 None,
612 Some(vec![Some(10)]),
613 ];
614 let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
615
616 let list2 = vec![
617 None,
618 Some(vec![Some(100), None, Some(101)]),
619 Some(vec![Some(102)]),
620 ];
621 let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
622
623 let list3 = vec![Some(vec![Some(1000), Some(1001)])];
624 let list3_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone());
625
626 let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
627
628 let expected = list1.into_iter().chain(list2).chain(list3);
629 let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
630
631 assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
632 }
633
634 #[test]
635 fn test_concat_primitive_list_arrays_slices() {
636 let list1 = vec![
637 Some(vec![Some(-1), Some(-1), Some(2), None, None]),
638 Some(vec![]), None, Some(vec![Some(10)]),
641 ];
642 let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
643 let list1_array = list1_array.slice(1, 2);
644 let list1_values = list1.into_iter().skip(1).take(2);
645
646 let list2 = vec![
647 None,
648 Some(vec![Some(100), None, Some(101)]),
649 Some(vec![Some(102)]),
650 ];
651 let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
652
653 assert!(list1_array.offsets()[0].as_usize() > 0);
655 let array_result = concat(&[&list1_array, &list2_array]).unwrap();
656
657 let expected = list1_values.chain(list2);
658 let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
659
660 assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
661 }
662
663 #[test]
664 fn test_concat_primitive_list_arrays_sliced_lengths() {
665 let list1 = vec![
666 Some(vec![Some(-1), Some(-1), Some(2), None, None]), Some(vec![]), None, Some(vec![Some(10)]),
670 ];
671 let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
672 let list1_array = list1_array.slice(0, 3); let list1_values = list1.into_iter().take(3);
674
675 let list2 = vec![
676 None,
677 Some(vec![Some(100), None, Some(101)]),
678 Some(vec![Some(102)]),
679 ];
680 let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
681
682 assert_eq!(list1_array.offsets()[0].as_usize(), 0);
685 assert!(list1_array.offsets().last().unwrap().as_usize() < list1_array.values().len());
686 let array_result = concat(&[&list1_array, &list2_array]).unwrap();
687
688 let expected = list1_values.chain(list2);
689 let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
690
691 assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
692 }
693
694 #[test]
695 fn test_concat_primitive_fixed_size_list_arrays() {
696 let list1 = vec![
697 Some(vec![Some(-1), None]),
698 None,
699 Some(vec![Some(10), Some(20)]),
700 ];
701 let list1_array =
702 FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone(), 2);
703
704 let list2 = vec![
705 None,
706 Some(vec![Some(100), None]),
707 Some(vec![Some(102), Some(103)]),
708 ];
709 let list2_array =
710 FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone(), 2);
711
712 let list3 = vec![Some(vec![Some(1000), Some(1001)])];
713 let list3_array =
714 FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone(), 2);
715
716 let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
717
718 let expected = list1.into_iter().chain(list2).chain(list3);
719 let array_expected =
720 FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(expected, 2);
721
722 assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
723 }
724
725 #[test]
726 fn test_concat_struct_arrays() {
727 let field = Arc::new(Field::new("field", DataType::Int64, true));
728 let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
729 Some(-1),
730 Some(-1),
731 Some(2),
732 None,
733 None,
734 ]));
735 let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
736
737 let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
738 Some(101),
739 Some(102),
740 Some(103),
741 None,
742 ]));
743 let input_struct_2 = StructArray::from(vec![(field.clone(), input_primitive_2)]);
744
745 let input_primitive_3: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
746 Some(256),
747 Some(512),
748 Some(1024),
749 ]));
750 let input_struct_3 = StructArray::from(vec![(field, input_primitive_3)]);
751
752 let arr = concat(&[&input_struct_1, &input_struct_2, &input_struct_3]).unwrap();
753
754 let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
755 Some(-1),
756 Some(-1),
757 Some(2),
758 None,
759 None,
760 Some(101),
761 Some(102),
762 Some(103),
763 None,
764 Some(256),
765 Some(512),
766 Some(1024),
767 ])) as ArrayRef;
768
769 let actual_primitive = arr
770 .as_any()
771 .downcast_ref::<StructArray>()
772 .unwrap()
773 .column(0);
774 assert_eq!(actual_primitive, &expected_primitive_output);
775 }
776
777 #[test]
778 fn test_concat_struct_array_slices() {
779 let field = Arc::new(Field::new("field", DataType::Int64, true));
780 let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
781 Some(-1),
782 Some(-1),
783 Some(2),
784 None,
785 None,
786 ]));
787 let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
788
789 let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
790 Some(101),
791 Some(102),
792 Some(103),
793 None,
794 ]));
795 let input_struct_2 = StructArray::from(vec![(field, input_primitive_2)]);
796
797 let arr = concat(&[&input_struct_1.slice(1, 3), &input_struct_2.slice(1, 2)]).unwrap();
798
799 let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
800 Some(-1),
801 Some(2),
802 None,
803 Some(102),
804 Some(103),
805 ])) as ArrayRef;
806
807 let actual_primitive = arr
808 .as_any()
809 .downcast_ref::<StructArray>()
810 .unwrap()
811 .column(0);
812 assert_eq!(actual_primitive, &expected_primitive_output);
813 }
814
815 #[test]
816 fn test_string_array_slices() {
817 let input_1 = StringArray::from(vec!["hello", "A", "B", "C"]);
818 let input_2 = StringArray::from(vec!["world", "D", "E", "Z"]);
819
820 let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
821
822 let expected_output = StringArray::from(vec!["A", "B", "C", "D", "E"]);
823
824 let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
825 assert_eq!(actual_output, &expected_output);
826 }
827
828 #[test]
829 fn test_string_array_with_null_slices() {
830 let input_1 = StringArray::from(vec![Some("hello"), None, Some("A"), Some("C")]);
831 let input_2 = StringArray::from(vec![None, Some("world"), Some("D"), None]);
832
833 let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
834
835 let expected_output =
836 StringArray::from(vec![None, Some("A"), Some("C"), Some("world"), Some("D")]);
837
838 let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
839 assert_eq!(actual_output, &expected_output);
840 }
841
842 fn collect_string_dictionary(array: &DictionaryArray<Int32Type>) -> Vec<Option<&str>> {
843 let concrete = array.downcast_dict::<StringArray>().unwrap();
844 concrete.into_iter().collect()
845 }
846
847 #[test]
848 fn test_string_dictionary_array() {
849 let input_1: DictionaryArray<Int32Type> = vec!["hello", "A", "B", "hello", "hello", "C"]
850 .into_iter()
851 .collect();
852 let input_2: DictionaryArray<Int32Type> = vec!["hello", "E", "E", "hello", "F", "E"]
853 .into_iter()
854 .collect();
855
856 let expected: Vec<_> = vec![
857 "hello", "A", "B", "hello", "hello", "C", "hello", "E", "E", "hello", "F", "E",
858 ]
859 .into_iter()
860 .map(Some)
861 .collect();
862
863 let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
864 let dictionary = concat.as_dictionary::<Int32Type>();
865 let actual = collect_string_dictionary(dictionary);
866 assert_eq!(actual, expected);
867
868 assert_eq!(
870 dictionary.values().len(),
871 input_1.values().len() + input_2.values().len(),
872 )
873 }
874
875 #[test]
876 fn test_string_dictionary_array_nulls() {
877 let input_1: DictionaryArray<Int32Type> = vec![Some("foo"), Some("bar"), None, Some("fiz")]
878 .into_iter()
879 .collect();
880 let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
881 let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
882
883 let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
884 let dictionary = concat.as_dictionary::<Int32Type>();
885 let actual = collect_string_dictionary(dictionary);
886 assert_eq!(actual, expected);
887
888 assert_eq!(
890 dictionary.values().len(),
891 input_1.values().len() + input_2.values().len(),
892 )
893 }
894
895 #[test]
896 fn test_string_dictionary_array_nulls_in_values() {
897 let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
898 let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
899 let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
900
901 let input_2_keys = Int32Array::from_iter_values([0]);
902 let input_2_values = StringArray::from(vec![None, Some("hello")]);
903 let input_2 = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
904
905 let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
906
907 let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
908 let dictionary = concat.as_dictionary::<Int32Type>();
909 let actual = collect_string_dictionary(dictionary);
910 assert_eq!(actual, expected);
911 }
912
913 #[test]
914 fn test_string_dictionary_merge() {
915 let mut builder = StringDictionaryBuilder::<Int32Type>::new();
916 for i in 0..20 {
917 builder.append(i.to_string()).unwrap();
918 }
919 let input_1 = builder.finish();
920
921 let mut builder = StringDictionaryBuilder::<Int32Type>::new();
922 for i in 0..30 {
923 builder.append(i.to_string()).unwrap();
924 }
925 let input_2 = builder.finish();
926
927 let expected: Vec<_> = (0..20).chain(0..30).map(|x| x.to_string()).collect();
928 let expected: Vec<_> = expected.iter().map(|x| Some(x.as_str())).collect();
929
930 let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
931 let dictionary = concat.as_dictionary::<Int32Type>();
932 let actual = collect_string_dictionary(dictionary);
933 assert_eq!(actual, expected);
934
935 let values_len = dictionary.values().len();
938 assert!((30..40).contains(&values_len), "{values_len}")
939 }
940
941 #[test]
942 fn test_concat_string_sizes() {
943 let a: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
944 let b: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
945 let c = LargeStringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
946 let arr = concat(&[&a, &b, &c]).unwrap();
954 assert_eq!(arr.to_data().buffers()[1].capacity(), 960);
956 }
957
958 #[test]
959 fn test_dictionary_concat_reuse() {
960 let array: DictionaryArray<Int8Type> = vec!["a", "a", "b", "c"].into_iter().collect();
961 let copy: DictionaryArray<Int8Type> = array.clone();
962
963 assert_eq!(
965 array.values(),
966 &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef)
967 );
968 assert_eq!(array.keys(), &Int8Array::from(vec![0, 0, 1, 2]));
969
970 let combined = concat(&[© as _, &array as _]).unwrap();
972 let combined = combined.as_dictionary::<Int8Type>();
973
974 assert_eq!(
975 combined.values(),
976 &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef),
977 "Actual: {combined:#?}"
978 );
979
980 assert_eq!(
981 combined.keys(),
982 &Int8Array::from(vec![0, 0, 1, 2, 0, 0, 1, 2])
983 );
984
985 assert!(array
987 .values()
988 .to_data()
989 .ptr_eq(&combined.values().to_data()));
990 assert!(copy.values().to_data().ptr_eq(&combined.values().to_data()));
991
992 let new: DictionaryArray<Int8Type> = vec!["d"].into_iter().collect();
993 let combined = concat(&[© as _, &array as _, &new as _]).unwrap();
994 let com = combined.as_dictionary::<Int8Type>();
995
996 assert!(!array.values().to_data().ptr_eq(&com.values().to_data()));
998 assert!(!copy.values().to_data().ptr_eq(&com.values().to_data()));
999 assert!(!new.values().to_data().ptr_eq(&com.values().to_data()));
1000 }
1001
1002 #[test]
1003 fn concat_record_batches() {
1004 let schema = Arc::new(Schema::new(vec![
1005 Field::new("a", DataType::Int32, false),
1006 Field::new("b", DataType::Utf8, false),
1007 ]));
1008 let batch1 = RecordBatch::try_new(
1009 schema.clone(),
1010 vec![
1011 Arc::new(Int32Array::from(vec![1, 2])),
1012 Arc::new(StringArray::from(vec!["a", "b"])),
1013 ],
1014 )
1015 .unwrap();
1016 let batch2 = RecordBatch::try_new(
1017 schema.clone(),
1018 vec![
1019 Arc::new(Int32Array::from(vec![3, 4])),
1020 Arc::new(StringArray::from(vec!["c", "d"])),
1021 ],
1022 )
1023 .unwrap();
1024 let new_batch = concat_batches(&schema, [&batch1, &batch2]).unwrap();
1025 assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
1026 assert_eq!(2, new_batch.num_columns());
1027 assert_eq!(4, new_batch.num_rows());
1028 let new_batch_owned = concat_batches(&schema, &[batch1, batch2]).unwrap();
1029 assert_eq!(new_batch_owned.schema().as_ref(), schema.as_ref());
1030 assert_eq!(2, new_batch_owned.num_columns());
1031 assert_eq!(4, new_batch_owned.num_rows());
1032 }
1033
1034 #[test]
1035 fn concat_empty_record_batch() {
1036 let schema = Arc::new(Schema::new(vec![
1037 Field::new("a", DataType::Int32, false),
1038 Field::new("b", DataType::Utf8, false),
1039 ]));
1040 let batch = concat_batches(&schema, []).unwrap();
1041 assert_eq!(batch.schema().as_ref(), schema.as_ref());
1042 assert_eq!(0, batch.num_rows());
1043 }
1044
1045 #[test]
1046 fn concat_record_batches_of_different_schemas_but_compatible_data() {
1047 let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1048 let schema2 = Arc::new(Schema::new(vec![Field::new("c", DataType::Int32, false)]));
1050 let batch1 = RecordBatch::try_new(
1051 schema1.clone(),
1052 vec![Arc::new(Int32Array::from(vec![1, 2]))],
1053 )
1054 .unwrap();
1055 let batch2 =
1056 RecordBatch::try_new(schema2, vec![Arc::new(Int32Array::from(vec![3, 4]))]).unwrap();
1057 let batch = concat_batches(&schema1, [&batch1, &batch2]).unwrap();
1059 assert_eq!(batch.schema().as_ref(), schema1.as_ref());
1060 assert_eq!(4, batch.num_rows());
1061 }
1062
1063 #[test]
1064 fn concat_record_batches_of_different_schemas_incompatible_data() {
1065 let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1066 let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
1068 let batch1 = RecordBatch::try_new(
1069 schema1.clone(),
1070 vec![Arc::new(Int32Array::from(vec![1, 2]))],
1071 )
1072 .unwrap();
1073 let batch2 = RecordBatch::try_new(
1074 schema2,
1075 vec![Arc::new(StringArray::from(vec!["foo", "bar"]))],
1076 )
1077 .unwrap();
1078
1079 let error = concat_batches(&schema1, [&batch1, &batch2]).unwrap_err();
1080 assert_eq!(error.to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int32, Utf8).");
1081 }
1082
1083 #[test]
1084 fn concat_capacity() {
1085 let a = Int32Array::from_iter_values(0..100);
1086 let b = Int32Array::from_iter_values(10..20);
1087 let a = concat(&[&a, &b]).unwrap();
1088 let data = a.to_data();
1089 assert_eq!(data.buffers()[0].len(), 440);
1090 assert_eq!(data.buffers()[0].capacity(), 448); let a = concat(&[&a.slice(10, 20), &b]).unwrap();
1093 let data = a.to_data();
1094 assert_eq!(data.buffers()[0].len(), 120);
1095 assert_eq!(data.buffers()[0].capacity(), 128); let a = StringArray::from_iter_values(std::iter::repeat("foo").take(100));
1098 let b = StringArray::from(vec!["bingo", "bongo", "lorem", ""]);
1099
1100 let a = concat(&[&a, &b]).unwrap();
1101 let data = a.to_data();
1102 assert_eq!(data.buffers()[0].len(), 420);
1104 assert_eq!(data.buffers()[0].capacity(), 448); assert_eq!(data.buffers()[1].len(), 315);
1108 assert_eq!(data.buffers()[1].capacity(), 320); let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1111 let data = a.to_data();
1112 assert_eq!(data.buffers()[0].len(), 180);
1114 assert_eq!(data.buffers()[0].capacity(), 192); assert_eq!(data.buffers()[1].len(), 135);
1118 assert_eq!(data.buffers()[1].capacity(), 192); let a = LargeBinaryArray::from_iter_values(std::iter::repeat(b"foo").take(100));
1121 let b = LargeBinaryArray::from_iter_values(std::iter::repeat(b"cupcakes").take(10));
1122
1123 let a = concat(&[&a, &b]).unwrap();
1124 let data = a.to_data();
1125 assert_eq!(data.buffers()[0].len(), 888);
1127 assert_eq!(data.buffers()[0].capacity(), 896); assert_eq!(data.buffers()[1].len(), 380);
1131 assert_eq!(data.buffers()[1].capacity(), 384); let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1134 let data = a.to_data();
1135 assert_eq!(data.buffers()[0].len(), 408);
1137 assert_eq!(data.buffers()[0].capacity(), 448); assert_eq!(data.buffers()[1].len(), 200);
1141 assert_eq!(data.buffers()[1].capacity(), 256); }
1143
1144 #[test]
1145 fn concat_sparse_nulls() {
1146 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
1147 let keys = Int32Array::from(vec![1; 10]);
1148 let dict_a = DictionaryArray::new(keys, Arc::new(values));
1149 let values = StringArray::new_null(0);
1150 let keys = Int32Array::new_null(10);
1151 let dict_b = DictionaryArray::new(keys, Arc::new(values));
1152 let array = concat(&[&dict_a, &dict_b]).unwrap();
1153 assert_eq!(array.null_count(), 10);
1154 assert_eq!(array.logical_null_count(), 10);
1155 }
1156
1157 #[test]
1158 fn concat_dictionary_list_array_simple() {
1159 let scalars = vec![
1160 create_single_row_list_of_dict(vec![Some("a")]),
1161 create_single_row_list_of_dict(vec![Some("a")]),
1162 create_single_row_list_of_dict(vec![Some("b")]),
1163 ];
1164
1165 let arrays = scalars
1166 .iter()
1167 .map(|a| a as &(dyn Array))
1168 .collect::<Vec<_>>();
1169 let concat_res = concat(arrays.as_slice()).unwrap();
1170
1171 let expected_list = create_list_of_dict(vec![
1172 Some(vec![Some("a")]),
1174 Some(vec![Some("a")]),
1175 Some(vec![Some("b")]),
1176 ]);
1177
1178 let list = concat_res.as_list::<i32>();
1179
1180 list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1182 assert_eq!(a, b);
1183 });
1184
1185 assert_dictionary_has_unique_values::<_, StringArray>(
1186 list.values().as_dictionary::<Int32Type>(),
1187 );
1188 }
1189
1190 #[test]
1191 fn concat_many_dictionary_list_arrays() {
1192 let number_of_unique_values = 8;
1193 let scalars = (0..80000)
1194 .map(|i| {
1195 create_single_row_list_of_dict(vec![Some(
1196 (i % number_of_unique_values).to_string(),
1197 )])
1198 })
1199 .collect::<Vec<_>>();
1200
1201 let arrays = scalars
1202 .iter()
1203 .map(|a| a as &(dyn Array))
1204 .collect::<Vec<_>>();
1205 let concat_res = concat(arrays.as_slice()).unwrap();
1206
1207 let expected_list = create_list_of_dict(
1208 (0..80000)
1209 .map(|i| Some(vec![Some((i % number_of_unique_values).to_string())]))
1210 .collect::<Vec<_>>(),
1211 );
1212
1213 let list = concat_res.as_list::<i32>();
1214
1215 list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1217 assert_eq!(a, b);
1218 });
1219
1220 assert_dictionary_has_unique_values::<_, StringArray>(
1221 list.values().as_dictionary::<Int32Type>(),
1222 );
1223 }
1224
1225 fn create_single_row_list_of_dict(
1226 list_items: Vec<Option<impl AsRef<str>>>,
1227 ) -> GenericListArray<i32> {
1228 let rows = list_items.into_iter().map(Some).collect();
1229
1230 create_list_of_dict(vec![rows])
1231 }
1232
1233 fn create_list_of_dict(
1234 rows: Vec<Option<Vec<Option<impl AsRef<str>>>>>,
1235 ) -> GenericListArray<i32> {
1236 let mut builder =
1237 GenericListBuilder::<i32, _>::new(StringDictionaryBuilder::<Int32Type>::new());
1238
1239 for row in rows {
1240 builder.append_option(row);
1241 }
1242
1243 builder.finish()
1244 }
1245
1246 fn assert_dictionary_has_unique_values<'a, K, V>(array: &'a DictionaryArray<K>)
1247 where
1248 K: ArrowDictionaryKeyType,
1249 V: Sync + Send + 'static,
1250 &'a V: ArrayAccessor + IntoIterator,
1251
1252 <&'a V as ArrayAccessor>::Item: Default + Clone + PartialEq + Debug + Ord,
1253 <&'a V as IntoIterator>::Item: Clone + PartialEq + Debug + Ord,
1254 {
1255 let dict = array.downcast_dict::<V>().unwrap();
1256 let mut values = dict.values().into_iter().collect::<Vec<_>>();
1257
1258 values.sort();
1260
1261 let mut unique_values = values.clone();
1262
1263 unique_values.dedup();
1264
1265 assert_eq!(
1266 values, unique_values,
1267 "There are duplicates in the value list (the value list here is sorted which is only for the assertion)"
1268 );
1269 }
1270}