1use crate::filter::filter_record_batch;
24use arrow_array::types::{BinaryViewType, StringViewType};
25use arrow_array::{downcast_primitive, Array, ArrayRef, BooleanArray, RecordBatch};
26use arrow_schema::{ArrowError, DataType, SchemaRef};
27use std::collections::VecDeque;
28use std::sync::Arc;
29mod byte_view;
33mod generic;
34mod primitive;
35
36use byte_view::InProgressByteViewArray;
37use generic::GenericInProgressArray;
38use primitive::InProgressPrimitiveArray;
39
40#[derive(Debug)]
124pub struct BatchCoalescer {
125 schema: SchemaRef,
127 batch_size: usize,
129 in_progress_arrays: Vec<Box<dyn InProgressArray>>,
131 buffered_rows: usize,
133 completed: VecDeque<RecordBatch>,
135}
136
137impl BatchCoalescer {
138 pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
146 let in_progress_arrays = schema
147 .fields()
148 .iter()
149 .map(|field| create_in_progress_array(field.data_type(), batch_size))
150 .collect::<Vec<_>>();
151
152 Self {
153 schema,
154 batch_size,
155 in_progress_arrays,
156 completed: VecDeque::with_capacity(1),
158 buffered_rows: 0,
159 }
160 }
161
162 pub fn schema(&self) -> SchemaRef {
164 Arc::clone(&self.schema)
165 }
166
167 pub fn push_batch_with_filter(
192 &mut self,
193 batch: RecordBatch,
194 filter: &BooleanArray,
195 ) -> Result<(), ArrowError> {
196 let filtered_batch = filter_record_batch(&batch, filter)?;
199 self.push_batch(filtered_batch)
200 }
201
202 pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
223 let (_schema, arrays, mut num_rows) = batch.into_parts();
224 if num_rows == 0 {
225 return Ok(());
226 }
227
228 assert_eq!(arrays.len(), self.in_progress_arrays.len());
230 self.in_progress_arrays
231 .iter_mut()
232 .zip(arrays)
233 .for_each(|(in_progress, array)| {
234 in_progress.set_source(Some(array));
235 });
236
237 let mut offset = 0;
240 while num_rows > (self.batch_size - self.buffered_rows) {
241 let remaining_rows = self.batch_size - self.buffered_rows;
242 debug_assert!(remaining_rows > 0);
243
244 for in_progress in self.in_progress_arrays.iter_mut() {
246 in_progress.copy_rows(offset, remaining_rows)?;
247 }
248
249 self.buffered_rows += remaining_rows;
250 offset += remaining_rows;
251 num_rows -= remaining_rows;
252
253 self.finish_buffered_batch()?;
254 }
255
256 self.buffered_rows += num_rows;
258 if num_rows > 0 {
259 for in_progress in self.in_progress_arrays.iter_mut() {
260 in_progress.copy_rows(offset, num_rows)?;
261 }
262 }
263
264 if self.buffered_rows >= self.batch_size {
266 self.finish_buffered_batch()?;
267 }
268
269 for in_progress in self.in_progress_arrays.iter_mut() {
271 in_progress.set_source(None);
272 }
273
274 Ok(())
275 }
276
277 pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
285 if self.buffered_rows == 0 {
286 return Ok(());
287 }
288 let new_arrays = self
289 .in_progress_arrays
290 .iter_mut()
291 .map(|array| array.finish())
292 .collect::<Result<Vec<_>, ArrowError>>()?;
293
294 for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
295 debug_assert_eq!(array.data_type(), field.data_type());
296 debug_assert_eq!(array.len(), self.buffered_rows);
297 }
298
299 let batch = unsafe {
301 RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
302 };
303
304 self.buffered_rows = 0;
305 self.completed.push_back(batch);
306 Ok(())
307 }
308
309 pub fn is_empty(&self) -> bool {
311 self.buffered_rows == 0 && self.completed.is_empty()
312 }
313
314 pub fn has_completed_batch(&self) -> bool {
316 !self.completed.is_empty()
317 }
318
319 pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
321 self.completed.pop_front()
322 }
323}
324
325fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
327 macro_rules! instantiate_primitive {
328 ($t:ty) => {
329 Box::new(InProgressPrimitiveArray::<$t>::new(batch_size))
330 };
331 }
332
333 downcast_primitive! {
334 data_type => (instantiate_primitive),
336 DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
337 DataType::BinaryView => {
338 Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
339 }
340 _ => Box::new(GenericInProgressArray::new()),
341 }
342}
343
344trait InProgressArray: std::fmt::Debug + Send + Sync {
354 fn set_source(&mut self, source: Option<ArrayRef>);
359
360 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
366
367 fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::concat::concat_batches;
375 use arrow_array::builder::StringViewBuilder;
376 use arrow_array::cast::AsArray;
377 use arrow_array::{
378 BinaryViewArray, RecordBatchOptions, StringArray, StringViewArray, UInt32Array,
379 };
380 use arrow_schema::{DataType, Field, Schema};
381 use std::ops::Range;
382
383 #[test]
384 fn test_coalesce() {
385 let batch = uint32_batch(0..8);
386 Test::new()
387 .with_batches(std::iter::repeat_n(batch, 10))
388 .with_batch_size(21)
390 .with_expected_output_sizes(vec![21, 21, 21, 17])
391 .run();
392 }
393
394 #[test]
395 fn test_coalesce_one_by_one() {
396 let batch = uint32_batch(0..1); Test::new()
398 .with_batches(std::iter::repeat_n(batch, 97))
399 .with_batch_size(20)
401 .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
402 .run();
403 }
404
405 #[test]
406 fn test_coalesce_empty() {
407 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
408
409 Test::new()
410 .with_batches(vec![])
411 .with_schema(schema)
412 .with_batch_size(21)
413 .with_expected_output_sizes(vec![])
414 .run();
415 }
416
417 #[test]
418 fn test_single_large_batch_greater_than_target() {
419 let batch = uint32_batch(0..4096);
421 Test::new()
422 .with_batch(batch)
423 .with_batch_size(1000)
424 .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
425 .run();
426 }
427
428 #[test]
429 fn test_single_large_batch_smaller_than_target() {
430 let batch = uint32_batch(0..4096);
432 Test::new()
433 .with_batch(batch)
434 .with_batch_size(8192)
435 .with_expected_output_sizes(vec![4096])
436 .run();
437 }
438
439 #[test]
440 fn test_single_large_batch_equal_to_target() {
441 let batch = uint32_batch(0..4096);
443 Test::new()
444 .with_batch(batch)
445 .with_batch_size(4096)
446 .with_expected_output_sizes(vec![4096])
447 .run();
448 }
449
450 #[test]
451 fn test_single_large_batch_equally_divisible_in_target() {
452 let batch = uint32_batch(0..4096);
454 Test::new()
455 .with_batch(batch)
456 .with_batch_size(1024)
457 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
458 .run();
459 }
460
461 #[test]
462 fn test_empty_schema() {
463 let schema = Schema::empty();
464 let batch = RecordBatch::new_empty(schema.into());
465 Test::new()
466 .with_batch(batch)
467 .with_expected_output_sizes(vec![])
468 .run();
469 }
470
471 #[test]
472 fn test_coalesce_non_null() {
473 Test::new()
474 .with_batch(uint32_batch_non_null(0..3000))
476 .with_batch(uint32_batch_non_null(0..1040))
477 .with_batch_size(1024)
478 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
479 .run();
480 }
481 #[test]
482 fn test_utf8_split() {
483 Test::new()
484 .with_batch(utf8_batch(0..3000))
486 .with_batch(utf8_batch(0..1040))
487 .with_batch_size(1024)
488 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
489 .run();
490 }
491
492 #[test]
493 fn test_string_view_no_views() {
494 let output_batches = Test::new()
495 .with_batch(stringview_batch([Some("foo"), Some("bar")]))
497 .with_batch(stringview_batch([Some("baz"), Some("qux")]))
498 .with_expected_output_sizes(vec![4])
499 .run();
500
501 expect_buffer_layout(
502 col_as_string_view("c0", output_batches.first().unwrap()),
503 vec![],
504 );
505 }
506
507 #[test]
508 fn test_string_view_batch_small_no_compact() {
509 let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
511 let output_batches = Test::new()
512 .with_batch(batch.clone())
513 .with_expected_output_sizes(vec![1000])
514 .run();
515
516 let array = col_as_string_view("c0", &batch);
517 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
518 assert_eq!(array.data_buffers().len(), 0);
519 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); expect_buffer_layout(gc_array, vec![]);
522 }
523
524 #[test]
525 fn test_string_view_batch_large_no_compact() {
526 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
528 let output_batches = Test::new()
529 .with_batch(batch.clone())
530 .with_batch_size(1000)
531 .with_expected_output_sizes(vec![1000])
532 .run();
533
534 let array = col_as_string_view("c0", &batch);
535 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
536 assert_eq!(array.data_buffers().len(), 5);
537 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); expect_buffer_layout(
540 gc_array,
541 vec![
542 ExpectedLayout {
543 len: 8190,
544 capacity: 8192,
545 },
546 ExpectedLayout {
547 len: 8190,
548 capacity: 8192,
549 },
550 ExpectedLayout {
551 len: 8190,
552 capacity: 8192,
553 },
554 ExpectedLayout {
555 len: 8190,
556 capacity: 8192,
557 },
558 ExpectedLayout {
559 len: 2240,
560 capacity: 8192,
561 },
562 ],
563 );
564 }
565
566 #[test]
567 fn test_string_view_batch_small_with_buffers_no_compact() {
568 let short_strings = std::iter::repeat(Some("SmallString"));
570 let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
571 let values = short_strings.take(20).chain(long_strings);
573 let batch = stringview_batch_repeated(1000, values)
574 .slice(5, 10);
576 let output_batches = Test::new()
577 .with_batch(batch.clone())
578 .with_batch_size(1000)
579 .with_expected_output_sizes(vec![10])
580 .run();
581
582 let array = col_as_string_view("c0", &batch);
583 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
584 assert_eq!(array.data_buffers().len(), 1); assert_eq!(gc_array.data_buffers().len(), 0); }
587
588 #[test]
589 fn test_string_view_batch_large_slice_compact() {
590 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
592 .slice(11, 22);
594
595 let output_batches = Test::new()
596 .with_batch(batch.clone())
597 .with_batch_size(1000)
598 .with_expected_output_sizes(vec![22])
599 .run();
600
601 let array = col_as_string_view("c0", &batch);
602 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
603 assert_eq!(array.data_buffers().len(), 5);
604
605 expect_buffer_layout(
606 gc_array,
607 vec![ExpectedLayout {
608 len: 770,
609 capacity: 8192,
610 }],
611 );
612 }
613
614 #[test]
615 fn test_string_view_mixed() {
616 let large_view_batch =
617 stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
618 let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
619 let mixed_batch = stringview_batch_repeated(
620 1000,
621 [Some("This string is longer than 12 bytes"), Some("Small")],
622 );
623 let mixed_batch_nulls = stringview_batch_repeated(
624 1000,
625 [
626 Some("This string is longer than 12 bytes"),
627 Some("Small"),
628 None,
629 ],
630 );
631
632 let output_batches = Test::new()
635 .with_batch(large_view_batch.clone())
636 .with_batch(small_view_batch)
637 .with_batch(large_view_batch.slice(10, 20))
639 .with_batch(mixed_batch_nulls)
640 .with_batch(large_view_batch.slice(10, 20))
642 .with_batch(mixed_batch)
643 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
644 .run();
645
646 expect_buffer_layout(
647 col_as_string_view("c0", output_batches.first().unwrap()),
648 vec![
649 ExpectedLayout {
650 len: 8190,
651 capacity: 8192,
652 },
653 ExpectedLayout {
654 len: 8190,
655 capacity: 8192,
656 },
657 ExpectedLayout {
658 len: 8190,
659 capacity: 8192,
660 },
661 ExpectedLayout {
662 len: 8190,
663 capacity: 8192,
664 },
665 ExpectedLayout {
666 len: 2240,
667 capacity: 8192,
668 },
669 ],
670 );
671 }
672
673 #[test]
674 fn test_string_view_many_small_compact() {
675 let batch = stringview_batch_repeated(
677 400,
678 [Some("This string is 28 bytes long"), Some("small string")],
679 );
680 let output_batches = Test::new()
681 .with_batch(batch.clone())
684 .with_batch(batch.clone())
685 .with_batch(batch.clone())
686 .with_batch(batch.clone())
687 .with_batch(batch.clone())
688 .with_batch_size(8000)
689 .with_expected_output_sizes(vec![2000]) .run();
691
692 expect_buffer_layout(
694 col_as_string_view("c0", output_batches.first().unwrap()),
695 vec![
696 ExpectedLayout {
697 len: 8176,
698 capacity: 8192,
699 },
700 ExpectedLayout {
701 len: 16380,
702 capacity: 16384,
703 },
704 ExpectedLayout {
705 len: 3444,
706 capacity: 32768,
707 },
708 ],
709 );
710 }
711
712 #[test]
713 fn test_string_view_many_small_boundary() {
714 let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
716 let output_batches = Test::new()
717 .with_batches(std::iter::repeat(batch).take(20))
718 .with_batch_size(900)
719 .with_expected_output_sizes(vec![900, 900, 200])
720 .run();
721
722 expect_buffer_layout(
724 col_as_string_view("c0", output_batches.first().unwrap()),
725 vec![
726 ExpectedLayout {
727 len: 8192,
728 capacity: 8192,
729 },
730 ExpectedLayout {
731 len: 16384,
732 capacity: 16384,
733 },
734 ExpectedLayout {
735 len: 4224,
736 capacity: 32768,
737 },
738 ],
739 );
740 }
741
742 #[test]
743 fn test_string_view_large_small() {
744 let mixed_batch = stringview_batch_repeated(
746 400,
747 [Some("This string is 28 bytes long"), Some("small string")],
748 );
749 let all_large = stringview_batch_repeated(
751 100,
752 [Some(
753 "This buffer has only large strings in it so there are no buffer copies",
754 )],
755 );
756
757 let output_batches = Test::new()
758 .with_batch(mixed_batch.clone())
761 .with_batch(mixed_batch.clone())
762 .with_batch(all_large.clone())
763 .with_batch(mixed_batch.clone())
764 .with_batch(all_large.clone())
765 .with_batch_size(8000)
766 .with_expected_output_sizes(vec![1400])
767 .run();
768
769 expect_buffer_layout(
770 col_as_string_view("c0", output_batches.first().unwrap()),
771 vec![
772 ExpectedLayout {
773 len: 8176,
774 capacity: 8192,
775 },
776 ExpectedLayout {
778 len: 3024,
779 capacity: 16384,
780 },
781 ExpectedLayout {
782 len: 7000,
783 capacity: 8192,
784 },
785 ExpectedLayout {
786 len: 5600,
787 capacity: 32768,
788 },
789 ExpectedLayout {
790 len: 7000,
791 capacity: 8192,
792 },
793 ],
794 );
795 }
796
797 #[test]
798 fn test_binary_view() {
799 let values: Vec<Option<&[u8]>> = vec![
800 Some(b"foo"),
801 None,
802 Some(b"A longer string that is more than 12 bytes"),
803 ];
804
805 let binary_view =
806 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
807 let batch =
808 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
809
810 Test::new()
811 .with_batch(batch.clone())
812 .with_batch(batch.clone())
813 .with_batch_size(512)
814 .with_expected_output_sizes(vec![512, 512, 512, 464])
815 .run();
816 }
817
818 #[derive(Debug, Clone, PartialEq)]
819 struct ExpectedLayout {
820 len: usize,
821 capacity: usize,
822 }
823
824 fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
826 let actual = array
827 .data_buffers()
828 .iter()
829 .map(|b| ExpectedLayout {
830 len: b.len(),
831 capacity: b.capacity(),
832 })
833 .collect::<Vec<_>>();
834
835 assert_eq!(
836 actual, expected,
837 "Expected buffer layout {expected:#?} but got {actual:#?}"
838 );
839 }
840
841 #[derive(Debug, Clone)]
846 struct Test {
847 input_batches: Vec<RecordBatch>,
849 schema: Option<SchemaRef>,
851 expected_output_sizes: Vec<usize>,
853 target_batch_size: usize,
855 }
856
857 impl Default for Test {
858 fn default() -> Self {
859 Self {
860 input_batches: vec![],
861 schema: None,
862 expected_output_sizes: vec![],
863 target_batch_size: 1024,
864 }
865 }
866 }
867
868 impl Test {
869 fn new() -> Self {
870 Self::default()
871 }
872
873 fn with_batch_size(mut self, target_batch_size: usize) -> Self {
875 self.target_batch_size = target_batch_size;
876 self
877 }
878
879 fn with_batch(mut self, batch: RecordBatch) -> Self {
881 self.input_batches.push(batch);
882 self
883 }
884
885 fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
887 self.input_batches.extend(batches);
888 self
889 }
890
891 fn with_schema(mut self, schema: SchemaRef) -> Self {
893 self.schema = Some(schema);
894 self
895 }
896
897 fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
899 self.expected_output_sizes.extend(sizes);
900 self
901 }
902
903 fn run(self) -> Vec<RecordBatch> {
907 let Self {
908 input_batches,
909 schema,
910 target_batch_size,
911 expected_output_sizes,
912 } = self;
913
914 let schema = schema.unwrap_or_else(|| input_batches[0].schema());
915
916 let single_input_batch = concat_batches(&schema, &input_batches).unwrap();
918
919 let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
920
921 let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
922 for batch in input_batches {
923 coalescer.push_batch(batch).unwrap();
924 }
925 assert_eq!(schema, coalescer.schema());
926
927 if had_input {
928 assert!(!coalescer.is_empty(), "Coalescer should not be empty");
929 } else {
930 assert!(coalescer.is_empty(), "Coalescer should be empty");
931 }
932
933 coalescer.finish_buffered_batch().unwrap();
934 if had_input {
935 assert!(
936 coalescer.has_completed_batch(),
937 "Coalescer should have completed batches"
938 );
939 }
940
941 let mut output_batches = vec![];
942 while let Some(batch) = coalescer.next_completed_batch() {
943 output_batches.push(batch);
944 }
945
946 let mut starting_idx = 0;
948 let actual_output_sizes: Vec<usize> =
949 output_batches.iter().map(|b| b.num_rows()).collect();
950 assert_eq!(
951 expected_output_sizes, actual_output_sizes,
952 "Unexpected number of rows in output batches\n\
953 Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
954 );
955 let iter = expected_output_sizes
956 .iter()
957 .zip(output_batches.iter())
958 .enumerate();
959
960 for (i, (expected_size, batch)) in iter {
961 let expected_batch = single_input_batch.slice(starting_idx, *expected_size);
964 let expected_batch = normalize_batch(expected_batch);
965 let batch = normalize_batch(batch.clone());
966 assert_eq!(
967 expected_batch, batch,
968 "Unexpected content in batch {i}:\
969 \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
970 );
971 starting_idx += *expected_size;
972 }
973 output_batches
974 }
975 }
976
977 fn uint32_batch(range: Range<u32>) -> RecordBatch {
980 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
981
982 let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
983 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
984 }
985
986 fn uint32_batch_non_null(range: Range<u32>) -> RecordBatch {
988 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
989
990 let array = UInt32Array::from_iter_values(range);
991 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
992 }
993
994 fn utf8_batch(range: Range<u32>) -> RecordBatch {
997 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
998
999 let array = StringArray::from_iter(range.map(|i| {
1000 if i % 3 == 0 {
1001 None
1002 } else {
1003 Some(format!("value{i}"))
1004 }
1005 }));
1006
1007 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1008 }
1009
1010 fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1012 let schema = Arc::new(Schema::new(vec![Field::new(
1013 "c0",
1014 DataType::Utf8View,
1015 false,
1016 )]));
1017
1018 let array = StringViewArray::from_iter(values);
1019 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1020 }
1021
1022 fn stringview_batch_repeated<'a>(
1025 num_rows: usize,
1026 values: impl IntoIterator<Item = Option<&'a str>>,
1027 ) -> RecordBatch {
1028 let schema = Arc::new(Schema::new(vec![Field::new(
1029 "c0",
1030 DataType::Utf8View,
1031 true,
1032 )]));
1033
1034 let values: Vec<_> = values.into_iter().collect();
1036 let values_iter = std::iter::repeat(values.iter())
1037 .flatten()
1038 .cloned()
1039 .take(num_rows);
1040
1041 let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1042 for val in values_iter {
1043 builder.append_option(val);
1044 }
1045
1046 let array = builder.finish();
1047 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1048 }
1049
1050 fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1052 batch
1053 .column_by_name(name)
1054 .expect("column not found")
1055 .as_string_view_opt()
1056 .expect("column is not a string view")
1057 }
1058
1059 fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1062 let (schema, mut columns, row_count) = batch.into_parts();
1064
1065 for column in columns.iter_mut() {
1066 let Some(string_view) = column.as_string_view_opt() else {
1067 continue;
1068 };
1069
1070 let mut builder = StringViewBuilder::new();
1073 for s in string_view.iter() {
1074 builder.append_option(s);
1075 }
1076 *column = Arc::new(builder.finish());
1078 }
1079
1080 let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1081 RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1082 }
1083}