1use crate::filter::filter_record_batch;
24use crate::take::take_record_batch;
25use arrow_array::types::{BinaryViewType, StringViewType};
26use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
27use arrow_schema::{ArrowError, DataType, SchemaRef};
28use std::collections::VecDeque;
29use std::sync::Arc;
30mod byte_view;
34mod generic;
35mod primitive;
36
37use byte_view::InProgressByteViewArray;
38use generic::GenericInProgressArray;
39use primitive::InProgressPrimitiveArray;
40
41#[derive(Debug)]
133pub struct BatchCoalescer {
134 schema: SchemaRef,
136 target_batch_size: usize,
140 in_progress_arrays: Vec<Box<dyn InProgressArray>>,
142 buffered_rows: usize,
144 completed: VecDeque<RecordBatch>,
146 biggest_coalesce_batch_size: Option<usize>,
148}
149
150impl BatchCoalescer {
151 pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
159 let in_progress_arrays = schema
160 .fields()
161 .iter()
162 .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
163 .collect::<Vec<_>>();
164
165 Self {
166 schema,
167 target_batch_size,
168 in_progress_arrays,
169 completed: VecDeque::with_capacity(1),
171 buffered_rows: 0,
172 biggest_coalesce_batch_size: None,
173 }
174 }
175
176 pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
189 self.biggest_coalesce_batch_size = limit;
190 self
191 }
192
193 pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
197 self.biggest_coalesce_batch_size
198 }
199
200 pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
204 self.biggest_coalesce_batch_size = limit;
205 }
206
207 pub fn schema(&self) -> SchemaRef {
209 Arc::clone(&self.schema)
210 }
211
212 pub fn push_batch_with_filter(
237 &mut self,
238 batch: RecordBatch,
239 filter: &BooleanArray,
240 ) -> Result<(), ArrowError> {
241 let filtered_batch = filter_record_batch(&batch, filter)?;
244 self.push_batch(filtered_batch)
245 }
246
247 pub fn push_batch_with_indices(
271 &mut self,
272 batch: RecordBatch,
273 indices: &dyn Array,
274 ) -> Result<(), ArrowError> {
275 let taken_batch = take_record_batch(&batch, indices)?;
277 self.push_batch(taken_batch)
278 }
279
280 pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
307 let batch_size = batch.num_rows();
426
427 if batch_size == 0 {
429 return Ok(());
430 }
431
432 if let Some(limit) = self.biggest_coalesce_batch_size {
434 if batch_size > limit {
435 if self.buffered_rows == 0 {
438 self.completed.push_back(batch);
439 return Ok(());
440 }
441
442 if self.buffered_rows > limit {
447 self.finish_buffered_batch()?;
448 self.completed.push_back(batch);
449 return Ok(());
450 }
451
452 }
457 }
458
459 let (_schema, arrays, mut num_rows) = batch.into_parts();
460
461 if arrays.len() != self.in_progress_arrays.len() {
463 return Err(ArrowError::InvalidArgumentError(format!(
464 "Batch has {} columns but BatchCoalescer expects {}",
465 arrays.len(),
466 self.in_progress_arrays.len()
467 )));
468 }
469 self.in_progress_arrays
470 .iter_mut()
471 .zip(arrays)
472 .for_each(|(in_progress, array)| {
473 in_progress.set_source(Some(array));
474 });
475
476 let mut offset = 0;
479 while num_rows > (self.target_batch_size - self.buffered_rows) {
480 let remaining_rows = self.target_batch_size - self.buffered_rows;
481 debug_assert!(remaining_rows > 0);
482
483 for in_progress in self.in_progress_arrays.iter_mut() {
485 in_progress.copy_rows(offset, remaining_rows)?;
486 }
487
488 self.buffered_rows += remaining_rows;
489 offset += remaining_rows;
490 num_rows -= remaining_rows;
491
492 self.finish_buffered_batch()?;
493 }
494
495 self.buffered_rows += num_rows;
497 if num_rows > 0 {
498 for in_progress in self.in_progress_arrays.iter_mut() {
499 in_progress.copy_rows(offset, num_rows)?;
500 }
501 }
502
503 if self.buffered_rows >= self.target_batch_size {
505 self.finish_buffered_batch()?;
506 }
507
508 for in_progress in self.in_progress_arrays.iter_mut() {
510 in_progress.set_source(None);
511 }
512
513 Ok(())
514 }
515
516 pub fn get_buffered_rows(&self) -> usize {
518 self.buffered_rows
519 }
520
521 pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
529 if self.buffered_rows == 0 {
530 return Ok(());
531 }
532 let new_arrays = self
533 .in_progress_arrays
534 .iter_mut()
535 .map(|array| array.finish())
536 .collect::<Result<Vec<_>, ArrowError>>()?;
537
538 for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
539 debug_assert_eq!(array.data_type(), field.data_type());
540 debug_assert_eq!(array.len(), self.buffered_rows);
541 }
542
543 let batch = unsafe {
545 RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
546 };
547
548 self.buffered_rows = 0;
549 self.completed.push_back(batch);
550 Ok(())
551 }
552
553 pub fn is_empty(&self) -> bool {
555 self.buffered_rows == 0 && self.completed.is_empty()
556 }
557
558 pub fn has_completed_batch(&self) -> bool {
560 !self.completed.is_empty()
561 }
562
563 pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
565 self.completed.pop_front()
566 }
567}
568
569fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
571 macro_rules! instantiate_primitive {
572 ($t:ty) => {
573 Box::new(InProgressPrimitiveArray::<$t>::new(
574 batch_size,
575 data_type.clone(),
576 ))
577 };
578 }
579
580 downcast_primitive! {
581 data_type => (instantiate_primitive),
583 DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
584 DataType::BinaryView => {
585 Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
586 }
587 _ => Box::new(GenericInProgressArray::new()),
588 }
589}
590
591trait InProgressArray: std::fmt::Debug + Send + Sync {
601 fn set_source(&mut self, source: Option<ArrayRef>);
606
607 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
613
614 fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use crate::concat::concat_batches;
622 use arrow_array::builder::StringViewBuilder;
623 use arrow_array::cast::AsArray;
624 use arrow_array::types::Int32Type;
625 use arrow_array::{
626 BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
627 TimestampNanosecondArray, UInt32Array, UInt64Array, make_array,
628 };
629 use arrow_buffer::BooleanBufferBuilder;
630 use arrow_schema::{DataType, Field, Schema};
631 use rand::{Rng, SeedableRng};
632 use std::ops::Range;
633
634 #[test]
635 fn test_coalesce() {
636 let batch = uint32_batch(0..8);
637 Test::new("coalesce")
638 .with_batches(std::iter::repeat_n(batch, 10))
639 .with_batch_size(21)
641 .with_expected_output_sizes(vec![21, 21, 21, 17])
642 .run();
643 }
644
645 #[test]
646 fn test_coalesce_one_by_one() {
647 let batch = uint32_batch(0..1); Test::new("coalesce_one_by_one")
649 .with_batches(std::iter::repeat_n(batch, 97))
650 .with_batch_size(20)
652 .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
653 .run();
654 }
655
656 #[test]
657 fn test_coalesce_empty() {
658 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
659
660 Test::new("coalesce_empty")
661 .with_batches(vec![])
662 .with_schema(schema)
663 .with_batch_size(21)
664 .with_expected_output_sizes(vec![])
665 .run();
666 }
667
668 #[test]
669 fn test_single_large_batch_greater_than_target() {
670 let batch = uint32_batch(0..4096);
672 Test::new("coalesce_single_large_batch_greater_than_target")
673 .with_batch(batch)
674 .with_batch_size(1000)
675 .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
676 .run();
677 }
678
679 #[test]
680 fn test_single_large_batch_smaller_than_target() {
681 let batch = uint32_batch(0..4096);
683 Test::new("coalesce_single_large_batch_smaller_than_target")
684 .with_batch(batch)
685 .with_batch_size(8192)
686 .with_expected_output_sizes(vec![4096])
687 .run();
688 }
689
690 #[test]
691 fn test_single_large_batch_equal_to_target() {
692 let batch = uint32_batch(0..4096);
694 Test::new("coalesce_single_large_batch_equal_to_target")
695 .with_batch(batch)
696 .with_batch_size(4096)
697 .with_expected_output_sizes(vec![4096])
698 .run();
699 }
700
701 #[test]
702 fn test_single_large_batch_equally_divisible_in_target() {
703 let batch = uint32_batch(0..4096);
705 Test::new("coalesce_single_large_batch_equally_divisible_in_target")
706 .with_batch(batch)
707 .with_batch_size(1024)
708 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
709 .run();
710 }
711
712 #[test]
713 fn test_empty_schema() {
714 let schema = Schema::empty();
715 let batch = RecordBatch::new_empty(schema.into());
716 Test::new("coalesce_empty_schema")
717 .with_batch(batch)
718 .with_expected_output_sizes(vec![])
719 .run();
720 }
721
722 #[test]
724 fn test_coalesce_filtered_001() {
725 let mut filter_builder = RandomFilterBuilder {
726 num_rows: 8000,
727 selectivity: 0.001,
728 seed: 0,
729 };
730
731 let mut test = Test::new("coalesce_filtered_001");
735 for _ in 0..10 {
736 test = test
737 .with_batch(multi_column_batch(0..8000))
738 .with_filter(filter_builder.next_filter())
739 }
740 test.with_batch_size(15)
741 .with_expected_output_sizes(vec![15, 15, 15, 13])
742 .run();
743 }
744
745 #[test]
747 fn test_coalesce_filtered_01() {
748 let mut filter_builder = RandomFilterBuilder {
749 num_rows: 8000,
750 selectivity: 0.01,
751 seed: 0,
752 };
753
754 let mut test = Test::new("coalesce_filtered_01");
758 for _ in 0..10 {
759 test = test
760 .with_batch(multi_column_batch(0..8000))
761 .with_filter(filter_builder.next_filter())
762 }
763 test.with_batch_size(128)
764 .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
765 .run();
766 }
767
768 #[test]
770 fn test_coalesce_filtered_10() {
771 let mut filter_builder = RandomFilterBuilder {
772 num_rows: 8000,
773 selectivity: 0.1,
774 seed: 0,
775 };
776
777 let mut test = Test::new("coalesce_filtered_10");
781 for _ in 0..10 {
782 test = test
783 .with_batch(multi_column_batch(0..8000))
784 .with_filter(filter_builder.next_filter())
785 }
786 test.with_batch_size(1024)
787 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
788 .run();
789 }
790
791 #[test]
793 fn test_coalesce_filtered_90() {
794 let mut filter_builder = RandomFilterBuilder {
795 num_rows: 800,
796 selectivity: 0.90,
797 seed: 0,
798 };
799
800 let mut test = Test::new("coalesce_filtered_90");
804 for _ in 0..10 {
805 test = test
806 .with_batch(multi_column_batch(0..800))
807 .with_filter(filter_builder.next_filter())
808 }
809 test.with_batch_size(1024)
810 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
811 .run();
812 }
813
814 #[test]
816 fn test_coalesce_filtered_mixed() {
817 let mut filter_builder = RandomFilterBuilder {
818 num_rows: 800,
819 selectivity: 0.90,
820 seed: 0,
821 };
822
823 let mut test = Test::new("coalesce_filtered_mixed");
824 for _ in 0..3 {
825 let mut all_filter_builder = BooleanBufferBuilder::new(1000);
828 all_filter_builder.append_n(500, true);
829 all_filter_builder.append_n(1, false);
830 all_filter_builder.append_n(499, false);
831 let all_filter = all_filter_builder.build();
832
833 test = test
834 .with_batch(multi_column_batch(0..1000))
835 .with_filter(BooleanArray::from(all_filter))
836 .with_batch(multi_column_batch(0..800))
837 .with_filter(filter_builder.next_filter());
838 filter_builder.selectivity *= 0.6;
840 }
841
842 test.with_batch_size(250)
845 .with_expected_output_sizes(vec![
846 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 179,
847 ])
848 .run();
849 }
850
851 #[test]
852 fn test_coalesce_non_null() {
853 Test::new("coalesce_non_null")
854 .with_batch(uint32_batch_non_null(0..3000))
856 .with_batch(uint32_batch_non_null(0..1040))
857 .with_batch_size(1024)
858 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
859 .run();
860 }
861 #[test]
862 fn test_utf8_split() {
863 Test::new("coalesce_utf8")
864 .with_batch(utf8_batch(0..3000))
866 .with_batch(utf8_batch(0..1040))
867 .with_batch_size(1024)
868 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
869 .run();
870 }
871
872 #[test]
873 fn test_string_view_no_views() {
874 let output_batches = Test::new("coalesce_string_view_no_views")
875 .with_batch(stringview_batch([Some("foo"), Some("bar")]))
877 .with_batch(stringview_batch([Some("baz"), Some("qux")]))
878 .with_expected_output_sizes(vec![4])
879 .run();
880
881 expect_buffer_layout(
882 col_as_string_view("c0", output_batches.first().unwrap()),
883 vec![],
884 );
885 }
886
887 #[test]
888 fn test_string_view_batch_small_no_compact() {
889 let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
891 let output_batches = Test::new("coalesce_string_view_batch_small_no_compact")
892 .with_batch(batch.clone())
893 .with_expected_output_sizes(vec![1000])
894 .run();
895
896 let array = col_as_string_view("c0", &batch);
897 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
898 assert_eq!(array.data_buffers().len(), 0);
899 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); expect_buffer_layout(gc_array, vec![]);
902 }
903
904 #[test]
905 fn test_string_view_batch_large_no_compact() {
906 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
908 let output_batches = Test::new("coalesce_string_view_batch_large_no_compact")
909 .with_batch(batch.clone())
910 .with_batch_size(1000)
911 .with_expected_output_sizes(vec![1000])
912 .run();
913
914 let array = col_as_string_view("c0", &batch);
915 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
916 assert_eq!(array.data_buffers().len(), 5);
917 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); expect_buffer_layout(
920 gc_array,
921 vec![
922 ExpectedLayout {
923 len: 8190,
924 capacity: 8192,
925 },
926 ExpectedLayout {
927 len: 8190,
928 capacity: 8192,
929 },
930 ExpectedLayout {
931 len: 8190,
932 capacity: 8192,
933 },
934 ExpectedLayout {
935 len: 8190,
936 capacity: 8192,
937 },
938 ExpectedLayout {
939 len: 2240,
940 capacity: 8192,
941 },
942 ],
943 );
944 }
945
946 #[test]
947 fn test_string_view_batch_small_with_buffers_no_compact() {
948 let short_strings = std::iter::repeat(Some("SmallString"));
950 let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
951 let values = short_strings.take(20).chain(long_strings);
953 let batch = stringview_batch_repeated(1000, values)
954 .slice(5, 10);
956 let output_batches = Test::new("coalesce_string_view_batch_small_with_buffers_no_compact")
957 .with_batch(batch.clone())
958 .with_batch_size(1000)
959 .with_expected_output_sizes(vec![10])
960 .run();
961
962 let array = col_as_string_view("c0", &batch);
963 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
964 assert_eq!(array.data_buffers().len(), 1); assert_eq!(gc_array.data_buffers().len(), 0); }
967
968 #[test]
969 fn test_string_view_batch_large_slice_compact() {
970 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
972 .slice(11, 22);
974
975 let output_batches = Test::new("coalesce_string_view_batch_large_slice_compact")
976 .with_batch(batch.clone())
977 .with_batch_size(1000)
978 .with_expected_output_sizes(vec![22])
979 .run();
980
981 let array = col_as_string_view("c0", &batch);
982 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
983 assert_eq!(array.data_buffers().len(), 5);
984
985 expect_buffer_layout(
986 gc_array,
987 vec![ExpectedLayout {
988 len: 770,
989 capacity: 8192,
990 }],
991 );
992 }
993
994 #[test]
995 fn test_string_view_mixed() {
996 let large_view_batch =
997 stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
998 let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
999 let mixed_batch = stringview_batch_repeated(
1000 1000,
1001 [Some("This string is longer than 12 bytes"), Some("Small")],
1002 );
1003 let mixed_batch_nulls = stringview_batch_repeated(
1004 1000,
1005 [
1006 Some("This string is longer than 12 bytes"),
1007 Some("Small"),
1008 None,
1009 ],
1010 );
1011
1012 let output_batches = Test::new("coalesce_string_view_mixed")
1015 .with_batch(large_view_batch.clone())
1016 .with_batch(small_view_batch)
1017 .with_batch(large_view_batch.slice(10, 20))
1019 .with_batch(mixed_batch_nulls)
1020 .with_batch(large_view_batch.slice(10, 20))
1022 .with_batch(mixed_batch)
1023 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
1024 .run();
1025
1026 expect_buffer_layout(
1027 col_as_string_view("c0", output_batches.first().unwrap()),
1028 vec![
1029 ExpectedLayout {
1030 len: 8190,
1031 capacity: 8192,
1032 },
1033 ExpectedLayout {
1034 len: 8190,
1035 capacity: 8192,
1036 },
1037 ExpectedLayout {
1038 len: 8190,
1039 capacity: 8192,
1040 },
1041 ExpectedLayout {
1042 len: 8190,
1043 capacity: 8192,
1044 },
1045 ExpectedLayout {
1046 len: 2240,
1047 capacity: 8192,
1048 },
1049 ],
1050 );
1051 }
1052
1053 #[test]
1054 fn test_string_view_many_small_compact() {
1055 let batch = stringview_batch_repeated(
1058 200,
1059 [Some("This string is 28 bytes long"), Some("small string")],
1060 );
1061 let output_batches = Test::new("coalesce_string_view_many_small_compact")
1062 .with_batch(batch.clone())
1065 .with_batch(batch.clone())
1066 .with_batch(batch.clone())
1067 .with_batch(batch.clone())
1068 .with_batch(batch.clone())
1069 .with_batch(batch.clone())
1070 .with_batch(batch.clone())
1071 .with_batch(batch.clone())
1072 .with_batch(batch.clone())
1073 .with_batch(batch.clone())
1074 .with_batch_size(8000)
1075 .with_expected_output_sizes(vec![2000]) .run();
1077
1078 expect_buffer_layout(
1080 col_as_string_view("c0", output_batches.first().unwrap()),
1081 vec![
1082 ExpectedLayout {
1083 len: 8176,
1084 capacity: 8192,
1085 },
1086 ExpectedLayout {
1087 len: 16380,
1088 capacity: 16384,
1089 },
1090 ExpectedLayout {
1091 len: 3444,
1092 capacity: 32768,
1093 },
1094 ],
1095 );
1096 }
1097
1098 #[test]
1099 fn test_string_view_many_small_boundary() {
1100 let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1102 let output_batches = Test::new("coalesce_string_view_many_small_boundary")
1103 .with_batches(std::iter::repeat_n(batch, 20))
1104 .with_batch_size(900)
1105 .with_expected_output_sizes(vec![900, 900, 200])
1106 .run();
1107
1108 expect_buffer_layout(
1110 col_as_string_view("c0", output_batches.first().unwrap()),
1111 vec![
1112 ExpectedLayout {
1113 len: 8192,
1114 capacity: 8192,
1115 },
1116 ExpectedLayout {
1117 len: 16384,
1118 capacity: 16384,
1119 },
1120 ExpectedLayout {
1121 len: 4224,
1122 capacity: 32768,
1123 },
1124 ],
1125 );
1126 }
1127
1128 #[test]
1129 fn test_string_view_large_small() {
1130 let mixed_batch = stringview_batch_repeated(
1132 200,
1133 [Some("This string is 28 bytes long"), Some("small string")],
1134 );
1135 let all_large = stringview_batch_repeated(
1137 50,
1138 [Some(
1139 "This buffer has only large strings in it so there are no buffer copies",
1140 )],
1141 );
1142
1143 let output_batches = Test::new("coalesce_string_view_large_small")
1144 .with_batch(mixed_batch.clone())
1147 .with_batch(mixed_batch.clone())
1148 .with_batch(all_large.clone())
1149 .with_batch(mixed_batch.clone())
1150 .with_batch(all_large.clone())
1151 .with_batch(mixed_batch.clone())
1152 .with_batch(mixed_batch.clone())
1153 .with_batch(all_large.clone())
1154 .with_batch(mixed_batch.clone())
1155 .with_batch(all_large.clone())
1156 .with_batch_size(8000)
1157 .with_expected_output_sizes(vec![1400])
1158 .run();
1159
1160 expect_buffer_layout(
1161 col_as_string_view("c0", output_batches.first().unwrap()),
1162 vec![
1163 ExpectedLayout {
1164 len: 8190,
1165 capacity: 8192,
1166 },
1167 ExpectedLayout {
1168 len: 16366,
1169 capacity: 16384,
1170 },
1171 ExpectedLayout {
1172 len: 6244,
1173 capacity: 32768,
1174 },
1175 ],
1176 );
1177 }
1178
1179 #[test]
1180 fn test_binary_view() {
1181 let values: Vec<Option<&[u8]>> = vec![
1182 Some(b"foo"),
1183 None,
1184 Some(b"A longer string that is more than 12 bytes"),
1185 ];
1186
1187 let binary_view =
1188 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1189 let batch =
1190 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1191
1192 Test::new("coalesce_binary_view")
1193 .with_batch(batch.clone())
1194 .with_batch(batch.clone())
1195 .with_batch_size(512)
1196 .with_expected_output_sizes(vec![512, 512, 512, 464])
1197 .run();
1198 }
1199
1200 #[derive(Debug, Clone, PartialEq)]
1201 struct ExpectedLayout {
1202 len: usize,
1203 capacity: usize,
1204 }
1205
1206 fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1208 let actual = array
1209 .data_buffers()
1210 .iter()
1211 .map(|b| ExpectedLayout {
1212 len: b.len(),
1213 capacity: b.capacity(),
1214 })
1215 .collect::<Vec<_>>();
1216
1217 assert_eq!(
1218 actual, expected,
1219 "Expected buffer layout {expected:#?} but got {actual:#?}"
1220 );
1221 }
1222
1223 #[derive(Debug, Clone)]
1230 struct Test {
1231 name: String,
1233 input_batches: Vec<RecordBatch>,
1235 filters: Vec<BooleanArray>,
1240 schema: Option<SchemaRef>,
1242 expected_output_sizes: Vec<usize>,
1244 target_batch_size: usize,
1246 }
1247
1248 impl Default for Test {
1249 fn default() -> Self {
1250 Self {
1251 name: "".to_string(),
1252 input_batches: vec![],
1253 filters: vec![],
1254 schema: None,
1255 expected_output_sizes: vec![],
1256 target_batch_size: 1024,
1257 }
1258 }
1259 }
1260
1261 impl Test {
1262 fn new(name: impl Into<String>) -> Self {
1263 Self {
1264 name: name.into(),
1265 ..Self::default()
1266 }
1267 }
1268
1269 fn with_description(mut self, description: &str) -> Self {
1271 self.name.push_str(": ");
1272 self.name.push_str(description);
1273 self
1274 }
1275
1276 fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1278 self.target_batch_size = target_batch_size;
1279 self
1280 }
1281
1282 fn with_batch(mut self, batch: RecordBatch) -> Self {
1284 self.input_batches.push(batch);
1285 self
1286 }
1287
1288 fn with_filter(mut self, filter: BooleanArray) -> Self {
1290 self.filters.push(filter);
1291 self
1292 }
1293
1294 fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1296 self.input_batches = batches.into_iter().collect();
1297 self
1298 }
1299
1300 fn with_schema(mut self, schema: SchemaRef) -> Self {
1302 self.schema = Some(schema);
1303 self
1304 }
1305
1306 fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1308 self.expected_output_sizes.extend(sizes);
1309 self
1310 }
1311
1312 fn run(self) -> Vec<RecordBatch> {
1316 let mut extra_tests = vec![];
1321 extra_tests.push(self.clone().make_half_non_nullable());
1322 extra_tests.push(self.clone().insert_empty_batches());
1323 let single_column_tests = self.make_single_column_tests();
1324 for test in single_column_tests {
1325 extra_tests.push(test.clone().make_half_non_nullable());
1326 extra_tests.push(test);
1327 }
1328
1329 let results = self.run_inner();
1332 for extra in extra_tests {
1334 extra.run_inner();
1335 }
1336
1337 results
1338 }
1339
1340 fn run_inner(self) -> Vec<RecordBatch> {
1342 let expected_output = self.expected_output();
1343 let schema = self.schema();
1344
1345 let Self {
1346 name,
1347 input_batches,
1348 filters,
1349 schema: _,
1350 target_batch_size,
1351 expected_output_sizes,
1352 } = self;
1353
1354 println!("Running test '{name}'");
1355
1356 let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1357
1358 let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1359
1360 let mut filters = filters.into_iter();
1362 for batch in input_batches {
1363 if let Some(filter) = filters.next() {
1364 coalescer.push_batch_with_filter(batch, &filter).unwrap();
1365 } else {
1366 coalescer.push_batch(batch).unwrap();
1367 }
1368 }
1369 assert_eq!(schema, coalescer.schema());
1370
1371 if had_input {
1372 assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1373 } else {
1374 assert!(coalescer.is_empty(), "Coalescer should be empty");
1375 }
1376
1377 coalescer.finish_buffered_batch().unwrap();
1378 if had_input {
1379 assert!(
1380 coalescer.has_completed_batch(),
1381 "Coalescer should have completed batches"
1382 );
1383 }
1384
1385 let mut output_batches = vec![];
1386 while let Some(batch) = coalescer.next_completed_batch() {
1387 output_batches.push(batch);
1388 }
1389
1390 let mut starting_idx = 0;
1392 let actual_output_sizes: Vec<usize> =
1393 output_batches.iter().map(|b| b.num_rows()).collect();
1394 assert_eq!(
1395 expected_output_sizes, actual_output_sizes,
1396 "Unexpected number of rows in output batches\n\
1397 Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1398 );
1399 let iter = expected_output_sizes
1400 .iter()
1401 .zip(output_batches.iter())
1402 .enumerate();
1403
1404 for (i, (expected_size, batch)) in iter {
1406 let expected_batch = expected_output.slice(starting_idx, *expected_size);
1409 let expected_batch = normalize_batch(expected_batch);
1410 let batch = normalize_batch(batch.clone());
1411 assert_eq!(
1412 expected_batch, batch,
1413 "Unexpected content in batch {i}:\
1414 \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1415 );
1416 starting_idx += *expected_size;
1417 }
1418 output_batches
1419 }
1420
1421 fn schema(&self) -> SchemaRef {
1424 self.schema
1425 .clone()
1426 .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1427 }
1428
1429 fn expected_output(&self) -> RecordBatch {
1431 let schema = self.schema();
1432 if self.filters.is_empty() {
1433 return concat_batches(&schema, &self.input_batches).unwrap();
1434 }
1435
1436 let mut filters = self.filters.iter();
1437 let filtered_batches = self
1438 .input_batches
1439 .iter()
1440 .map(|batch| {
1441 if let Some(filter) = filters.next() {
1442 filter_record_batch(batch, filter).unwrap()
1443 } else {
1444 batch.clone()
1445 }
1446 })
1447 .collect::<Vec<_>>();
1448 concat_batches(&schema, &filtered_batches).unwrap()
1449 }
1450
1451 fn make_half_non_nullable(mut self) -> Self {
1454 self.input_batches = self
1456 .input_batches
1457 .iter()
1458 .enumerate()
1459 .map(|(i, batch)| {
1460 if i % 2 == 1 {
1461 batch.clone()
1462 } else {
1463 Self::remove_nulls_from_batch(batch)
1464 }
1465 })
1466 .collect();
1467 self.with_description("non-nullable")
1468 }
1469
1470 fn insert_empty_batches(mut self) -> Self {
1472 let empty_batch = RecordBatch::new_empty(self.schema());
1473 self.input_batches = self
1474 .input_batches
1475 .into_iter()
1476 .flat_map(|batch| [empty_batch.clone(), batch])
1477 .collect();
1478 let empty_filters = BooleanArray::builder(0).finish();
1479 self.filters = self
1480 .filters
1481 .into_iter()
1482 .flat_map(|filter| [empty_filters.clone(), filter])
1483 .collect();
1484 self.with_description("empty batches inserted")
1485 }
1486
1487 fn remove_nulls_from_batch(batch: &RecordBatch) -> RecordBatch {
1489 let new_columns = batch
1490 .columns()
1491 .iter()
1492 .map(Self::remove_nulls_from_array)
1493 .collect::<Vec<_>>();
1494 let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
1495 RecordBatch::try_new_with_options(batch.schema(), new_columns, &options).unwrap()
1496 }
1497
1498 fn remove_nulls_from_array(array: &ArrayRef) -> ArrayRef {
1499 make_array(array.to_data().into_builder().nulls(None).build().unwrap())
1500 }
1501
1502 fn make_single_column_tests(&self) -> Vec<Self> {
1508 let original_schema = self.schema();
1509 let mut new_tests = vec![];
1510 for column in original_schema.fields() {
1511 let single_column_schema = Arc::new(Schema::new(vec![column.clone()]));
1512
1513 let single_column_batches = self.input_batches.iter().map(|batch| {
1514 let single_column = batch.column_by_name(column.name()).unwrap();
1515 RecordBatch::try_new(
1516 Arc::clone(&single_column_schema),
1517 vec![single_column.clone()],
1518 )
1519 .unwrap()
1520 });
1521
1522 let single_column_test = self
1523 .clone()
1524 .with_schema(Arc::clone(&single_column_schema))
1525 .with_batches(single_column_batches)
1526 .with_description("single column")
1527 .with_description(column.name());
1528
1529 new_tests.push(single_column_test);
1530 }
1531 new_tests
1532 }
1533 }
1534
1535 fn uint32_batch<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1538 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1539
1540 let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1541 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1542 }
1543
1544 fn uint32_batch_non_null<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1546 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1547
1548 let array = UInt32Array::from_iter_values(range);
1549 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1550 }
1551
1552 fn uint64_batch_non_null<T: std::iter::Iterator<Item = u64>>(range: T) -> RecordBatch {
1554 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt64, false)]));
1555
1556 let array = UInt64Array::from_iter_values(range);
1557 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1558 }
1559
1560 fn utf8_batch(range: Range<u32>) -> RecordBatch {
1563 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1564
1565 let array = StringArray::from_iter(range.map(|i| {
1566 if i % 3 == 0 {
1567 None
1568 } else {
1569 Some(format!("value{i}"))
1570 }
1571 }));
1572
1573 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1574 }
1575
1576 fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1578 let schema = Arc::new(Schema::new(vec![Field::new(
1579 "c0",
1580 DataType::Utf8View,
1581 false,
1582 )]));
1583
1584 let array = StringViewArray::from_iter(values);
1585 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1586 }
1587
1588 fn stringview_batch_repeated<'a>(
1591 num_rows: usize,
1592 values: impl IntoIterator<Item = Option<&'a str>>,
1593 ) -> RecordBatch {
1594 let schema = Arc::new(Schema::new(vec![Field::new(
1595 "c0",
1596 DataType::Utf8View,
1597 true,
1598 )]));
1599
1600 let values: Vec<_> = values.into_iter().collect();
1602 let values_iter = std::iter::repeat(values.iter())
1603 .flatten()
1604 .cloned()
1605 .take(num_rows);
1606
1607 let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1608 for val in values_iter {
1609 builder.append_option(val);
1610 }
1611
1612 let array = builder.finish();
1613 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1614 }
1615
1616 fn multi_column_batch(range: Range<i32>) -> RecordBatch {
1618 let int64_array = Int64Array::from_iter(
1619 range
1620 .clone()
1621 .map(|v| if v % 5 == 0 { None } else { Some(v as i64) }),
1622 );
1623 let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
1624 if v % 5 == 0 {
1625 None
1626 } else if v % 7 == 0 {
1627 Some(format!("This is a string longer than 12 bytes{v}"))
1628 } else {
1629 Some(format!("Short {v}"))
1630 }
1631 }));
1632 let string_array = StringArray::from_iter(range.clone().map(|v| {
1633 if v % 11 == 0 {
1634 None
1635 } else {
1636 Some(format!("Value {v}"))
1637 }
1638 }));
1639 let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
1640 if v % 3 == 0 {
1641 None
1642 } else {
1643 Some(v as i64 * 1000) }
1645 }))
1646 .with_timezone("America/New_York");
1647
1648 RecordBatch::try_from_iter(vec![
1649 ("int64", Arc::new(int64_array) as ArrayRef),
1650 ("stringview", Arc::new(string_view_array) as ArrayRef),
1651 ("string", Arc::new(string_array) as ArrayRef),
1652 ("timestamp", Arc::new(timestamp_array) as ArrayRef),
1653 ])
1654 .unwrap()
1655 }
1656
1657 #[derive(Debug)]
1663 struct RandomFilterBuilder {
1664 num_rows: usize,
1666 selectivity: f64,
1669 seed: u64,
1672 }
1673 impl RandomFilterBuilder {
1674 fn next_filter(&mut self) -> BooleanArray {
1677 assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
1678 let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
1679 self.seed += 1;
1680 BooleanArray::from_iter(
1681 (0..self.num_rows)
1682 .map(|_| rng.random_bool(self.selectivity))
1683 .map(Some),
1684 )
1685 }
1686 }
1687
1688 fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1690 batch
1691 .column_by_name(name)
1692 .expect("column not found")
1693 .as_string_view_opt()
1694 .expect("column is not a string view")
1695 }
1696
1697 fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1700 let (schema, mut columns, row_count) = batch.into_parts();
1702
1703 for column in columns.iter_mut() {
1704 let Some(string_view) = column.as_string_view_opt() else {
1705 continue;
1706 };
1707
1708 let mut builder = StringViewBuilder::new();
1711 for s in string_view.iter() {
1712 builder.append_option(s);
1713 }
1714 *column = Arc::new(builder.finish());
1716 }
1717
1718 let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1719 RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1720 }
1721
1722 fn create_test_batch(num_rows: usize) -> RecordBatch {
1724 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
1725 let array = Int32Array::from_iter_values(0..num_rows as i32);
1726 RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
1727 }
1728 #[test]
1729 fn test_biggest_coalesce_batch_size_none_default() {
1730 let mut coalescer = BatchCoalescer::new(
1732 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1733 100,
1734 );
1735
1736 let large_batch = create_test_batch(1000);
1738 coalescer.push_batch(large_batch).unwrap();
1739
1740 let mut output_batches = vec![];
1742 while let Some(batch) = coalescer.next_completed_batch() {
1743 output_batches.push(batch);
1744 }
1745
1746 coalescer.finish_buffered_batch().unwrap();
1747 while let Some(batch) = coalescer.next_completed_batch() {
1748 output_batches.push(batch);
1749 }
1750
1751 assert_eq!(output_batches.len(), 10);
1753 for batch in output_batches {
1754 assert_eq!(batch.num_rows(), 100);
1755 }
1756 }
1757
1758 #[test]
1759 fn test_biggest_coalesce_batch_size_bypass_large_batch() {
1760 let mut coalescer = BatchCoalescer::new(
1762 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1763 100,
1764 );
1765 coalescer.set_biggest_coalesce_batch_size(Some(500));
1766
1767 let large_batch = create_test_batch(1000);
1769 coalescer.push_batch(large_batch.clone()).unwrap();
1770
1771 assert!(coalescer.has_completed_batch());
1773 let output_batch = coalescer.next_completed_batch().unwrap();
1774 assert_eq!(output_batch.num_rows(), 1000);
1775
1776 assert!(!coalescer.has_completed_batch());
1778 assert_eq!(coalescer.get_buffered_rows(), 0);
1779 }
1780
1781 #[test]
1782 fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
1783 let mut coalescer = BatchCoalescer::new(
1785 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1786 100,
1787 )
1788 .with_biggest_coalesce_batch_size(Some(500));
1789
1790 let small_batch = create_test_batch(50);
1792 coalescer.push_batch(small_batch.clone()).unwrap();
1793
1794 assert!(!coalescer.has_completed_batch());
1796 assert_eq!(coalescer.get_buffered_rows(), 50);
1797
1798 coalescer.push_batch(small_batch).unwrap();
1800
1801 assert!(coalescer.has_completed_batch());
1803 let output_batch = coalescer.next_completed_batch().unwrap();
1804 let size = output_batch
1805 .column(0)
1806 .as_primitive::<Int32Type>()
1807 .get_buffer_memory_size();
1808 assert_eq!(size, 400); assert_eq!(output_batch.num_rows(), 100);
1810
1811 assert_eq!(coalescer.get_buffered_rows(), 0);
1812 }
1813
1814 #[test]
1815 fn test_biggest_coalesce_batch_size_equal_boundary() {
1816 let mut coalescer = BatchCoalescer::new(
1818 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1819 100,
1820 );
1821 coalescer.set_biggest_coalesce_batch_size(Some(500));
1822
1823 let boundary_batch = create_test_batch(500);
1825 coalescer.push_batch(boundary_batch).unwrap();
1826
1827 let mut output_count = 0;
1829 while coalescer.next_completed_batch().is_some() {
1830 output_count += 1;
1831 }
1832
1833 coalescer.finish_buffered_batch().unwrap();
1834 while coalescer.next_completed_batch().is_some() {
1835 output_count += 1;
1836 }
1837
1838 assert_eq!(output_count, 5);
1840 }
1841
1842 #[test]
1843 fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
1844 let mut coalescer = BatchCoalescer::new(
1847 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1848 100,
1849 );
1850 coalescer.set_biggest_coalesce_batch_size(Some(200));
1851
1852 let small_batch = create_test_batch(50);
1853
1854 coalescer.push_batch(small_batch).unwrap();
1856 assert_eq!(coalescer.get_buffered_rows(), 50);
1857 assert!(!coalescer.has_completed_batch());
1858
1859 let large_batch1 = create_test_batch(250);
1861 coalescer.push_batch(large_batch1).unwrap();
1862
1863 let mut completed_batches = vec![];
1865 while let Some(batch) = coalescer.next_completed_batch() {
1866 completed_batches.push(batch);
1867 }
1868 assert_eq!(completed_batches.len(), 3);
1869 assert_eq!(coalescer.get_buffered_rows(), 0);
1870
1871 let large_batch2 = create_test_batch(300);
1873 let large_batch3 = create_test_batch(400);
1874
1875 coalescer.push_batch(large_batch2).unwrap();
1877 assert!(coalescer.has_completed_batch());
1878 let output = coalescer.next_completed_batch().unwrap();
1879 assert_eq!(output.num_rows(), 300); assert_eq!(coalescer.get_buffered_rows(), 0);
1881
1882 coalescer.push_batch(large_batch3).unwrap();
1884 assert!(coalescer.has_completed_batch());
1885 let output = coalescer.next_completed_batch().unwrap();
1886 assert_eq!(output.num_rows(), 400); assert_eq!(coalescer.get_buffered_rows(), 0);
1888 }
1889
1890 #[test]
1891 fn test_biggest_coalesce_batch_size_empty_batch() {
1892 let mut coalescer = BatchCoalescer::new(
1894 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1895 100,
1896 );
1897 coalescer.set_biggest_coalesce_batch_size(Some(50));
1898
1899 let empty_batch = create_test_batch(0);
1900 coalescer.push_batch(empty_batch).unwrap();
1901
1902 assert!(!coalescer.has_completed_batch());
1904 assert_eq!(coalescer.get_buffered_rows(), 0);
1905 }
1906
1907 #[test]
1908 fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
1909 let mut coalescer = BatchCoalescer::new(
1911 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1912 100,
1913 );
1914 coalescer.set_biggest_coalesce_batch_size(Some(200));
1915
1916 let small_batch = create_test_batch(30);
1918 coalescer.push_batch(small_batch.clone()).unwrap();
1919 coalescer.push_batch(small_batch).unwrap();
1920 assert_eq!(coalescer.get_buffered_rows(), 60);
1921
1922 let large_batch = create_test_batch(250);
1924 coalescer.push_batch(large_batch).unwrap();
1925
1926 let mut completed_batches = vec![];
1931 while let Some(batch) = coalescer.next_completed_batch() {
1932 completed_batches.push(batch);
1933 }
1934
1935 assert_eq!(completed_batches.len(), 3);
1936 for batch in &completed_batches {
1937 assert_eq!(batch.num_rows(), 100);
1938 }
1939 assert_eq!(coalescer.get_buffered_rows(), 10);
1940 }
1941
1942 #[test]
1943 fn test_biggest_coalesce_batch_size_zero_limit() {
1944 let mut coalescer = BatchCoalescer::new(
1946 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1947 100,
1948 );
1949 coalescer.set_biggest_coalesce_batch_size(Some(0));
1950
1951 let tiny_batch = create_test_batch(1);
1953 coalescer.push_batch(tiny_batch).unwrap();
1954
1955 assert!(coalescer.has_completed_batch());
1956 let output = coalescer.next_completed_batch().unwrap();
1957 assert_eq!(output.num_rows(), 1);
1958 }
1959
1960 #[test]
1961 fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
1962 let mut coalescer = BatchCoalescer::new(
1964 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1965 100,
1966 );
1967 coalescer.set_biggest_coalesce_batch_size(Some(200));
1968
1969 let large_batch = create_test_batch(300);
1971 coalescer.push_batch(large_batch.clone()).unwrap();
1972
1973 assert!(coalescer.has_completed_batch());
1974 let output = coalescer.next_completed_batch().unwrap();
1975 assert_eq!(output.num_rows(), 300); assert_eq!(coalescer.get_buffered_rows(), 0);
1977
1978 let small_batch = create_test_batch(50);
1980 coalescer.push_batch(small_batch).unwrap();
1981 assert_eq!(coalescer.get_buffered_rows(), 50);
1982
1983 coalescer.push_batch(large_batch).unwrap();
1985
1986 let mut completed_batches = vec![];
1989 while let Some(batch) = coalescer.next_completed_batch() {
1990 completed_batches.push(batch);
1991 }
1992
1993 assert_eq!(completed_batches.len(), 3);
1994 for batch in &completed_batches {
1995 assert_eq!(batch.num_rows(), 100);
1996 }
1997 assert_eq!(coalescer.get_buffered_rows(), 50);
1998 }
1999
2000 #[test]
2001 fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
2002 let mut coalescer = BatchCoalescer::new(
2004 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2005 1000,
2006 );
2007 coalescer.set_biggest_coalesce_batch_size(Some(500));
2008
2009 coalescer.push_batch(create_test_batch(20)).unwrap();
2011 coalescer.push_batch(create_test_batch(20)).unwrap();
2012 coalescer.push_batch(create_test_batch(30)).unwrap();
2013
2014 assert_eq!(coalescer.get_buffered_rows(), 70);
2015 assert!(!coalescer.has_completed_batch());
2016
2017 coalescer.push_batch(create_test_batch(700)).unwrap();
2019
2020 assert_eq!(coalescer.get_buffered_rows(), 770);
2022 assert!(!coalescer.has_completed_batch());
2023
2024 coalescer.push_batch(create_test_batch(600)).unwrap();
2026
2027 let mut outputs = vec![];
2029 while let Some(batch) = coalescer.next_completed_batch() {
2030 outputs.push(batch);
2031 }
2032 assert_eq!(outputs.len(), 2); assert_eq!(outputs[0].num_rows(), 770);
2034 assert_eq!(outputs[1].num_rows(), 600);
2035 assert_eq!(coalescer.get_buffered_rows(), 0);
2036
2037 let remaining_batches = [700, 900, 700, 600];
2039 for &size in &remaining_batches {
2040 coalescer.push_batch(create_test_batch(size)).unwrap();
2041
2042 assert!(coalescer.has_completed_batch());
2043 let output = coalescer.next_completed_batch().unwrap();
2044 assert_eq!(output.num_rows(), size);
2045 assert_eq!(coalescer.get_buffered_rows(), 0);
2046 }
2047 }
2048
2049 #[test]
2050 fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
2051 let mut coalescer = BatchCoalescer::new(
2054 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2055 100,
2056 );
2057 coalescer.set_biggest_coalesce_batch_size(Some(200));
2058
2059 let large_batches = vec![
2061 create_test_batch(300),
2062 create_test_batch(400),
2063 create_test_batch(350),
2064 create_test_batch(500),
2065 ];
2066
2067 let mut all_outputs = vec![];
2068
2069 for (i, large_batch) in large_batches.into_iter().enumerate() {
2070 let expected_size = large_batch.num_rows();
2071
2072 assert_eq!(
2074 coalescer.get_buffered_rows(),
2075 0,
2076 "Buffer should be empty before batch {}",
2077 i
2078 );
2079
2080 coalescer.push_batch(large_batch).unwrap();
2081
2082 assert!(
2084 coalescer.has_completed_batch(),
2085 "Should have completed batch after pushing batch {}",
2086 i
2087 );
2088
2089 let output = coalescer.next_completed_batch().unwrap();
2090 assert_eq!(
2091 output.num_rows(),
2092 expected_size,
2093 "Batch {} should have bypassed with original size",
2094 i
2095 );
2096
2097 assert!(
2099 !coalescer.has_completed_batch(),
2100 "Should have no more completed batches after batch {}",
2101 i
2102 );
2103 assert_eq!(
2104 coalescer.get_buffered_rows(),
2105 0,
2106 "Buffer should be empty after batch {}",
2107 i
2108 );
2109
2110 all_outputs.push(output);
2111 }
2112
2113 assert_eq!(all_outputs.len(), 4);
2115 assert_eq!(all_outputs[0].num_rows(), 300);
2116 assert_eq!(all_outputs[1].num_rows(), 400);
2117 assert_eq!(all_outputs[2].num_rows(), 350);
2118 assert_eq!(all_outputs[3].num_rows(), 500);
2119 }
2120
2121 #[test]
2122 fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
2123 let mut coalescer = BatchCoalescer::new(
2125 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2126 100,
2127 );
2128 coalescer.set_biggest_coalesce_batch_size(Some(200));
2129
2130 coalescer.push_batch(create_test_batch(300)).unwrap();
2132 let output = coalescer.next_completed_batch().unwrap();
2133 assert_eq!(output.num_rows(), 300);
2134
2135 coalescer.push_batch(create_test_batch(400)).unwrap();
2137 let output = coalescer.next_completed_batch().unwrap();
2138 assert_eq!(output.num_rows(), 400);
2139
2140 coalescer.push_batch(create_test_batch(50)).unwrap();
2142 assert_eq!(coalescer.get_buffered_rows(), 50);
2143
2144 coalescer.push_batch(create_test_batch(350)).unwrap();
2146
2147 let mut outputs = vec![];
2149 while let Some(batch) = coalescer.next_completed_batch() {
2150 outputs.push(batch);
2151 }
2152 assert_eq!(outputs.len(), 4);
2153 for batch in outputs {
2154 assert_eq!(batch.num_rows(), 100);
2155 }
2156 assert_eq!(coalescer.get_buffered_rows(), 0);
2157 }
2158
2159 #[test]
2160 fn test_coalasce_push_batch_with_indices() {
2161 const MID_POINT: u32 = 2333;
2162 const TOTAL_ROWS: u32 = 23333;
2163 let batch1 = uint32_batch_non_null(0..MID_POINT);
2164 let batch2 = uint32_batch_non_null((MID_POINT..TOTAL_ROWS).rev());
2165
2166 let mut coalescer = BatchCoalescer::new(
2167 Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])),
2168 TOTAL_ROWS as usize,
2169 );
2170 coalescer.push_batch(batch1).unwrap();
2171
2172 let rev_indices = (0..((TOTAL_ROWS - MID_POINT) as u64)).rev();
2173 let reversed_indices_batch = uint64_batch_non_null(rev_indices);
2174
2175 let reverse_indices = UInt64Array::from(reversed_indices_batch.column(0).to_data());
2176 coalescer
2177 .push_batch_with_indices(batch2, &reverse_indices)
2178 .unwrap();
2179
2180 coalescer.finish_buffered_batch().unwrap();
2181 let actual = coalescer.next_completed_batch().unwrap();
2182
2183 let expected = uint32_batch_non_null(0..TOTAL_ROWS);
2184
2185 assert_eq!(expected, actual);
2186 }
2187
2188 #[test]
2189 fn test_push_batch_schema_mismatch_fewer_columns() {
2190 let empty_schema = Arc::new(Schema::empty());
2192 let mut coalescer = BatchCoalescer::new(empty_schema, 100);
2193 let batch = uint32_batch(0..5);
2194 let result = coalescer.push_batch(batch);
2195 assert!(result.is_err());
2196 let err = result.unwrap_err().to_string();
2197 assert!(
2198 err.contains("Batch has 1 columns but BatchCoalescer expects 0"),
2199 "unexpected error: {err}"
2200 );
2201 }
2202
2203 #[test]
2204 fn test_push_batch_schema_mismatch_more_columns() {
2205 let schema = Arc::new(Schema::new(vec![
2207 Field::new("c0", DataType::UInt32, false),
2208 Field::new("c1", DataType::UInt32, false),
2209 ]));
2210 let mut coalescer = BatchCoalescer::new(schema, 100);
2211 let batch = uint32_batch(0..5);
2212 let result = coalescer.push_batch(batch);
2213 assert!(result.is_err());
2214 let err = result.unwrap_err().to_string();
2215 assert!(
2216 err.contains("Batch has 1 columns but BatchCoalescer expects 2"),
2217 "unexpected error: {err}"
2218 );
2219 }
2220
2221 #[test]
2222 fn test_push_batch_schema_mismatch_two_vs_zero() {
2223 let empty_schema = Arc::new(Schema::empty());
2225 let mut coalescer = BatchCoalescer::new(empty_schema, 100);
2226 let schema = Arc::new(Schema::new(vec![
2227 Field::new("c0", DataType::UInt32, false),
2228 Field::new("c1", DataType::UInt32, false),
2229 ]));
2230 let batch = RecordBatch::try_new(
2231 schema,
2232 vec![
2233 Arc::new(UInt32Array::from(vec![1, 2, 3])),
2234 Arc::new(UInt32Array::from(vec![4, 5, 6])),
2235 ],
2236 )
2237 .unwrap();
2238 let result = coalescer.push_batch(batch);
2239 assert!(result.is_err());
2240 let err = result.unwrap_err().to_string();
2241 assert!(
2242 err.contains("Batch has 2 columns but BatchCoalescer expects 0"),
2243 "unexpected error: {err}"
2244 );
2245 }
2246}