1use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
34use arrow_array::cast::AsArray;
35use arrow_array::types::*;
36use arrow_array::*;
37use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, OffsetBuffer};
38use arrow_data::transform::{Capacities, MutableArrayData};
39use arrow_schema::{ArrowError, DataType, FieldRef, SchemaRef};
40use std::{collections::HashSet, sync::Arc};
41
42fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
43 let mut item_capacity = 0;
44 let mut bytes_capacity = 0;
45 for array in arrays {
46 let a = array.as_bytes::<T>();
47
48 let offsets = a.value_offsets();
50 bytes_capacity += offsets[offsets.len() - 1].as_usize() - offsets[0].as_usize();
51 item_capacity += a.len()
52 }
53
54 Capacities::Binary(item_capacity, Some(bytes_capacity))
55}
56
57fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
58 if let DataType::FixedSizeList(f, _) = data_type {
59 let item_capacity = arrays.iter().map(|a| a.len()).sum();
60 let child_data_type = f.data_type();
61 match child_data_type {
62 DataType::Utf8
65 | DataType::LargeUtf8
66 | DataType::Binary
67 | DataType::LargeBinary
68 | DataType::FixedSizeList(_, _) => {
69 let values: Vec<&dyn arrow_array::Array> = arrays
70 .iter()
71 .map(|a| a.as_fixed_size_list().values().as_ref())
72 .collect();
73 Capacities::List(
74 item_capacity,
75 Some(Box::new(get_capacity(&values, child_data_type))),
76 )
77 }
78 _ => Capacities::Array(item_capacity),
79 }
80 } else {
81 unreachable!("illegal data type for fixed size list")
82 }
83}
84
85fn concat_dictionaries<K: ArrowDictionaryKeyType>(
86 arrays: &[&dyn Array],
87) -> Result<ArrayRef, ArrowError> {
88 let mut output_len = 0;
89 let dictionaries: Vec<_> = arrays
90 .iter()
91 .map(|x| x.as_dictionary::<K>())
92 .inspect(|d| output_len += d.len())
93 .collect();
94
95 if !should_merge_dictionary_values::<K>(&dictionaries, output_len) {
96 return concat_fallback(arrays, Capacities::Array(output_len));
97 }
98
99 let merged = merge_dictionary_values(&dictionaries, None)?;
100
101 let mut key_values = Vec::with_capacity(output_len);
103
104 let mut has_nulls = false;
105 for (d, mapping) in dictionaries.iter().zip(merged.key_mappings) {
106 has_nulls |= d.null_count() != 0;
107 for key in d.keys().values() {
108 key_values.push(mapping.get(key.as_usize()).copied().unwrap_or_default())
110 }
111 }
112
113 let nulls = has_nulls.then(|| {
114 let mut nulls = BooleanBufferBuilder::new(output_len);
115 for d in &dictionaries {
116 match d.nulls() {
117 Some(n) => nulls.append_buffer(n.inner()),
118 None => nulls.append_n(d.len(), true),
119 }
120 }
121 NullBuffer::new(nulls.finish())
122 });
123
124 let keys = PrimitiveArray::<K>::new(key_values.into(), nulls);
125 assert_eq!(keys.len(), output_len);
127
128 let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };
129 Ok(Arc::new(array))
130}
131
132fn concat_lists<OffsetSize: OffsetSizeTrait>(
133 arrays: &[&dyn Array],
134 field: &FieldRef,
135) -> Result<ArrayRef, ArrowError> {
136 let mut output_len = 0;
137 let mut list_has_nulls = false;
138 let mut list_has_slices = false;
139
140 let lists = arrays
141 .iter()
142 .map(|x| x.as_list::<OffsetSize>())
143 .inspect(|l| {
144 output_len += l.len();
145 list_has_nulls |= l.null_count() != 0;
146 list_has_slices |= l.offsets()[0] > OffsetSize::zero()
147 || l.offsets().last().unwrap().as_usize() < l.values().len();
148 })
149 .collect::<Vec<_>>();
150
151 let lists_nulls = list_has_nulls.then(|| {
152 let mut nulls = BooleanBufferBuilder::new(output_len);
153 for l in &lists {
154 match l.nulls() {
155 Some(n) => nulls.append_buffer(n.inner()),
156 None => nulls.append_n(l.len(), true),
157 }
158 }
159 NullBuffer::new(nulls.finish())
160 });
161
162 let mut sliced_values;
165 let values: Vec<&dyn Array> = if list_has_slices {
166 sliced_values = Vec::with_capacity(lists.len());
167 for l in &lists {
168 let offsets = l.offsets();
171 let start_offset = offsets[0].as_usize();
172 let end_offset = offsets.last().unwrap().as_usize();
173 sliced_values.push(l.values().slice(start_offset, end_offset - start_offset));
174 }
175 sliced_values.iter().map(|a| a.as_ref()).collect()
176 } else {
177 lists.iter().map(|x| x.values().as_ref()).collect()
178 };
179
180 let concatenated_values = concat(values.as_slice())?;
181
182 let value_offset_buffer =
184 OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths()));
185
186 let array = GenericListArray::<OffsetSize>::try_new(
187 Arc::clone(field),
188 value_offset_buffer,
189 concatenated_values,
190 lists_nulls,
191 )?;
192
193 Ok(Arc::new(array))
194}
195
196macro_rules! dict_helper {
197 ($t:ty, $arrays:expr) => {
198 return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
199 };
200}
201
202fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
203 match data_type {
204 DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
205 DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
206 DataType::Binary => binary_capacity::<BinaryType>(arrays),
207 DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
208 DataType::FixedSizeList(_, _) => fixed_size_list_capacity(arrays, data_type),
209 _ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
210 }
211}
212
213pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
215 if arrays.is_empty() {
216 return Err(ArrowError::ComputeError(
217 "concat requires input of at least one array".to_string(),
218 ));
219 } else if arrays.len() == 1 {
220 let array = arrays[0];
221 return Ok(array.slice(0, array.len()));
222 }
223
224 let d = arrays[0].data_type();
225 if arrays.iter().skip(1).any(|array| array.data_type() != d) {
226 let error_message = {
228 let mut unique_data_types = HashSet::with_capacity(11);
230
231 let mut error_message =
232 format!("It is not possible to concatenate arrays of different data types ({d}");
233 unique_data_types.insert(d);
234
235 for array in arrays {
236 let is_unique = unique_data_types.insert(array.data_type());
237
238 if unique_data_types.len() == 11 {
239 error_message.push_str(", ...");
240 break;
241 }
242
243 if is_unique {
244 error_message.push_str(", ");
245 error_message.push_str(&array.data_type().to_string());
246 }
247 }
248
249 error_message.push_str(").");
250
251 error_message
252 };
253
254 return Err(ArrowError::InvalidArgumentError(error_message));
255 }
256
257 match d {
258 DataType::Dictionary(k, _) => {
259 downcast_integer! {
260 k.as_ref() => (dict_helper, arrays),
261 _ => unreachable!("illegal dictionary key type {k}")
262 }
263 }
264 DataType::List(field) => concat_lists::<i32>(arrays, field),
265 DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
266 _ => {
267 let capacity = get_capacity(arrays, d);
268 concat_fallback(arrays, capacity)
269 }
270 }
271}
272
273fn concat_fallback(arrays: &[&dyn Array], capacity: Capacities) -> Result<ArrayRef, ArrowError> {
277 let array_data: Vec<_> = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>();
278 let array_data = array_data.iter().collect();
279 let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity);
280
281 for (i, a) in arrays.iter().enumerate() {
282 mutable.extend(i, 0, a.len())
283 }
284
285 Ok(make_array(mutable.freeze()))
286}
287
288pub fn concat_batches<'a>(
295 schema: &SchemaRef,
296 input_batches: impl IntoIterator<Item = &'a RecordBatch>,
297) -> Result<RecordBatch, ArrowError> {
298 if schema.fields().is_empty() {
300 let num_rows: usize = input_batches.into_iter().map(RecordBatch::num_rows).sum();
301 let mut options = RecordBatchOptions::default();
302 options.row_count = Some(num_rows);
303 return RecordBatch::try_new_with_options(schema.clone(), vec![], &options);
304 }
305
306 let batches: Vec<&RecordBatch> = input_batches.into_iter().collect();
307 if batches.is_empty() {
308 return Ok(RecordBatch::new_empty(schema.clone()));
309 }
310 let field_num = schema.fields().len();
311 let mut arrays = Vec::with_capacity(field_num);
312 for i in 0..field_num {
313 let array = concat(
314 &batches
315 .iter()
316 .map(|batch| batch.column(i).as_ref())
317 .collect::<Vec<_>>(),
318 )?;
319 arrays.push(array);
320 }
321 RecordBatch::try_new(schema.clone(), arrays)
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use arrow_array::builder::{GenericListBuilder, StringDictionaryBuilder};
328 use arrow_schema::{Field, Schema};
329 use std::fmt::Debug;
330
331 #[test]
332 fn test_concat_empty_vec() {
333 let re = concat(&[]);
334 assert!(re.is_err());
335 }
336
337 #[test]
338 fn test_concat_batches_no_columns() {
339 let schema = Arc::new(Schema::empty());
341
342 let mut options = RecordBatchOptions::default();
343 options.row_count = Some(100);
344 let batch = RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
345 let re = concat_batches(&schema, &[batch.clone(), batch]).unwrap();
347
348 assert_eq!(re.num_rows(), 200);
349 }
350
351 #[test]
352 fn test_concat_one_element_vec() {
353 let arr = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
354 Some(-1),
355 Some(2),
356 None,
357 ])) as ArrayRef;
358 let result = concat(&[arr.as_ref()]).unwrap();
359 assert_eq!(
360 &arr, &result,
361 "concatenating single element array gives back the same result"
362 );
363 }
364
365 #[test]
366 fn test_concat_incompatible_datatypes() {
367 let re = concat(&[
368 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
369 &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
371 &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
372 &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
374 ]);
375
376 assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32).");
377 }
378
379 #[test]
380 fn test_concat_10_incompatible_datatypes_should_include_all_of_them() {
381 let re = concat(&[
382 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
383 &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
385 &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
386 &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
388 &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
389 &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
390 &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
391 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
392 &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
393 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
395 &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
396 &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
397 ]);
398
399 assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32).");
400 }
401
402 #[test]
403 fn test_concat_11_incompatible_datatypes_should_only_include_10() {
404 let re = concat(&[
405 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
406 &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
408 &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
409 &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
411 &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
412 &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
413 &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
414 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
415 &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
416 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
418 &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
419 &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
420 &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
421 ]);
422
423 assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...).");
424 }
425
426 #[test]
427 fn test_concat_13_incompatible_datatypes_should_not_include_all_of_them() {
428 let re = concat(&[
429 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(2), None]),
430 &StringArray::from(vec![Some("hello"), Some("bar"), Some("world")]),
432 &StringArray::from(vec![Some("hey"), Some(""), Some("you")]),
433 &PrimitiveArray::<Int32Type>::from(vec![Some(-1), Some(2), None]),
435 &PrimitiveArray::<Int8Type>::from(vec![Some(-1), Some(2), None]),
436 &PrimitiveArray::<Int16Type>::from(vec![Some(-1), Some(2), None]),
437 &PrimitiveArray::<UInt8Type>::from(vec![Some(1), Some(2), None]),
438 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
439 &PrimitiveArray::<UInt32Type>::from(vec![Some(1), Some(2), None]),
440 &PrimitiveArray::<UInt16Type>::from(vec![Some(1), Some(2), None]),
442 &PrimitiveArray::<UInt64Type>::from(vec![Some(1), Some(2), None]),
443 &PrimitiveArray::<Float32Type>::from(vec![Some(1.0), Some(2.0), None]),
444 &PrimitiveArray::<Float64Type>::from(vec![Some(1.0), Some(2.0), None]),
445 &PrimitiveArray::<Float16Type>::new_null(3),
446 &BooleanArray::from(vec![Some(true), Some(false), None]),
447 ]);
448
449 assert_eq!(re.unwrap_err().to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int64, Utf8, Int32, Int8, Int16, UInt8, UInt16, UInt32, UInt64, Float32, ...).");
450 }
451
452 #[test]
453 fn test_concat_string_arrays() {
454 let arr = concat(&[
455 &StringArray::from(vec!["hello", "world"]),
456 &StringArray::from(vec!["2", "3", "4"]),
457 &StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]),
458 ])
459 .unwrap();
460
461 let expected_output = Arc::new(StringArray::from(vec![
462 Some("hello"),
463 Some("world"),
464 Some("2"),
465 Some("3"),
466 Some("4"),
467 Some("foo"),
468 Some("bar"),
469 None,
470 Some("baz"),
471 ])) as ArrayRef;
472
473 assert_eq!(&arr, &expected_output);
474 }
475
476 #[test]
477 fn test_concat_primitive_arrays() {
478 let arr = concat(&[
479 &PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None]),
480 &PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None]),
481 &PrimitiveArray::<Int64Type>::from(vec![Some(256), Some(512), Some(1024)]),
482 ])
483 .unwrap();
484
485 let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
486 Some(-1),
487 Some(-1),
488 Some(2),
489 None,
490 None,
491 Some(101),
492 Some(102),
493 Some(103),
494 None,
495 Some(256),
496 Some(512),
497 Some(1024),
498 ])) as ArrayRef;
499
500 assert_eq!(&arr, &expected_output);
501 }
502
503 #[test]
504 fn test_concat_primitive_array_slices() {
505 let input_1 =
506 PrimitiveArray::<Int64Type>::from(vec![Some(-1), Some(-1), Some(2), None, None])
507 .slice(1, 3);
508
509 let input_2 =
510 PrimitiveArray::<Int64Type>::from(vec![Some(101), Some(102), Some(103), None])
511 .slice(1, 3);
512 let arr = concat(&[&input_1, &input_2]).unwrap();
513
514 let expected_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
515 Some(-1),
516 Some(2),
517 None,
518 Some(102),
519 Some(103),
520 None,
521 ])) as ArrayRef;
522
523 assert_eq!(&arr, &expected_output);
524 }
525
526 #[test]
527 fn test_concat_boolean_primitive_arrays() {
528 let arr = concat(&[
529 &BooleanArray::from(vec![
530 Some(true),
531 Some(true),
532 Some(false),
533 None,
534 None,
535 Some(false),
536 ]),
537 &BooleanArray::from(vec![None, Some(false), Some(true), Some(false)]),
538 ])
539 .unwrap();
540
541 let expected_output = Arc::new(BooleanArray::from(vec![
542 Some(true),
543 Some(true),
544 Some(false),
545 None,
546 None,
547 Some(false),
548 None,
549 Some(false),
550 Some(true),
551 Some(false),
552 ])) as ArrayRef;
553
554 assert_eq!(&arr, &expected_output);
555 }
556
557 #[test]
558 fn test_concat_primitive_list_arrays() {
559 let list1 = vec![
560 Some(vec![Some(-1), Some(-1), Some(2), None, None]),
561 Some(vec![]),
562 None,
563 Some(vec![Some(10)]),
564 ];
565 let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
566
567 let list2 = vec![
568 None,
569 Some(vec![Some(100), None, Some(101)]),
570 Some(vec![Some(102)]),
571 ];
572 let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
573
574 let list3 = vec![Some(vec![Some(1000), Some(1001)])];
575 let list3_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone());
576
577 let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
578
579 let expected = list1.into_iter().chain(list2).chain(list3);
580 let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
581
582 assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
583 }
584
585 #[test]
586 fn test_concat_primitive_list_arrays_slices() {
587 let list1 = vec![
588 Some(vec![Some(-1), Some(-1), Some(2), None, None]),
589 Some(vec![]), None, Some(vec![Some(10)]),
592 ];
593 let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
594 let list1_array = list1_array.slice(1, 2);
595 let list1_values = list1.into_iter().skip(1).take(2);
596
597 let list2 = vec![
598 None,
599 Some(vec![Some(100), None, Some(101)]),
600 Some(vec![Some(102)]),
601 ];
602 let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
603
604 assert!(list1_array.offsets()[0].as_usize() > 0);
606 let array_result = concat(&[&list1_array, &list2_array]).unwrap();
607
608 let expected = list1_values.chain(list2);
609 let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
610
611 assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
612 }
613
614 #[test]
615 fn test_concat_primitive_list_arrays_sliced_lengths() {
616 let list1 = vec![
617 Some(vec![Some(-1), Some(-1), Some(2), None, None]), Some(vec![]), None, Some(vec![Some(10)]),
621 ];
622 let list1_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone());
623 let list1_array = list1_array.slice(0, 3); let list1_values = list1.into_iter().take(3);
625
626 let list2 = vec![
627 None,
628 Some(vec![Some(100), None, Some(101)]),
629 Some(vec![Some(102)]),
630 ];
631 let list2_array = ListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone());
632
633 assert_eq!(list1_array.offsets()[0].as_usize(), 0);
636 assert!(list1_array.offsets().last().unwrap().as_usize() < list1_array.values().len());
637 let array_result = concat(&[&list1_array, &list2_array]).unwrap();
638
639 let expected = list1_values.chain(list2);
640 let array_expected = ListArray::from_iter_primitive::<Int64Type, _, _>(expected);
641
642 assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
643 }
644
645 #[test]
646 fn test_concat_primitive_fixed_size_list_arrays() {
647 let list1 = vec![
648 Some(vec![Some(-1), None]),
649 None,
650 Some(vec![Some(10), Some(20)]),
651 ];
652 let list1_array =
653 FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone(), 2);
654
655 let list2 = vec![
656 None,
657 Some(vec![Some(100), None]),
658 Some(vec![Some(102), Some(103)]),
659 ];
660 let list2_array =
661 FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone(), 2);
662
663 let list3 = vec![Some(vec![Some(1000), Some(1001)])];
664 let list3_array =
665 FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone(), 2);
666
667 let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();
668
669 let expected = list1.into_iter().chain(list2).chain(list3);
670 let array_expected =
671 FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(expected, 2);
672
673 assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
674 }
675
676 #[test]
677 fn test_concat_struct_arrays() {
678 let field = Arc::new(Field::new("field", DataType::Int64, true));
679 let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
680 Some(-1),
681 Some(-1),
682 Some(2),
683 None,
684 None,
685 ]));
686 let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
687
688 let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
689 Some(101),
690 Some(102),
691 Some(103),
692 None,
693 ]));
694 let input_struct_2 = StructArray::from(vec![(field.clone(), input_primitive_2)]);
695
696 let input_primitive_3: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
697 Some(256),
698 Some(512),
699 Some(1024),
700 ]));
701 let input_struct_3 = StructArray::from(vec![(field, input_primitive_3)]);
702
703 let arr = concat(&[&input_struct_1, &input_struct_2, &input_struct_3]).unwrap();
704
705 let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
706 Some(-1),
707 Some(-1),
708 Some(2),
709 None,
710 None,
711 Some(101),
712 Some(102),
713 Some(103),
714 None,
715 Some(256),
716 Some(512),
717 Some(1024),
718 ])) as ArrayRef;
719
720 let actual_primitive = arr
721 .as_any()
722 .downcast_ref::<StructArray>()
723 .unwrap()
724 .column(0);
725 assert_eq!(actual_primitive, &expected_primitive_output);
726 }
727
728 #[test]
729 fn test_concat_struct_array_slices() {
730 let field = Arc::new(Field::new("field", DataType::Int64, true));
731 let input_primitive_1: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
732 Some(-1),
733 Some(-1),
734 Some(2),
735 None,
736 None,
737 ]));
738 let input_struct_1 = StructArray::from(vec![(field.clone(), input_primitive_1)]);
739
740 let input_primitive_2: ArrayRef = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
741 Some(101),
742 Some(102),
743 Some(103),
744 None,
745 ]));
746 let input_struct_2 = StructArray::from(vec![(field, input_primitive_2)]);
747
748 let arr = concat(&[&input_struct_1.slice(1, 3), &input_struct_2.slice(1, 2)]).unwrap();
749
750 let expected_primitive_output = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
751 Some(-1),
752 Some(2),
753 None,
754 Some(102),
755 Some(103),
756 ])) as ArrayRef;
757
758 let actual_primitive = arr
759 .as_any()
760 .downcast_ref::<StructArray>()
761 .unwrap()
762 .column(0);
763 assert_eq!(actual_primitive, &expected_primitive_output);
764 }
765
766 #[test]
767 fn test_string_array_slices() {
768 let input_1 = StringArray::from(vec!["hello", "A", "B", "C"]);
769 let input_2 = StringArray::from(vec!["world", "D", "E", "Z"]);
770
771 let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
772
773 let expected_output = StringArray::from(vec!["A", "B", "C", "D", "E"]);
774
775 let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
776 assert_eq!(actual_output, &expected_output);
777 }
778
779 #[test]
780 fn test_string_array_with_null_slices() {
781 let input_1 = StringArray::from(vec![Some("hello"), None, Some("A"), Some("C")]);
782 let input_2 = StringArray::from(vec![None, Some("world"), Some("D"), None]);
783
784 let arr = concat(&[&input_1.slice(1, 3), &input_2.slice(1, 2)]).unwrap();
785
786 let expected_output =
787 StringArray::from(vec![None, Some("A"), Some("C"), Some("world"), Some("D")]);
788
789 let actual_output = arr.as_any().downcast_ref::<StringArray>().unwrap();
790 assert_eq!(actual_output, &expected_output);
791 }
792
793 fn collect_string_dictionary(array: &DictionaryArray<Int32Type>) -> Vec<Option<&str>> {
794 let concrete = array.downcast_dict::<StringArray>().unwrap();
795 concrete.into_iter().collect()
796 }
797
798 #[test]
799 fn test_string_dictionary_array() {
800 let input_1: DictionaryArray<Int32Type> = vec!["hello", "A", "B", "hello", "hello", "C"]
801 .into_iter()
802 .collect();
803 let input_2: DictionaryArray<Int32Type> = vec!["hello", "E", "E", "hello", "F", "E"]
804 .into_iter()
805 .collect();
806
807 let expected: Vec<_> = vec![
808 "hello", "A", "B", "hello", "hello", "C", "hello", "E", "E", "hello", "F", "E",
809 ]
810 .into_iter()
811 .map(Some)
812 .collect();
813
814 let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
815 let dictionary = concat.as_dictionary::<Int32Type>();
816 let actual = collect_string_dictionary(dictionary);
817 assert_eq!(actual, expected);
818
819 assert_eq!(
821 dictionary.values().len(),
822 input_1.values().len() + input_2.values().len(),
823 )
824 }
825
826 #[test]
827 fn test_string_dictionary_array_nulls() {
828 let input_1: DictionaryArray<Int32Type> = vec![Some("foo"), Some("bar"), None, Some("fiz")]
829 .into_iter()
830 .collect();
831 let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
832 let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
833
834 let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
835 let dictionary = concat.as_dictionary::<Int32Type>();
836 let actual = collect_string_dictionary(dictionary);
837 assert_eq!(actual, expected);
838
839 assert_eq!(
841 dictionary.values().len(),
842 input_1.values().len() + input_2.values().len(),
843 )
844 }
845
846 #[test]
847 fn test_string_dictionary_array_nulls_in_values() {
848 let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
849 let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
850 let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
851
852 let input_2_keys = Int32Array::from_iter_values([0]);
853 let input_2_values = StringArray::from(vec![None, Some("hello")]);
854 let input_2 = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
855
856 let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None];
857
858 let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
859 let dictionary = concat.as_dictionary::<Int32Type>();
860 let actual = collect_string_dictionary(dictionary);
861 assert_eq!(actual, expected);
862 }
863
864 #[test]
865 fn test_string_dictionary_merge() {
866 let mut builder = StringDictionaryBuilder::<Int32Type>::new();
867 for i in 0..20 {
868 builder.append(i.to_string()).unwrap();
869 }
870 let input_1 = builder.finish();
871
872 let mut builder = StringDictionaryBuilder::<Int32Type>::new();
873 for i in 0..30 {
874 builder.append(i.to_string()).unwrap();
875 }
876 let input_2 = builder.finish();
877
878 let expected: Vec<_> = (0..20).chain(0..30).map(|x| x.to_string()).collect();
879 let expected: Vec<_> = expected.iter().map(|x| Some(x.as_str())).collect();
880
881 let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap();
882 let dictionary = concat.as_dictionary::<Int32Type>();
883 let actual = collect_string_dictionary(dictionary);
884 assert_eq!(actual, expected);
885
886 let values_len = dictionary.values().len();
889 assert!((30..40).contains(&values_len), "{values_len}")
890 }
891
892 #[test]
893 fn test_concat_string_sizes() {
894 let a: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
895 let b: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
896 let c = LargeStringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
897 let arr = concat(&[&a, &b, &c]).unwrap();
905 assert_eq!(arr.to_data().buffers()[1].capacity(), 960);
907 }
908
909 #[test]
910 fn test_dictionary_concat_reuse() {
911 let array: DictionaryArray<Int8Type> = vec!["a", "a", "b", "c"].into_iter().collect();
912 let copy: DictionaryArray<Int8Type> = array.clone();
913
914 assert_eq!(
916 array.values(),
917 &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef)
918 );
919 assert_eq!(array.keys(), &Int8Array::from(vec![0, 0, 1, 2]));
920
921 let combined = concat(&[© as _, &array as _]).unwrap();
923 let combined = combined.as_dictionary::<Int8Type>();
924
925 assert_eq!(
926 combined.values(),
927 &(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef),
928 "Actual: {combined:#?}"
929 );
930
931 assert_eq!(
932 combined.keys(),
933 &Int8Array::from(vec![0, 0, 1, 2, 0, 0, 1, 2])
934 );
935
936 assert!(array
938 .values()
939 .to_data()
940 .ptr_eq(&combined.values().to_data()));
941 assert!(copy.values().to_data().ptr_eq(&combined.values().to_data()));
942
943 let new: DictionaryArray<Int8Type> = vec!["d"].into_iter().collect();
944 let combined = concat(&[© as _, &array as _, &new as _]).unwrap();
945 let com = combined.as_dictionary::<Int8Type>();
946
947 assert!(!array.values().to_data().ptr_eq(&com.values().to_data()));
949 assert!(!copy.values().to_data().ptr_eq(&com.values().to_data()));
950 assert!(!new.values().to_data().ptr_eq(&com.values().to_data()));
951 }
952
953 #[test]
954 fn concat_record_batches() {
955 let schema = Arc::new(Schema::new(vec![
956 Field::new("a", DataType::Int32, false),
957 Field::new("b", DataType::Utf8, false),
958 ]));
959 let batch1 = RecordBatch::try_new(
960 schema.clone(),
961 vec![
962 Arc::new(Int32Array::from(vec![1, 2])),
963 Arc::new(StringArray::from(vec!["a", "b"])),
964 ],
965 )
966 .unwrap();
967 let batch2 = RecordBatch::try_new(
968 schema.clone(),
969 vec![
970 Arc::new(Int32Array::from(vec![3, 4])),
971 Arc::new(StringArray::from(vec!["c", "d"])),
972 ],
973 )
974 .unwrap();
975 let new_batch = concat_batches(&schema, [&batch1, &batch2]).unwrap();
976 assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
977 assert_eq!(2, new_batch.num_columns());
978 assert_eq!(4, new_batch.num_rows());
979 let new_batch_owned = concat_batches(&schema, &[batch1, batch2]).unwrap();
980 assert_eq!(new_batch_owned.schema().as_ref(), schema.as_ref());
981 assert_eq!(2, new_batch_owned.num_columns());
982 assert_eq!(4, new_batch_owned.num_rows());
983 }
984
985 #[test]
986 fn concat_empty_record_batch() {
987 let schema = Arc::new(Schema::new(vec![
988 Field::new("a", DataType::Int32, false),
989 Field::new("b", DataType::Utf8, false),
990 ]));
991 let batch = concat_batches(&schema, []).unwrap();
992 assert_eq!(batch.schema().as_ref(), schema.as_ref());
993 assert_eq!(0, batch.num_rows());
994 }
995
996 #[test]
997 fn concat_record_batches_of_different_schemas_but_compatible_data() {
998 let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
999 let schema2 = Arc::new(Schema::new(vec![Field::new("c", DataType::Int32, false)]));
1001 let batch1 = RecordBatch::try_new(
1002 schema1.clone(),
1003 vec![Arc::new(Int32Array::from(vec![1, 2]))],
1004 )
1005 .unwrap();
1006 let batch2 =
1007 RecordBatch::try_new(schema2, vec![Arc::new(Int32Array::from(vec![3, 4]))]).unwrap();
1008 let batch = concat_batches(&schema1, [&batch1, &batch2]).unwrap();
1010 assert_eq!(batch.schema().as_ref(), schema1.as_ref());
1011 assert_eq!(4, batch.num_rows());
1012 }
1013
1014 #[test]
1015 fn concat_record_batches_of_different_schemas_incompatible_data() {
1016 let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1017 let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
1019 let batch1 = RecordBatch::try_new(
1020 schema1.clone(),
1021 vec![Arc::new(Int32Array::from(vec![1, 2]))],
1022 )
1023 .unwrap();
1024 let batch2 = RecordBatch::try_new(
1025 schema2,
1026 vec![Arc::new(StringArray::from(vec!["foo", "bar"]))],
1027 )
1028 .unwrap();
1029
1030 let error = concat_batches(&schema1, [&batch1, &batch2]).unwrap_err();
1031 assert_eq!(error.to_string(), "Invalid argument error: It is not possible to concatenate arrays of different data types (Int32, Utf8).");
1032 }
1033
1034 #[test]
1035 fn concat_capacity() {
1036 let a = Int32Array::from_iter_values(0..100);
1037 let b = Int32Array::from_iter_values(10..20);
1038 let a = concat(&[&a, &b]).unwrap();
1039 let data = a.to_data();
1040 assert_eq!(data.buffers()[0].len(), 440);
1041 assert_eq!(data.buffers()[0].capacity(), 448); let a = concat(&[&a.slice(10, 20), &b]).unwrap();
1044 let data = a.to_data();
1045 assert_eq!(data.buffers()[0].len(), 120);
1046 assert_eq!(data.buffers()[0].capacity(), 128); let a = StringArray::from_iter_values(std::iter::repeat("foo").take(100));
1049 let b = StringArray::from(vec!["bingo", "bongo", "lorem", ""]);
1050
1051 let a = concat(&[&a, &b]).unwrap();
1052 let data = a.to_data();
1053 assert_eq!(data.buffers()[0].len(), 420);
1055 assert_eq!(data.buffers()[0].capacity(), 448); assert_eq!(data.buffers()[1].len(), 315);
1059 assert_eq!(data.buffers()[1].capacity(), 320); let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1062 let data = a.to_data();
1063 assert_eq!(data.buffers()[0].len(), 180);
1065 assert_eq!(data.buffers()[0].capacity(), 192); assert_eq!(data.buffers()[1].len(), 135);
1069 assert_eq!(data.buffers()[1].capacity(), 192); let a = LargeBinaryArray::from_iter_values(std::iter::repeat(b"foo").take(100));
1072 let b = LargeBinaryArray::from_iter_values(std::iter::repeat(b"cupcakes").take(10));
1073
1074 let a = concat(&[&a, &b]).unwrap();
1075 let data = a.to_data();
1076 assert_eq!(data.buffers()[0].len(), 888);
1078 assert_eq!(data.buffers()[0].capacity(), 896); assert_eq!(data.buffers()[1].len(), 380);
1082 assert_eq!(data.buffers()[1].capacity(), 384); let a = concat(&[&a.slice(10, 40), &b]).unwrap();
1085 let data = a.to_data();
1086 assert_eq!(data.buffers()[0].len(), 408);
1088 assert_eq!(data.buffers()[0].capacity(), 448); assert_eq!(data.buffers()[1].len(), 200);
1092 assert_eq!(data.buffers()[1].capacity(), 256); }
1094
1095 #[test]
1096 fn concat_sparse_nulls() {
1097 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
1098 let keys = Int32Array::from(vec![1; 10]);
1099 let dict_a = DictionaryArray::new(keys, Arc::new(values));
1100 let values = StringArray::new_null(0);
1101 let keys = Int32Array::new_null(10);
1102 let dict_b = DictionaryArray::new(keys, Arc::new(values));
1103 let array = concat(&[&dict_a, &dict_b]).unwrap();
1104 assert_eq!(array.null_count(), 10);
1105 assert_eq!(array.logical_null_count(), 10);
1106 }
1107
1108 #[test]
1109 fn concat_dictionary_list_array_simple() {
1110 let scalars = vec![
1111 create_single_row_list_of_dict(vec![Some("a")]),
1112 create_single_row_list_of_dict(vec![Some("a")]),
1113 create_single_row_list_of_dict(vec![Some("b")]),
1114 ];
1115
1116 let arrays = scalars
1117 .iter()
1118 .map(|a| a as &(dyn Array))
1119 .collect::<Vec<_>>();
1120 let concat_res = concat(arrays.as_slice()).unwrap();
1121
1122 let expected_list = create_list_of_dict(vec![
1123 Some(vec![Some("a")]),
1125 Some(vec![Some("a")]),
1126 Some(vec![Some("b")]),
1127 ]);
1128
1129 let list = concat_res.as_list::<i32>();
1130
1131 list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1133 assert_eq!(a, b);
1134 });
1135
1136 assert_dictionary_has_unique_values::<_, StringArray>(
1137 list.values().as_dictionary::<Int32Type>(),
1138 );
1139 }
1140
1141 #[test]
1142 fn concat_many_dictionary_list_arrays() {
1143 let number_of_unique_values = 8;
1144 let scalars = (0..80000)
1145 .map(|i| {
1146 create_single_row_list_of_dict(vec![Some(
1147 (i % number_of_unique_values).to_string(),
1148 )])
1149 })
1150 .collect::<Vec<_>>();
1151
1152 let arrays = scalars
1153 .iter()
1154 .map(|a| a as &(dyn Array))
1155 .collect::<Vec<_>>();
1156 let concat_res = concat(arrays.as_slice()).unwrap();
1157
1158 let expected_list = create_list_of_dict(
1159 (0..80000)
1160 .map(|i| Some(vec![Some((i % number_of_unique_values).to_string())]))
1161 .collect::<Vec<_>>(),
1162 );
1163
1164 let list = concat_res.as_list::<i32>();
1165
1166 list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
1168 assert_eq!(a, b);
1169 });
1170
1171 assert_dictionary_has_unique_values::<_, StringArray>(
1172 list.values().as_dictionary::<Int32Type>(),
1173 );
1174 }
1175
1176 fn create_single_row_list_of_dict(
1177 list_items: Vec<Option<impl AsRef<str>>>,
1178 ) -> GenericListArray<i32> {
1179 let rows = list_items.into_iter().map(Some).collect();
1180
1181 create_list_of_dict(vec![rows])
1182 }
1183
1184 fn create_list_of_dict(
1185 rows: Vec<Option<Vec<Option<impl AsRef<str>>>>>,
1186 ) -> GenericListArray<i32> {
1187 let mut builder =
1188 GenericListBuilder::<i32, _>::new(StringDictionaryBuilder::<Int32Type>::new());
1189
1190 for row in rows {
1191 builder.append_option(row);
1192 }
1193
1194 builder.finish()
1195 }
1196
1197 fn assert_dictionary_has_unique_values<'a, K, V>(array: &'a DictionaryArray<K>)
1198 where
1199 K: ArrowDictionaryKeyType,
1200 V: Sync + Send + 'static,
1201 &'a V: ArrayAccessor + IntoIterator,
1202
1203 <&'a V as ArrayAccessor>::Item: Default + Clone + PartialEq + Debug + Ord,
1204 <&'a V as IntoIterator>::Item: Clone + PartialEq + Debug + Ord,
1205 {
1206 let dict = array.downcast_dict::<V>().unwrap();
1207 let mut values = dict.values().into_iter().collect::<Vec<_>>();
1208
1209 values.sort();
1211
1212 let mut unique_values = values.clone();
1213
1214 unique_values.dedup();
1215
1216 assert_eq!(
1217 values, unique_values,
1218 "There are duplicates in the value list (the value list here is sorted which is only for the assertion)"
1219 );
1220 }
1221}