1use crate::filter::{FilterBuilder, FilterPredicate, FilterSelection};
24use crate::take::take_record_batch;
25use arrow_array::types::{BinaryViewType, StringViewType};
26use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
27use arrow_schema::{ArrowError, DataType, SchemaRef};
28use std::collections::VecDeque;
29use std::sync::Arc;
30mod byte_view;
34mod generic;
35mod primitive;
36
37use byte_view::InProgressByteViewArray;
38use generic::GenericInProgressArray;
39use primitive::InProgressPrimitiveArray;
40
41fn has_sparse_filter_copy(data_type: &DataType) -> bool {
42 data_type.is_primitive() || matches!(data_type, DataType::Utf8View | DataType::BinaryView)
43}
44
45const SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR: usize = 16;
51
52fn should_use_sparse_filter_copy(filter_len: usize, selected_count: usize) -> bool {
53 selected_count <= filter_len / SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR
54}
55
56#[derive(Debug)]
148pub struct BatchCoalescer {
149 schema: SchemaRef,
151 target_batch_size: usize,
155 in_progress_arrays: Vec<Box<dyn InProgressArray>>,
157 has_non_specialized_filter_columns: bool,
159 buffered_rows: usize,
161 completed: VecDeque<RecordBatch>,
163 biggest_coalesce_batch_size: Option<usize>,
165}
166
167impl BatchCoalescer {
168 pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
176 let has_non_specialized_filter_columns = schema
177 .fields()
178 .iter()
179 .any(|field| !has_sparse_filter_copy(field.data_type()));
180 let in_progress_arrays = schema
181 .fields()
182 .iter()
183 .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
184 .collect::<Vec<_>>();
185
186 Self {
187 schema,
188 target_batch_size,
189 in_progress_arrays,
190 has_non_specialized_filter_columns,
191 completed: VecDeque::with_capacity(1),
193 buffered_rows: 0,
194 biggest_coalesce_batch_size: None,
195 }
196 }
197
198 pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
211 self.biggest_coalesce_batch_size = limit;
212 self
213 }
214
215 pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
219 self.biggest_coalesce_batch_size
220 }
221
222 pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
226 self.biggest_coalesce_batch_size = limit;
227 }
228
229 pub fn schema(&self) -> SchemaRef {
231 Arc::clone(&self.schema)
232 }
233
234 pub fn push_batch_with_filter(
259 &mut self,
260 batch: RecordBatch,
261 filter: &BooleanArray,
262 ) -> Result<(), ArrowError> {
263 self.push_batch_with_filtered_columns(batch, filter)
264 }
265
266 pub fn push_batch_with_indices(
290 &mut self,
291 batch: RecordBatch,
292 indices: &dyn Array,
293 ) -> Result<(), ArrowError> {
294 let taken_batch = take_record_batch(&batch, indices)?;
296 self.push_batch(taken_batch)
297 }
298
299 pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
326 let batch_size = batch.num_rows();
445
446 if batch_size == 0 {
448 return Ok(());
449 }
450
451 if let Some(limit) = self.biggest_coalesce_batch_size {
453 if batch_size > limit {
454 if self.buffered_rows == 0 {
457 self.completed.push_back(batch);
458 return Ok(());
459 }
460
461 if self.buffered_rows > limit {
466 self.finish_buffered_batch()?;
467 self.completed.push_back(batch);
468 return Ok(());
469 }
470
471 }
476 }
477
478 let (_schema, arrays, mut num_rows) = batch.into_parts();
479
480 if arrays.len() != self.in_progress_arrays.len() {
482 return Err(ArrowError::InvalidArgumentError(format!(
483 "Batch has {} columns but BatchCoalescer expects {}",
484 arrays.len(),
485 self.in_progress_arrays.len()
486 )));
487 }
488 self.in_progress_arrays
489 .iter_mut()
490 .zip(arrays)
491 .for_each(|(in_progress, array)| {
492 in_progress.set_source(Some(array));
493 });
494
495 let mut offset = 0;
498 while num_rows > (self.target_batch_size - self.buffered_rows) {
499 let remaining_rows = self.target_batch_size - self.buffered_rows;
500 debug_assert!(remaining_rows > 0);
501
502 for in_progress in self.in_progress_arrays.iter_mut() {
504 in_progress.copy_rows(offset, remaining_rows)?;
505 }
506
507 self.buffered_rows += remaining_rows;
508 offset += remaining_rows;
509 num_rows -= remaining_rows;
510
511 self.finish_buffered_batch()?;
512 }
513
514 self.buffered_rows += num_rows;
516 if num_rows > 0 {
517 for in_progress in self.in_progress_arrays.iter_mut() {
518 in_progress.copy_rows(offset, num_rows)?;
519 }
520 }
521
522 if self.buffered_rows >= self.target_batch_size {
524 self.finish_buffered_batch()?;
525 }
526
527 for in_progress in self.in_progress_arrays.iter_mut() {
529 in_progress.set_source(None);
530 }
531
532 Ok(())
533 }
534
535 pub fn get_buffered_rows(&self) -> usize {
537 self.buffered_rows
538 }
539
540 pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
548 if self.buffered_rows == 0 {
549 return Ok(());
550 }
551 let new_arrays = self
552 .in_progress_arrays
553 .iter_mut()
554 .map(|array| array.finish())
555 .collect::<Result<Vec<_>, ArrowError>>()?;
556
557 for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
558 debug_assert_eq!(array.data_type(), field.data_type());
559 debug_assert_eq!(array.len(), self.buffered_rows);
560 }
561
562 let batch = unsafe {
564 RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
565 };
566
567 self.buffered_rows = 0;
568 self.completed.push_back(batch);
569 Ok(())
570 }
571
572 pub fn is_empty(&self) -> bool {
574 self.buffered_rows == 0 && self.completed.is_empty()
575 }
576
577 pub fn has_completed_batch(&self) -> bool {
579 !self.completed.is_empty()
580 }
581
582 pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
584 self.completed.pop_front()
585 }
586}
587
588impl BatchCoalescer {
589 fn filter_predicate_for_batch(
590 batch: &RecordBatch,
591 filter: &BooleanArray,
592 selected_count: usize,
593 ) -> FilterPredicate {
594 let mut filter_builder = FilterBuilder::new_with_count(filter, selected_count);
595 if batch.num_columns() > 1
596 || (batch.num_columns() > 0
597 && FilterBuilder::is_optimize_beneficial(batch.schema_ref().field(0).data_type()))
598 {
599 filter_builder = filter_builder.optimize();
600 }
601 filter_builder.build()
602 }
603
604 fn push_batch_with_filtered_columns(
605 &mut self,
606 batch: RecordBatch,
607 filter: &BooleanArray,
608 ) -> Result<(), ArrowError> {
609 let filter_len = filter.len();
610 let batch_num_rows = batch.num_rows();
611 let batch_num_columns = batch.num_columns();
612
613 if filter_len > batch_num_rows {
614 return Err(ArrowError::InvalidArgumentError(format!(
615 "Filter predicate of length {} is larger than target array of length {}",
616 filter_len, batch_num_rows
617 )));
618 }
619
620 let selected_count = filter.true_count();
621 if selected_count == 0 {
622 return Ok(());
623 }
624
625 if selected_count == batch_num_rows && filter_len == batch_num_rows {
626 return self.push_batch(batch);
627 }
628
629 if batch_num_columns != self.in_progress_arrays.len() {
630 return Err(ArrowError::InvalidArgumentError(format!(
631 "Batch has {} columns but BatchCoalescer expects {}",
632 batch_num_columns,
633 self.in_progress_arrays.len()
634 )));
635 }
636
637 let exceeds_coalesce_limit = self
638 .biggest_coalesce_batch_size
639 .is_some_and(|limit| selected_count > limit);
640 let does_not_fit_buffer = selected_count > self.target_batch_size - self.buffered_rows;
641 let should_materialize_filter = exceeds_coalesce_limit
642 || self.has_non_specialized_filter_columns
643 || does_not_fit_buffer
644 || !should_use_sparse_filter_copy(filter_len, selected_count);
645
646 if should_materialize_filter {
647 let predicate = Self::filter_predicate_for_batch(&batch, filter, selected_count);
649 let filtered_batch = predicate.filter_record_batch(&batch)?;
650 return self.push_batch(filtered_batch);
651 }
652
653 let predicate = Self::filter_predicate_for_batch(&batch, filter, selected_count);
654 let (_schema, arrays, _num_rows) = batch.into_parts();
655
656 for (in_progress, array) in self.in_progress_arrays.iter_mut().zip(arrays) {
657 in_progress.copy_rows_by_filter_from(array, &predicate)?;
658 }
659
660 self.buffered_rows += selected_count;
661 if self.buffered_rows >= self.target_batch_size {
662 self.finish_buffered_batch()?;
663 }
664
665 Ok(())
666 }
667}
668
669fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
671 macro_rules! instantiate_primitive {
672 ($t:ty) => {
673 Box::new(InProgressPrimitiveArray::<$t>::new(
674 batch_size,
675 data_type.clone(),
676 ))
677 };
678 }
679
680 downcast_primitive! {
681 data_type => (instantiate_primitive),
683 DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
684 DataType::BinaryView => {
685 Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
686 }
687 _ => Box::new(GenericInProgressArray::new()),
688 }
689}
690
691trait InProgressArray: std::fmt::Debug + Send + Sync {
701 fn set_source(&mut self, source: Option<ArrayRef>);
706
707 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
713
714 fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
716 self.copy_rows_by_selection(filter.selection())
717 }
718
719 fn copy_rows_by_filter_from(
721 &mut self,
722 source: ArrayRef,
723 filter: &FilterPredicate,
724 ) -> Result<(), ArrowError> {
725 self.set_source(Some(source));
726 let result = self.copy_rows_by_filter(filter);
727 self.set_source(None);
728 result
729 }
730
731 fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) -> Result<(), ArrowError> {
733 match selection {
734 FilterSelection::None => Ok(()),
735 FilterSelection::All { len } => self.copy_rows(0, len),
736 FilterSelection::Slices(slices) => {
737 slices.try_for_each(|(start, end)| self.copy_rows(start, end - start))
738 }
739 FilterSelection::Indices(indices) => indices.try_for_each(|idx| self.copy_rows(idx, 1)),
740 }
741 }
742
743 fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
745}
746
747#[cfg(test)]
748mod tests {
749 use super::*;
750 use crate::concat::concat_batches;
751 use crate::filter::filter_record_batch;
752 use arrow_array::builder::StringViewBuilder;
753 use arrow_array::cast::AsArray;
754 use arrow_array::types::Int32Type;
755 use arrow_array::{
756 BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
757 TimestampNanosecondArray, UInt32Array, UInt64Array, make_array,
758 };
759 use arrow_buffer::BooleanBufferBuilder;
760 use arrow_schema::{DataType, Field, Schema};
761 use rand::{Rng, SeedableRng};
762 use std::ops::Range;
763
764 #[test]
765 fn test_coalesce() {
766 let batch = uint32_batch(0..8);
767 Test::new("coalesce")
768 .with_batches(std::iter::repeat_n(batch, 10))
769 .with_batch_size(21)
771 .with_expected_output_sizes(vec![21, 21, 21, 17])
772 .run();
773 }
774
775 #[test]
776 fn test_coalesce_one_by_one() {
777 let batch = uint32_batch(0..1); Test::new("coalesce_one_by_one")
779 .with_batches(std::iter::repeat_n(batch, 97))
780 .with_batch_size(20)
782 .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
783 .run();
784 }
785
786 #[test]
787 fn test_coalesce_empty() {
788 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
789
790 Test::new("coalesce_empty")
791 .with_batches(vec![])
792 .with_schema(schema)
793 .with_batch_size(21)
794 .with_expected_output_sizes(vec![])
795 .run();
796 }
797
798 #[test]
799 fn test_sparse_filter_copy_threshold() {
800 assert!(should_use_sparse_filter_copy(8192, 8));
801 assert!(should_use_sparse_filter_copy(8192, 81));
802 assert!(!should_use_sparse_filter_copy(8192, 819));
803 assert!(!should_use_sparse_filter_copy(8192, 6553));
804 }
805
806 #[test]
807 fn test_single_large_batch_greater_than_target() {
808 let batch = uint32_batch(0..4096);
810 Test::new("coalesce_single_large_batch_greater_than_target")
811 .with_batch(batch)
812 .with_batch_size(1000)
813 .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
814 .run();
815 }
816
817 #[test]
818 fn test_single_large_batch_smaller_than_target() {
819 let batch = uint32_batch(0..4096);
821 Test::new("coalesce_single_large_batch_smaller_than_target")
822 .with_batch(batch)
823 .with_batch_size(8192)
824 .with_expected_output_sizes(vec![4096])
825 .run();
826 }
827
828 #[test]
829 fn test_single_large_batch_equal_to_target() {
830 let batch = uint32_batch(0..4096);
832 Test::new("coalesce_single_large_batch_equal_to_target")
833 .with_batch(batch)
834 .with_batch_size(4096)
835 .with_expected_output_sizes(vec![4096])
836 .run();
837 }
838
839 #[test]
840 fn test_single_large_batch_equally_divisible_in_target() {
841 let batch = uint32_batch(0..4096);
843 Test::new("coalesce_single_large_batch_equally_divisible_in_target")
844 .with_batch(batch)
845 .with_batch_size(1024)
846 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
847 .run();
848 }
849
850 #[test]
851 fn test_empty_schema() {
852 let schema = Schema::empty();
853 let batch = RecordBatch::new_empty(schema.into());
854 Test::new("coalesce_empty_schema")
855 .with_batch(batch)
856 .with_expected_output_sizes(vec![])
857 .run();
858 }
859
860 #[test]
862 fn test_coalesce_filtered_001() {
863 let mut filter_builder = RandomFilterBuilder {
864 num_rows: 8000,
865 selectivity: 0.001,
866 seed: 0,
867 };
868
869 let mut test = Test::new("coalesce_filtered_001");
873 for _ in 0..10 {
874 test = test
875 .with_batch(multi_column_batch(0..8000))
876 .with_filter(filter_builder.next_filter())
877 }
878 test.with_batch_size(15)
879 .with_expected_output_sizes(vec![15, 15, 15, 13])
880 .run();
881 }
882
883 #[test]
885 fn test_coalesce_filtered_01() {
886 let mut filter_builder = RandomFilterBuilder {
887 num_rows: 8000,
888 selectivity: 0.01,
889 seed: 0,
890 };
891
892 let mut test = Test::new("coalesce_filtered_01");
896 for _ in 0..10 {
897 test = test
898 .with_batch(multi_column_batch(0..8000))
899 .with_filter(filter_builder.next_filter())
900 }
901 test.with_batch_size(128)
902 .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
903 .run();
904 }
905
906 #[test]
908 fn test_coalesce_filtered_10() {
909 let mut filter_builder = RandomFilterBuilder {
910 num_rows: 8000,
911 selectivity: 0.1,
912 seed: 0,
913 };
914
915 let mut test = Test::new("coalesce_filtered_10");
919 for _ in 0..10 {
920 test = test
921 .with_batch(multi_column_batch(0..8000))
922 .with_filter(filter_builder.next_filter())
923 }
924 test.with_batch_size(1024)
925 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
926 .run();
927 }
928
929 #[test]
931 fn test_coalesce_filtered_90() {
932 let mut filter_builder = RandomFilterBuilder {
933 num_rows: 800,
934 selectivity: 0.90,
935 seed: 0,
936 };
937
938 let mut test = Test::new("coalesce_filtered_90");
942 for _ in 0..10 {
943 test = test
944 .with_batch(multi_column_batch(0..800))
945 .with_filter(filter_builder.next_filter())
946 }
947 test.with_batch_size(1024)
948 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
949 .run();
950 }
951
952 #[test]
954 fn test_coalesce_filtered_mixed() {
955 let mut filter_builder = RandomFilterBuilder {
956 num_rows: 800,
957 selectivity: 0.90,
958 seed: 0,
959 };
960
961 let mut test = Test::new("coalesce_filtered_mixed");
962 for _ in 0..3 {
963 let mut all_filter_builder = BooleanBufferBuilder::new(1000);
966 all_filter_builder.append_n(500, true);
967 all_filter_builder.append_n(1, false);
968 all_filter_builder.append_n(499, false);
969 let all_filter = all_filter_builder.build();
970
971 test = test
972 .with_batch(multi_column_batch(0..1000))
973 .with_filter(BooleanArray::from(all_filter))
974 .with_batch(multi_column_batch(0..800))
975 .with_filter(filter_builder.next_filter());
976 filter_builder.selectivity *= 0.6;
978 }
979
980 test.with_batch_size(250)
983 .with_expected_output_sizes(vec![
984 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 179,
985 ])
986 .run();
987 }
988
989 #[test]
990 fn test_coalesce_non_null() {
991 Test::new("coalesce_non_null")
992 .with_batch(uint32_batch_non_null(0..3000))
994 .with_batch(uint32_batch_non_null(0..1040))
995 .with_batch_size(1024)
996 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
997 .run();
998 }
999 #[test]
1000 fn test_utf8_split() {
1001 Test::new("coalesce_utf8")
1002 .with_batch(utf8_batch(0..3000))
1004 .with_batch(utf8_batch(0..1040))
1005 .with_batch_size(1024)
1006 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
1007 .run();
1008 }
1009
1010 #[test]
1011 fn test_string_view_no_views() {
1012 let output_batches = Test::new("coalesce_string_view_no_views")
1013 .with_batch(stringview_batch([Some("foo"), Some("bar")]))
1015 .with_batch(stringview_batch([Some("baz"), Some("qux")]))
1016 .with_expected_output_sizes(vec![4])
1017 .run();
1018
1019 expect_buffer_layout(
1020 col_as_string_view("c0", output_batches.first().unwrap()),
1021 vec![],
1022 );
1023 }
1024
1025 #[test]
1026 fn test_string_view_batch_small_no_compact() {
1027 let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
1029 let output_batches = Test::new("coalesce_string_view_batch_small_no_compact")
1030 .with_batch(batch.clone())
1031 .with_expected_output_sizes(vec![1000])
1032 .run();
1033
1034 let array = col_as_string_view("c0", &batch);
1035 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
1036 assert_eq!(array.data_buffers().len(), 0);
1037 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); expect_buffer_layout(gc_array, vec![]);
1040 }
1041
1042 #[test]
1043 fn test_string_view_batch_large_no_compact() {
1044 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
1046 let output_batches = Test::new("coalesce_string_view_batch_large_no_compact")
1047 .with_batch(batch.clone())
1048 .with_batch_size(1000)
1049 .with_expected_output_sizes(vec![1000])
1050 .run();
1051
1052 let array = col_as_string_view("c0", &batch);
1053 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
1054 assert_eq!(array.data_buffers().len(), 5);
1055 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); expect_buffer_layout(
1058 gc_array,
1059 vec![
1060 ExpectedLayout {
1061 len: 8190,
1062 capacity: 8192,
1063 },
1064 ExpectedLayout {
1065 len: 8190,
1066 capacity: 8192,
1067 },
1068 ExpectedLayout {
1069 len: 8190,
1070 capacity: 8192,
1071 },
1072 ExpectedLayout {
1073 len: 8190,
1074 capacity: 8192,
1075 },
1076 ExpectedLayout {
1077 len: 2240,
1078 capacity: 8192,
1079 },
1080 ],
1081 );
1082 }
1083
1084 #[test]
1085 fn test_string_view_batch_small_with_buffers_no_compact() {
1086 let short_strings = std::iter::repeat(Some("SmallString"));
1088 let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
1089 let values = short_strings.take(20).chain(long_strings);
1091 let batch = stringview_batch_repeated(1000, values)
1092 .slice(5, 10);
1094 let output_batches = Test::new("coalesce_string_view_batch_small_with_buffers_no_compact")
1095 .with_batch(batch.clone())
1096 .with_batch_size(1000)
1097 .with_expected_output_sizes(vec![10])
1098 .run();
1099
1100 let array = col_as_string_view("c0", &batch);
1101 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
1102 assert_eq!(array.data_buffers().len(), 1); assert_eq!(gc_array.data_buffers().len(), 0); }
1105
1106 #[test]
1107 fn test_string_view_batch_large_slice_compact() {
1108 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
1110 .slice(11, 22);
1112
1113 let output_batches = Test::new("coalesce_string_view_batch_large_slice_compact")
1114 .with_batch(batch.clone())
1115 .with_batch_size(1000)
1116 .with_expected_output_sizes(vec![22])
1117 .run();
1118
1119 let array = col_as_string_view("c0", &batch);
1120 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
1121 assert_eq!(array.data_buffers().len(), 5);
1122
1123 expect_buffer_layout(
1124 gc_array,
1125 vec![ExpectedLayout {
1126 len: 770,
1127 capacity: 8192,
1128 }],
1129 );
1130 }
1131
1132 #[test]
1133 fn test_string_view_mixed() {
1134 let large_view_batch =
1135 stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
1136 let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
1137 let mixed_batch = stringview_batch_repeated(
1138 1000,
1139 [Some("This string is longer than 12 bytes"), Some("Small")],
1140 );
1141 let mixed_batch_nulls = stringview_batch_repeated(
1142 1000,
1143 [
1144 Some("This string is longer than 12 bytes"),
1145 Some("Small"),
1146 None,
1147 ],
1148 );
1149
1150 let output_batches = Test::new("coalesce_string_view_mixed")
1153 .with_batch(large_view_batch.clone())
1154 .with_batch(small_view_batch)
1155 .with_batch(large_view_batch.slice(10, 20))
1157 .with_batch(mixed_batch_nulls)
1158 .with_batch(large_view_batch.slice(10, 20))
1160 .with_batch(mixed_batch)
1161 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
1162 .run();
1163
1164 expect_buffer_layout(
1165 col_as_string_view("c0", output_batches.first().unwrap()),
1166 vec![
1167 ExpectedLayout {
1168 len: 8190,
1169 capacity: 8192,
1170 },
1171 ExpectedLayout {
1172 len: 8190,
1173 capacity: 8192,
1174 },
1175 ExpectedLayout {
1176 len: 8190,
1177 capacity: 8192,
1178 },
1179 ExpectedLayout {
1180 len: 8190,
1181 capacity: 8192,
1182 },
1183 ExpectedLayout {
1184 len: 2240,
1185 capacity: 8192,
1186 },
1187 ],
1188 );
1189 }
1190
1191 #[test]
1192 fn test_string_view_many_small_compact() {
1193 let batch = stringview_batch_repeated(
1196 200,
1197 [Some("This string is 28 bytes long"), Some("small string")],
1198 );
1199 let output_batches = Test::new("coalesce_string_view_many_small_compact")
1200 .with_batch(batch.clone())
1203 .with_batch(batch.clone())
1204 .with_batch(batch.clone())
1205 .with_batch(batch.clone())
1206 .with_batch(batch.clone())
1207 .with_batch(batch.clone())
1208 .with_batch(batch.clone())
1209 .with_batch(batch.clone())
1210 .with_batch(batch.clone())
1211 .with_batch(batch.clone())
1212 .with_batch_size(8000)
1213 .with_expected_output_sizes(vec![2000]) .run();
1215
1216 expect_buffer_layout(
1218 col_as_string_view("c0", output_batches.first().unwrap()),
1219 vec![
1220 ExpectedLayout {
1221 len: 8176,
1222 capacity: 8192,
1223 },
1224 ExpectedLayout {
1225 len: 16380,
1226 capacity: 16384,
1227 },
1228 ExpectedLayout {
1229 len: 3444,
1230 capacity: 32768,
1231 },
1232 ],
1233 );
1234 }
1235
1236 #[test]
1237 fn test_string_view_many_small_boundary() {
1238 let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1240 let output_batches = Test::new("coalesce_string_view_many_small_boundary")
1241 .with_batches(std::iter::repeat_n(batch, 20))
1242 .with_batch_size(900)
1243 .with_expected_output_sizes(vec![900, 900, 200])
1244 .run();
1245
1246 expect_buffer_layout(
1248 col_as_string_view("c0", output_batches.first().unwrap()),
1249 vec![
1250 ExpectedLayout {
1251 len: 8192,
1252 capacity: 8192,
1253 },
1254 ExpectedLayout {
1255 len: 16384,
1256 capacity: 16384,
1257 },
1258 ExpectedLayout {
1259 len: 4224,
1260 capacity: 32768,
1261 },
1262 ],
1263 );
1264 }
1265
1266 #[test]
1267 fn test_string_view_large_small() {
1268 let mixed_batch = stringview_batch_repeated(
1270 200,
1271 [Some("This string is 28 bytes long"), Some("small string")],
1272 );
1273 let all_large = stringview_batch_repeated(
1275 50,
1276 [Some(
1277 "This buffer has only large strings in it so there are no buffer copies",
1278 )],
1279 );
1280
1281 let output_batches = Test::new("coalesce_string_view_large_small")
1282 .with_batch(mixed_batch.clone())
1285 .with_batch(mixed_batch.clone())
1286 .with_batch(all_large.clone())
1287 .with_batch(mixed_batch.clone())
1288 .with_batch(all_large.clone())
1289 .with_batch(mixed_batch.clone())
1290 .with_batch(mixed_batch.clone())
1291 .with_batch(all_large.clone())
1292 .with_batch(mixed_batch.clone())
1293 .with_batch(all_large.clone())
1294 .with_batch_size(8000)
1295 .with_expected_output_sizes(vec![1400])
1296 .run();
1297
1298 expect_buffer_layout(
1299 col_as_string_view("c0", output_batches.first().unwrap()),
1300 vec![
1301 ExpectedLayout {
1302 len: 8190,
1303 capacity: 8192,
1304 },
1305 ExpectedLayout {
1306 len: 16366,
1307 capacity: 16384,
1308 },
1309 ExpectedLayout {
1310 len: 6244,
1311 capacity: 32768,
1312 },
1313 ],
1314 );
1315 }
1316
1317 #[test]
1318 fn test_binary_view() {
1319 let values: Vec<Option<&[u8]>> = vec![
1320 Some(b"foo"),
1321 None,
1322 Some(b"A longer string that is more than 12 bytes"),
1323 ];
1324
1325 let binary_view =
1326 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1327 let batch =
1328 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1329
1330 Test::new("coalesce_binary_view")
1331 .with_batch(batch.clone())
1332 .with_batch(batch.clone())
1333 .with_batch_size(512)
1334 .with_expected_output_sizes(vec![512, 512, 512, 464])
1335 .run();
1336 }
1337
1338 #[test]
1339 fn test_binary_view_filtered() {
1340 let values: Vec<Option<&[u8]>> = vec![
1341 Some(b"foo"),
1342 None,
1343 Some(b"A longer string that is more than 12 bytes"),
1344 ];
1345
1346 let binary_view =
1347 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1348 let batch =
1349 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1350 let filter = sparse_filter(1000);
1351
1352 Test::new("coalesce_binary_view_filtered")
1353 .with_batch(batch.clone())
1354 .with_filter(filter.clone())
1355 .with_batch(batch)
1356 .with_filter(filter)
1357 .with_batch_size(256)
1358 .with_expected_output_sizes(vec![250])
1359 .run();
1360 }
1361
1362 #[test]
1363 fn test_binary_view_filtered_inline() {
1364 let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
1365
1366 let binary_view =
1367 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1368 let batch =
1369 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1370 let filter = sparse_filter(1000);
1371
1372 Test::new("coalesce_binary_view_filtered_inline")
1373 .with_batch(batch.clone())
1374 .with_filter(filter.clone())
1375 .with_batch(batch)
1376 .with_filter(filter)
1377 .with_batch_size(300)
1378 .with_expected_output_sizes(vec![250])
1379 .run();
1380 }
1381
1382 #[test]
1383 fn test_string_view_filtered_inline() {
1384 let values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1385
1386 let string_view =
1387 StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1388 let batch =
1389 RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap();
1390 let filter = sparse_filter(1000);
1391
1392 Test::new("coalesce_string_view_filtered_inline")
1393 .with_batch(batch.clone())
1394 .with_filter(filter.clone())
1395 .with_batch(batch)
1396 .with_filter(filter)
1397 .with_batch_size(300)
1398 .with_expected_output_sizes(vec![250])
1399 .run();
1400 }
1401
1402 #[test]
1403 fn test_mixed_inline_binary_view_filtered() {
1404 let int_values =
1405 Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
1406 let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
1407 let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
1408 let binary_view = BinaryViewArray::from_iter(
1409 std::iter::repeat(binary_values.iter()).flatten().take(1000),
1410 );
1411
1412 let batch = RecordBatch::try_from_iter(vec![
1413 ("i", Arc::new(int_values) as ArrayRef),
1414 ("f", Arc::new(float_values) as ArrayRef),
1415 ("b", Arc::new(binary_view) as ArrayRef),
1416 ])
1417 .unwrap();
1418
1419 let filter = sparse_filter(1000);
1420
1421 Test::new("coalesce_mixed_inline_binary_view_filtered")
1422 .with_batch(batch.clone())
1423 .with_filter(filter.clone())
1424 .with_batch(batch)
1425 .with_filter(filter)
1426 .with_batch_size(300)
1427 .with_expected_output_sizes(vec![250])
1428 .run();
1429 }
1430
1431 #[test]
1432 fn test_mixed_inline_string_view_filtered() {
1433 let int_values =
1434 Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
1435 let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
1436 let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1437 let string_view = StringViewArray::from_iter(
1438 std::iter::repeat(string_values.iter()).flatten().take(1000),
1439 );
1440
1441 let batch = RecordBatch::try_from_iter(vec![
1442 ("i", Arc::new(int_values) as ArrayRef),
1443 ("f", Arc::new(float_values) as ArrayRef),
1444 ("s", Arc::new(string_view) as ArrayRef),
1445 ])
1446 .unwrap();
1447
1448 let filter = sparse_filter(1000);
1449
1450 Test::new("coalesce_mixed_inline_string_view_filtered")
1451 .with_batch(batch.clone())
1452 .with_filter(filter.clone())
1453 .with_batch(batch)
1454 .with_filter(filter)
1455 .with_batch_size(300)
1456 .with_expected_output_sizes(vec![250])
1457 .run();
1458 }
1459
1460 #[test]
1461 fn test_inline_binary_view_sparse() {
1462 let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
1464 let binary_view =
1465 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1466 let batch =
1467 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1468 let filter = very_sparse_filter(1000);
1469
1470 Test::new("inline_binary_view_sparse")
1471 .with_batch(batch.clone())
1472 .with_filter(filter.clone())
1473 .with_batch(batch)
1474 .with_filter(filter)
1475 .with_batch_size(1024)
1476 .with_expected_output_sizes(vec![100])
1477 .run();
1478 }
1479
1480 #[test]
1481 fn test_inline_string_view_sparse() {
1482 let values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1483 let string_view =
1484 StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1485 let batch =
1486 RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap();
1487 let filter = very_sparse_filter(1000);
1488
1489 Test::new("inline_string_view_sparse")
1490 .with_batch(batch.clone())
1491 .with_filter(filter.clone())
1492 .with_batch(batch)
1493 .with_filter(filter)
1494 .with_batch_size(1024)
1495 .with_expected_output_sizes(vec![100])
1496 .run();
1497 }
1498
1499 #[test]
1500 fn test_inline_mixed_sparse() {
1501 let int_values =
1504 Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
1505 let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
1506 let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1507 let string_view = StringViewArray::from_iter(
1508 std::iter::repeat(string_values.iter()).flatten().take(1000),
1509 );
1510 let binary_values: Vec<Option<&[u8]>> = vec![Some(b"x"), None, Some(b"abcdef")];
1511 let binary_view = BinaryViewArray::from_iter(
1512 std::iter::repeat(binary_values.iter()).flatten().take(1000),
1513 );
1514
1515 let batch = RecordBatch::try_from_iter(vec![
1516 ("i", Arc::new(int_values) as ArrayRef),
1517 ("f", Arc::new(float_values) as ArrayRef),
1518 ("s", Arc::new(string_view) as ArrayRef),
1519 ("b", Arc::new(binary_view) as ArrayRef),
1520 ])
1521 .unwrap();
1522 let filter = very_sparse_filter(1000);
1523
1524 Test::new("inline_mixed_sparse")
1525 .with_batch(batch.clone())
1526 .with_filter(filter.clone())
1527 .with_batch(batch)
1528 .with_filter(filter)
1529 .with_batch_size(1024)
1530 .with_expected_output_sizes(vec![100])
1531 .run();
1532 }
1533
1534 #[test]
1535 fn test_inline_crosses_target_batch_size() {
1536 let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
1540 let make_batch = || {
1541 let binary_view =
1542 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1543 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap()
1544 };
1545 let filter = very_sparse_filter(1000);
1546
1547 Test::new("inline_crosses_target_batch_size")
1548 .with_batch(make_batch())
1549 .with_filter(filter.clone())
1550 .with_batch(make_batch())
1551 .with_filter(filter.clone())
1552 .with_batch(make_batch())
1553 .with_filter(filter)
1554 .with_batch_size(100)
1555 .with_expected_output_sizes(vec![100, 50])
1556 .run();
1557 }
1558
1559 #[test]
1560 fn test_inline_filter_rejects_filter_longer_than_batch() {
1561 let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), Some(b"bar")];
1562 let binary_view = BinaryViewArray::from_iter(values);
1563 let batch =
1564 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1565 let filter = BooleanArray::from(vec![true, false, true]);
1566
1567 let mut coalescer = BatchCoalescer::new(batch.schema(), 100);
1568 let result = coalescer.push_batch_with_filter(batch, &filter);
1569 assert!(result.is_err());
1570 let err = result.unwrap_err().to_string();
1571 assert!(
1572 err.contains("Filter predicate of length 3 is larger than target array of length 2"),
1573 "unexpected error: {err}"
1574 );
1575 }
1576
1577 #[test]
1578 fn test_mixed_boolean_inline_string_view_filtered() {
1579 let bool_values = BooleanArray::from_iter((0..1000).map(|v| Some(v % 3 == 0)));
1580 let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1581 let string_view = StringViewArray::from_iter(
1582 std::iter::repeat(string_values.iter()).flatten().take(1000),
1583 );
1584
1585 let batch = RecordBatch::try_from_iter(vec![
1586 ("b", Arc::new(bool_values) as ArrayRef),
1587 ("s", Arc::new(string_view) as ArrayRef),
1588 ])
1589 .unwrap();
1590
1591 let filter = sparse_filter(1000);
1592
1593 Test::new("coalesce_mixed_boolean_inline_string_view_filtered")
1594 .with_batch(batch.clone())
1595 .with_filter(filter.clone())
1596 .with_batch(batch)
1597 .with_filter(filter)
1598 .with_batch_size(300)
1599 .with_expected_output_sizes(vec![250])
1600 .run();
1601 }
1602
1603 #[test]
1604 fn test_filter_fast_path_schema_capability() {
1605 let supported = Arc::new(Schema::new(vec![
1606 Field::new("primitive", DataType::UInt32, false),
1607 Field::new("utf8_view", DataType::Utf8View, true),
1608 Field::new("binary_view", DataType::BinaryView, true),
1609 ]));
1610 let coalescer = BatchCoalescer::new(supported, 100);
1611 assert!(!coalescer.has_non_specialized_filter_columns);
1612
1613 let utf8 = Arc::new(Schema::new(vec![Field::new("utf8", DataType::Utf8, true)]));
1614 let coalescer = BatchCoalescer::new(utf8, 100);
1615 assert!(coalescer.has_non_specialized_filter_columns);
1616
1617 let boolean = Arc::new(Schema::new(vec![Field::new(
1618 "boolean",
1619 DataType::Boolean,
1620 true,
1621 )]));
1622 let coalescer = BatchCoalescer::new(boolean, 100);
1623 assert!(coalescer.has_non_specialized_filter_columns);
1624 }
1625
1626 #[derive(Debug, Clone, PartialEq)]
1627 struct ExpectedLayout {
1628 len: usize,
1629 capacity: usize,
1630 }
1631
1632 fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1634 let actual = array
1635 .data_buffers()
1636 .iter()
1637 .map(|b| ExpectedLayout {
1638 len: b.len(),
1639 capacity: b.capacity(),
1640 })
1641 .collect::<Vec<_>>();
1642
1643 assert_eq!(
1644 actual, expected,
1645 "Expected buffer layout {expected:#?} but got {actual:#?}"
1646 );
1647 }
1648
1649 #[derive(Debug, Clone)]
1656 struct Test {
1657 name: String,
1659 input_batches: Vec<RecordBatch>,
1661 filters: Vec<BooleanArray>,
1666 schema: Option<SchemaRef>,
1668 expected_output_sizes: Vec<usize>,
1670 target_batch_size: usize,
1672 }
1673
1674 impl Default for Test {
1675 fn default() -> Self {
1676 Self {
1677 name: "".to_string(),
1678 input_batches: vec![],
1679 filters: vec![],
1680 schema: None,
1681 expected_output_sizes: vec![],
1682 target_batch_size: 1024,
1683 }
1684 }
1685 }
1686
1687 impl Test {
1688 fn new(name: impl Into<String>) -> Self {
1689 Self {
1690 name: name.into(),
1691 ..Self::default()
1692 }
1693 }
1694
1695 fn with_description(mut self, description: &str) -> Self {
1697 self.name.push_str(": ");
1698 self.name.push_str(description);
1699 self
1700 }
1701
1702 fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1704 self.target_batch_size = target_batch_size;
1705 self
1706 }
1707
1708 fn with_batch(mut self, batch: RecordBatch) -> Self {
1710 self.input_batches.push(batch);
1711 self
1712 }
1713
1714 fn with_filter(mut self, filter: BooleanArray) -> Self {
1716 self.filters.push(filter);
1717 self
1718 }
1719
1720 fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1722 self.input_batches = batches.into_iter().collect();
1723 self
1724 }
1725
1726 fn with_schema(mut self, schema: SchemaRef) -> Self {
1728 self.schema = Some(schema);
1729 self
1730 }
1731
1732 fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1734 self.expected_output_sizes.extend(sizes);
1735 self
1736 }
1737
1738 fn run(self) -> Vec<RecordBatch> {
1742 let mut extra_tests = vec![];
1747 extra_tests.push(self.clone().make_half_non_nullable());
1748 extra_tests.push(self.clone().insert_empty_batches());
1749 let single_column_tests = self.make_single_column_tests();
1750 for test in single_column_tests {
1751 extra_tests.push(test.clone().make_half_non_nullable());
1752 extra_tests.push(test);
1753 }
1754
1755 let results = self.run_inner();
1758 for extra in extra_tests {
1760 extra.run_inner();
1761 }
1762
1763 results
1764 }
1765
1766 fn run_inner(self) -> Vec<RecordBatch> {
1768 let expected_output = self.expected_output();
1769 let schema = self.schema();
1770
1771 let Self {
1772 name,
1773 input_batches,
1774 filters,
1775 schema: _,
1776 target_batch_size,
1777 expected_output_sizes,
1778 } = self;
1779
1780 println!("Running test '{name}'");
1781
1782 let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1783
1784 let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1785
1786 let mut filters = filters.into_iter();
1788 for batch in input_batches {
1789 if let Some(filter) = filters.next() {
1790 coalescer.push_batch_with_filter(batch, &filter).unwrap();
1791 } else {
1792 coalescer.push_batch(batch).unwrap();
1793 }
1794 }
1795 assert_eq!(schema, coalescer.schema());
1796
1797 if had_input {
1798 assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1799 } else {
1800 assert!(coalescer.is_empty(), "Coalescer should be empty");
1801 }
1802
1803 coalescer.finish_buffered_batch().unwrap();
1804 if had_input {
1805 assert!(
1806 coalescer.has_completed_batch(),
1807 "Coalescer should have completed batches"
1808 );
1809 }
1810
1811 let mut output_batches = vec![];
1812 while let Some(batch) = coalescer.next_completed_batch() {
1813 output_batches.push(batch);
1814 }
1815
1816 let mut starting_idx = 0;
1818 let actual_output_sizes: Vec<usize> =
1819 output_batches.iter().map(|b| b.num_rows()).collect();
1820 assert_eq!(
1821 expected_output_sizes, actual_output_sizes,
1822 "Unexpected number of rows in output batches\n\
1823 Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1824 );
1825 let iter = expected_output_sizes
1826 .iter()
1827 .zip(output_batches.iter())
1828 .enumerate();
1829
1830 for (i, (expected_size, batch)) in iter {
1832 let expected_batch = expected_output.slice(starting_idx, *expected_size);
1835 let expected_batch = normalize_batch(expected_batch);
1836 let batch = normalize_batch(batch.clone());
1837 assert_eq!(
1838 expected_batch, batch,
1839 "Unexpected content in batch {i}:\
1840 \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1841 );
1842 starting_idx += *expected_size;
1843 }
1844 output_batches
1845 }
1846
1847 fn schema(&self) -> SchemaRef {
1850 self.schema
1851 .clone()
1852 .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1853 }
1854
1855 fn expected_output(&self) -> RecordBatch {
1857 let schema = self.schema();
1858 if self.filters.is_empty() {
1859 return concat_batches(&schema, &self.input_batches).unwrap();
1860 }
1861
1862 let mut filters = self.filters.iter();
1863 let filtered_batches = self
1864 .input_batches
1865 .iter()
1866 .map(|batch| {
1867 if let Some(filter) = filters.next() {
1868 filter_record_batch(batch, filter).unwrap()
1869 } else {
1870 batch.clone()
1871 }
1872 })
1873 .collect::<Vec<_>>();
1874 concat_batches(&schema, &filtered_batches).unwrap()
1875 }
1876
1877 fn make_half_non_nullable(mut self) -> Self {
1880 self.input_batches = self
1882 .input_batches
1883 .iter()
1884 .enumerate()
1885 .map(|(i, batch)| {
1886 if i % 2 == 1 {
1887 batch.clone()
1888 } else {
1889 Self::remove_nulls_from_batch(batch)
1890 }
1891 })
1892 .collect();
1893 self.with_description("non-nullable")
1894 }
1895
1896 fn insert_empty_batches(mut self) -> Self {
1898 let empty_batch = RecordBatch::new_empty(self.schema());
1899 self.input_batches = self
1900 .input_batches
1901 .into_iter()
1902 .flat_map(|batch| [empty_batch.clone(), batch])
1903 .collect();
1904 let empty_filters = BooleanArray::builder(0).finish();
1905 self.filters = self
1906 .filters
1907 .into_iter()
1908 .flat_map(|filter| [empty_filters.clone(), filter])
1909 .collect();
1910 self.with_description("empty batches inserted")
1911 }
1912
1913 fn remove_nulls_from_batch(batch: &RecordBatch) -> RecordBatch {
1915 let new_columns = batch
1916 .columns()
1917 .iter()
1918 .map(Self::remove_nulls_from_array)
1919 .collect::<Vec<_>>();
1920 let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
1921 RecordBatch::try_new_with_options(batch.schema(), new_columns, &options).unwrap()
1922 }
1923
1924 fn remove_nulls_from_array(array: &ArrayRef) -> ArrayRef {
1925 make_array(array.to_data().into_builder().nulls(None).build().unwrap())
1926 }
1927
1928 fn make_single_column_tests(&self) -> Vec<Self> {
1934 let original_schema = self.schema();
1935 let mut new_tests = vec![];
1936 for column in original_schema.fields() {
1937 let single_column_schema = Arc::new(Schema::new(vec![column.clone()]));
1938
1939 let single_column_batches = self.input_batches.iter().map(|batch| {
1940 let single_column = batch.column_by_name(column.name()).unwrap();
1941 RecordBatch::try_new(
1942 Arc::clone(&single_column_schema),
1943 vec![single_column.clone()],
1944 )
1945 .unwrap()
1946 });
1947
1948 let single_column_test = self
1949 .clone()
1950 .with_schema(Arc::clone(&single_column_schema))
1951 .with_batches(single_column_batches)
1952 .with_description("single column")
1953 .with_description(column.name());
1954
1955 new_tests.push(single_column_test);
1956 }
1957 new_tests
1958 }
1959 }
1960
1961 fn uint32_batch<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1964 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1965
1966 let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1967 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1968 }
1969
1970 fn uint32_batch_non_null<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1972 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1973
1974 let array = UInt32Array::from_iter_values(range);
1975 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1976 }
1977
1978 fn uint64_batch_non_null<T: std::iter::Iterator<Item = u64>>(range: T) -> RecordBatch {
1980 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt64, false)]));
1981
1982 let array = UInt64Array::from_iter_values(range);
1983 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1984 }
1985
1986 fn utf8_batch(range: Range<u32>) -> RecordBatch {
1989 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1990
1991 let array = StringArray::from_iter(range.map(|i| {
1992 if i % 3 == 0 {
1993 None
1994 } else {
1995 Some(format!("value{i}"))
1996 }
1997 }));
1998
1999 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
2000 }
2001
2002 fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
2004 let schema = Arc::new(Schema::new(vec![Field::new(
2005 "c0",
2006 DataType::Utf8View,
2007 false,
2008 )]));
2009
2010 let array = StringViewArray::from_iter(values);
2011 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
2012 }
2013
2014 fn stringview_batch_repeated<'a>(
2017 num_rows: usize,
2018 values: impl IntoIterator<Item = Option<&'a str>>,
2019 ) -> RecordBatch {
2020 let schema = Arc::new(Schema::new(vec![Field::new(
2021 "c0",
2022 DataType::Utf8View,
2023 true,
2024 )]));
2025
2026 let values: Vec<_> = values.into_iter().collect();
2028 let values_iter = std::iter::repeat(values.iter())
2029 .flatten()
2030 .cloned()
2031 .take(num_rows);
2032
2033 let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
2034 for val in values_iter {
2035 builder.append_option(val);
2036 }
2037
2038 let array = builder.finish();
2039 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
2040 }
2041
2042 fn multi_column_batch(range: Range<i32>) -> RecordBatch {
2044 let int64_array = Int64Array::from_iter(
2045 range
2046 .clone()
2047 .map(|v| if v % 5 == 0 { None } else { Some(v as i64) }),
2048 );
2049 let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
2050 if v % 5 == 0 {
2051 None
2052 } else if v % 7 == 0 {
2053 Some(format!("This is a string longer than 12 bytes{v}"))
2054 } else {
2055 Some(format!("Short {v}"))
2056 }
2057 }));
2058 let string_array = StringArray::from_iter(range.clone().map(|v| {
2059 if v % 11 == 0 {
2060 None
2061 } else {
2062 Some(format!("Value {v}"))
2063 }
2064 }));
2065 let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
2066 if v % 3 == 0 {
2067 None
2068 } else {
2069 Some(v as i64 * 1000) }
2071 }))
2072 .with_timezone("America/New_York");
2073
2074 RecordBatch::try_from_iter(vec![
2075 ("int64", Arc::new(int64_array) as ArrayRef),
2076 ("stringview", Arc::new(string_view_array) as ArrayRef),
2077 ("string", Arc::new(string_array) as ArrayRef),
2078 ("timestamp", Arc::new(timestamp_array) as ArrayRef),
2079 ])
2080 .unwrap()
2081 }
2082
2083 #[derive(Debug)]
2089 struct RandomFilterBuilder {
2090 num_rows: usize,
2092 selectivity: f64,
2095 seed: u64,
2098 }
2099 impl RandomFilterBuilder {
2100 fn next_filter(&mut self) -> BooleanArray {
2103 assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
2104 let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
2105 self.seed += 1;
2106 BooleanArray::from_iter(
2107 (0..self.num_rows)
2108 .map(|_| rng.random_bool(self.selectivity))
2109 .map(Some),
2110 )
2111 }
2112 }
2113
2114 fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
2116 batch
2117 .column_by_name(name)
2118 .expect("column not found")
2119 .as_string_view_opt()
2120 .expect("column is not a string view")
2121 }
2122
2123 fn sparse_filter(len: usize) -> BooleanArray {
2125 BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0)))
2126 }
2127
2128 fn very_sparse_filter(len: usize) -> BooleanArray {
2132 BooleanArray::from_iter((0..len).map(|idx| Some(idx % 20 == 0)))
2133 }
2134
2135 fn normalize_batch(batch: RecordBatch) -> RecordBatch {
2138 let (schema, mut columns, row_count) = batch.into_parts();
2140
2141 for column in columns.iter_mut() {
2142 if let Some(string_view) = column.as_string_view_opt() {
2143 let mut builder = StringViewBuilder::new();
2146 for s in string_view.iter() {
2147 builder.append_option(s);
2148 }
2149 *column = Arc::new(builder.finish());
2150 continue;
2151 }
2152
2153 if let Some(binary_view) = column.as_binary_view_opt() {
2154 *column = Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
2155 }
2156 }
2157
2158 let options = RecordBatchOptions::new().with_row_count(Some(row_count));
2159 RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
2160 }
2161
2162 fn create_test_batch(num_rows: usize) -> RecordBatch {
2164 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
2165 let array = Int32Array::from_iter_values(0..num_rows as i32);
2166 RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
2167 }
2168 #[test]
2169 fn test_biggest_coalesce_batch_size_none_default() {
2170 let mut coalescer = BatchCoalescer::new(
2172 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2173 100,
2174 );
2175
2176 let large_batch = create_test_batch(1000);
2178 coalescer.push_batch(large_batch).unwrap();
2179
2180 let mut output_batches = vec![];
2182 while let Some(batch) = coalescer.next_completed_batch() {
2183 output_batches.push(batch);
2184 }
2185
2186 coalescer.finish_buffered_batch().unwrap();
2187 while let Some(batch) = coalescer.next_completed_batch() {
2188 output_batches.push(batch);
2189 }
2190
2191 assert_eq!(output_batches.len(), 10);
2193 for batch in output_batches {
2194 assert_eq!(batch.num_rows(), 100);
2195 }
2196 }
2197
2198 #[test]
2199 fn test_biggest_coalesce_batch_size_bypass_large_batch() {
2200 let mut coalescer = BatchCoalescer::new(
2202 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2203 100,
2204 );
2205 coalescer.set_biggest_coalesce_batch_size(Some(500));
2206
2207 let large_batch = create_test_batch(1000);
2209 coalescer.push_batch(large_batch.clone()).unwrap();
2210
2211 assert!(coalescer.has_completed_batch());
2213 let output_batch = coalescer.next_completed_batch().unwrap();
2214 assert_eq!(output_batch.num_rows(), 1000);
2215
2216 assert!(!coalescer.has_completed_batch());
2218 assert_eq!(coalescer.get_buffered_rows(), 0);
2219 }
2220
2221 #[test]
2222 fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
2223 let mut coalescer = BatchCoalescer::new(
2225 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2226 100,
2227 )
2228 .with_biggest_coalesce_batch_size(Some(500));
2229
2230 let small_batch = create_test_batch(50);
2232 coalescer.push_batch(small_batch.clone()).unwrap();
2233
2234 assert!(!coalescer.has_completed_batch());
2236 assert_eq!(coalescer.get_buffered_rows(), 50);
2237
2238 coalescer.push_batch(small_batch).unwrap();
2240
2241 assert!(coalescer.has_completed_batch());
2243 let output_batch = coalescer.next_completed_batch().unwrap();
2244 let size = output_batch
2245 .column(0)
2246 .as_primitive::<Int32Type>()
2247 .get_buffer_memory_size();
2248 assert_eq!(size, 400); assert_eq!(output_batch.num_rows(), 100);
2250
2251 assert_eq!(coalescer.get_buffered_rows(), 0);
2252 }
2253
2254 #[test]
2255 fn test_biggest_coalesce_batch_size_equal_boundary() {
2256 let mut coalescer = BatchCoalescer::new(
2258 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2259 100,
2260 );
2261 coalescer.set_biggest_coalesce_batch_size(Some(500));
2262
2263 let boundary_batch = create_test_batch(500);
2265 coalescer.push_batch(boundary_batch).unwrap();
2266
2267 let mut output_count = 0;
2269 while coalescer.next_completed_batch().is_some() {
2270 output_count += 1;
2271 }
2272
2273 coalescer.finish_buffered_batch().unwrap();
2274 while coalescer.next_completed_batch().is_some() {
2275 output_count += 1;
2276 }
2277
2278 assert_eq!(output_count, 5);
2280 }
2281
2282 #[test]
2283 fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
2284 let mut coalescer = BatchCoalescer::new(
2287 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2288 100,
2289 );
2290 coalescer.set_biggest_coalesce_batch_size(Some(200));
2291
2292 let small_batch = create_test_batch(50);
2293
2294 coalescer.push_batch(small_batch).unwrap();
2296 assert_eq!(coalescer.get_buffered_rows(), 50);
2297 assert!(!coalescer.has_completed_batch());
2298
2299 let large_batch1 = create_test_batch(250);
2301 coalescer.push_batch(large_batch1).unwrap();
2302
2303 let mut completed_batches = vec![];
2305 while let Some(batch) = coalescer.next_completed_batch() {
2306 completed_batches.push(batch);
2307 }
2308 assert_eq!(completed_batches.len(), 3);
2309 assert_eq!(coalescer.get_buffered_rows(), 0);
2310
2311 let large_batch2 = create_test_batch(300);
2313 let large_batch3 = create_test_batch(400);
2314
2315 coalescer.push_batch(large_batch2).unwrap();
2317 assert!(coalescer.has_completed_batch());
2318 let output = coalescer.next_completed_batch().unwrap();
2319 assert_eq!(output.num_rows(), 300); assert_eq!(coalescer.get_buffered_rows(), 0);
2321
2322 coalescer.push_batch(large_batch3).unwrap();
2324 assert!(coalescer.has_completed_batch());
2325 let output = coalescer.next_completed_batch().unwrap();
2326 assert_eq!(output.num_rows(), 400); assert_eq!(coalescer.get_buffered_rows(), 0);
2328 }
2329
2330 #[test]
2331 fn test_biggest_coalesce_batch_size_empty_batch() {
2332 let mut coalescer = BatchCoalescer::new(
2334 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2335 100,
2336 );
2337 coalescer.set_biggest_coalesce_batch_size(Some(50));
2338
2339 let empty_batch = create_test_batch(0);
2340 coalescer.push_batch(empty_batch).unwrap();
2341
2342 assert!(!coalescer.has_completed_batch());
2344 assert_eq!(coalescer.get_buffered_rows(), 0);
2345 }
2346
2347 #[test]
2348 fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
2349 let mut coalescer = BatchCoalescer::new(
2351 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2352 100,
2353 );
2354 coalescer.set_biggest_coalesce_batch_size(Some(200));
2355
2356 let small_batch = create_test_batch(30);
2358 coalescer.push_batch(small_batch.clone()).unwrap();
2359 coalescer.push_batch(small_batch).unwrap();
2360 assert_eq!(coalescer.get_buffered_rows(), 60);
2361
2362 let large_batch = create_test_batch(250);
2364 coalescer.push_batch(large_batch).unwrap();
2365
2366 let mut completed_batches = vec![];
2371 while let Some(batch) = coalescer.next_completed_batch() {
2372 completed_batches.push(batch);
2373 }
2374
2375 assert_eq!(completed_batches.len(), 3);
2376 for batch in &completed_batches {
2377 assert_eq!(batch.num_rows(), 100);
2378 }
2379 assert_eq!(coalescer.get_buffered_rows(), 10);
2380 }
2381
2382 #[test]
2383 fn test_biggest_coalesce_batch_size_zero_limit() {
2384 let mut coalescer = BatchCoalescer::new(
2386 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2387 100,
2388 );
2389 coalescer.set_biggest_coalesce_batch_size(Some(0));
2390
2391 let tiny_batch = create_test_batch(1);
2393 coalescer.push_batch(tiny_batch).unwrap();
2394
2395 assert!(coalescer.has_completed_batch());
2396 let output = coalescer.next_completed_batch().unwrap();
2397 assert_eq!(output.num_rows(), 1);
2398 }
2399
2400 #[test]
2401 fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
2402 let mut coalescer = BatchCoalescer::new(
2404 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2405 100,
2406 );
2407 coalescer.set_biggest_coalesce_batch_size(Some(200));
2408
2409 let large_batch = create_test_batch(300);
2411 coalescer.push_batch(large_batch.clone()).unwrap();
2412
2413 assert!(coalescer.has_completed_batch());
2414 let output = coalescer.next_completed_batch().unwrap();
2415 assert_eq!(output.num_rows(), 300); assert_eq!(coalescer.get_buffered_rows(), 0);
2417
2418 let small_batch = create_test_batch(50);
2420 coalescer.push_batch(small_batch).unwrap();
2421 assert_eq!(coalescer.get_buffered_rows(), 50);
2422
2423 coalescer.push_batch(large_batch).unwrap();
2425
2426 let mut completed_batches = vec![];
2429 while let Some(batch) = coalescer.next_completed_batch() {
2430 completed_batches.push(batch);
2431 }
2432
2433 assert_eq!(completed_batches.len(), 3);
2434 for batch in &completed_batches {
2435 assert_eq!(batch.num_rows(), 100);
2436 }
2437 assert_eq!(coalescer.get_buffered_rows(), 50);
2438 }
2439
2440 #[test]
2441 fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
2442 let mut coalescer = BatchCoalescer::new(
2444 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2445 1000,
2446 );
2447 coalescer.set_biggest_coalesce_batch_size(Some(500));
2448
2449 coalescer.push_batch(create_test_batch(20)).unwrap();
2451 coalescer.push_batch(create_test_batch(20)).unwrap();
2452 coalescer.push_batch(create_test_batch(30)).unwrap();
2453
2454 assert_eq!(coalescer.get_buffered_rows(), 70);
2455 assert!(!coalescer.has_completed_batch());
2456
2457 coalescer.push_batch(create_test_batch(700)).unwrap();
2459
2460 assert_eq!(coalescer.get_buffered_rows(), 770);
2462 assert!(!coalescer.has_completed_batch());
2463
2464 coalescer.push_batch(create_test_batch(600)).unwrap();
2466
2467 let mut outputs = vec![];
2469 while let Some(batch) = coalescer.next_completed_batch() {
2470 outputs.push(batch);
2471 }
2472 assert_eq!(outputs.len(), 2); assert_eq!(outputs[0].num_rows(), 770);
2474 assert_eq!(outputs[1].num_rows(), 600);
2475 assert_eq!(coalescer.get_buffered_rows(), 0);
2476
2477 let remaining_batches = [700, 900, 700, 600];
2479 for &size in &remaining_batches {
2480 coalescer.push_batch(create_test_batch(size)).unwrap();
2481
2482 assert!(coalescer.has_completed_batch());
2483 let output = coalescer.next_completed_batch().unwrap();
2484 assert_eq!(output.num_rows(), size);
2485 assert_eq!(coalescer.get_buffered_rows(), 0);
2486 }
2487 }
2488
2489 #[test]
2490 fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
2491 let mut coalescer = BatchCoalescer::new(
2494 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2495 100,
2496 );
2497 coalescer.set_biggest_coalesce_batch_size(Some(200));
2498
2499 let large_batches = vec![
2501 create_test_batch(300),
2502 create_test_batch(400),
2503 create_test_batch(350),
2504 create_test_batch(500),
2505 ];
2506
2507 let mut all_outputs = vec![];
2508
2509 for (i, large_batch) in large_batches.into_iter().enumerate() {
2510 let expected_size = large_batch.num_rows();
2511
2512 assert_eq!(
2514 coalescer.get_buffered_rows(),
2515 0,
2516 "Buffer should be empty before batch {}",
2517 i
2518 );
2519
2520 coalescer.push_batch(large_batch).unwrap();
2521
2522 assert!(
2524 coalescer.has_completed_batch(),
2525 "Should have completed batch after pushing batch {}",
2526 i
2527 );
2528
2529 let output = coalescer.next_completed_batch().unwrap();
2530 assert_eq!(
2531 output.num_rows(),
2532 expected_size,
2533 "Batch {} should have bypassed with original size",
2534 i
2535 );
2536
2537 assert!(
2539 !coalescer.has_completed_batch(),
2540 "Should have no more completed batches after batch {}",
2541 i
2542 );
2543 assert_eq!(
2544 coalescer.get_buffered_rows(),
2545 0,
2546 "Buffer should be empty after batch {}",
2547 i
2548 );
2549
2550 all_outputs.push(output);
2551 }
2552
2553 assert_eq!(all_outputs.len(), 4);
2555 assert_eq!(all_outputs[0].num_rows(), 300);
2556 assert_eq!(all_outputs[1].num_rows(), 400);
2557 assert_eq!(all_outputs[2].num_rows(), 350);
2558 assert_eq!(all_outputs[3].num_rows(), 500);
2559 }
2560
2561 #[test]
2562 fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
2563 let mut coalescer = BatchCoalescer::new(
2565 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2566 100,
2567 );
2568 coalescer.set_biggest_coalesce_batch_size(Some(200));
2569
2570 coalescer.push_batch(create_test_batch(300)).unwrap();
2572 let output = coalescer.next_completed_batch().unwrap();
2573 assert_eq!(output.num_rows(), 300);
2574
2575 coalescer.push_batch(create_test_batch(400)).unwrap();
2577 let output = coalescer.next_completed_batch().unwrap();
2578 assert_eq!(output.num_rows(), 400);
2579
2580 coalescer.push_batch(create_test_batch(50)).unwrap();
2582 assert_eq!(coalescer.get_buffered_rows(), 50);
2583
2584 coalescer.push_batch(create_test_batch(350)).unwrap();
2586
2587 let mut outputs = vec![];
2589 while let Some(batch) = coalescer.next_completed_batch() {
2590 outputs.push(batch);
2591 }
2592 assert_eq!(outputs.len(), 4);
2593 for batch in outputs {
2594 assert_eq!(batch.num_rows(), 100);
2595 }
2596 assert_eq!(coalescer.get_buffered_rows(), 0);
2597 }
2598
2599 #[test]
2600 fn test_coalasce_push_batch_with_indices() {
2601 const MID_POINT: u32 = 2333;
2602 const TOTAL_ROWS: u32 = 23333;
2603 let batch1 = uint32_batch_non_null(0..MID_POINT);
2604 let batch2 = uint32_batch_non_null((MID_POINT..TOTAL_ROWS).rev());
2605
2606 let mut coalescer = BatchCoalescer::new(
2607 Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])),
2608 TOTAL_ROWS as usize,
2609 );
2610 coalescer.push_batch(batch1).unwrap();
2611
2612 let rev_indices = (0..((TOTAL_ROWS - MID_POINT) as u64)).rev();
2613 let reversed_indices_batch = uint64_batch_non_null(rev_indices);
2614
2615 let reverse_indices = UInt64Array::from(reversed_indices_batch.column(0).to_data());
2616 coalescer
2617 .push_batch_with_indices(batch2, &reverse_indices)
2618 .unwrap();
2619
2620 coalescer.finish_buffered_batch().unwrap();
2621 let actual = coalescer.next_completed_batch().unwrap();
2622
2623 let expected = uint32_batch_non_null(0..TOTAL_ROWS);
2624
2625 assert_eq!(expected, actual);
2626 }
2627
2628 #[test]
2629 fn test_push_batch_schema_mismatch_fewer_columns() {
2630 let empty_schema = Arc::new(Schema::empty());
2632 let mut coalescer = BatchCoalescer::new(empty_schema, 100);
2633 let batch = uint32_batch(0..5);
2634 let result = coalescer.push_batch(batch);
2635 assert!(result.is_err());
2636 let err = result.unwrap_err().to_string();
2637 assert!(
2638 err.contains("Batch has 1 columns but BatchCoalescer expects 0"),
2639 "unexpected error: {err}"
2640 );
2641 }
2642
2643 #[test]
2644 fn test_push_batch_schema_mismatch_more_columns() {
2645 let schema = Arc::new(Schema::new(vec![
2647 Field::new("c0", DataType::UInt32, false),
2648 Field::new("c1", DataType::UInt32, false),
2649 ]));
2650 let mut coalescer = BatchCoalescer::new(schema, 100);
2651 let batch = uint32_batch(0..5);
2652 let result = coalescer.push_batch(batch);
2653 assert!(result.is_err());
2654 let err = result.unwrap_err().to_string();
2655 assert!(
2656 err.contains("Batch has 1 columns but BatchCoalescer expects 2"),
2657 "unexpected error: {err}"
2658 );
2659 }
2660
2661 #[test]
2662 fn test_push_batch_schema_mismatch_two_vs_zero() {
2663 let empty_schema = Arc::new(Schema::empty());
2665 let mut coalescer = BatchCoalescer::new(empty_schema, 100);
2666 let schema = Arc::new(Schema::new(vec![
2667 Field::new("c0", DataType::UInt32, false),
2668 Field::new("c1", DataType::UInt32, false),
2669 ]));
2670 let batch = RecordBatch::try_new(
2671 schema,
2672 vec![
2673 Arc::new(UInt32Array::from(vec![1, 2, 3])),
2674 Arc::new(UInt32Array::from(vec![4, 5, 6])),
2675 ],
2676 )
2677 .unwrap();
2678 let result = coalescer.push_batch(batch);
2679 assert!(result.is_err());
2680 let err = result.unwrap_err().to_string();
2681 assert!(
2682 err.contains("Batch has 2 columns but BatchCoalescer expects 0"),
2683 "unexpected error: {err}"
2684 );
2685 }
2686}