arrow_select/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`]  concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use arrow_array::types::{BinaryViewType, StringViewType};
25use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
26use arrow_schema::{ArrowError, DataType, SchemaRef};
27use std::collections::VecDeque;
28use std::sync::Arc;
29// Originally From DataFusion's coalesce module:
30// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
31
32mod byte_view;
33mod generic;
34mod primitive;
35
36use byte_view::InProgressByteViewArray;
37use generic::GenericInProgressArray;
38use primitive::InProgressPrimitiveArray;
39
40/// Concatenate multiple [`RecordBatch`]es
41///
42/// Implements the common pattern of incrementally creating output
43/// [`RecordBatch`]es of a specific size from an input stream of
44/// [`RecordBatch`]es.
45///
46/// This is useful after operations such as [`filter`] and [`take`] that produce
47/// smaller batches, and we want to coalesce them into larger batches for
48/// further processing.
49///
50/// # Motivation
51///
52/// If we use [`concat_batches`] to implement the same functionality, there are 2 potential issues:
53/// 1. At least 2x peak memory (holding the input and output of concat)
54/// 2. 2 copies of the data (to create the output of filter and then create the output of concat)
55///
56/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
57/// about the motivation.
58///
59/// [`filter`]: crate::filter::filter
60/// [`take`]: crate::take::take
61/// [`concat_batches`]: crate::concat::concat_batches
62///
63/// # Example
64/// ```
65/// use arrow_array::record_batch;
66/// use arrow_select::coalesce::{BatchCoalescer};
67/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
68/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
69///
70/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
71/// let target_batch_size = 4;
72/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
73///
74/// // push the batches
75/// coalescer.push_batch(batch1).unwrap();
76/// // only pushed 3 rows (not yet 4, enough to produce a batch)
77/// assert!(coalescer.next_completed_batch().is_none());
78/// coalescer.push_batch(batch2).unwrap();
79/// // now we have 5 rows, so we can produce a batch
80/// let finished = coalescer.next_completed_batch().unwrap();
81/// // 4 rows came out (target batch size is 4)
82/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
83/// assert_eq!(finished, expected);
84///
85/// // Have no more input, but still have an in-progress batch
86/// assert!(coalescer.next_completed_batch().is_none());
87/// // We can finish the batch, which will produce the remaining rows
88/// coalescer.finish_buffered_batch().unwrap();
89/// let expected = record_batch!(("a", Int32, [5])).unwrap();
90/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
91///
92/// // The coalescer is now empty
93/// assert!(coalescer.next_completed_batch().is_none());
94/// ```
95///
96/// # Background
97///
98/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
99/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
100/// there is fixed processing overhead per batch. This coalescer builds up these
101/// larger batches incrementally.
102///
103/// ```text
104/// ┌────────────────────┐
105/// │    RecordBatch     │
106/// │   num_rows = 100   │
107/// └────────────────────┘                 ┌────────────────────┐
108///                                        │                    │
109/// ┌────────────────────┐     Coalesce    │                    │
110/// │                    │      Batches    │                    │
111/// │    RecordBatch     │                 │                    │
112/// │   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
113/// │                    │                 │    RecordBatch     │
114/// │                    │                 │   num_rows = 400   │
115/// └────────────────────┘                 │                    │
116///                                        │                    │
117/// ┌────────────────────┐                 │                    │
118/// │                    │                 │                    │
119/// │    RecordBatch     │                 │                    │
120/// │   num_rows = 100   │                 └────────────────────┘
121/// │                    │
122/// └────────────────────┘
123/// ```
124///
125/// # Notes:
126///
127/// 1. Output rows are produced in the same order as the input rows
128///
129/// 2. The output is a sequence of batches, with all but the last being at exactly
130///    `target_batch_size` rows.
131#[derive(Debug)]
132pub struct BatchCoalescer {
133    /// The input schema
134    schema: SchemaRef,
135    /// The target batch size (and thus size for views allocation). This is a
136    /// hard limit: the output batch will be exactly `target_batch_size`,
137    /// rather than possibly being slightly above.
138    target_batch_size: usize,
139    /// In-progress arrays
140    in_progress_arrays: Vec<Box<dyn InProgressArray>>,
141    /// Buffered row count. Always less than `batch_size`
142    buffered_rows: usize,
143    /// Completed batches
144    completed: VecDeque<RecordBatch>,
145    /// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`]
146    biggest_coalesce_batch_size: Option<usize>,
147}
148
149impl BatchCoalescer {
150    /// Create a new `BatchCoalescer`
151    ///
152    /// # Arguments
153    /// - `schema` - the schema of the output batches
154    /// - `target_batch_size` - the number of rows in each output batch.
155    ///   Typical values are `4096` or `8192` rows.
156    ///
157    pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
158        let in_progress_arrays = schema
159            .fields()
160            .iter()
161            .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
162            .collect::<Vec<_>>();
163
164        Self {
165            schema,
166            target_batch_size,
167            in_progress_arrays,
168            // We will for sure store at least one completed batch
169            completed: VecDeque::with_capacity(1),
170            buffered_rows: 0,
171            biggest_coalesce_batch_size: None,
172        }
173    }
174
175    /// Set the coalesce batch size limit (default `None`)
176    ///
177    /// This limit determine when batches should bypass coalescing. Intuitively,
178    /// batches that are already large are costly to coalesce and are efficient
179    /// enough to process directly without coalescing.
180    ///
181    /// If `Some(limit)`, batches larger than this limit will bypass coalescing
182    /// when there is no buffered data, or when the previously buffered data
183    /// already exceeds this limit.
184    ///
185    /// If `None`, all batches will be coalesced according to the
186    /// target_batch_size.
187    pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
188        self.biggest_coalesce_batch_size = limit;
189        self
190    }
191
192    /// Get the current biggest coalesce batch size limit
193    ///
194    /// See [`Self::with_biggest_coalesce_batch_size`] for details
195    pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
196        self.biggest_coalesce_batch_size
197    }
198
199    /// Set the biggest coalesce batch size limit
200    ///
201    /// See [`Self::with_biggest_coalesce_batch_size`] for details
202    pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
203        self.biggest_coalesce_batch_size = limit;
204    }
205
206    /// Return the schema of the output batches
207    pub fn schema(&self) -> SchemaRef {
208        Arc::clone(&self.schema)
209    }
210
211    /// Push a batch into the Coalescer after applying a filter
212    ///
213    /// This is semantically equivalent of calling [`Self::push_batch`]
214    /// with the results from  [`filter_record_batch`]
215    ///
216    /// # Example
217    /// ```
218    /// # use arrow_array::{record_batch, BooleanArray};
219    /// # use arrow_select::coalesce::BatchCoalescer;
220    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
221    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
222    /// // Apply a filter to each batch to pick the first and last row
223    /// let filter = BooleanArray::from(vec![true, false, true]);
224    /// // create a new Coalescer that targets creating 1000 row batches
225    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
226    /// coalescer.push_batch_with_filter(batch1, &filter);
227    /// coalescer.push_batch_with_filter(batch2, &filter);
228    /// // finsh and retrieve the created batch
229    /// coalescer.finish_buffered_batch().unwrap();
230    /// let completed_batch = coalescer.next_completed_batch().unwrap();
231    /// // filtered out 2 and 5:
232    /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
233    /// assert_eq!(completed_batch, expected_batch);
234    /// ```
235    pub fn push_batch_with_filter(
236        &mut self,
237        batch: RecordBatch,
238        filter: &BooleanArray,
239    ) -> Result<(), ArrowError> {
240        // TODO: optimize this to avoid materializing (copying the results
241        // of filter to a new batch)
242        let filtered_batch = filter_record_batch(&batch, filter)?;
243        self.push_batch(filtered_batch)
244    }
245
246    /// Push all the rows from `batch` into the Coalescer
247    ///
248    /// When buffered data plus incoming rows reach `target_batch_size` ,
249    /// completed batches are generated eagerly and can be retrieved via
250    /// [`Self::next_completed_batch()`].
251    /// Output batches contain exactly `target_batch_size` rows, so the tail of
252    /// the input batch may remain buffered.
253    /// Remaining partial data either waits for future input batches or can be
254    /// materialized immediately by calling [`Self::finish_buffered_batch()`].
255    ///
256    /// # Example
257    /// ```
258    /// # use arrow_array::record_batch;
259    /// # use arrow_select::coalesce::BatchCoalescer;
260    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
261    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
262    /// // create a new Coalescer that targets creating 1000 row batches
263    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
264    /// coalescer.push_batch(batch1);
265    /// coalescer.push_batch(batch2);
266    /// // finsh and retrieve the created batch
267    /// coalescer.finish_buffered_batch().unwrap();
268    /// let completed_batch = coalescer.next_completed_batch().unwrap();
269    /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
270    /// assert_eq!(completed_batch, expected_batch);
271    /// ```
272    pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
273        // Large batch bypass optimization:
274        // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
275        // we can avoid expensive split-and-merge operations by passing it through directly.
276        //
277        // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
278        // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
279        // If not set (None), ALL batches follow normal coalescing behavior regardless of size.
280
281        // =============================================================================
282        // CASE 1: No buffer + large batch → Direct bypass
283        // =============================================================================
284        // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
285        // Input sequence: [600, 1200, 300]
286        //
287        // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
288        //   600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
289        //        → output: [600] (bypass, preserves large batch)
290        //   1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
291        //         → output: [1200] (bypass, preserves large batch)
292        //   300 → normal batch, buffer: [300]
293        //   Result: [600], [1200], [300] - large batches preserved, mixed sizes
294
295        // =============================================================================
296        // CASE 2: Buffer too large + large batch → Flush first, then bypass
297        // =============================================================================
298        // This case prevents creating extremely large merged batches that would
299        // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
300        //
301        // Example 1: Buffer exceeds limit before large batch arrives
302        // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
303        // Input: [350, 200, 800]
304        //
305        // Step 1: push_batch([350])
306        //   → batch_size=350 <= 400, normal path
307        //   → buffer: [350], buffered_rows=350
308        //
309        // Step 2: push_batch([200])
310        //   → batch_size=200 <= 400, normal path
311        //   → buffer: [350, 200], buffered_rows=550
312        //
313        // Step 3: push_batch([800])
314        //   → batch_size=800 > 400, large batch path
315        //   → buffered_rows=550 > 400 → Case 2: flush first
316        //   → flush: output [550] (combined [350, 200])
317        //   → then bypass: output [800]
318        //   Result: [550], [800] - buffer flushed to prevent oversized merge
319        //
320        // Example 2: Multiple small batches accumulate before large batch
321        // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
322        // Input: [150, 100, 80, 900]
323        //
324        // Step 1-3: Accumulate small batches
325        //   150 → buffer: [150], buffered_rows=150
326        //   100 → buffer: [150, 100], buffered_rows=250
327        //   80  → buffer: [150, 100, 80], buffered_rows=330
328        //
329        // Step 4: push_batch([900])
330        //   → batch_size=900 > 300, large batch path
331        //   → buffered_rows=330 > 300 → Case 2: flush first
332        //   → flush: output [330] (combined [150, 100, 80])
333        //   → then bypass: output [900]
334        //   Result: [330], [900] - prevents merge into [1230] which would be too large
335
336        // =============================================================================
337        // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
338        // =============================================================================
339        // When buffer is small enough, we still merge to maintain efficiency
340        // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
341        // Input: [300, 1200]
342        //
343        // Step 1: push_batch([300])
344        //   → batch_size=300 <= 500, normal path
345        //   → buffer: [300], buffered_rows=300
346        //
347        // Step 2: push_batch([1200])
348        //   → batch_size=1200 > 500, large batch path
349        //   → buffered_rows=300 <= 500 → Case 3: normal merge
350        //   → buffer: [300, 1200] (1500 total)
351        //   → 1500 > target_batch_size → split: output [1000], buffer [500]
352        //   Result: [1000], [500] - normal split/merge behavior maintained
353
354        // =============================================================================
355        // Comparison: Default vs Optimized Behavior
356        // =============================================================================
357        // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
358        // Input: [600, 1200, 300]
359        //
360        // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
361        //   600 → buffer: [600]
362        //   1200 → buffer: [600, 1200] (1800 rows total)
363        //         → split: output [1000 rows], buffer [800 rows remaining]
364        //   300 → buffer: [800, 300] (1100 rows total)
365        //        → split: output [1000 rows], buffer [100 rows remaining]
366        //   Result: [1000], [1000], [100] - all outputs respect target_batch_size
367        //
368        // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
369        //   600 → Case 1: direct bypass → output: [600]
370        //   1200 → Case 1: direct bypass → output: [1200]
371        //   300 → normal path → buffer: [300]
372        //   Result: [600], [1200], [300] - large batches preserved
373
374        // =============================================================================
375        // Benefits and Trade-offs
376        // =============================================================================
377        // Benefits of the optimization:
378        // - Large batches stay intact (better for downstream vectorized processing)
379        // - Fewer split/merge operations (better CPU performance)
380        // - More predictable memory usage patterns
381        // - Maintains streaming efficiency while preserving batch boundaries
382        //
383        // Trade-offs:
384        // - Output batch sizes become variable (not always target_batch_size)
385        // - May produce smaller partial batches when flushing before large batches
386        // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
387
388        // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
389        // small batches together?
390
391        let batch_size = batch.num_rows();
392
393        // Fast path: skip empty batches
394        if batch_size == 0 {
395            return Ok(());
396        }
397
398        // Large batch optimization: bypass coalescing for oversized batches
399        if let Some(limit) = self.biggest_coalesce_batch_size {
400            if batch_size > limit {
401                // Case 1: No buffered data - emit large batch directly
402                // Example: [] + [1200] → output [1200], buffer []
403                if self.buffered_rows == 0 {
404                    self.completed.push_back(batch);
405                    return Ok(());
406                }
407
408                // Case 2: Buffer too large - flush then emit to avoid oversized merge
409                // Example: [850] + [1200] → output [850], then output [1200]
410                // This prevents creating batches much larger than both target_batch_size
411                // and biggest_coalesce_batch_size, which could cause memory issues
412                if self.buffered_rows > limit {
413                    self.finish_buffered_batch()?;
414                    self.completed.push_back(batch);
415                    return Ok(());
416                }
417
418                // Case 3: Small buffer - proceed with normal coalescing
419                // Example: [300] + [1200] → split and merge normally
420                // This ensures small batches still get properly coalesced
421                // while allowing some controlled growth beyond the limit
422            }
423        }
424
425        let (_schema, arrays, mut num_rows) = batch.into_parts();
426
427        // setup input rows
428        assert_eq!(arrays.len(), self.in_progress_arrays.len());
429        self.in_progress_arrays
430            .iter_mut()
431            .zip(arrays)
432            .for_each(|(in_progress, array)| {
433                in_progress.set_source(Some(array));
434            });
435
436        // If pushing this batch would exceed the target batch size,
437        // finish the current batch and start a new one
438        let mut offset = 0;
439        while num_rows > (self.target_batch_size - self.buffered_rows) {
440            let remaining_rows = self.target_batch_size - self.buffered_rows;
441            debug_assert!(remaining_rows > 0);
442
443            // Copy remaining_rows from each array
444            for in_progress in self.in_progress_arrays.iter_mut() {
445                in_progress.copy_rows(offset, remaining_rows)?;
446            }
447
448            self.buffered_rows += remaining_rows;
449            offset += remaining_rows;
450            num_rows -= remaining_rows;
451
452            self.finish_buffered_batch()?;
453        }
454
455        // Add any the remaining rows to the buffer
456        self.buffered_rows += num_rows;
457        if num_rows > 0 {
458            for in_progress in self.in_progress_arrays.iter_mut() {
459                in_progress.copy_rows(offset, num_rows)?;
460            }
461        }
462
463        // If we have reached the target batch size, finalize the buffered batch
464        if self.buffered_rows >= self.target_batch_size {
465            self.finish_buffered_batch()?;
466        }
467
468        // clear in progress sources (to allow the memory to be freed)
469        for in_progress in self.in_progress_arrays.iter_mut() {
470            in_progress.set_source(None);
471        }
472
473        Ok(())
474    }
475
476    /// Returns the number of buffered rows
477    pub fn get_buffered_rows(&self) -> usize {
478        self.buffered_rows
479    }
480
481    /// Concatenates any buffered batches into a single `RecordBatch` and
482    /// clears any output buffers
483    ///
484    /// Normally this is called when the input stream is exhausted, and
485    /// we want to finalize the last batch of rows.
486    ///
487    /// See [`Self::next_completed_batch()`] for the completed batches.
488    pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
489        if self.buffered_rows == 0 {
490            return Ok(());
491        }
492        let new_arrays = self
493            .in_progress_arrays
494            .iter_mut()
495            .map(|array| array.finish())
496            .collect::<Result<Vec<_>, ArrowError>>()?;
497
498        for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
499            debug_assert_eq!(array.data_type(), field.data_type());
500            debug_assert_eq!(array.len(), self.buffered_rows);
501        }
502
503        // SAFETY: each array was created of the correct type and length.
504        let batch = unsafe {
505            RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
506        };
507
508        self.buffered_rows = 0;
509        self.completed.push_back(batch);
510        Ok(())
511    }
512
513    /// Returns true if there is any buffered data
514    pub fn is_empty(&self) -> bool {
515        self.buffered_rows == 0 && self.completed.is_empty()
516    }
517
518    /// Returns true if there are any completed batches
519    pub fn has_completed_batch(&self) -> bool {
520        !self.completed.is_empty()
521    }
522
523    /// Removes and returns the next completed batch, if any.
524    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
525        self.completed.pop_front()
526    }
527}
528
529/// Return a new `InProgressArray` for the given data type
530fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
531    macro_rules! instantiate_primitive {
532        ($t:ty) => {
533            Box::new(InProgressPrimitiveArray::<$t>::new(
534                batch_size,
535                data_type.clone(),
536            ))
537        };
538    }
539
540    downcast_primitive! {
541        // Instantiate InProgressPrimitiveArray for each primitive type
542        data_type => (instantiate_primitive),
543        DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
544        DataType::BinaryView => {
545            Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
546        }
547        _ => Box::new(GenericInProgressArray::new()),
548    }
549}
550
551/// Incrementally builds up arrays
552///
553/// [`GenericInProgressArray`] is the default implementation that buffers
554/// arrays and uses other kernels concatenates them when finished.
555///
556/// Some types have specialized implementations for this array types (e.g.,
557/// [`StringViewArray`], etc.).
558///
559/// [`StringViewArray`]: arrow_array::StringViewArray
560trait InProgressArray: std::fmt::Debug + Send + Sync {
561    /// Set the source array.
562    ///
563    /// Calls to [`Self::copy_rows`] will copy rows from this array into the
564    /// current in-progress array
565    fn set_source(&mut self, source: Option<ArrayRef>);
566
567    /// Copy rows from the current source array into the in-progress array
568    ///
569    /// The source array is set by [`Self::set_source`].
570    ///
571    /// Return an error if the source array is not set
572    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
573
574    /// Finish the currently in-progress array and return it as an `ArrayRef`
575    fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
576}
577
578#[cfg(test)]
579mod tests {
580    use super::*;
581    use crate::concat::concat_batches;
582    use arrow_array::builder::StringViewBuilder;
583    use arrow_array::cast::AsArray;
584    use arrow_array::{
585        BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
586        TimestampNanosecondArray, UInt32Array,
587    };
588    use arrow_schema::{DataType, Field, Schema};
589    use rand::{Rng, SeedableRng};
590    use std::ops::Range;
591
592    #[test]
593    fn test_coalesce() {
594        let batch = uint32_batch(0..8);
595        Test::new()
596            .with_batches(std::iter::repeat_n(batch, 10))
597            // expected output is exactly 21 rows (except for the final batch)
598            .with_batch_size(21)
599            .with_expected_output_sizes(vec![21, 21, 21, 17])
600            .run();
601    }
602
603    #[test]
604    fn test_coalesce_one_by_one() {
605        let batch = uint32_batch(0..1); // single row input
606        Test::new()
607            .with_batches(std::iter::repeat_n(batch, 97))
608            // expected output is exactly 20 rows (except for the final batch)
609            .with_batch_size(20)
610            .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
611            .run();
612    }
613
614    #[test]
615    fn test_coalesce_empty() {
616        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
617
618        Test::new()
619            .with_batches(vec![])
620            .with_schema(schema)
621            .with_batch_size(21)
622            .with_expected_output_sizes(vec![])
623            .run();
624    }
625
626    #[test]
627    fn test_single_large_batch_greater_than_target() {
628        // test a single large batch
629        let batch = uint32_batch(0..4096);
630        Test::new()
631            .with_batch(batch)
632            .with_batch_size(1000)
633            .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
634            .run();
635    }
636
637    #[test]
638    fn test_single_large_batch_smaller_than_target() {
639        // test a single large batch
640        let batch = uint32_batch(0..4096);
641        Test::new()
642            .with_batch(batch)
643            .with_batch_size(8192)
644            .with_expected_output_sizes(vec![4096])
645            .run();
646    }
647
648    #[test]
649    fn test_single_large_batch_equal_to_target() {
650        // test a single large batch
651        let batch = uint32_batch(0..4096);
652        Test::new()
653            .with_batch(batch)
654            .with_batch_size(4096)
655            .with_expected_output_sizes(vec![4096])
656            .run();
657    }
658
659    #[test]
660    fn test_single_large_batch_equally_divisible_in_target() {
661        // test a single large batch
662        let batch = uint32_batch(0..4096);
663        Test::new()
664            .with_batch(batch)
665            .with_batch_size(1024)
666            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
667            .run();
668    }
669
670    #[test]
671    fn test_empty_schema() {
672        let schema = Schema::empty();
673        let batch = RecordBatch::new_empty(schema.into());
674        Test::new()
675            .with_batch(batch)
676            .with_expected_output_sizes(vec![])
677            .run();
678    }
679
680    /// Coalesce multiple batches, 80k rows, with a 0.1% selectivity filter
681    #[test]
682    fn test_coalesce_filtered_001() {
683        let mut filter_builder = RandomFilterBuilder {
684            num_rows: 8000,
685            selectivity: 0.001,
686            seed: 0,
687        };
688
689        // add 10 batches of 8000 rows each
690        // 80k rows, selecting 0.1% means 80 rows
691        // not exactly 80 as the rows are random;
692        let mut test = Test::new();
693        for _ in 0..10 {
694            test = test
695                .with_batch(multi_column_batch(0..8000))
696                .with_filter(filter_builder.next_filter())
697        }
698        test.with_batch_size(15)
699            .with_expected_output_sizes(vec![15, 15, 15, 13])
700            .run();
701    }
702
703    /// Coalesce multiple batches, 80k rows, with a 1% selectivity filter
704    #[test]
705    fn test_coalesce_filtered_01() {
706        let mut filter_builder = RandomFilterBuilder {
707            num_rows: 8000,
708            selectivity: 0.01,
709            seed: 0,
710        };
711
712        // add 10 batches of 8000 rows each
713        // 80k rows, selecting 1% means 800 rows
714        // not exactly 800 as the rows are random;
715        let mut test = Test::new();
716        for _ in 0..10 {
717            test = test
718                .with_batch(multi_column_batch(0..8000))
719                .with_filter(filter_builder.next_filter())
720        }
721        test.with_batch_size(128)
722            .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
723            .run();
724    }
725
726    /// Coalesce multiple batches, 80k rows, with a 10% selectivity filter
727    #[test]
728    fn test_coalesce_filtered_1() {
729        let mut filter_builder = RandomFilterBuilder {
730            num_rows: 8000,
731            selectivity: 0.1,
732            seed: 0,
733        };
734
735        // add 10 batches of 8000 rows each
736        // 80k rows, selecting 10% means 8000 rows
737        // not exactly 800 as the rows are random;
738        let mut test = Test::new();
739        for _ in 0..10 {
740            test = test
741                .with_batch(multi_column_batch(0..8000))
742                .with_filter(filter_builder.next_filter())
743        }
744        test.with_batch_size(1024)
745            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
746            .run();
747    }
748
749    /// Coalesce multiple batches, 8k rows, with a 90% selectivity filter
750    #[test]
751    fn test_coalesce_filtered_90() {
752        let mut filter_builder = RandomFilterBuilder {
753            num_rows: 800,
754            selectivity: 0.90,
755            seed: 0,
756        };
757
758        // add 10 batches of 800 rows each
759        // 8k rows, selecting 99% means 7200 rows
760        // not exactly 7200 as the rows are random;
761        let mut test = Test::new();
762        for _ in 0..10 {
763            test = test
764                .with_batch(multi_column_batch(0..800))
765                .with_filter(filter_builder.next_filter())
766        }
767        test.with_batch_size(1024)
768            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
769            .run();
770    }
771
772    #[test]
773    fn test_coalesce_non_null() {
774        Test::new()
775            // 4040 rows of unit32
776            .with_batch(uint32_batch_non_null(0..3000))
777            .with_batch(uint32_batch_non_null(0..1040))
778            .with_batch_size(1024)
779            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
780            .run();
781    }
782    #[test]
783    fn test_utf8_split() {
784        Test::new()
785            // 4040 rows of utf8 strings in total, split into batches of 1024
786            .with_batch(utf8_batch(0..3000))
787            .with_batch(utf8_batch(0..1040))
788            .with_batch_size(1024)
789            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
790            .run();
791    }
792
793    #[test]
794    fn test_string_view_no_views() {
795        let output_batches = Test::new()
796            // both input batches have no views, so no need to compact
797            .with_batch(stringview_batch([Some("foo"), Some("bar")]))
798            .with_batch(stringview_batch([Some("baz"), Some("qux")]))
799            .with_expected_output_sizes(vec![4])
800            .run();
801
802        expect_buffer_layout(
803            col_as_string_view("c0", output_batches.first().unwrap()),
804            vec![],
805        );
806    }
807
808    #[test]
809    fn test_string_view_batch_small_no_compact() {
810        // view with only short strings (no buffers) --> no need to compact
811        let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
812        let output_batches = Test::new()
813            .with_batch(batch.clone())
814            .with_expected_output_sizes(vec![1000])
815            .run();
816
817        let array = col_as_string_view("c0", &batch);
818        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
819        assert_eq!(array.data_buffers().len(), 0);
820        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
821
822        expect_buffer_layout(gc_array, vec![]);
823    }
824
825    #[test]
826    fn test_string_view_batch_large_no_compact() {
827        // view with large strings (has buffers) but full --> no need to compact
828        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
829        let output_batches = Test::new()
830            .with_batch(batch.clone())
831            .with_batch_size(1000)
832            .with_expected_output_sizes(vec![1000])
833            .run();
834
835        let array = col_as_string_view("c0", &batch);
836        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
837        assert_eq!(array.data_buffers().len(), 5);
838        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
839
840        expect_buffer_layout(
841            gc_array,
842            vec![
843                ExpectedLayout {
844                    len: 8190,
845                    capacity: 8192,
846                },
847                ExpectedLayout {
848                    len: 8190,
849                    capacity: 8192,
850                },
851                ExpectedLayout {
852                    len: 8190,
853                    capacity: 8192,
854                },
855                ExpectedLayout {
856                    len: 8190,
857                    capacity: 8192,
858                },
859                ExpectedLayout {
860                    len: 2240,
861                    capacity: 8192,
862                },
863            ],
864        );
865    }
866
867    #[test]
868    fn test_string_view_batch_small_with_buffers_no_compact() {
869        // view with buffers but only short views
870        let short_strings = std::iter::repeat(Some("SmallString"));
871        let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
872        // 20 short strings, then a long ones
873        let values = short_strings.take(20).chain(long_strings);
874        let batch = stringview_batch_repeated(1000, values)
875            // take only 10 short strings (no long ones)
876            .slice(5, 10);
877        let output_batches = Test::new()
878            .with_batch(batch.clone())
879            .with_batch_size(1000)
880            .with_expected_output_sizes(vec![10])
881            .run();
882
883        let array = col_as_string_view("c0", &batch);
884        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
885        assert_eq!(array.data_buffers().len(), 1); // input has one buffer
886        assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
887    }
888
889    #[test]
890    fn test_string_view_batch_large_slice_compact() {
891        // view with large strings (has buffers) and only partially used  --> no need to compact
892        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
893            // slice only 22 rows, so most of the buffer is not used
894            .slice(11, 22);
895
896        let output_batches = Test::new()
897            .with_batch(batch.clone())
898            .with_batch_size(1000)
899            .with_expected_output_sizes(vec![22])
900            .run();
901
902        let array = col_as_string_view("c0", &batch);
903        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
904        assert_eq!(array.data_buffers().len(), 5);
905
906        expect_buffer_layout(
907            gc_array,
908            vec![ExpectedLayout {
909                len: 770,
910                capacity: 8192,
911            }],
912        );
913    }
914
915    #[test]
916    fn test_string_view_mixed() {
917        let large_view_batch =
918            stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
919        let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
920        let mixed_batch = stringview_batch_repeated(
921            1000,
922            [Some("This string is longer than 12 bytes"), Some("Small")],
923        );
924        let mixed_batch_nulls = stringview_batch_repeated(
925            1000,
926            [
927                Some("This string is longer than 12 bytes"),
928                Some("Small"),
929                None,
930            ],
931        );
932
933        // Several batches with mixed inline / non inline
934        // 4k rows in
935        let output_batches = Test::new()
936            .with_batch(large_view_batch.clone())
937            .with_batch(small_view_batch)
938            // this batch needs to be compacted (less than 1/2 full)
939            .with_batch(large_view_batch.slice(10, 20))
940            .with_batch(mixed_batch_nulls)
941            // this batch needs to be compacted (less than 1/2 full)
942            .with_batch(large_view_batch.slice(10, 20))
943            .with_batch(mixed_batch)
944            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
945            .run();
946
947        expect_buffer_layout(
948            col_as_string_view("c0", output_batches.first().unwrap()),
949            vec![
950                ExpectedLayout {
951                    len: 8190,
952                    capacity: 8192,
953                },
954                ExpectedLayout {
955                    len: 8190,
956                    capacity: 8192,
957                },
958                ExpectedLayout {
959                    len: 8190,
960                    capacity: 8192,
961                },
962                ExpectedLayout {
963                    len: 8190,
964                    capacity: 8192,
965                },
966                ExpectedLayout {
967                    len: 2240,
968                    capacity: 8192,
969                },
970            ],
971        );
972    }
973
974    #[test]
975    fn test_string_view_many_small_compact() {
976        // 200 rows alternating long (28) and short (≤12) strings.
977        // Only the 100 long strings go into data buffers: 100 × 28 = 2800.
978        let batch = stringview_batch_repeated(
979            200,
980            [Some("This string is 28 bytes long"), Some("small string")],
981        );
982        let output_batches = Test::new()
983            // First allocated buffer is 8kb.
984            // Appending 10 batches of 2800 bytes will use 2800 * 10 = 14kb (8kb, an 16kb and 32kbkb)
985            .with_batch(batch.clone())
986            .with_batch(batch.clone())
987            .with_batch(batch.clone())
988            .with_batch(batch.clone())
989            .with_batch(batch.clone())
990            .with_batch(batch.clone())
991            .with_batch(batch.clone())
992            .with_batch(batch.clone())
993            .with_batch(batch.clone())
994            .with_batch(batch.clone())
995            .with_batch_size(8000)
996            .with_expected_output_sizes(vec![2000]) // only 1000 rows total
997            .run();
998
999        // expect a nice even distribution of buffers
1000        expect_buffer_layout(
1001            col_as_string_view("c0", output_batches.first().unwrap()),
1002            vec![
1003                ExpectedLayout {
1004                    len: 8176,
1005                    capacity: 8192,
1006                },
1007                ExpectedLayout {
1008                    len: 16380,
1009                    capacity: 16384,
1010                },
1011                ExpectedLayout {
1012                    len: 3444,
1013                    capacity: 32768,
1014                },
1015            ],
1016        );
1017    }
1018
1019    #[test]
1020    fn test_string_view_many_small_boundary() {
1021        // The strings are designed to exactly fit into buffers that are powers of 2 long
1022        let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1023        let output_batches = Test::new()
1024            .with_batches(std::iter::repeat_n(batch, 20))
1025            .with_batch_size(900)
1026            .with_expected_output_sizes(vec![900, 900, 200])
1027            .run();
1028
1029        // expect each buffer to be entirely full except the last one
1030        expect_buffer_layout(
1031            col_as_string_view("c0", output_batches.first().unwrap()),
1032            vec![
1033                ExpectedLayout {
1034                    len: 8192,
1035                    capacity: 8192,
1036                },
1037                ExpectedLayout {
1038                    len: 16384,
1039                    capacity: 16384,
1040                },
1041                ExpectedLayout {
1042                    len: 4224,
1043                    capacity: 32768,
1044                },
1045            ],
1046        );
1047    }
1048
1049    #[test]
1050    fn test_string_view_large_small() {
1051        // The strings are 37 bytes long, so each batch has 100 * 28 = 2800 bytes
1052        let mixed_batch = stringview_batch_repeated(
1053            200,
1054            [Some("This string is 28 bytes long"), Some("small string")],
1055        );
1056        // These strings aren't copied, this array has an 8k buffer
1057        let all_large = stringview_batch_repeated(
1058            50,
1059            [Some(
1060                "This buffer has only large strings in it so there are no buffer copies",
1061            )],
1062        );
1063
1064        let output_batches = Test::new()
1065            // First allocated buffer is 8kb.
1066            // Appending five batches of 2800 bytes will use 2800 * 10 = 28kb (8kb, an 16kb and 32kbkb)
1067            .with_batch(mixed_batch.clone())
1068            .with_batch(mixed_batch.clone())
1069            .with_batch(all_large.clone())
1070            .with_batch(mixed_batch.clone())
1071            .with_batch(all_large.clone())
1072            .with_batch(mixed_batch.clone())
1073            .with_batch(mixed_batch.clone())
1074            .with_batch(all_large.clone())
1075            .with_batch(mixed_batch.clone())
1076            .with_batch(all_large.clone())
1077            .with_batch_size(8000)
1078            .with_expected_output_sizes(vec![1400])
1079            .run();
1080
1081        expect_buffer_layout(
1082            col_as_string_view("c0", output_batches.first().unwrap()),
1083            vec![
1084                ExpectedLayout {
1085                    len: 8190,
1086                    capacity: 8192,
1087                },
1088                ExpectedLayout {
1089                    len: 16366,
1090                    capacity: 16384,
1091                },
1092                ExpectedLayout {
1093                    len: 6244,
1094                    capacity: 32768,
1095                },
1096            ],
1097        );
1098    }
1099
1100    #[test]
1101    fn test_binary_view() {
1102        let values: Vec<Option<&[u8]>> = vec![
1103            Some(b"foo"),
1104            None,
1105            Some(b"A longer string that is more than 12 bytes"),
1106        ];
1107
1108        let binary_view =
1109            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1110        let batch =
1111            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1112
1113        Test::new()
1114            .with_batch(batch.clone())
1115            .with_batch(batch.clone())
1116            .with_batch_size(512)
1117            .with_expected_output_sizes(vec![512, 512, 512, 464])
1118            .run();
1119    }
1120
1121    #[derive(Debug, Clone, PartialEq)]
1122    struct ExpectedLayout {
1123        len: usize,
1124        capacity: usize,
1125    }
1126
1127    /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
1128    fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1129        let actual = array
1130            .data_buffers()
1131            .iter()
1132            .map(|b| ExpectedLayout {
1133                len: b.len(),
1134                capacity: b.capacity(),
1135            })
1136            .collect::<Vec<_>>();
1137
1138        assert_eq!(
1139            actual, expected,
1140            "Expected buffer layout {expected:#?} but got {actual:#?}"
1141        );
1142    }
1143
1144    /// Test for [`BatchCoalescer`]
1145    ///
1146    /// Pushes the input batches to the coalescer and verifies that the resulting
1147    /// batches have the expected number of rows and contents.
1148    #[derive(Debug, Clone)]
1149    struct Test {
1150        /// Batches to feed to the coalescer.
1151        input_batches: Vec<RecordBatch>,
1152        /// Filters to apply to the corresponding input batches.
1153        ///
1154        /// If there are no filters for the input batches, the batch will be
1155        /// pushed as is.
1156        filters: Vec<BooleanArray>,
1157        /// The schema. If not provided, the first batch's schema is used.
1158        schema: Option<SchemaRef>,
1159        /// Expected output sizes of the resulting batches
1160        expected_output_sizes: Vec<usize>,
1161        /// target batch size (default to 1024)
1162        target_batch_size: usize,
1163    }
1164
1165    impl Default for Test {
1166        fn default() -> Self {
1167            Self {
1168                input_batches: vec![],
1169                filters: vec![],
1170                schema: None,
1171                expected_output_sizes: vec![],
1172                target_batch_size: 1024,
1173            }
1174        }
1175    }
1176
1177    impl Test {
1178        fn new() -> Self {
1179            Self::default()
1180        }
1181
1182        /// Set the target batch size
1183        fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1184            self.target_batch_size = target_batch_size;
1185            self
1186        }
1187
1188        /// Extend the input batches with `batch`
1189        fn with_batch(mut self, batch: RecordBatch) -> Self {
1190            self.input_batches.push(batch);
1191            self
1192        }
1193
1194        /// Extend the filters with `filter`
1195        fn with_filter(mut self, filter: BooleanArray) -> Self {
1196            self.filters.push(filter);
1197            self
1198        }
1199
1200        /// Extends the input batches with `batches`
1201        fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1202            self.input_batches.extend(batches);
1203            self
1204        }
1205
1206        /// Specifies the schema for the test
1207        fn with_schema(mut self, schema: SchemaRef) -> Self {
1208            self.schema = Some(schema);
1209            self
1210        }
1211
1212        /// Extends `sizes` to expected output sizes
1213        fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1214            self.expected_output_sizes.extend(sizes);
1215            self
1216        }
1217
1218        /// Runs the test -- see documentation on [`Test`] for details
1219        ///
1220        /// Returns the resulting output batches
1221        fn run(self) -> Vec<RecordBatch> {
1222            let expected_output = self.expected_output();
1223            let schema = self.schema();
1224
1225            let Self {
1226                input_batches,
1227                filters,
1228                schema: _,
1229                target_batch_size,
1230                expected_output_sizes,
1231            } = self;
1232
1233            let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1234
1235            let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1236
1237            // feed input batches and filters to the coalescer
1238            let mut filters = filters.into_iter();
1239            for batch in input_batches {
1240                if let Some(filter) = filters.next() {
1241                    coalescer.push_batch_with_filter(batch, &filter).unwrap();
1242                } else {
1243                    coalescer.push_batch(batch).unwrap();
1244                }
1245            }
1246            assert_eq!(schema, coalescer.schema());
1247
1248            if had_input {
1249                assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1250            } else {
1251                assert!(coalescer.is_empty(), "Coalescer should be empty");
1252            }
1253
1254            coalescer.finish_buffered_batch().unwrap();
1255            if had_input {
1256                assert!(
1257                    coalescer.has_completed_batch(),
1258                    "Coalescer should have completed batches"
1259                );
1260            }
1261
1262            let mut output_batches = vec![];
1263            while let Some(batch) = coalescer.next_completed_batch() {
1264                output_batches.push(batch);
1265            }
1266
1267            // make sure we got the expected number of output batches and content
1268            let mut starting_idx = 0;
1269            let actual_output_sizes: Vec<usize> =
1270                output_batches.iter().map(|b| b.num_rows()).collect();
1271            assert_eq!(
1272                expected_output_sizes, actual_output_sizes,
1273                "Unexpected number of rows in output batches\n\
1274                Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1275            );
1276            let iter = expected_output_sizes
1277                .iter()
1278                .zip(output_batches.iter())
1279                .enumerate();
1280
1281            for (i, (expected_size, batch)) in iter {
1282                // compare the contents of the batch after normalization (using
1283                // `==` compares the underlying memory layout too)
1284                let expected_batch = expected_output.slice(starting_idx, *expected_size);
1285                let expected_batch = normalize_batch(expected_batch);
1286                let batch = normalize_batch(batch.clone());
1287                assert_eq!(
1288                    expected_batch, batch,
1289                    "Unexpected content in batch {i}:\
1290                    \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1291                );
1292                starting_idx += *expected_size;
1293            }
1294            output_batches
1295        }
1296
1297        /// Return the expected output schema. If not overridden by `with_schema`, it
1298        /// returns the schema of the first input batch.
1299        fn schema(&self) -> SchemaRef {
1300            self.schema
1301                .clone()
1302                .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1303        }
1304
1305        /// Returns the expected output as a single `RecordBatch`
1306        fn expected_output(&self) -> RecordBatch {
1307            let schema = self.schema();
1308            if self.filters.is_empty() {
1309                return concat_batches(&schema, &self.input_batches).unwrap();
1310            }
1311
1312            let mut filters = self.filters.iter();
1313            let filtered_batches = self
1314                .input_batches
1315                .iter()
1316                .map(|batch| {
1317                    if let Some(filter) = filters.next() {
1318                        filter_record_batch(batch, filter).unwrap()
1319                    } else {
1320                        batch.clone()
1321                    }
1322                })
1323                .collect::<Vec<_>>();
1324            concat_batches(&schema, &filtered_batches).unwrap()
1325        }
1326    }
1327
1328    /// Return a RecordBatch with a UInt32Array with the specified range and
1329    /// every third value is null.
1330    fn uint32_batch(range: Range<u32>) -> RecordBatch {
1331        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1332
1333        let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1334        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1335    }
1336
1337    /// Return a RecordBatch with a UInt32Array with no nulls specified range
1338    fn uint32_batch_non_null(range: Range<u32>) -> RecordBatch {
1339        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1340
1341        let array = UInt32Array::from_iter_values(range);
1342        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1343    }
1344
1345    /// Return a RecordBatch with a StringArrary with values `value0`, `value1`, ...
1346    /// and every third value is `None`.
1347    fn utf8_batch(range: Range<u32>) -> RecordBatch {
1348        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1349
1350        let array = StringArray::from_iter(range.map(|i| {
1351            if i % 3 == 0 {
1352                None
1353            } else {
1354                Some(format!("value{i}"))
1355            }
1356        }));
1357
1358        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1359    }
1360
1361    /// Return a RecordBatch with a StringViewArray with (only) the specified values
1362    fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1363        let schema = Arc::new(Schema::new(vec![Field::new(
1364            "c0",
1365            DataType::Utf8View,
1366            false,
1367        )]));
1368
1369        let array = StringViewArray::from_iter(values);
1370        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1371    }
1372
1373    /// Return a RecordBatch with a StringViewArray with num_rows by repeating
1374    /// values over and over.
1375    fn stringview_batch_repeated<'a>(
1376        num_rows: usize,
1377        values: impl IntoIterator<Item = Option<&'a str>>,
1378    ) -> RecordBatch {
1379        let schema = Arc::new(Schema::new(vec![Field::new(
1380            "c0",
1381            DataType::Utf8View,
1382            true,
1383        )]));
1384
1385        // Repeat the values to a total of num_rows
1386        let values: Vec<_> = values.into_iter().collect();
1387        let values_iter = std::iter::repeat(values.iter())
1388            .flatten()
1389            .cloned()
1390            .take(num_rows);
1391
1392        let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1393        for val in values_iter {
1394            builder.append_option(val);
1395        }
1396
1397        let array = builder.finish();
1398        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1399    }
1400
1401    /// Return a RecordBatch of 100 rows
1402    fn multi_column_batch(range: Range<i32>) -> RecordBatch {
1403        let int64_array = Int64Array::from_iter(
1404            range
1405                .clone()
1406                .map(|v| if v % 5 == 0 { None } else { Some(v as i64) }),
1407        );
1408        let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
1409            if v % 5 == 0 {
1410                None
1411            } else if v % 7 == 0 {
1412                Some(format!("This is a string longer than 12 bytes{v}"))
1413            } else {
1414                Some(format!("Short {v}"))
1415            }
1416        }));
1417        let string_array = StringArray::from_iter(range.clone().map(|v| {
1418            if v % 11 == 0 {
1419                None
1420            } else {
1421                Some(format!("Value {v}"))
1422            }
1423        }));
1424        let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
1425            if v % 3 == 0 {
1426                None
1427            } else {
1428                Some(v as i64 * 1000) // simulate a timestamp in milliseconds
1429            }
1430        }))
1431        .with_timezone("America/New_York");
1432
1433        RecordBatch::try_from_iter(vec![
1434            ("int64", Arc::new(int64_array) as ArrayRef),
1435            ("stringview", Arc::new(string_view_array) as ArrayRef),
1436            ("string", Arc::new(string_array) as ArrayRef),
1437            ("timestamp", Arc::new(timestamp_array) as ArrayRef),
1438        ])
1439        .unwrap()
1440    }
1441
1442    /// Return a boolean array that filters out randomly selected rows
1443    /// from the input batch with a `selectivity`.
1444    ///
1445    /// For example a `selectivity` of 0.1 will filter out
1446    /// 90% of the rows.
1447    #[derive(Debug)]
1448    struct RandomFilterBuilder {
1449        num_rows: usize,
1450        selectivity: f64,
1451        /// seed for random number generator, increases by one each time
1452        /// `next_filter` is called
1453        seed: u64,
1454    }
1455    impl RandomFilterBuilder {
1456        /// Build the next filter with the current seed and increment the seed
1457        /// by one.
1458        fn next_filter(&mut self) -> BooleanArray {
1459            assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
1460            let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
1461            self.seed += 1;
1462            BooleanArray::from_iter(
1463                (0..self.num_rows)
1464                    .map(|_| rng.random_bool(self.selectivity))
1465                    .map(Some),
1466            )
1467        }
1468    }
1469
1470    /// Returns the named column as a StringViewArray
1471    fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1472        batch
1473            .column_by_name(name)
1474            .expect("column not found")
1475            .as_string_view_opt()
1476            .expect("column is not a string view")
1477    }
1478
1479    /// Normalize the `RecordBatch` so that the memory layout is consistent
1480    /// (e.g. StringArray is compacted).
1481    fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1482        // Only need to normalize StringViews (as == also tests for memory layout)
1483        let (schema, mut columns, row_count) = batch.into_parts();
1484
1485        for column in columns.iter_mut() {
1486            let Some(string_view) = column.as_string_view_opt() else {
1487                continue;
1488            };
1489
1490            // Re-create the StringViewArray to ensure memory layout is
1491            // consistent
1492            let mut builder = StringViewBuilder::new();
1493            for s in string_view.iter() {
1494                builder.append_option(s);
1495            }
1496            // Update the column with the new StringViewArray
1497            *column = Arc::new(builder.finish());
1498        }
1499
1500        let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1501        RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1502    }
1503
1504    /// Helper function to create a test batch with specified number of rows
1505    fn create_test_batch(num_rows: usize) -> RecordBatch {
1506        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
1507        let array = Int32Array::from_iter_values(0..num_rows as i32);
1508        RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
1509    }
1510    #[test]
1511    fn test_biggest_coalesce_batch_size_none_default() {
1512        // Test that default behavior (None) coalesces all batches
1513        let mut coalescer = BatchCoalescer::new(
1514            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1515            100,
1516        );
1517
1518        // Push a large batch (1000 rows) - should be coalesced normally
1519        let large_batch = create_test_batch(1000);
1520        coalescer.push_batch(large_batch).unwrap();
1521
1522        // Should produce multiple batches of target size (100)
1523        let mut output_batches = vec![];
1524        while let Some(batch) = coalescer.next_completed_batch() {
1525            output_batches.push(batch);
1526        }
1527
1528        coalescer.finish_buffered_batch().unwrap();
1529        while let Some(batch) = coalescer.next_completed_batch() {
1530            output_batches.push(batch);
1531        }
1532
1533        // Should have 10 batches of 100 rows each
1534        assert_eq!(output_batches.len(), 10);
1535        for batch in output_batches {
1536            assert_eq!(batch.num_rows(), 100);
1537        }
1538    }
1539
1540    #[test]
1541    fn test_biggest_coalesce_batch_size_bypass_large_batch() {
1542        // Test that batches larger than biggest_coalesce_batch_size bypass coalescing
1543        let mut coalescer = BatchCoalescer::new(
1544            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1545            100,
1546        );
1547        coalescer.set_biggest_coalesce_batch_size(Some(500));
1548
1549        // Push a large batch (1000 rows) - should bypass coalescing
1550        let large_batch = create_test_batch(1000);
1551        coalescer.push_batch(large_batch.clone()).unwrap();
1552
1553        // Should have one completed batch immediately (the original large batch)
1554        assert!(coalescer.has_completed_batch());
1555        let output_batch = coalescer.next_completed_batch().unwrap();
1556        assert_eq!(output_batch.num_rows(), 1000);
1557
1558        // Should be no more completed batches
1559        assert!(!coalescer.has_completed_batch());
1560        assert_eq!(coalescer.get_buffered_rows(), 0);
1561    }
1562
1563    #[test]
1564    fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
1565        // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally
1566        let mut coalescer = BatchCoalescer::new(
1567            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1568            100,
1569        );
1570        coalescer.set_biggest_coalesce_batch_size(Some(500));
1571
1572        // Push small batches that should be coalesced
1573        let small_batch = create_test_batch(50);
1574        coalescer.push_batch(small_batch.clone()).unwrap();
1575
1576        // Should not have completed batch yet (only 50 rows, target is 100)
1577        assert!(!coalescer.has_completed_batch());
1578        assert_eq!(coalescer.get_buffered_rows(), 50);
1579
1580        // Push another small batch
1581        coalescer.push_batch(small_batch).unwrap();
1582
1583        // Now should have a completed batch (100 rows total)
1584        assert!(coalescer.has_completed_batch());
1585        let output_batch = coalescer.next_completed_batch().unwrap();
1586        assert_eq!(output_batch.num_rows(), 100);
1587
1588        assert_eq!(coalescer.get_buffered_rows(), 0);
1589    }
1590
1591    #[test]
1592    fn test_biggest_coalesce_batch_size_equal_boundary() {
1593        // Test behavior when batch size equals biggest_coalesce_batch_size
1594        let mut coalescer = BatchCoalescer::new(
1595            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1596            100,
1597        );
1598        coalescer.set_biggest_coalesce_batch_size(Some(500));
1599
1600        // Push a batch exactly equal to the limit
1601        let boundary_batch = create_test_batch(500);
1602        coalescer.push_batch(boundary_batch).unwrap();
1603
1604        // Should be coalesced (not bypass) since it's equal, not greater
1605        let mut output_count = 0;
1606        while coalescer.next_completed_batch().is_some() {
1607            output_count += 1;
1608        }
1609
1610        coalescer.finish_buffered_batch().unwrap();
1611        while coalescer.next_completed_batch().is_some() {
1612            output_count += 1;
1613        }
1614
1615        // Should have 5 batches of 100 rows each
1616        assert_eq!(output_count, 5);
1617    }
1618
1619    #[test]
1620    fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
1621        // Test the new consecutive large batch bypass behavior
1622        // Pattern: small batches -> first large batch (coalesced) -> consecutive large batches (bypass)
1623        let mut coalescer = BatchCoalescer::new(
1624            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1625            100,
1626        );
1627        coalescer.set_biggest_coalesce_batch_size(Some(200));
1628
1629        let small_batch = create_test_batch(50);
1630
1631        // Push small batch first to create buffered data
1632        coalescer.push_batch(small_batch).unwrap();
1633        assert_eq!(coalescer.get_buffered_rows(), 50);
1634        assert!(!coalescer.has_completed_batch());
1635
1636        // Push first large batch - should go through normal coalescing due to buffered data
1637        let large_batch1 = create_test_batch(250);
1638        coalescer.push_batch(large_batch1).unwrap();
1639
1640        // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered
1641        let mut completed_batches = vec![];
1642        while let Some(batch) = coalescer.next_completed_batch() {
1643            completed_batches.push(batch);
1644        }
1645        assert_eq!(completed_batches.len(), 3);
1646        assert_eq!(coalescer.get_buffered_rows(), 0);
1647
1648        // Now push consecutive large batches - they should bypass
1649        let large_batch2 = create_test_batch(300);
1650        let large_batch3 = create_test_batch(400);
1651
1652        // Push second large batch - should bypass since it's consecutive and buffer is empty
1653        coalescer.push_batch(large_batch2).unwrap();
1654        assert!(coalescer.has_completed_batch());
1655        let output = coalescer.next_completed_batch().unwrap();
1656        assert_eq!(output.num_rows(), 300); // bypassed with original size
1657        assert_eq!(coalescer.get_buffered_rows(), 0);
1658
1659        // Push third large batch - should also bypass
1660        coalescer.push_batch(large_batch3).unwrap();
1661        assert!(coalescer.has_completed_batch());
1662        let output = coalescer.next_completed_batch().unwrap();
1663        assert_eq!(output.num_rows(), 400); // bypassed with original size
1664        assert_eq!(coalescer.get_buffered_rows(), 0);
1665    }
1666
1667    #[test]
1668    fn test_biggest_coalesce_batch_size_empty_batch() {
1669        // Test that empty batches don't trigger the bypass logic
1670        let mut coalescer = BatchCoalescer::new(
1671            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1672            100,
1673        );
1674        coalescer.set_biggest_coalesce_batch_size(Some(50));
1675
1676        let empty_batch = create_test_batch(0);
1677        coalescer.push_batch(empty_batch).unwrap();
1678
1679        // Empty batch should be handled normally (no effect)
1680        assert!(!coalescer.has_completed_batch());
1681        assert_eq!(coalescer.get_buffered_rows(), 0);
1682    }
1683
1684    #[test]
1685    fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
1686        // Test that when there is buffered data, large batches do NOT bypass (unless consecutive)
1687        let mut coalescer = BatchCoalescer::new(
1688            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1689            100,
1690        );
1691        coalescer.set_biggest_coalesce_batch_size(Some(200));
1692
1693        // Add some buffered data first
1694        let small_batch = create_test_batch(30);
1695        coalescer.push_batch(small_batch.clone()).unwrap();
1696        coalescer.push_batch(small_batch).unwrap();
1697        assert_eq!(coalescer.get_buffered_rows(), 60);
1698
1699        // Push large batch that would normally bypass, but shouldn't because buffered_rows > 0
1700        let large_batch = create_test_batch(250);
1701        coalescer.push_batch(large_batch).unwrap();
1702
1703        // The large batch should be processed through normal coalescing logic
1704        // Total: 60 (buffered) + 250 (new) = 310 rows
1705        // Output: 3 complete batches of 100 rows each, 10 rows remain buffered
1706
1707        let mut completed_batches = vec![];
1708        while let Some(batch) = coalescer.next_completed_batch() {
1709            completed_batches.push(batch);
1710        }
1711
1712        assert_eq!(completed_batches.len(), 3);
1713        for batch in &completed_batches {
1714            assert_eq!(batch.num_rows(), 100);
1715        }
1716        assert_eq!(coalescer.get_buffered_rows(), 10);
1717    }
1718
1719    #[test]
1720    fn test_biggest_coalesce_batch_size_zero_limit() {
1721        // Test edge case where limit is 0 (all batches bypass when no buffered data)
1722        let mut coalescer = BatchCoalescer::new(
1723            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1724            100,
1725        );
1726        coalescer.set_biggest_coalesce_batch_size(Some(0));
1727
1728        // Even a 1-row batch should bypass when there's no buffered data
1729        let tiny_batch = create_test_batch(1);
1730        coalescer.push_batch(tiny_batch).unwrap();
1731
1732        assert!(coalescer.has_completed_batch());
1733        let output = coalescer.next_completed_batch().unwrap();
1734        assert_eq!(output.num_rows(), 1);
1735    }
1736
1737    #[test]
1738    fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
1739        // Test that bypass only occurs when buffered_rows == 0
1740        let mut coalescer = BatchCoalescer::new(
1741            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1742            100,
1743        );
1744        coalescer.set_biggest_coalesce_batch_size(Some(200));
1745
1746        // First, push a large batch with no buffered data - should bypass
1747        let large_batch = create_test_batch(300);
1748        coalescer.push_batch(large_batch.clone()).unwrap();
1749
1750        assert!(coalescer.has_completed_batch());
1751        let output = coalescer.next_completed_batch().unwrap();
1752        assert_eq!(output.num_rows(), 300); // bypassed
1753        assert_eq!(coalescer.get_buffered_rows(), 0);
1754
1755        // Now add some buffered data
1756        let small_batch = create_test_batch(50);
1757        coalescer.push_batch(small_batch).unwrap();
1758        assert_eq!(coalescer.get_buffered_rows(), 50);
1759
1760        // Push the same large batch again - should NOT bypass this time (not consecutive)
1761        coalescer.push_batch(large_batch).unwrap();
1762
1763        // Should process through normal coalescing: 50 + 300 = 350 rows
1764        // Output: 3 complete batches of 100 rows, 50 rows buffered
1765        let mut completed_batches = vec![];
1766        while let Some(batch) = coalescer.next_completed_batch() {
1767            completed_batches.push(batch);
1768        }
1769
1770        assert_eq!(completed_batches.len(), 3);
1771        for batch in &completed_batches {
1772            assert_eq!(batch.num_rows(), 100);
1773        }
1774        assert_eq!(coalescer.get_buffered_rows(), 50);
1775    }
1776
1777    #[test]
1778    fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
1779        // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600
1780        let mut coalescer = BatchCoalescer::new(
1781            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1782            1000,
1783        );
1784        coalescer.set_biggest_coalesce_batch_size(Some(500));
1785
1786        // Push small batches first
1787        coalescer.push_batch(create_test_batch(20)).unwrap();
1788        coalescer.push_batch(create_test_batch(20)).unwrap();
1789        coalescer.push_batch(create_test_batch(30)).unwrap();
1790
1791        assert_eq!(coalescer.get_buffered_rows(), 70);
1792        assert!(!coalescer.has_completed_batch());
1793
1794        // Push first large batch (700) - should coalesce due to buffered data
1795        coalescer.push_batch(create_test_batch(700)).unwrap();
1796
1797        // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered
1798        assert_eq!(coalescer.get_buffered_rows(), 770);
1799        assert!(!coalescer.has_completed_batch());
1800
1801        // Push second large batch (600) - should bypass since previous was large
1802        coalescer.push_batch(create_test_batch(600)).unwrap();
1803
1804        // Should flush buffer (770 rows) and bypass the 600
1805        let mut outputs = vec![];
1806        while let Some(batch) = coalescer.next_completed_batch() {
1807            outputs.push(batch);
1808        }
1809        assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one bypassed (600)
1810        assert_eq!(outputs[0].num_rows(), 770);
1811        assert_eq!(outputs[1].num_rows(), 600);
1812        assert_eq!(coalescer.get_buffered_rows(), 0);
1813
1814        // Push remaining large batches - should all bypass
1815        let remaining_batches = [700, 900, 700, 600];
1816        for &size in &remaining_batches {
1817            coalescer.push_batch(create_test_batch(size)).unwrap();
1818
1819            assert!(coalescer.has_completed_batch());
1820            let output = coalescer.next_completed_batch().unwrap();
1821            assert_eq!(output.num_rows(), size);
1822            assert_eq!(coalescer.get_buffered_rows(), 0);
1823        }
1824    }
1825
1826    #[test]
1827    fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
1828        // Test truly consecutive large batches that should all bypass
1829        // This test ensures buffer is completely empty between large batches
1830        let mut coalescer = BatchCoalescer::new(
1831            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1832            100,
1833        );
1834        coalescer.set_biggest_coalesce_batch_size(Some(200));
1835
1836        // Push consecutive large batches with no prior buffered data
1837        let large_batches = vec![
1838            create_test_batch(300),
1839            create_test_batch(400),
1840            create_test_batch(350),
1841            create_test_batch(500),
1842        ];
1843
1844        let mut all_outputs = vec![];
1845
1846        for (i, large_batch) in large_batches.into_iter().enumerate() {
1847            let expected_size = large_batch.num_rows();
1848
1849            // Buffer should be empty before each large batch
1850            assert_eq!(
1851                coalescer.get_buffered_rows(),
1852                0,
1853                "Buffer should be empty before batch {}",
1854                i
1855            );
1856
1857            coalescer.push_batch(large_batch).unwrap();
1858
1859            // Each large batch should bypass and produce exactly one output batch
1860            assert!(
1861                coalescer.has_completed_batch(),
1862                "Should have completed batch after pushing batch {}",
1863                i
1864            );
1865
1866            let output = coalescer.next_completed_batch().unwrap();
1867            assert_eq!(
1868                output.num_rows(),
1869                expected_size,
1870                "Batch {} should have bypassed with original size",
1871                i
1872            );
1873
1874            // Should be no more batches and buffer should be empty
1875            assert!(
1876                !coalescer.has_completed_batch(),
1877                "Should have no more completed batches after batch {}",
1878                i
1879            );
1880            assert_eq!(
1881                coalescer.get_buffered_rows(),
1882                0,
1883                "Buffer should be empty after batch {}",
1884                i
1885            );
1886
1887            all_outputs.push(output);
1888        }
1889
1890        // Verify we got exactly 4 output batches with original sizes
1891        assert_eq!(all_outputs.len(), 4);
1892        assert_eq!(all_outputs[0].num_rows(), 300);
1893        assert_eq!(all_outputs[1].num_rows(), 400);
1894        assert_eq!(all_outputs[2].num_rows(), 350);
1895        assert_eq!(all_outputs[3].num_rows(), 500);
1896    }
1897
1898    #[test]
1899    fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
1900        // Test that small batches reset the consecutive large batch tracking
1901        let mut coalescer = BatchCoalescer::new(
1902            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1903            100,
1904        );
1905        coalescer.set_biggest_coalesce_batch_size(Some(200));
1906
1907        // Push first large batch - should bypass (no buffered data)
1908        coalescer.push_batch(create_test_batch(300)).unwrap();
1909        let output = coalescer.next_completed_batch().unwrap();
1910        assert_eq!(output.num_rows(), 300);
1911
1912        // Push second large batch - should bypass (consecutive)
1913        coalescer.push_batch(create_test_batch(400)).unwrap();
1914        let output = coalescer.next_completed_batch().unwrap();
1915        assert_eq!(output.num_rows(), 400);
1916
1917        // Push small batch - resets consecutive tracking
1918        coalescer.push_batch(create_test_batch(50)).unwrap();
1919        assert_eq!(coalescer.get_buffered_rows(), 50);
1920
1921        // Push large batch again - should NOT bypass due to buffered data
1922        coalescer.push_batch(create_test_batch(350)).unwrap();
1923
1924        // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100
1925        let mut outputs = vec![];
1926        while let Some(batch) = coalescer.next_completed_batch() {
1927            outputs.push(batch);
1928        }
1929        assert_eq!(outputs.len(), 4);
1930        for batch in outputs {
1931            assert_eq!(batch.num_rows(), 100);
1932        }
1933        assert_eq!(coalescer.get_buffered_rows(), 0);
1934    }
1935}