arrow_select/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`]  concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use arrow_array::types::{BinaryViewType, StringViewType};
25use arrow_array::{downcast_primitive, Array, ArrayRef, BooleanArray, RecordBatch};
26use arrow_schema::{ArrowError, DataType, SchemaRef};
27use std::collections::VecDeque;
28use std::sync::Arc;
29// Originally From DataFusion's coalesce module:
30// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
31
32mod byte_view;
33mod generic;
34mod primitive;
35
36use byte_view::InProgressByteViewArray;
37use generic::GenericInProgressArray;
38use primitive::InProgressPrimitiveArray;
39
40/// Concatenate multiple [`RecordBatch`]es
41///
42/// Implements the common pattern of incrementally creating output
43/// [`RecordBatch`]es of a specific size from an input stream of
44/// [`RecordBatch`]es.
45///
46/// This is useful after operations such as [`filter`] and [`take`] that produce
47/// smaller batches, and we want to coalesce them into larger batches for
48/// further processing.
49///
50/// [`filter`]: crate::filter::filter
51/// [`take`]: crate::take::take
52///
53/// See: <https://github.com/apache/arrow-rs/issues/6692>
54///
55/// # Example
56/// ```
57/// use arrow_array::record_batch;
58/// use arrow_select::coalesce::{BatchCoalescer};
59/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
60/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
61///
62/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
63/// let target_batch_size = 4;
64/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
65///
66/// // push the batches
67/// coalescer.push_batch(batch1).unwrap();
68/// // only pushed 3 rows (not yet 4, enough to produce a batch)
69/// assert!(coalescer.next_completed_batch().is_none());
70/// coalescer.push_batch(batch2).unwrap();
71/// // now we have 5 rows, so we can produce a batch
72/// let finished = coalescer.next_completed_batch().unwrap();
73/// // 4 rows came out (target batch size is 4)
74/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
75/// assert_eq!(finished, expected);
76///
77/// // Have no more input, but still have an in-progress batch
78/// assert!(coalescer.next_completed_batch().is_none());
79/// // We can finish the batch, which will produce the remaining rows
80/// coalescer.finish_buffered_batch().unwrap();
81/// let expected = record_batch!(("a", Int32, [5])).unwrap();
82/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
83///
84/// // The coalescer is now empty
85/// assert!(coalescer.next_completed_batch().is_none());
86/// ```
87///
88/// # Background
89///
90/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
91/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
92/// there is fixed processing overhead per batch. This coalescer builds up these
93/// larger batches incrementally.
94///
95/// ```text
96/// ┌────────────────────┐
97/// │    RecordBatch     │
98/// │   num_rows = 100   │
99/// └────────────────────┘                 ┌────────────────────┐
100///                                        │                    │
101/// ┌────────────────────┐     Coalesce    │                    │
102/// │                    │      Batches    │                    │
103/// │    RecordBatch     │                 │                    │
104/// │   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
105/// │                    │                 │    RecordBatch     │
106/// │                    │                 │   num_rows = 400   │
107/// └────────────────────┘                 │                    │
108///                                        │                    │
109/// ┌────────────────────┐                 │                    │
110/// │                    │                 │                    │
111/// │    RecordBatch     │                 │                    │
112/// │   num_rows = 100   │                 └────────────────────┘
113/// │                    │
114/// └────────────────────┘
115/// ```
116///
117/// # Notes:
118///
119/// 1. Output rows are produced in the same order as the input rows
120///
121/// 2. The output is a sequence of batches, with all but the last being at exactly
122///    `target_batch_size` rows.
123#[derive(Debug)]
124pub struct BatchCoalescer {
125    /// The input schema
126    schema: SchemaRef,
127    /// output batch size
128    batch_size: usize,
129    /// In-progress arrays
130    in_progress_arrays: Vec<Box<dyn InProgressArray>>,
131    /// Buffered row count. Always less than `batch_size`
132    buffered_rows: usize,
133    /// Completed batches
134    completed: VecDeque<RecordBatch>,
135}
136
137impl BatchCoalescer {
138    /// Create a new `BatchCoalescer`
139    ///
140    /// # Arguments
141    /// - `schema` - the schema of the output batches
142    /// - `batch_size` - the number of rows in each output batch.
143    ///   Typical values are `4096` or `8192` rows.
144    ///
145    pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
146        let in_progress_arrays = schema
147            .fields()
148            .iter()
149            .map(|field| create_in_progress_array(field.data_type(), batch_size))
150            .collect::<Vec<_>>();
151
152        Self {
153            schema,
154            batch_size,
155            in_progress_arrays,
156            // We will for sure store at least one completed batch
157            completed: VecDeque::with_capacity(1),
158            buffered_rows: 0,
159        }
160    }
161
162    /// Return the schema of the output batches
163    pub fn schema(&self) -> SchemaRef {
164        Arc::clone(&self.schema)
165    }
166
167    /// Push a batch into the Coalescer after applying a filter
168    ///
169    /// This is semantically equivalent of calling [`Self::push_batch`]
170    /// with the results from  [`filter_record_batch`]
171    ///
172    /// # Example
173    /// ```
174    /// # use arrow_array::{record_batch, BooleanArray};
175    /// # use arrow_select::coalesce::BatchCoalescer;
176    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
177    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
178    /// // Apply a filter to each batch to pick the first and last row
179    /// let filter = BooleanArray::from(vec![true, false, true]);
180    /// // create a new Coalescer that targets creating 1000 row batches
181    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
182    /// coalescer.push_batch_with_filter(batch1, &filter);
183    /// coalescer.push_batch_with_filter(batch2, &filter);
184    /// // finsh and retrieve the created batch
185    /// coalescer.finish_buffered_batch().unwrap();
186    /// let completed_batch = coalescer.next_completed_batch().unwrap();
187    /// // filtered out 2 and 5:
188    /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
189    /// assert_eq!(completed_batch, expected_batch);
190    /// ```
191    pub fn push_batch_with_filter(
192        &mut self,
193        batch: RecordBatch,
194        filter: &BooleanArray,
195    ) -> Result<(), ArrowError> {
196        // TODO: optimize this to avoid materializing (copying the results
197        // of filter to a new batch)
198        let filtered_batch = filter_record_batch(&batch, filter)?;
199        self.push_batch(filtered_batch)
200    }
201
202    /// Push all the rows from `batch` into the Coalescer
203    ///
204    /// See [`Self::next_completed_batch()`] to retrieve any completed batches.
205    ///
206    /// # Example
207    /// ```
208    /// # use arrow_array::record_batch;
209    /// # use arrow_select::coalesce::BatchCoalescer;
210    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
211    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
212    /// // create a new Coalescer that targets creating 1000 row batches
213    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
214    /// coalescer.push_batch(batch1);
215    /// coalescer.push_batch(batch2);
216    /// // finsh and retrieve the created batch
217    /// coalescer.finish_buffered_batch().unwrap();
218    /// let completed_batch = coalescer.next_completed_batch().unwrap();
219    /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
220    /// assert_eq!(completed_batch, expected_batch);
221    /// ```
222    pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
223        let (_schema, arrays, mut num_rows) = batch.into_parts();
224        if num_rows == 0 {
225            return Ok(());
226        }
227
228        // setup input rows
229        assert_eq!(arrays.len(), self.in_progress_arrays.len());
230        self.in_progress_arrays
231            .iter_mut()
232            .zip(arrays)
233            .for_each(|(in_progress, array)| {
234                in_progress.set_source(Some(array));
235            });
236
237        // If pushing this batch would exceed the target batch size,
238        // finish the current batch and start a new one
239        let mut offset = 0;
240        while num_rows > (self.batch_size - self.buffered_rows) {
241            let remaining_rows = self.batch_size - self.buffered_rows;
242            debug_assert!(remaining_rows > 0);
243
244            // Copy remaining_rows from each array
245            for in_progress in self.in_progress_arrays.iter_mut() {
246                in_progress.copy_rows(offset, remaining_rows)?;
247            }
248
249            self.buffered_rows += remaining_rows;
250            offset += remaining_rows;
251            num_rows -= remaining_rows;
252
253            self.finish_buffered_batch()?;
254        }
255
256        // Add any the remaining rows to the buffer
257        self.buffered_rows += num_rows;
258        if num_rows > 0 {
259            for in_progress in self.in_progress_arrays.iter_mut() {
260                in_progress.copy_rows(offset, num_rows)?;
261            }
262        }
263
264        // If we have reached the target batch size, finalize the buffered batch
265        if self.buffered_rows >= self.batch_size {
266            self.finish_buffered_batch()?;
267        }
268
269        // clear in progress sources (to allow the memory to be freed)
270        for in_progress in self.in_progress_arrays.iter_mut() {
271            in_progress.set_source(None);
272        }
273
274        Ok(())
275    }
276
277    /// Concatenates any buffered batches into a single `RecordBatch` and
278    /// clears any output buffers
279    ///
280    /// Normally this is called when the input stream is exhausted, and
281    /// we want to finalize the last batch of rows.
282    ///
283    /// See [`Self::next_completed_batch()`] for the completed batches.
284    pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
285        if self.buffered_rows == 0 {
286            return Ok(());
287        }
288        let new_arrays = self
289            .in_progress_arrays
290            .iter_mut()
291            .map(|array| array.finish())
292            .collect::<Result<Vec<_>, ArrowError>>()?;
293
294        for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
295            debug_assert_eq!(array.data_type(), field.data_type());
296            debug_assert_eq!(array.len(), self.buffered_rows);
297        }
298
299        // SAFETY: each array was created of the correct type and length.
300        let batch = unsafe {
301            RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
302        };
303
304        self.buffered_rows = 0;
305        self.completed.push_back(batch);
306        Ok(())
307    }
308
309    /// Returns true if there is any buffered data
310    pub fn is_empty(&self) -> bool {
311        self.buffered_rows == 0 && self.completed.is_empty()
312    }
313
314    /// Returns true if there are any completed batches
315    pub fn has_completed_batch(&self) -> bool {
316        !self.completed.is_empty()
317    }
318
319    /// Returns the next completed batch, if any
320    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
321        self.completed.pop_front()
322    }
323}
324
325/// Return a new `InProgressArray` for the given data type
326fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
327    macro_rules! instantiate_primitive {
328        ($t:ty) => {
329            Box::new(InProgressPrimitiveArray::<$t>::new(batch_size))
330        };
331    }
332
333    downcast_primitive! {
334        // Instantiate InProgressPrimitiveArray for each primitive type
335        data_type => (instantiate_primitive),
336        DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
337        DataType::BinaryView => {
338            Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
339        }
340        _ => Box::new(GenericInProgressArray::new()),
341    }
342}
343
344/// Incrementally builds up arrays
345///
346/// [`GenericInProgressArray`] is the default implementation that buffers
347/// arrays and uses other kernels concatenates them when finished.
348///
349/// Some types have specialized implementations for this array types (e.g.,
350/// [`StringViewArray`], etc.).
351///
352/// [`StringViewArray`]: arrow_array::StringViewArray
353trait InProgressArray: std::fmt::Debug + Send + Sync {
354    /// Set the source array.
355    ///
356    /// Calls to [`Self::copy_rows`] will copy rows from this array into the
357    /// current in-progress array
358    fn set_source(&mut self, source: Option<ArrayRef>);
359
360    /// Copy rows from the current source array into the in-progress array
361    ///
362    /// The source array is set by [`Self::set_source`].
363    ///
364    /// Return an error if the source array is not set
365    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
366
367    /// Finish the currently in-progress array and return it as an `ArrayRef`
368    fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::concat::concat_batches;
375    use arrow_array::builder::StringViewBuilder;
376    use arrow_array::cast::AsArray;
377    use arrow_array::{
378        BinaryViewArray, RecordBatchOptions, StringArray, StringViewArray, UInt32Array,
379    };
380    use arrow_schema::{DataType, Field, Schema};
381    use std::ops::Range;
382
383    #[test]
384    fn test_coalesce() {
385        let batch = uint32_batch(0..8);
386        Test::new()
387            .with_batches(std::iter::repeat_n(batch, 10))
388            // expected output is exactly 21 rows (except for the final batch)
389            .with_batch_size(21)
390            .with_expected_output_sizes(vec![21, 21, 21, 17])
391            .run();
392    }
393
394    #[test]
395    fn test_coalesce_one_by_one() {
396        let batch = uint32_batch(0..1); // single row input
397        Test::new()
398            .with_batches(std::iter::repeat_n(batch, 97))
399            // expected output is exactly 20 rows (except for the final batch)
400            .with_batch_size(20)
401            .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
402            .run();
403    }
404
405    #[test]
406    fn test_coalesce_empty() {
407        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
408
409        Test::new()
410            .with_batches(vec![])
411            .with_schema(schema)
412            .with_batch_size(21)
413            .with_expected_output_sizes(vec![])
414            .run();
415    }
416
417    #[test]
418    fn test_single_large_batch_greater_than_target() {
419        // test a single large batch
420        let batch = uint32_batch(0..4096);
421        Test::new()
422            .with_batch(batch)
423            .with_batch_size(1000)
424            .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
425            .run();
426    }
427
428    #[test]
429    fn test_single_large_batch_smaller_than_target() {
430        // test a single large batch
431        let batch = uint32_batch(0..4096);
432        Test::new()
433            .with_batch(batch)
434            .with_batch_size(8192)
435            .with_expected_output_sizes(vec![4096])
436            .run();
437    }
438
439    #[test]
440    fn test_single_large_batch_equal_to_target() {
441        // test a single large batch
442        let batch = uint32_batch(0..4096);
443        Test::new()
444            .with_batch(batch)
445            .with_batch_size(4096)
446            .with_expected_output_sizes(vec![4096])
447            .run();
448    }
449
450    #[test]
451    fn test_single_large_batch_equally_divisible_in_target() {
452        // test a single large batch
453        let batch = uint32_batch(0..4096);
454        Test::new()
455            .with_batch(batch)
456            .with_batch_size(1024)
457            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
458            .run();
459    }
460
461    #[test]
462    fn test_empty_schema() {
463        let schema = Schema::empty();
464        let batch = RecordBatch::new_empty(schema.into());
465        Test::new()
466            .with_batch(batch)
467            .with_expected_output_sizes(vec![])
468            .run();
469    }
470
471    #[test]
472    fn test_coalesce_non_null() {
473        Test::new()
474            // 4040 rows of unit32
475            .with_batch(uint32_batch_non_null(0..3000))
476            .with_batch(uint32_batch_non_null(0..1040))
477            .with_batch_size(1024)
478            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
479            .run();
480    }
481    #[test]
482    fn test_utf8_split() {
483        Test::new()
484            // 4040 rows of utf8 strings in total, split into batches of 1024
485            .with_batch(utf8_batch(0..3000))
486            .with_batch(utf8_batch(0..1040))
487            .with_batch_size(1024)
488            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
489            .run();
490    }
491
492    #[test]
493    fn test_string_view_no_views() {
494        let output_batches = Test::new()
495            // both input batches have no views, so no need to compact
496            .with_batch(stringview_batch([Some("foo"), Some("bar")]))
497            .with_batch(stringview_batch([Some("baz"), Some("qux")]))
498            .with_expected_output_sizes(vec![4])
499            .run();
500
501        expect_buffer_layout(
502            col_as_string_view("c0", output_batches.first().unwrap()),
503            vec![],
504        );
505    }
506
507    #[test]
508    fn test_string_view_batch_small_no_compact() {
509        // view with only short strings (no buffers) --> no need to compact
510        let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
511        let output_batches = Test::new()
512            .with_batch(batch.clone())
513            .with_expected_output_sizes(vec![1000])
514            .run();
515
516        let array = col_as_string_view("c0", &batch);
517        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
518        assert_eq!(array.data_buffers().len(), 0);
519        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
520
521        expect_buffer_layout(gc_array, vec![]);
522    }
523
524    #[test]
525    fn test_string_view_batch_large_no_compact() {
526        // view with large strings (has buffers) but full --> no need to compact
527        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
528        let output_batches = Test::new()
529            .with_batch(batch.clone())
530            .with_batch_size(1000)
531            .with_expected_output_sizes(vec![1000])
532            .run();
533
534        let array = col_as_string_view("c0", &batch);
535        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
536        assert_eq!(array.data_buffers().len(), 5);
537        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
538
539        expect_buffer_layout(
540            gc_array,
541            vec![
542                ExpectedLayout {
543                    len: 8190,
544                    capacity: 8192,
545                },
546                ExpectedLayout {
547                    len: 8190,
548                    capacity: 8192,
549                },
550                ExpectedLayout {
551                    len: 8190,
552                    capacity: 8192,
553                },
554                ExpectedLayout {
555                    len: 8190,
556                    capacity: 8192,
557                },
558                ExpectedLayout {
559                    len: 2240,
560                    capacity: 8192,
561                },
562            ],
563        );
564    }
565
566    #[test]
567    fn test_string_view_batch_small_with_buffers_no_compact() {
568        // view with buffers but only short views
569        let short_strings = std::iter::repeat(Some("SmallString"));
570        let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
571        // 20 short strings, then a long ones
572        let values = short_strings.take(20).chain(long_strings);
573        let batch = stringview_batch_repeated(1000, values)
574            // take only 10 short strings (no long ones)
575            .slice(5, 10);
576        let output_batches = Test::new()
577            .with_batch(batch.clone())
578            .with_batch_size(1000)
579            .with_expected_output_sizes(vec![10])
580            .run();
581
582        let array = col_as_string_view("c0", &batch);
583        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
584        assert_eq!(array.data_buffers().len(), 1); // input has one buffer
585        assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
586    }
587
588    #[test]
589    fn test_string_view_batch_large_slice_compact() {
590        // view with large strings (has buffers) and only partially used  --> no need to compact
591        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
592            // slice only 22 rows, so most of the buffer is not used
593            .slice(11, 22);
594
595        let output_batches = Test::new()
596            .with_batch(batch.clone())
597            .with_batch_size(1000)
598            .with_expected_output_sizes(vec![22])
599            .run();
600
601        let array = col_as_string_view("c0", &batch);
602        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
603        assert_eq!(array.data_buffers().len(), 5);
604
605        expect_buffer_layout(
606            gc_array,
607            vec![ExpectedLayout {
608                len: 770,
609                capacity: 8192,
610            }],
611        );
612    }
613
614    #[test]
615    fn test_string_view_mixed() {
616        let large_view_batch =
617            stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
618        let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
619        let mixed_batch = stringview_batch_repeated(
620            1000,
621            [Some("This string is longer than 12 bytes"), Some("Small")],
622        );
623        let mixed_batch_nulls = stringview_batch_repeated(
624            1000,
625            [
626                Some("This string is longer than 12 bytes"),
627                Some("Small"),
628                None,
629            ],
630        );
631
632        // Several batches with mixed inline / non inline
633        // 4k rows in
634        let output_batches = Test::new()
635            .with_batch(large_view_batch.clone())
636            .with_batch(small_view_batch)
637            // this batch needs to be compacted (less than 1/2 full)
638            .with_batch(large_view_batch.slice(10, 20))
639            .with_batch(mixed_batch_nulls)
640            // this batch needs to be compacted (less than 1/2 full)
641            .with_batch(large_view_batch.slice(10, 20))
642            .with_batch(mixed_batch)
643            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
644            .run();
645
646        expect_buffer_layout(
647            col_as_string_view("c0", output_batches.first().unwrap()),
648            vec![
649                ExpectedLayout {
650                    len: 8190,
651                    capacity: 8192,
652                },
653                ExpectedLayout {
654                    len: 8190,
655                    capacity: 8192,
656                },
657                ExpectedLayout {
658                    len: 8190,
659                    capacity: 8192,
660                },
661                ExpectedLayout {
662                    len: 8190,
663                    capacity: 8192,
664                },
665                ExpectedLayout {
666                    len: 2240,
667                    capacity: 8192,
668                },
669            ],
670        );
671    }
672
673    #[test]
674    fn test_string_view_many_small_compact() {
675        // The strings are 28 long, so each batch has 400 * 28 = 5600 bytes
676        let batch = stringview_batch_repeated(
677            400,
678            [Some("This string is 28 bytes long"), Some("small string")],
679        );
680        let output_batches = Test::new()
681            // First allocated buffer is 8kb.
682            // Appending five batches of 5600 bytes will use 5600 * 5 = 28kb (8kb, an 16kb and 32kbkb)
683            .with_batch(batch.clone())
684            .with_batch(batch.clone())
685            .with_batch(batch.clone())
686            .with_batch(batch.clone())
687            .with_batch(batch.clone())
688            .with_batch_size(8000)
689            .with_expected_output_sizes(vec![2000]) // only 2000 rows total
690            .run();
691
692        // expect a nice even distribution of buffers
693        expect_buffer_layout(
694            col_as_string_view("c0", output_batches.first().unwrap()),
695            vec![
696                ExpectedLayout {
697                    len: 8176,
698                    capacity: 8192,
699                },
700                ExpectedLayout {
701                    len: 16380,
702                    capacity: 16384,
703                },
704                ExpectedLayout {
705                    len: 3444,
706                    capacity: 32768,
707                },
708            ],
709        );
710    }
711
712    #[test]
713    fn test_string_view_many_small_boundary() {
714        // The strings are designed to exactly fit into buffers that are powers of 2 long
715        let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
716        let output_batches = Test::new()
717            .with_batches(std::iter::repeat(batch).take(20))
718            .with_batch_size(900)
719            .with_expected_output_sizes(vec![900, 900, 200])
720            .run();
721
722        // expect each buffer to be entirely full except the last one
723        expect_buffer_layout(
724            col_as_string_view("c0", output_batches.first().unwrap()),
725            vec![
726                ExpectedLayout {
727                    len: 8192,
728                    capacity: 8192,
729                },
730                ExpectedLayout {
731                    len: 16384,
732                    capacity: 16384,
733                },
734                ExpectedLayout {
735                    len: 4224,
736                    capacity: 32768,
737                },
738            ],
739        );
740    }
741
742    #[test]
743    fn test_string_view_large_small() {
744        // The strings are 37 bytes long, so each batch has 200 * 28 = 5600 bytes
745        let mixed_batch = stringview_batch_repeated(
746            400,
747            [Some("This string is 28 bytes long"), Some("small string")],
748        );
749        // These strings aren't copied, this array has an 8k buffer
750        let all_large = stringview_batch_repeated(
751            100,
752            [Some(
753                "This buffer has only large strings in it so there are no buffer copies",
754            )],
755        );
756
757        let output_batches = Test::new()
758            // First allocated buffer is 8kb.
759            // Appending five batches of 5600 bytes will use 5600 * 5 = 28kb (8kb, an 16kb and 32kbkb)
760            .with_batch(mixed_batch.clone())
761            .with_batch(mixed_batch.clone())
762            .with_batch(all_large.clone())
763            .with_batch(mixed_batch.clone())
764            .with_batch(all_large.clone())
765            .with_batch_size(8000)
766            .with_expected_output_sizes(vec![1400])
767            .run();
768
769        expect_buffer_layout(
770            col_as_string_view("c0", output_batches.first().unwrap()),
771            vec![
772                ExpectedLayout {
773                    len: 8176,
774                    capacity: 8192,
775                },
776                // this buffer was allocated but not used when the all_large batch was pushed
777                ExpectedLayout {
778                    len: 3024,
779                    capacity: 16384,
780                },
781                ExpectedLayout {
782                    len: 7000,
783                    capacity: 8192,
784                },
785                ExpectedLayout {
786                    len: 5600,
787                    capacity: 32768,
788                },
789                ExpectedLayout {
790                    len: 7000,
791                    capacity: 8192,
792                },
793            ],
794        );
795    }
796
797    #[test]
798    fn test_binary_view() {
799        let values: Vec<Option<&[u8]>> = vec![
800            Some(b"foo"),
801            None,
802            Some(b"A longer string that is more than 12 bytes"),
803        ];
804
805        let binary_view =
806            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
807        let batch =
808            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
809
810        Test::new()
811            .with_batch(batch.clone())
812            .with_batch(batch.clone())
813            .with_batch_size(512)
814            .with_expected_output_sizes(vec![512, 512, 512, 464])
815            .run();
816    }
817
818    #[derive(Debug, Clone, PartialEq)]
819    struct ExpectedLayout {
820        len: usize,
821        capacity: usize,
822    }
823
824    /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
825    fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
826        let actual = array
827            .data_buffers()
828            .iter()
829            .map(|b| ExpectedLayout {
830                len: b.len(),
831                capacity: b.capacity(),
832            })
833            .collect::<Vec<_>>();
834
835        assert_eq!(
836            actual, expected,
837            "Expected buffer layout {expected:#?} but got {actual:#?}"
838        );
839    }
840
841    /// Test for [`BatchCoalescer`]
842    ///
843    /// Pushes the input batches to the coalescer and verifies that the resulting
844    /// batches have the expected number of rows and contents.
845    #[derive(Debug, Clone)]
846    struct Test {
847        /// Batches to feed to the coalescer.
848        input_batches: Vec<RecordBatch>,
849        /// The schema. If not provided, the first batch's schema is used.
850        schema: Option<SchemaRef>,
851        /// Expected output sizes of the resulting batches
852        expected_output_sizes: Vec<usize>,
853        /// target batch size (default to 1024)
854        target_batch_size: usize,
855    }
856
857    impl Default for Test {
858        fn default() -> Self {
859            Self {
860                input_batches: vec![],
861                schema: None,
862                expected_output_sizes: vec![],
863                target_batch_size: 1024,
864            }
865        }
866    }
867
868    impl Test {
869        fn new() -> Self {
870            Self::default()
871        }
872
873        /// Set the target batch size
874        fn with_batch_size(mut self, target_batch_size: usize) -> Self {
875            self.target_batch_size = target_batch_size;
876            self
877        }
878
879        /// Extend the input batches with `batch`
880        fn with_batch(mut self, batch: RecordBatch) -> Self {
881            self.input_batches.push(batch);
882            self
883        }
884
885        /// Extends the input batches with `batches`
886        fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
887            self.input_batches.extend(batches);
888            self
889        }
890
891        /// Specifies the schema for the test
892        fn with_schema(mut self, schema: SchemaRef) -> Self {
893            self.schema = Some(schema);
894            self
895        }
896
897        /// Extends `sizes` to expected output sizes
898        fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
899            self.expected_output_sizes.extend(sizes);
900            self
901        }
902
903        /// Runs the test -- see documentation on [`Test`] for details
904        ///
905        /// Returns the resulting output batches
906        fn run(self) -> Vec<RecordBatch> {
907            let Self {
908                input_batches,
909                schema,
910                target_batch_size,
911                expected_output_sizes,
912            } = self;
913
914            let schema = schema.unwrap_or_else(|| input_batches[0].schema());
915
916            // create a single large input batch for output comparison
917            let single_input_batch = concat_batches(&schema, &input_batches).unwrap();
918
919            let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
920
921            let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
922            for batch in input_batches {
923                coalescer.push_batch(batch).unwrap();
924            }
925            assert_eq!(schema, coalescer.schema());
926
927            if had_input {
928                assert!(!coalescer.is_empty(), "Coalescer should not be empty");
929            } else {
930                assert!(coalescer.is_empty(), "Coalescer should be empty");
931            }
932
933            coalescer.finish_buffered_batch().unwrap();
934            if had_input {
935                assert!(
936                    coalescer.has_completed_batch(),
937                    "Coalescer should have completed batches"
938                );
939            }
940
941            let mut output_batches = vec![];
942            while let Some(batch) = coalescer.next_completed_batch() {
943                output_batches.push(batch);
944            }
945
946            // make sure we got the expected number of output batches and content
947            let mut starting_idx = 0;
948            let actual_output_sizes: Vec<usize> =
949                output_batches.iter().map(|b| b.num_rows()).collect();
950            assert_eq!(
951                expected_output_sizes, actual_output_sizes,
952                "Unexpected number of rows in output batches\n\
953                Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
954            );
955            let iter = expected_output_sizes
956                .iter()
957                .zip(output_batches.iter())
958                .enumerate();
959
960            for (i, (expected_size, batch)) in iter {
961                // compare the contents of the batch after normalization (using
962                // `==` compares the underlying memory layout too)
963                let expected_batch = single_input_batch.slice(starting_idx, *expected_size);
964                let expected_batch = normalize_batch(expected_batch);
965                let batch = normalize_batch(batch.clone());
966                assert_eq!(
967                    expected_batch, batch,
968                    "Unexpected content in batch {i}:\
969                    \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
970                );
971                starting_idx += *expected_size;
972            }
973            output_batches
974        }
975    }
976
977    /// Return a RecordBatch with a UInt32Array with the specified range and
978    /// every third value is null.
979    fn uint32_batch(range: Range<u32>) -> RecordBatch {
980        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
981
982        let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
983        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
984    }
985
986    /// Return a RecordBatch with a UInt32Array with no nulls specified range
987    fn uint32_batch_non_null(range: Range<u32>) -> RecordBatch {
988        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
989
990        let array = UInt32Array::from_iter_values(range);
991        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
992    }
993
994    /// Return a RecordBatch with a StringArrary with values `value0`, `value1`, ...
995    /// and every third value is `None`.
996    fn utf8_batch(range: Range<u32>) -> RecordBatch {
997        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
998
999        let array = StringArray::from_iter(range.map(|i| {
1000            if i % 3 == 0 {
1001                None
1002            } else {
1003                Some(format!("value{i}"))
1004            }
1005        }));
1006
1007        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1008    }
1009
1010    /// Return a RecordBatch with a StringViewArray with (only) the specified values
1011    fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1012        let schema = Arc::new(Schema::new(vec![Field::new(
1013            "c0",
1014            DataType::Utf8View,
1015            false,
1016        )]));
1017
1018        let array = StringViewArray::from_iter(values);
1019        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1020    }
1021
1022    /// Return a RecordBatch with a StringViewArray with num_rows by repeating
1023    /// values over and over.
1024    fn stringview_batch_repeated<'a>(
1025        num_rows: usize,
1026        values: impl IntoIterator<Item = Option<&'a str>>,
1027    ) -> RecordBatch {
1028        let schema = Arc::new(Schema::new(vec![Field::new(
1029            "c0",
1030            DataType::Utf8View,
1031            true,
1032        )]));
1033
1034        // Repeat the values to a total of num_rows
1035        let values: Vec<_> = values.into_iter().collect();
1036        let values_iter = std::iter::repeat(values.iter())
1037            .flatten()
1038            .cloned()
1039            .take(num_rows);
1040
1041        let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1042        for val in values_iter {
1043            builder.append_option(val);
1044        }
1045
1046        let array = builder.finish();
1047        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1048    }
1049
1050    /// Returns the named column as a StringViewArray
1051    fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1052        batch
1053            .column_by_name(name)
1054            .expect("column not found")
1055            .as_string_view_opt()
1056            .expect("column is not a string view")
1057    }
1058
1059    /// Normalize the `RecordBatch` so that the memory layout is consistent
1060    /// (e.g. StringArray is compacted).
1061    fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1062        // Only need to normalize StringViews (as == also tests for memory layout)
1063        let (schema, mut columns, row_count) = batch.into_parts();
1064
1065        for column in columns.iter_mut() {
1066            let Some(string_view) = column.as_string_view_opt() else {
1067                continue;
1068            };
1069
1070            // Re-create the StringViewArray to ensure memory layout is
1071            // consistent
1072            let mut builder = StringViewBuilder::new();
1073            for s in string_view.iter() {
1074                builder.append_option(s);
1075            }
1076            // Update the column with the new StringViewArray
1077            *column = Arc::new(builder.finish());
1078        }
1079
1080        let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1081        RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1082    }
1083}