arrow_select/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`]  concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use crate::take::take_record_batch;
25use arrow_array::types::{BinaryViewType, StringViewType};
26use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
27use arrow_schema::{ArrowError, DataType, SchemaRef};
28use std::collections::VecDeque;
29use std::sync::Arc;
30// Originally From DataFusion's coalesce module:
31// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
32
33mod byte_view;
34mod generic;
35mod primitive;
36
37use byte_view::InProgressByteViewArray;
38use generic::GenericInProgressArray;
39use primitive::InProgressPrimitiveArray;
40
41/// Concatenate multiple [`RecordBatch`]es
42///
43/// Implements the common pattern of incrementally creating output
44/// [`RecordBatch`]es of a specific size from an input stream of
45/// [`RecordBatch`]es.
46///
47/// This is useful after operations such as [`filter`] and [`take`] that produce
48/// smaller batches, and we want to coalesce them into larger batches for
49/// further processing.
50///
51/// # Motivation
52///
53/// If we use [`concat_batches`] to implement the same functionality, there are 2 potential issues:
54/// 1. At least 2x peak memory (holding the input and output of concat)
55/// 2. 2 copies of the data (to create the output of filter and then create the output of concat)
56///
57/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
58/// about the motivation.
59///
60/// [`filter`]: crate::filter::filter
61/// [`take`]: crate::take::take
62/// [`concat_batches`]: crate::concat::concat_batches
63///
64/// # Example
65/// ```
66/// use arrow_array::record_batch;
67/// use arrow_select::coalesce::{BatchCoalescer};
68/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
69/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
70///
71/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
72/// let target_batch_size = 4;
73/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
74///
75/// // push the batches
76/// coalescer.push_batch(batch1).unwrap();
77/// // only pushed 3 rows (not yet 4, enough to produce a batch)
78/// assert!(coalescer.next_completed_batch().is_none());
79/// coalescer.push_batch(batch2).unwrap();
80/// // now we have 5 rows, so we can produce a batch
81/// let finished = coalescer.next_completed_batch().unwrap();
82/// // 4 rows came out (target batch size is 4)
83/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
84/// assert_eq!(finished, expected);
85///
86/// // Have no more input, but still have an in-progress batch
87/// assert!(coalescer.next_completed_batch().is_none());
88/// // We can finish the batch, which will produce the remaining rows
89/// coalescer.finish_buffered_batch().unwrap();
90/// let expected = record_batch!(("a", Int32, [5])).unwrap();
91/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
92///
93/// // The coalescer is now empty
94/// assert!(coalescer.next_completed_batch().is_none());
95/// ```
96///
97/// # Background
98///
99/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
100/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
101/// there is fixed processing overhead per batch. This coalescer builds up these
102/// larger batches incrementally.
103///
104/// ```text
105/// ┌────────────────────┐
106/// │    RecordBatch     │
107/// │   num_rows = 100   │
108/// └────────────────────┘                 ┌────────────────────┐
109///                                        │                    │
110/// ┌────────────────────┐     Coalesce    │                    │
111/// │                    │      Batches    │                    │
112/// │    RecordBatch     │                 │                    │
113/// │   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
114/// │                    │                 │    RecordBatch     │
115/// │                    │                 │   num_rows = 400   │
116/// └────────────────────┘                 │                    │
117///                                        │                    │
118/// ┌────────────────────┐                 │                    │
119/// │                    │                 │                    │
120/// │    RecordBatch     │                 │                    │
121/// │   num_rows = 100   │                 └────────────────────┘
122/// │                    │
123/// └────────────────────┘
124/// ```
125///
126/// # Notes:
127///
128/// 1. Output rows are produced in the same order as the input rows
129///
130/// 2. The output is a sequence of batches, with all but the last being at exactly
131///    `target_batch_size` rows.
132#[derive(Debug)]
133pub struct BatchCoalescer {
134    /// The input schema
135    schema: SchemaRef,
136    /// The target batch size (and thus size for views allocation). This is a
137    /// hard limit: the output batch will be exactly `target_batch_size`,
138    /// rather than possibly being slightly above.
139    target_batch_size: usize,
140    /// In-progress arrays
141    in_progress_arrays: Vec<Box<dyn InProgressArray>>,
142    /// Buffered row count. Always less than `batch_size`
143    buffered_rows: usize,
144    /// Completed batches
145    completed: VecDeque<RecordBatch>,
146    /// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`]
147    biggest_coalesce_batch_size: Option<usize>,
148}
149
150impl BatchCoalescer {
151    /// Create a new `BatchCoalescer`
152    ///
153    /// # Arguments
154    /// - `schema` - the schema of the output batches
155    /// - `target_batch_size` - the number of rows in each output batch.
156    ///   Typical values are `4096` or `8192` rows.
157    ///
158    pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
159        let in_progress_arrays = schema
160            .fields()
161            .iter()
162            .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
163            .collect::<Vec<_>>();
164
165        Self {
166            schema,
167            target_batch_size,
168            in_progress_arrays,
169            // We will for sure store at least one completed batch
170            completed: VecDeque::with_capacity(1),
171            buffered_rows: 0,
172            biggest_coalesce_batch_size: None,
173        }
174    }
175
176    /// Set the coalesce batch size limit (default `None`)
177    ///
178    /// This limit determine when batches should bypass coalescing. Intuitively,
179    /// batches that are already large are costly to coalesce and are efficient
180    /// enough to process directly without coalescing.
181    ///
182    /// If `Some(limit)`, batches larger than this limit will bypass coalescing
183    /// when there is no buffered data, or when the previously buffered data
184    /// already exceeds this limit.
185    ///
186    /// If `None`, all batches will be coalesced according to the
187    /// target_batch_size.
188    pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
189        self.biggest_coalesce_batch_size = limit;
190        self
191    }
192
193    /// Get the current biggest coalesce batch size limit
194    ///
195    /// See [`Self::with_biggest_coalesce_batch_size`] for details
196    pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
197        self.biggest_coalesce_batch_size
198    }
199
200    /// Set the biggest coalesce batch size limit
201    ///
202    /// See [`Self::with_biggest_coalesce_batch_size`] for details
203    pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
204        self.biggest_coalesce_batch_size = limit;
205    }
206
207    /// Return the schema of the output batches
208    pub fn schema(&self) -> SchemaRef {
209        Arc::clone(&self.schema)
210    }
211
212    /// Push a batch into the Coalescer after applying a filter
213    ///
214    /// This is semantically equivalent of calling [`Self::push_batch`]
215    /// with the results from  [`filter_record_batch`]
216    ///
217    /// # Example
218    /// ```
219    /// # use arrow_array::{record_batch, BooleanArray};
220    /// # use arrow_select::coalesce::BatchCoalescer;
221    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
222    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
223    /// // Apply a filter to each batch to pick the first and last row
224    /// let filter = BooleanArray::from(vec![true, false, true]);
225    /// // create a new Coalescer that targets creating 1000 row batches
226    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
227    /// coalescer.push_batch_with_filter(batch1, &filter);
228    /// coalescer.push_batch_with_filter(batch2, &filter);
229    /// // finsh and retrieve the created batch
230    /// coalescer.finish_buffered_batch().unwrap();
231    /// let completed_batch = coalescer.next_completed_batch().unwrap();
232    /// // filtered out 2 and 5:
233    /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
234    /// assert_eq!(completed_batch, expected_batch);
235    /// ```
236    pub fn push_batch_with_filter(
237        &mut self,
238        batch: RecordBatch,
239        filter: &BooleanArray,
240    ) -> Result<(), ArrowError> {
241        // TODO: optimize this to avoid materializing (copying the results
242        // of filter to a new batch)
243        let filtered_batch = filter_record_batch(&batch, filter)?;
244        self.push_batch(filtered_batch)
245    }
246
247    /// Push a batch into the Coalescer after applying a set of indices
248    /// This is semantically equivalent of calling [`Self::push_batch`]
249    /// with the results from  [`take_record_batch`]
250    ///
251    /// # Example
252    /// ```
253    /// # use arrow_array::{record_batch, UInt64Array};
254    /// # use arrow_select::coalesce::BatchCoalescer;
255    /// let batch1 = record_batch!(("a", Int32, [0, 0, 0])).unwrap();
256    /// let batch2 = record_batch!(("a", Int32, [1, 1, 4, 5, 1, 4])).unwrap();
257    /// // Sorted indices to create a sorted output, this can be obtained with
258    /// // `arrow-ord`'s sort_to_indices operation
259    /// let indices = UInt64Array::from(vec![0, 1, 4, 2, 5, 3]);
260    /// // create a new Coalescer that targets creating 1000 row batches
261    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
262    /// coalescer.push_batch(batch1);
263    /// coalescer.push_batch_with_indices(batch2, &indices);
264    /// // finsh and retrieve the created batch
265    /// coalescer.finish_buffered_batch().unwrap();
266    /// let completed_batch = coalescer.next_completed_batch().unwrap();
267    /// let expected_batch = record_batch!(("a", Int32, [0, 0, 0, 1, 1, 1, 4, 4, 5])).unwrap();
268    /// assert_eq!(completed_batch, expected_batch);
269    /// ```
270    pub fn push_batch_with_indices(
271        &mut self,
272        batch: RecordBatch,
273        indices: &dyn Array,
274    ) -> Result<(), ArrowError> {
275        // todo: optimize this to avoid materializing (copying the results of take indices to a new batch)
276        let taken_batch = take_record_batch(&batch, indices)?;
277        self.push_batch(taken_batch)
278    }
279
280    /// Push all the rows from `batch` into the Coalescer
281    ///
282    /// When buffered data plus incoming rows reach `target_batch_size` ,
283    /// completed batches are generated eagerly and can be retrieved via
284    /// [`Self::next_completed_batch()`].
285    /// Output batches contain exactly `target_batch_size` rows, so the tail of
286    /// the input batch may remain buffered.
287    /// Remaining partial data either waits for future input batches or can be
288    /// materialized immediately by calling [`Self::finish_buffered_batch()`].
289    ///
290    /// # Example
291    /// ```
292    /// # use arrow_array::record_batch;
293    /// # use arrow_select::coalesce::BatchCoalescer;
294    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
295    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
296    /// // create a new Coalescer that targets creating 1000 row batches
297    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
298    /// coalescer.push_batch(batch1);
299    /// coalescer.push_batch(batch2);
300    /// // finsh and retrieve the created batch
301    /// coalescer.finish_buffered_batch().unwrap();
302    /// let completed_batch = coalescer.next_completed_batch().unwrap();
303    /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
304    /// assert_eq!(completed_batch, expected_batch);
305    /// ```
306    pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
307        // Large batch bypass optimization:
308        // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
309        // we can avoid expensive split-and-merge operations by passing it through directly.
310        //
311        // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
312        // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
313        // If not set (None), ALL batches follow normal coalescing behavior regardless of size.
314
315        // =============================================================================
316        // CASE 1: No buffer + large batch → Direct bypass
317        // =============================================================================
318        // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
319        // Input sequence: [600, 1200, 300]
320        //
321        // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
322        //   600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
323        //        → output: [600] (bypass, preserves large batch)
324        //   1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
325        //         → output: [1200] (bypass, preserves large batch)
326        //   300 → normal batch, buffer: [300]
327        //   Result: [600], [1200], [300] - large batches preserved, mixed sizes
328
329        // =============================================================================
330        // CASE 2: Buffer too large + large batch → Flush first, then bypass
331        // =============================================================================
332        // This case prevents creating extremely large merged batches that would
333        // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
334        //
335        // Example 1: Buffer exceeds limit before large batch arrives
336        // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
337        // Input: [350, 200, 800]
338        //
339        // Step 1: push_batch([350])
340        //   → batch_size=350 <= 400, normal path
341        //   → buffer: [350], buffered_rows=350
342        //
343        // Step 2: push_batch([200])
344        //   → batch_size=200 <= 400, normal path
345        //   → buffer: [350, 200], buffered_rows=550
346        //
347        // Step 3: push_batch([800])
348        //   → batch_size=800 > 400, large batch path
349        //   → buffered_rows=550 > 400 → Case 2: flush first
350        //   → flush: output [550] (combined [350, 200])
351        //   → then bypass: output [800]
352        //   Result: [550], [800] - buffer flushed to prevent oversized merge
353        //
354        // Example 2: Multiple small batches accumulate before large batch
355        // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
356        // Input: [150, 100, 80, 900]
357        //
358        // Step 1-3: Accumulate small batches
359        //   150 → buffer: [150], buffered_rows=150
360        //   100 → buffer: [150, 100], buffered_rows=250
361        //   80  → buffer: [150, 100, 80], buffered_rows=330
362        //
363        // Step 4: push_batch([900])
364        //   → batch_size=900 > 300, large batch path
365        //   → buffered_rows=330 > 300 → Case 2: flush first
366        //   → flush: output [330] (combined [150, 100, 80])
367        //   → then bypass: output [900]
368        //   Result: [330], [900] - prevents merge into [1230] which would be too large
369
370        // =============================================================================
371        // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
372        // =============================================================================
373        // When buffer is small enough, we still merge to maintain efficiency
374        // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
375        // Input: [300, 1200]
376        //
377        // Step 1: push_batch([300])
378        //   → batch_size=300 <= 500, normal path
379        //   → buffer: [300], buffered_rows=300
380        //
381        // Step 2: push_batch([1200])
382        //   → batch_size=1200 > 500, large batch path
383        //   → buffered_rows=300 <= 500 → Case 3: normal merge
384        //   → buffer: [300, 1200] (1500 total)
385        //   → 1500 > target_batch_size → split: output [1000], buffer [500]
386        //   Result: [1000], [500] - normal split/merge behavior maintained
387
388        // =============================================================================
389        // Comparison: Default vs Optimized Behavior
390        // =============================================================================
391        // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
392        // Input: [600, 1200, 300]
393        //
394        // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
395        //   600 → buffer: [600]
396        //   1200 → buffer: [600, 1200] (1800 rows total)
397        //         → split: output [1000 rows], buffer [800 rows remaining]
398        //   300 → buffer: [800, 300] (1100 rows total)
399        //        → split: output [1000 rows], buffer [100 rows remaining]
400        //   Result: [1000], [1000], [100] - all outputs respect target_batch_size
401        //
402        // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
403        //   600 → Case 1: direct bypass → output: [600]
404        //   1200 → Case 1: direct bypass → output: [1200]
405        //   300 → normal path → buffer: [300]
406        //   Result: [600], [1200], [300] - large batches preserved
407
408        // =============================================================================
409        // Benefits and Trade-offs
410        // =============================================================================
411        // Benefits of the optimization:
412        // - Large batches stay intact (better for downstream vectorized processing)
413        // - Fewer split/merge operations (better CPU performance)
414        // - More predictable memory usage patterns
415        // - Maintains streaming efficiency while preserving batch boundaries
416        //
417        // Trade-offs:
418        // - Output batch sizes become variable (not always target_batch_size)
419        // - May produce smaller partial batches when flushing before large batches
420        // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
421
422        // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
423        // small batches together?
424
425        let batch_size = batch.num_rows();
426
427        // Fast path: skip empty batches
428        if batch_size == 0 {
429            return Ok(());
430        }
431
432        // Large batch optimization: bypass coalescing for oversized batches
433        if let Some(limit) = self.biggest_coalesce_batch_size {
434            if batch_size > limit {
435                // Case 1: No buffered data - emit large batch directly
436                // Example: [] + [1200] → output [1200], buffer []
437                if self.buffered_rows == 0 {
438                    self.completed.push_back(batch);
439                    return Ok(());
440                }
441
442                // Case 2: Buffer too large - flush then emit to avoid oversized merge
443                // Example: [850] + [1200] → output [850], then output [1200]
444                // This prevents creating batches much larger than both target_batch_size
445                // and biggest_coalesce_batch_size, which could cause memory issues
446                if self.buffered_rows > limit {
447                    self.finish_buffered_batch()?;
448                    self.completed.push_back(batch);
449                    return Ok(());
450                }
451
452                // Case 3: Small buffer - proceed with normal coalescing
453                // Example: [300] + [1200] → split and merge normally
454                // This ensures small batches still get properly coalesced
455                // while allowing some controlled growth beyond the limit
456            }
457        }
458
459        let (_schema, arrays, mut num_rows) = batch.into_parts();
460
461        // setup input rows
462        assert_eq!(arrays.len(), self.in_progress_arrays.len());
463        self.in_progress_arrays
464            .iter_mut()
465            .zip(arrays)
466            .for_each(|(in_progress, array)| {
467                in_progress.set_source(Some(array));
468            });
469
470        // If pushing this batch would exceed the target batch size,
471        // finish the current batch and start a new one
472        let mut offset = 0;
473        while num_rows > (self.target_batch_size - self.buffered_rows) {
474            let remaining_rows = self.target_batch_size - self.buffered_rows;
475            debug_assert!(remaining_rows > 0);
476
477            // Copy remaining_rows from each array
478            for in_progress in self.in_progress_arrays.iter_mut() {
479                in_progress.copy_rows(offset, remaining_rows)?;
480            }
481
482            self.buffered_rows += remaining_rows;
483            offset += remaining_rows;
484            num_rows -= remaining_rows;
485
486            self.finish_buffered_batch()?;
487        }
488
489        // Add any the remaining rows to the buffer
490        self.buffered_rows += num_rows;
491        if num_rows > 0 {
492            for in_progress in self.in_progress_arrays.iter_mut() {
493                in_progress.copy_rows(offset, num_rows)?;
494            }
495        }
496
497        // If we have reached the target batch size, finalize the buffered batch
498        if self.buffered_rows >= self.target_batch_size {
499            self.finish_buffered_batch()?;
500        }
501
502        // clear in progress sources (to allow the memory to be freed)
503        for in_progress in self.in_progress_arrays.iter_mut() {
504            in_progress.set_source(None);
505        }
506
507        Ok(())
508    }
509
510    /// Returns the number of buffered rows
511    pub fn get_buffered_rows(&self) -> usize {
512        self.buffered_rows
513    }
514
515    /// Concatenates any buffered batches into a single `RecordBatch` and
516    /// clears any output buffers
517    ///
518    /// Normally this is called when the input stream is exhausted, and
519    /// we want to finalize the last batch of rows.
520    ///
521    /// See [`Self::next_completed_batch()`] for the completed batches.
522    pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
523        if self.buffered_rows == 0 {
524            return Ok(());
525        }
526        let new_arrays = self
527            .in_progress_arrays
528            .iter_mut()
529            .map(|array| array.finish())
530            .collect::<Result<Vec<_>, ArrowError>>()?;
531
532        for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
533            debug_assert_eq!(array.data_type(), field.data_type());
534            debug_assert_eq!(array.len(), self.buffered_rows);
535        }
536
537        // SAFETY: each array was created of the correct type and length.
538        let batch = unsafe {
539            RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
540        };
541
542        self.buffered_rows = 0;
543        self.completed.push_back(batch);
544        Ok(())
545    }
546
547    /// Returns true if there is any buffered data
548    pub fn is_empty(&self) -> bool {
549        self.buffered_rows == 0 && self.completed.is_empty()
550    }
551
552    /// Returns true if there are any completed batches
553    pub fn has_completed_batch(&self) -> bool {
554        !self.completed.is_empty()
555    }
556
557    /// Removes and returns the next completed batch, if any.
558    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
559        self.completed.pop_front()
560    }
561}
562
563/// Return a new `InProgressArray` for the given data type
564fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
565    macro_rules! instantiate_primitive {
566        ($t:ty) => {
567            Box::new(InProgressPrimitiveArray::<$t>::new(
568                batch_size,
569                data_type.clone(),
570            ))
571        };
572    }
573
574    downcast_primitive! {
575        // Instantiate InProgressPrimitiveArray for each primitive type
576        data_type => (instantiate_primitive),
577        DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
578        DataType::BinaryView => {
579            Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
580        }
581        _ => Box::new(GenericInProgressArray::new()),
582    }
583}
584
585/// Incrementally builds up arrays
586///
587/// [`GenericInProgressArray`] is the default implementation that buffers
588/// arrays and uses other kernels concatenates them when finished.
589///
590/// Some types have specialized implementations for this array types (e.g.,
591/// [`StringViewArray`], etc.).
592///
593/// [`StringViewArray`]: arrow_array::StringViewArray
594trait InProgressArray: std::fmt::Debug + Send + Sync {
595    /// Set the source array.
596    ///
597    /// Calls to [`Self::copy_rows`] will copy rows from this array into the
598    /// current in-progress array
599    fn set_source(&mut self, source: Option<ArrayRef>);
600
601    /// Copy rows from the current source array into the in-progress array
602    ///
603    /// The source array is set by [`Self::set_source`].
604    ///
605    /// Return an error if the source array is not set
606    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
607
608    /// Finish the currently in-progress array and return it as an `ArrayRef`
609    fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use crate::concat::concat_batches;
616    use arrow_array::builder::StringViewBuilder;
617    use arrow_array::cast::AsArray;
618    use arrow_array::{
619        BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
620        TimestampNanosecondArray, UInt32Array, UInt64Array,
621    };
622    use arrow_schema::{DataType, Field, Schema};
623    use rand::{Rng, SeedableRng};
624    use std::ops::Range;
625
626    #[test]
627    fn test_coalesce() {
628        let batch = uint32_batch(0..8);
629        Test::new()
630            .with_batches(std::iter::repeat_n(batch, 10))
631            // expected output is exactly 21 rows (except for the final batch)
632            .with_batch_size(21)
633            .with_expected_output_sizes(vec![21, 21, 21, 17])
634            .run();
635    }
636
637    #[test]
638    fn test_coalesce_one_by_one() {
639        let batch = uint32_batch(0..1); // single row input
640        Test::new()
641            .with_batches(std::iter::repeat_n(batch, 97))
642            // expected output is exactly 20 rows (except for the final batch)
643            .with_batch_size(20)
644            .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
645            .run();
646    }
647
648    #[test]
649    fn test_coalesce_empty() {
650        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
651
652        Test::new()
653            .with_batches(vec![])
654            .with_schema(schema)
655            .with_batch_size(21)
656            .with_expected_output_sizes(vec![])
657            .run();
658    }
659
660    #[test]
661    fn test_single_large_batch_greater_than_target() {
662        // test a single large batch
663        let batch = uint32_batch(0..4096);
664        Test::new()
665            .with_batch(batch)
666            .with_batch_size(1000)
667            .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
668            .run();
669    }
670
671    #[test]
672    fn test_single_large_batch_smaller_than_target() {
673        // test a single large batch
674        let batch = uint32_batch(0..4096);
675        Test::new()
676            .with_batch(batch)
677            .with_batch_size(8192)
678            .with_expected_output_sizes(vec![4096])
679            .run();
680    }
681
682    #[test]
683    fn test_single_large_batch_equal_to_target() {
684        // test a single large batch
685        let batch = uint32_batch(0..4096);
686        Test::new()
687            .with_batch(batch)
688            .with_batch_size(4096)
689            .with_expected_output_sizes(vec![4096])
690            .run();
691    }
692
693    #[test]
694    fn test_single_large_batch_equally_divisible_in_target() {
695        // test a single large batch
696        let batch = uint32_batch(0..4096);
697        Test::new()
698            .with_batch(batch)
699            .with_batch_size(1024)
700            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
701            .run();
702    }
703
704    #[test]
705    fn test_empty_schema() {
706        let schema = Schema::empty();
707        let batch = RecordBatch::new_empty(schema.into());
708        Test::new()
709            .with_batch(batch)
710            .with_expected_output_sizes(vec![])
711            .run();
712    }
713
714    /// Coalesce multiple batches, 80k rows, with a 0.1% selectivity filter
715    #[test]
716    fn test_coalesce_filtered_001() {
717        let mut filter_builder = RandomFilterBuilder {
718            num_rows: 8000,
719            selectivity: 0.001,
720            seed: 0,
721        };
722
723        // add 10 batches of 8000 rows each
724        // 80k rows, selecting 0.1% means 80 rows
725        // not exactly 80 as the rows are random;
726        let mut test = Test::new();
727        for _ in 0..10 {
728            test = test
729                .with_batch(multi_column_batch(0..8000))
730                .with_filter(filter_builder.next_filter())
731        }
732        test.with_batch_size(15)
733            .with_expected_output_sizes(vec![15, 15, 15, 13])
734            .run();
735    }
736
737    /// Coalesce multiple batches, 80k rows, with a 1% selectivity filter
738    #[test]
739    fn test_coalesce_filtered_01() {
740        let mut filter_builder = RandomFilterBuilder {
741            num_rows: 8000,
742            selectivity: 0.01,
743            seed: 0,
744        };
745
746        // add 10 batches of 8000 rows each
747        // 80k rows, selecting 1% means 800 rows
748        // not exactly 800 as the rows are random;
749        let mut test = Test::new();
750        for _ in 0..10 {
751            test = test
752                .with_batch(multi_column_batch(0..8000))
753                .with_filter(filter_builder.next_filter())
754        }
755        test.with_batch_size(128)
756            .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
757            .run();
758    }
759
760    /// Coalesce multiple batches, 80k rows, with a 10% selectivity filter
761    #[test]
762    fn test_coalesce_filtered_1() {
763        let mut filter_builder = RandomFilterBuilder {
764            num_rows: 8000,
765            selectivity: 0.1,
766            seed: 0,
767        };
768
769        // add 10 batches of 8000 rows each
770        // 80k rows, selecting 10% means 8000 rows
771        // not exactly 800 as the rows are random;
772        let mut test = Test::new();
773        for _ in 0..10 {
774            test = test
775                .with_batch(multi_column_batch(0..8000))
776                .with_filter(filter_builder.next_filter())
777        }
778        test.with_batch_size(1024)
779            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
780            .run();
781    }
782
783    /// Coalesce multiple batches, 8k rows, with a 90% selectivity filter
784    #[test]
785    fn test_coalesce_filtered_90() {
786        let mut filter_builder = RandomFilterBuilder {
787            num_rows: 800,
788            selectivity: 0.90,
789            seed: 0,
790        };
791
792        // add 10 batches of 800 rows each
793        // 8k rows, selecting 99% means 7200 rows
794        // not exactly 7200 as the rows are random;
795        let mut test = Test::new();
796        for _ in 0..10 {
797            test = test
798                .with_batch(multi_column_batch(0..800))
799                .with_filter(filter_builder.next_filter())
800        }
801        test.with_batch_size(1024)
802            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
803            .run();
804    }
805
806    #[test]
807    fn test_coalesce_non_null() {
808        Test::new()
809            // 4040 rows of unit32
810            .with_batch(uint32_batch_non_null(0..3000))
811            .with_batch(uint32_batch_non_null(0..1040))
812            .with_batch_size(1024)
813            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
814            .run();
815    }
816    #[test]
817    fn test_utf8_split() {
818        Test::new()
819            // 4040 rows of utf8 strings in total, split into batches of 1024
820            .with_batch(utf8_batch(0..3000))
821            .with_batch(utf8_batch(0..1040))
822            .with_batch_size(1024)
823            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
824            .run();
825    }
826
827    #[test]
828    fn test_string_view_no_views() {
829        let output_batches = Test::new()
830            // both input batches have no views, so no need to compact
831            .with_batch(stringview_batch([Some("foo"), Some("bar")]))
832            .with_batch(stringview_batch([Some("baz"), Some("qux")]))
833            .with_expected_output_sizes(vec![4])
834            .run();
835
836        expect_buffer_layout(
837            col_as_string_view("c0", output_batches.first().unwrap()),
838            vec![],
839        );
840    }
841
842    #[test]
843    fn test_string_view_batch_small_no_compact() {
844        // view with only short strings (no buffers) --> no need to compact
845        let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
846        let output_batches = Test::new()
847            .with_batch(batch.clone())
848            .with_expected_output_sizes(vec![1000])
849            .run();
850
851        let array = col_as_string_view("c0", &batch);
852        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
853        assert_eq!(array.data_buffers().len(), 0);
854        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
855
856        expect_buffer_layout(gc_array, vec![]);
857    }
858
859    #[test]
860    fn test_string_view_batch_large_no_compact() {
861        // view with large strings (has buffers) but full --> no need to compact
862        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
863        let output_batches = Test::new()
864            .with_batch(batch.clone())
865            .with_batch_size(1000)
866            .with_expected_output_sizes(vec![1000])
867            .run();
868
869        let array = col_as_string_view("c0", &batch);
870        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
871        assert_eq!(array.data_buffers().len(), 5);
872        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
873
874        expect_buffer_layout(
875            gc_array,
876            vec![
877                ExpectedLayout {
878                    len: 8190,
879                    capacity: 8192,
880                },
881                ExpectedLayout {
882                    len: 8190,
883                    capacity: 8192,
884                },
885                ExpectedLayout {
886                    len: 8190,
887                    capacity: 8192,
888                },
889                ExpectedLayout {
890                    len: 8190,
891                    capacity: 8192,
892                },
893                ExpectedLayout {
894                    len: 2240,
895                    capacity: 8192,
896                },
897            ],
898        );
899    }
900
901    #[test]
902    fn test_string_view_batch_small_with_buffers_no_compact() {
903        // view with buffers but only short views
904        let short_strings = std::iter::repeat(Some("SmallString"));
905        let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
906        // 20 short strings, then a long ones
907        let values = short_strings.take(20).chain(long_strings);
908        let batch = stringview_batch_repeated(1000, values)
909            // take only 10 short strings (no long ones)
910            .slice(5, 10);
911        let output_batches = Test::new()
912            .with_batch(batch.clone())
913            .with_batch_size(1000)
914            .with_expected_output_sizes(vec![10])
915            .run();
916
917        let array = col_as_string_view("c0", &batch);
918        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
919        assert_eq!(array.data_buffers().len(), 1); // input has one buffer
920        assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
921    }
922
923    #[test]
924    fn test_string_view_batch_large_slice_compact() {
925        // view with large strings (has buffers) and only partially used  --> no need to compact
926        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
927            // slice only 22 rows, so most of the buffer is not used
928            .slice(11, 22);
929
930        let output_batches = Test::new()
931            .with_batch(batch.clone())
932            .with_batch_size(1000)
933            .with_expected_output_sizes(vec![22])
934            .run();
935
936        let array = col_as_string_view("c0", &batch);
937        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
938        assert_eq!(array.data_buffers().len(), 5);
939
940        expect_buffer_layout(
941            gc_array,
942            vec![ExpectedLayout {
943                len: 770,
944                capacity: 8192,
945            }],
946        );
947    }
948
949    #[test]
950    fn test_string_view_mixed() {
951        let large_view_batch =
952            stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
953        let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
954        let mixed_batch = stringview_batch_repeated(
955            1000,
956            [Some("This string is longer than 12 bytes"), Some("Small")],
957        );
958        let mixed_batch_nulls = stringview_batch_repeated(
959            1000,
960            [
961                Some("This string is longer than 12 bytes"),
962                Some("Small"),
963                None,
964            ],
965        );
966
967        // Several batches with mixed inline / non inline
968        // 4k rows in
969        let output_batches = Test::new()
970            .with_batch(large_view_batch.clone())
971            .with_batch(small_view_batch)
972            // this batch needs to be compacted (less than 1/2 full)
973            .with_batch(large_view_batch.slice(10, 20))
974            .with_batch(mixed_batch_nulls)
975            // this batch needs to be compacted (less than 1/2 full)
976            .with_batch(large_view_batch.slice(10, 20))
977            .with_batch(mixed_batch)
978            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
979            .run();
980
981        expect_buffer_layout(
982            col_as_string_view("c0", output_batches.first().unwrap()),
983            vec![
984                ExpectedLayout {
985                    len: 8190,
986                    capacity: 8192,
987                },
988                ExpectedLayout {
989                    len: 8190,
990                    capacity: 8192,
991                },
992                ExpectedLayout {
993                    len: 8190,
994                    capacity: 8192,
995                },
996                ExpectedLayout {
997                    len: 8190,
998                    capacity: 8192,
999                },
1000                ExpectedLayout {
1001                    len: 2240,
1002                    capacity: 8192,
1003                },
1004            ],
1005        );
1006    }
1007
1008    #[test]
1009    fn test_string_view_many_small_compact() {
1010        // 200 rows alternating long (28) and short (≤12) strings.
1011        // Only the 100 long strings go into data buffers: 100 × 28 = 2800.
1012        let batch = stringview_batch_repeated(
1013            200,
1014            [Some("This string is 28 bytes long"), Some("small string")],
1015        );
1016        let output_batches = Test::new()
1017            // First allocated buffer is 8kb.
1018            // Appending 10 batches of 2800 bytes will use 2800 * 10 = 14kb (8kb, an 16kb and 32kbkb)
1019            .with_batch(batch.clone())
1020            .with_batch(batch.clone())
1021            .with_batch(batch.clone())
1022            .with_batch(batch.clone())
1023            .with_batch(batch.clone())
1024            .with_batch(batch.clone())
1025            .with_batch(batch.clone())
1026            .with_batch(batch.clone())
1027            .with_batch(batch.clone())
1028            .with_batch(batch.clone())
1029            .with_batch_size(8000)
1030            .with_expected_output_sizes(vec![2000]) // only 1000 rows total
1031            .run();
1032
1033        // expect a nice even distribution of buffers
1034        expect_buffer_layout(
1035            col_as_string_view("c0", output_batches.first().unwrap()),
1036            vec![
1037                ExpectedLayout {
1038                    len: 8176,
1039                    capacity: 8192,
1040                },
1041                ExpectedLayout {
1042                    len: 16380,
1043                    capacity: 16384,
1044                },
1045                ExpectedLayout {
1046                    len: 3444,
1047                    capacity: 32768,
1048                },
1049            ],
1050        );
1051    }
1052
1053    #[test]
1054    fn test_string_view_many_small_boundary() {
1055        // The strings are designed to exactly fit into buffers that are powers of 2 long
1056        let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1057        let output_batches = Test::new()
1058            .with_batches(std::iter::repeat_n(batch, 20))
1059            .with_batch_size(900)
1060            .with_expected_output_sizes(vec![900, 900, 200])
1061            .run();
1062
1063        // expect each buffer to be entirely full except the last one
1064        expect_buffer_layout(
1065            col_as_string_view("c0", output_batches.first().unwrap()),
1066            vec![
1067                ExpectedLayout {
1068                    len: 8192,
1069                    capacity: 8192,
1070                },
1071                ExpectedLayout {
1072                    len: 16384,
1073                    capacity: 16384,
1074                },
1075                ExpectedLayout {
1076                    len: 4224,
1077                    capacity: 32768,
1078                },
1079            ],
1080        );
1081    }
1082
1083    #[test]
1084    fn test_string_view_large_small() {
1085        // The strings are 37 bytes long, so each batch has 100 * 28 = 2800 bytes
1086        let mixed_batch = stringview_batch_repeated(
1087            200,
1088            [Some("This string is 28 bytes long"), Some("small string")],
1089        );
1090        // These strings aren't copied, this array has an 8k buffer
1091        let all_large = stringview_batch_repeated(
1092            50,
1093            [Some(
1094                "This buffer has only large strings in it so there are no buffer copies",
1095            )],
1096        );
1097
1098        let output_batches = Test::new()
1099            // First allocated buffer is 8kb.
1100            // Appending five batches of 2800 bytes will use 2800 * 10 = 28kb (8kb, an 16kb and 32kbkb)
1101            .with_batch(mixed_batch.clone())
1102            .with_batch(mixed_batch.clone())
1103            .with_batch(all_large.clone())
1104            .with_batch(mixed_batch.clone())
1105            .with_batch(all_large.clone())
1106            .with_batch(mixed_batch.clone())
1107            .with_batch(mixed_batch.clone())
1108            .with_batch(all_large.clone())
1109            .with_batch(mixed_batch.clone())
1110            .with_batch(all_large.clone())
1111            .with_batch_size(8000)
1112            .with_expected_output_sizes(vec![1400])
1113            .run();
1114
1115        expect_buffer_layout(
1116            col_as_string_view("c0", output_batches.first().unwrap()),
1117            vec![
1118                ExpectedLayout {
1119                    len: 8190,
1120                    capacity: 8192,
1121                },
1122                ExpectedLayout {
1123                    len: 16366,
1124                    capacity: 16384,
1125                },
1126                ExpectedLayout {
1127                    len: 6244,
1128                    capacity: 32768,
1129                },
1130            ],
1131        );
1132    }
1133
1134    #[test]
1135    fn test_binary_view() {
1136        let values: Vec<Option<&[u8]>> = vec![
1137            Some(b"foo"),
1138            None,
1139            Some(b"A longer string that is more than 12 bytes"),
1140        ];
1141
1142        let binary_view =
1143            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1144        let batch =
1145            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1146
1147        Test::new()
1148            .with_batch(batch.clone())
1149            .with_batch(batch.clone())
1150            .with_batch_size(512)
1151            .with_expected_output_sizes(vec![512, 512, 512, 464])
1152            .run();
1153    }
1154
1155    #[derive(Debug, Clone, PartialEq)]
1156    struct ExpectedLayout {
1157        len: usize,
1158        capacity: usize,
1159    }
1160
1161    /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
1162    fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1163        let actual = array
1164            .data_buffers()
1165            .iter()
1166            .map(|b| ExpectedLayout {
1167                len: b.len(),
1168                capacity: b.capacity(),
1169            })
1170            .collect::<Vec<_>>();
1171
1172        assert_eq!(
1173            actual, expected,
1174            "Expected buffer layout {expected:#?} but got {actual:#?}"
1175        );
1176    }
1177
1178    /// Test for [`BatchCoalescer`]
1179    ///
1180    /// Pushes the input batches to the coalescer and verifies that the resulting
1181    /// batches have the expected number of rows and contents.
1182    #[derive(Debug, Clone)]
1183    struct Test {
1184        /// Batches to feed to the coalescer.
1185        input_batches: Vec<RecordBatch>,
1186        /// Filters to apply to the corresponding input batches.
1187        ///
1188        /// If there are no filters for the input batches, the batch will be
1189        /// pushed as is.
1190        filters: Vec<BooleanArray>,
1191        /// The schema. If not provided, the first batch's schema is used.
1192        schema: Option<SchemaRef>,
1193        /// Expected output sizes of the resulting batches
1194        expected_output_sizes: Vec<usize>,
1195        /// target batch size (default to 1024)
1196        target_batch_size: usize,
1197    }
1198
1199    impl Default for Test {
1200        fn default() -> Self {
1201            Self {
1202                input_batches: vec![],
1203                filters: vec![],
1204                schema: None,
1205                expected_output_sizes: vec![],
1206                target_batch_size: 1024,
1207            }
1208        }
1209    }
1210
1211    impl Test {
1212        fn new() -> Self {
1213            Self::default()
1214        }
1215
1216        /// Set the target batch size
1217        fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1218            self.target_batch_size = target_batch_size;
1219            self
1220        }
1221
1222        /// Extend the input batches with `batch`
1223        fn with_batch(mut self, batch: RecordBatch) -> Self {
1224            self.input_batches.push(batch);
1225            self
1226        }
1227
1228        /// Extend the filters with `filter`
1229        fn with_filter(mut self, filter: BooleanArray) -> Self {
1230            self.filters.push(filter);
1231            self
1232        }
1233
1234        /// Extends the input batches with `batches`
1235        fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1236            self.input_batches.extend(batches);
1237            self
1238        }
1239
1240        /// Specifies the schema for the test
1241        fn with_schema(mut self, schema: SchemaRef) -> Self {
1242            self.schema = Some(schema);
1243            self
1244        }
1245
1246        /// Extends `sizes` to expected output sizes
1247        fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1248            self.expected_output_sizes.extend(sizes);
1249            self
1250        }
1251
1252        /// Runs the test -- see documentation on [`Test`] for details
1253        ///
1254        /// Returns the resulting output batches
1255        fn run(self) -> Vec<RecordBatch> {
1256            let expected_output = self.expected_output();
1257            let schema = self.schema();
1258
1259            let Self {
1260                input_batches,
1261                filters,
1262                schema: _,
1263                target_batch_size,
1264                expected_output_sizes,
1265            } = self;
1266
1267            let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1268
1269            let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1270
1271            // feed input batches and filters to the coalescer
1272            let mut filters = filters.into_iter();
1273            for batch in input_batches {
1274                if let Some(filter) = filters.next() {
1275                    coalescer.push_batch_with_filter(batch, &filter).unwrap();
1276                } else {
1277                    coalescer.push_batch(batch).unwrap();
1278                }
1279            }
1280            assert_eq!(schema, coalescer.schema());
1281
1282            if had_input {
1283                assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1284            } else {
1285                assert!(coalescer.is_empty(), "Coalescer should be empty");
1286            }
1287
1288            coalescer.finish_buffered_batch().unwrap();
1289            if had_input {
1290                assert!(
1291                    coalescer.has_completed_batch(),
1292                    "Coalescer should have completed batches"
1293                );
1294            }
1295
1296            let mut output_batches = vec![];
1297            while let Some(batch) = coalescer.next_completed_batch() {
1298                output_batches.push(batch);
1299            }
1300
1301            // make sure we got the expected number of output batches and content
1302            let mut starting_idx = 0;
1303            let actual_output_sizes: Vec<usize> =
1304                output_batches.iter().map(|b| b.num_rows()).collect();
1305            assert_eq!(
1306                expected_output_sizes, actual_output_sizes,
1307                "Unexpected number of rows in output batches\n\
1308                Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1309            );
1310            let iter = expected_output_sizes
1311                .iter()
1312                .zip(output_batches.iter())
1313                .enumerate();
1314
1315            for (i, (expected_size, batch)) in iter {
1316                // compare the contents of the batch after normalization (using
1317                // `==` compares the underlying memory layout too)
1318                let expected_batch = expected_output.slice(starting_idx, *expected_size);
1319                let expected_batch = normalize_batch(expected_batch);
1320                let batch = normalize_batch(batch.clone());
1321                assert_eq!(
1322                    expected_batch, batch,
1323                    "Unexpected content in batch {i}:\
1324                    \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1325                );
1326                starting_idx += *expected_size;
1327            }
1328            output_batches
1329        }
1330
1331        /// Return the expected output schema. If not overridden by `with_schema`, it
1332        /// returns the schema of the first input batch.
1333        fn schema(&self) -> SchemaRef {
1334            self.schema
1335                .clone()
1336                .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1337        }
1338
1339        /// Returns the expected output as a single `RecordBatch`
1340        fn expected_output(&self) -> RecordBatch {
1341            let schema = self.schema();
1342            if self.filters.is_empty() {
1343                return concat_batches(&schema, &self.input_batches).unwrap();
1344            }
1345
1346            let mut filters = self.filters.iter();
1347            let filtered_batches = self
1348                .input_batches
1349                .iter()
1350                .map(|batch| {
1351                    if let Some(filter) = filters.next() {
1352                        filter_record_batch(batch, filter).unwrap()
1353                    } else {
1354                        batch.clone()
1355                    }
1356                })
1357                .collect::<Vec<_>>();
1358            concat_batches(&schema, &filtered_batches).unwrap()
1359        }
1360    }
1361
1362    /// Return a RecordBatch with a UInt32Array with the specified range and
1363    /// every third value is null.
1364    fn uint32_batch<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1365        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1366
1367        let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1368        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1369    }
1370
1371    /// Return a RecordBatch with a UInt32Array with no nulls specified range
1372    fn uint32_batch_non_null<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1373        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1374
1375        let array = UInt32Array::from_iter_values(range);
1376        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1377    }
1378
1379    /// Return a RecordBatch with a UInt64Array with no nulls specified range
1380    fn uint64_batch_non_null<T: std::iter::Iterator<Item = u64>>(range: T) -> RecordBatch {
1381        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt64, false)]));
1382
1383        let array = UInt64Array::from_iter_values(range);
1384        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1385    }
1386
1387    /// Return a RecordBatch with a StringArrary with values `value0`, `value1`, ...
1388    /// and every third value is `None`.
1389    fn utf8_batch(range: Range<u32>) -> RecordBatch {
1390        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1391
1392        let array = StringArray::from_iter(range.map(|i| {
1393            if i % 3 == 0 {
1394                None
1395            } else {
1396                Some(format!("value{i}"))
1397            }
1398        }));
1399
1400        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1401    }
1402
1403    /// Return a RecordBatch with a StringViewArray with (only) the specified values
1404    fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1405        let schema = Arc::new(Schema::new(vec![Field::new(
1406            "c0",
1407            DataType::Utf8View,
1408            false,
1409        )]));
1410
1411        let array = StringViewArray::from_iter(values);
1412        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1413    }
1414
1415    /// Return a RecordBatch with a StringViewArray with num_rows by repeating
1416    /// values over and over.
1417    fn stringview_batch_repeated<'a>(
1418        num_rows: usize,
1419        values: impl IntoIterator<Item = Option<&'a str>>,
1420    ) -> RecordBatch {
1421        let schema = Arc::new(Schema::new(vec![Field::new(
1422            "c0",
1423            DataType::Utf8View,
1424            true,
1425        )]));
1426
1427        // Repeat the values to a total of num_rows
1428        let values: Vec<_> = values.into_iter().collect();
1429        let values_iter = std::iter::repeat(values.iter())
1430            .flatten()
1431            .cloned()
1432            .take(num_rows);
1433
1434        let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1435        for val in values_iter {
1436            builder.append_option(val);
1437        }
1438
1439        let array = builder.finish();
1440        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1441    }
1442
1443    /// Return a RecordBatch of 100 rows
1444    fn multi_column_batch(range: Range<i32>) -> RecordBatch {
1445        let int64_array = Int64Array::from_iter(
1446            range
1447                .clone()
1448                .map(|v| if v % 5 == 0 { None } else { Some(v as i64) }),
1449        );
1450        let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
1451            if v % 5 == 0 {
1452                None
1453            } else if v % 7 == 0 {
1454                Some(format!("This is a string longer than 12 bytes{v}"))
1455            } else {
1456                Some(format!("Short {v}"))
1457            }
1458        }));
1459        let string_array = StringArray::from_iter(range.clone().map(|v| {
1460            if v % 11 == 0 {
1461                None
1462            } else {
1463                Some(format!("Value {v}"))
1464            }
1465        }));
1466        let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
1467            if v % 3 == 0 {
1468                None
1469            } else {
1470                Some(v as i64 * 1000) // simulate a timestamp in milliseconds
1471            }
1472        }))
1473        .with_timezone("America/New_York");
1474
1475        RecordBatch::try_from_iter(vec![
1476            ("int64", Arc::new(int64_array) as ArrayRef),
1477            ("stringview", Arc::new(string_view_array) as ArrayRef),
1478            ("string", Arc::new(string_array) as ArrayRef),
1479            ("timestamp", Arc::new(timestamp_array) as ArrayRef),
1480        ])
1481        .unwrap()
1482    }
1483
1484    /// Return a boolean array that filters out randomly selected rows
1485    /// from the input batch with a `selectivity`.
1486    ///
1487    /// For example a `selectivity` of 0.1 will filter out
1488    /// 90% of the rows.
1489    #[derive(Debug)]
1490    struct RandomFilterBuilder {
1491        num_rows: usize,
1492        selectivity: f64,
1493        /// seed for random number generator, increases by one each time
1494        /// `next_filter` is called
1495        seed: u64,
1496    }
1497    impl RandomFilterBuilder {
1498        /// Build the next filter with the current seed and increment the seed
1499        /// by one.
1500        fn next_filter(&mut self) -> BooleanArray {
1501            assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
1502            let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
1503            self.seed += 1;
1504            BooleanArray::from_iter(
1505                (0..self.num_rows)
1506                    .map(|_| rng.random_bool(self.selectivity))
1507                    .map(Some),
1508            )
1509        }
1510    }
1511
1512    /// Returns the named column as a StringViewArray
1513    fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1514        batch
1515            .column_by_name(name)
1516            .expect("column not found")
1517            .as_string_view_opt()
1518            .expect("column is not a string view")
1519    }
1520
1521    /// Normalize the `RecordBatch` so that the memory layout is consistent
1522    /// (e.g. StringArray is compacted).
1523    fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1524        // Only need to normalize StringViews (as == also tests for memory layout)
1525        let (schema, mut columns, row_count) = batch.into_parts();
1526
1527        for column in columns.iter_mut() {
1528            let Some(string_view) = column.as_string_view_opt() else {
1529                continue;
1530            };
1531
1532            // Re-create the StringViewArray to ensure memory layout is
1533            // consistent
1534            let mut builder = StringViewBuilder::new();
1535            for s in string_view.iter() {
1536                builder.append_option(s);
1537            }
1538            // Update the column with the new StringViewArray
1539            *column = Arc::new(builder.finish());
1540        }
1541
1542        let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1543        RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1544    }
1545
1546    /// Helper function to create a test batch with specified number of rows
1547    fn create_test_batch(num_rows: usize) -> RecordBatch {
1548        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
1549        let array = Int32Array::from_iter_values(0..num_rows as i32);
1550        RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
1551    }
1552    #[test]
1553    fn test_biggest_coalesce_batch_size_none_default() {
1554        // Test that default behavior (None) coalesces all batches
1555        let mut coalescer = BatchCoalescer::new(
1556            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1557            100,
1558        );
1559
1560        // Push a large batch (1000 rows) - should be coalesced normally
1561        let large_batch = create_test_batch(1000);
1562        coalescer.push_batch(large_batch).unwrap();
1563
1564        // Should produce multiple batches of target size (100)
1565        let mut output_batches = vec![];
1566        while let Some(batch) = coalescer.next_completed_batch() {
1567            output_batches.push(batch);
1568        }
1569
1570        coalescer.finish_buffered_batch().unwrap();
1571        while let Some(batch) = coalescer.next_completed_batch() {
1572            output_batches.push(batch);
1573        }
1574
1575        // Should have 10 batches of 100 rows each
1576        assert_eq!(output_batches.len(), 10);
1577        for batch in output_batches {
1578            assert_eq!(batch.num_rows(), 100);
1579        }
1580    }
1581
1582    #[test]
1583    fn test_biggest_coalesce_batch_size_bypass_large_batch() {
1584        // Test that batches larger than biggest_coalesce_batch_size bypass coalescing
1585        let mut coalescer = BatchCoalescer::new(
1586            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1587            100,
1588        );
1589        coalescer.set_biggest_coalesce_batch_size(Some(500));
1590
1591        // Push a large batch (1000 rows) - should bypass coalescing
1592        let large_batch = create_test_batch(1000);
1593        coalescer.push_batch(large_batch.clone()).unwrap();
1594
1595        // Should have one completed batch immediately (the original large batch)
1596        assert!(coalescer.has_completed_batch());
1597        let output_batch = coalescer.next_completed_batch().unwrap();
1598        assert_eq!(output_batch.num_rows(), 1000);
1599
1600        // Should be no more completed batches
1601        assert!(!coalescer.has_completed_batch());
1602        assert_eq!(coalescer.get_buffered_rows(), 0);
1603    }
1604
1605    #[test]
1606    fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
1607        // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally
1608        let mut coalescer = BatchCoalescer::new(
1609            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1610            100,
1611        );
1612        coalescer.set_biggest_coalesce_batch_size(Some(500));
1613
1614        // Push small batches that should be coalesced
1615        let small_batch = create_test_batch(50);
1616        coalescer.push_batch(small_batch.clone()).unwrap();
1617
1618        // Should not have completed batch yet (only 50 rows, target is 100)
1619        assert!(!coalescer.has_completed_batch());
1620        assert_eq!(coalescer.get_buffered_rows(), 50);
1621
1622        // Push another small batch
1623        coalescer.push_batch(small_batch).unwrap();
1624
1625        // Now should have a completed batch (100 rows total)
1626        assert!(coalescer.has_completed_batch());
1627        let output_batch = coalescer.next_completed_batch().unwrap();
1628        assert_eq!(output_batch.num_rows(), 100);
1629
1630        assert_eq!(coalescer.get_buffered_rows(), 0);
1631    }
1632
1633    #[test]
1634    fn test_biggest_coalesce_batch_size_equal_boundary() {
1635        // Test behavior when batch size equals biggest_coalesce_batch_size
1636        let mut coalescer = BatchCoalescer::new(
1637            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1638            100,
1639        );
1640        coalescer.set_biggest_coalesce_batch_size(Some(500));
1641
1642        // Push a batch exactly equal to the limit
1643        let boundary_batch = create_test_batch(500);
1644        coalescer.push_batch(boundary_batch).unwrap();
1645
1646        // Should be coalesced (not bypass) since it's equal, not greater
1647        let mut output_count = 0;
1648        while coalescer.next_completed_batch().is_some() {
1649            output_count += 1;
1650        }
1651
1652        coalescer.finish_buffered_batch().unwrap();
1653        while coalescer.next_completed_batch().is_some() {
1654            output_count += 1;
1655        }
1656
1657        // Should have 5 batches of 100 rows each
1658        assert_eq!(output_count, 5);
1659    }
1660
1661    #[test]
1662    fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
1663        // Test the new consecutive large batch bypass behavior
1664        // Pattern: small batches -> first large batch (coalesced) -> consecutive large batches (bypass)
1665        let mut coalescer = BatchCoalescer::new(
1666            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1667            100,
1668        );
1669        coalescer.set_biggest_coalesce_batch_size(Some(200));
1670
1671        let small_batch = create_test_batch(50);
1672
1673        // Push small batch first to create buffered data
1674        coalescer.push_batch(small_batch).unwrap();
1675        assert_eq!(coalescer.get_buffered_rows(), 50);
1676        assert!(!coalescer.has_completed_batch());
1677
1678        // Push first large batch - should go through normal coalescing due to buffered data
1679        let large_batch1 = create_test_batch(250);
1680        coalescer.push_batch(large_batch1).unwrap();
1681
1682        // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered
1683        let mut completed_batches = vec![];
1684        while let Some(batch) = coalescer.next_completed_batch() {
1685            completed_batches.push(batch);
1686        }
1687        assert_eq!(completed_batches.len(), 3);
1688        assert_eq!(coalescer.get_buffered_rows(), 0);
1689
1690        // Now push consecutive large batches - they should bypass
1691        let large_batch2 = create_test_batch(300);
1692        let large_batch3 = create_test_batch(400);
1693
1694        // Push second large batch - should bypass since it's consecutive and buffer is empty
1695        coalescer.push_batch(large_batch2).unwrap();
1696        assert!(coalescer.has_completed_batch());
1697        let output = coalescer.next_completed_batch().unwrap();
1698        assert_eq!(output.num_rows(), 300); // bypassed with original size
1699        assert_eq!(coalescer.get_buffered_rows(), 0);
1700
1701        // Push third large batch - should also bypass
1702        coalescer.push_batch(large_batch3).unwrap();
1703        assert!(coalescer.has_completed_batch());
1704        let output = coalescer.next_completed_batch().unwrap();
1705        assert_eq!(output.num_rows(), 400); // bypassed with original size
1706        assert_eq!(coalescer.get_buffered_rows(), 0);
1707    }
1708
1709    #[test]
1710    fn test_biggest_coalesce_batch_size_empty_batch() {
1711        // Test that empty batches don't trigger the bypass logic
1712        let mut coalescer = BatchCoalescer::new(
1713            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1714            100,
1715        );
1716        coalescer.set_biggest_coalesce_batch_size(Some(50));
1717
1718        let empty_batch = create_test_batch(0);
1719        coalescer.push_batch(empty_batch).unwrap();
1720
1721        // Empty batch should be handled normally (no effect)
1722        assert!(!coalescer.has_completed_batch());
1723        assert_eq!(coalescer.get_buffered_rows(), 0);
1724    }
1725
1726    #[test]
1727    fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
1728        // Test that when there is buffered data, large batches do NOT bypass (unless consecutive)
1729        let mut coalescer = BatchCoalescer::new(
1730            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1731            100,
1732        );
1733        coalescer.set_biggest_coalesce_batch_size(Some(200));
1734
1735        // Add some buffered data first
1736        let small_batch = create_test_batch(30);
1737        coalescer.push_batch(small_batch.clone()).unwrap();
1738        coalescer.push_batch(small_batch).unwrap();
1739        assert_eq!(coalescer.get_buffered_rows(), 60);
1740
1741        // Push large batch that would normally bypass, but shouldn't because buffered_rows > 0
1742        let large_batch = create_test_batch(250);
1743        coalescer.push_batch(large_batch).unwrap();
1744
1745        // The large batch should be processed through normal coalescing logic
1746        // Total: 60 (buffered) + 250 (new) = 310 rows
1747        // Output: 3 complete batches of 100 rows each, 10 rows remain buffered
1748
1749        let mut completed_batches = vec![];
1750        while let Some(batch) = coalescer.next_completed_batch() {
1751            completed_batches.push(batch);
1752        }
1753
1754        assert_eq!(completed_batches.len(), 3);
1755        for batch in &completed_batches {
1756            assert_eq!(batch.num_rows(), 100);
1757        }
1758        assert_eq!(coalescer.get_buffered_rows(), 10);
1759    }
1760
1761    #[test]
1762    fn test_biggest_coalesce_batch_size_zero_limit() {
1763        // Test edge case where limit is 0 (all batches bypass when no buffered data)
1764        let mut coalescer = BatchCoalescer::new(
1765            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1766            100,
1767        );
1768        coalescer.set_biggest_coalesce_batch_size(Some(0));
1769
1770        // Even a 1-row batch should bypass when there's no buffered data
1771        let tiny_batch = create_test_batch(1);
1772        coalescer.push_batch(tiny_batch).unwrap();
1773
1774        assert!(coalescer.has_completed_batch());
1775        let output = coalescer.next_completed_batch().unwrap();
1776        assert_eq!(output.num_rows(), 1);
1777    }
1778
1779    #[test]
1780    fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
1781        // Test that bypass only occurs when buffered_rows == 0
1782        let mut coalescer = BatchCoalescer::new(
1783            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1784            100,
1785        );
1786        coalescer.set_biggest_coalesce_batch_size(Some(200));
1787
1788        // First, push a large batch with no buffered data - should bypass
1789        let large_batch = create_test_batch(300);
1790        coalescer.push_batch(large_batch.clone()).unwrap();
1791
1792        assert!(coalescer.has_completed_batch());
1793        let output = coalescer.next_completed_batch().unwrap();
1794        assert_eq!(output.num_rows(), 300); // bypassed
1795        assert_eq!(coalescer.get_buffered_rows(), 0);
1796
1797        // Now add some buffered data
1798        let small_batch = create_test_batch(50);
1799        coalescer.push_batch(small_batch).unwrap();
1800        assert_eq!(coalescer.get_buffered_rows(), 50);
1801
1802        // Push the same large batch again - should NOT bypass this time (not consecutive)
1803        coalescer.push_batch(large_batch).unwrap();
1804
1805        // Should process through normal coalescing: 50 + 300 = 350 rows
1806        // Output: 3 complete batches of 100 rows, 50 rows buffered
1807        let mut completed_batches = vec![];
1808        while let Some(batch) = coalescer.next_completed_batch() {
1809            completed_batches.push(batch);
1810        }
1811
1812        assert_eq!(completed_batches.len(), 3);
1813        for batch in &completed_batches {
1814            assert_eq!(batch.num_rows(), 100);
1815        }
1816        assert_eq!(coalescer.get_buffered_rows(), 50);
1817    }
1818
1819    #[test]
1820    fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
1821        // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600
1822        let mut coalescer = BatchCoalescer::new(
1823            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1824            1000,
1825        );
1826        coalescer.set_biggest_coalesce_batch_size(Some(500));
1827
1828        // Push small batches first
1829        coalescer.push_batch(create_test_batch(20)).unwrap();
1830        coalescer.push_batch(create_test_batch(20)).unwrap();
1831        coalescer.push_batch(create_test_batch(30)).unwrap();
1832
1833        assert_eq!(coalescer.get_buffered_rows(), 70);
1834        assert!(!coalescer.has_completed_batch());
1835
1836        // Push first large batch (700) - should coalesce due to buffered data
1837        coalescer.push_batch(create_test_batch(700)).unwrap();
1838
1839        // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered
1840        assert_eq!(coalescer.get_buffered_rows(), 770);
1841        assert!(!coalescer.has_completed_batch());
1842
1843        // Push second large batch (600) - should bypass since previous was large
1844        coalescer.push_batch(create_test_batch(600)).unwrap();
1845
1846        // Should flush buffer (770 rows) and bypass the 600
1847        let mut outputs = vec![];
1848        while let Some(batch) = coalescer.next_completed_batch() {
1849            outputs.push(batch);
1850        }
1851        assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one bypassed (600)
1852        assert_eq!(outputs[0].num_rows(), 770);
1853        assert_eq!(outputs[1].num_rows(), 600);
1854        assert_eq!(coalescer.get_buffered_rows(), 0);
1855
1856        // Push remaining large batches - should all bypass
1857        let remaining_batches = [700, 900, 700, 600];
1858        for &size in &remaining_batches {
1859            coalescer.push_batch(create_test_batch(size)).unwrap();
1860
1861            assert!(coalescer.has_completed_batch());
1862            let output = coalescer.next_completed_batch().unwrap();
1863            assert_eq!(output.num_rows(), size);
1864            assert_eq!(coalescer.get_buffered_rows(), 0);
1865        }
1866    }
1867
1868    #[test]
1869    fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
1870        // Test truly consecutive large batches that should all bypass
1871        // This test ensures buffer is completely empty between large batches
1872        let mut coalescer = BatchCoalescer::new(
1873            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1874            100,
1875        );
1876        coalescer.set_biggest_coalesce_batch_size(Some(200));
1877
1878        // Push consecutive large batches with no prior buffered data
1879        let large_batches = vec![
1880            create_test_batch(300),
1881            create_test_batch(400),
1882            create_test_batch(350),
1883            create_test_batch(500),
1884        ];
1885
1886        let mut all_outputs = vec![];
1887
1888        for (i, large_batch) in large_batches.into_iter().enumerate() {
1889            let expected_size = large_batch.num_rows();
1890
1891            // Buffer should be empty before each large batch
1892            assert_eq!(
1893                coalescer.get_buffered_rows(),
1894                0,
1895                "Buffer should be empty before batch {}",
1896                i
1897            );
1898
1899            coalescer.push_batch(large_batch).unwrap();
1900
1901            // Each large batch should bypass and produce exactly one output batch
1902            assert!(
1903                coalescer.has_completed_batch(),
1904                "Should have completed batch after pushing batch {}",
1905                i
1906            );
1907
1908            let output = coalescer.next_completed_batch().unwrap();
1909            assert_eq!(
1910                output.num_rows(),
1911                expected_size,
1912                "Batch {} should have bypassed with original size",
1913                i
1914            );
1915
1916            // Should be no more batches and buffer should be empty
1917            assert!(
1918                !coalescer.has_completed_batch(),
1919                "Should have no more completed batches after batch {}",
1920                i
1921            );
1922            assert_eq!(
1923                coalescer.get_buffered_rows(),
1924                0,
1925                "Buffer should be empty after batch {}",
1926                i
1927            );
1928
1929            all_outputs.push(output);
1930        }
1931
1932        // Verify we got exactly 4 output batches with original sizes
1933        assert_eq!(all_outputs.len(), 4);
1934        assert_eq!(all_outputs[0].num_rows(), 300);
1935        assert_eq!(all_outputs[1].num_rows(), 400);
1936        assert_eq!(all_outputs[2].num_rows(), 350);
1937        assert_eq!(all_outputs[3].num_rows(), 500);
1938    }
1939
1940    #[test]
1941    fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
1942        // Test that small batches reset the consecutive large batch tracking
1943        let mut coalescer = BatchCoalescer::new(
1944            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1945            100,
1946        );
1947        coalescer.set_biggest_coalesce_batch_size(Some(200));
1948
1949        // Push first large batch - should bypass (no buffered data)
1950        coalescer.push_batch(create_test_batch(300)).unwrap();
1951        let output = coalescer.next_completed_batch().unwrap();
1952        assert_eq!(output.num_rows(), 300);
1953
1954        // Push second large batch - should bypass (consecutive)
1955        coalescer.push_batch(create_test_batch(400)).unwrap();
1956        let output = coalescer.next_completed_batch().unwrap();
1957        assert_eq!(output.num_rows(), 400);
1958
1959        // Push small batch - resets consecutive tracking
1960        coalescer.push_batch(create_test_batch(50)).unwrap();
1961        assert_eq!(coalescer.get_buffered_rows(), 50);
1962
1963        // Push large batch again - should NOT bypass due to buffered data
1964        coalescer.push_batch(create_test_batch(350)).unwrap();
1965
1966        // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100
1967        let mut outputs = vec![];
1968        while let Some(batch) = coalescer.next_completed_batch() {
1969            outputs.push(batch);
1970        }
1971        assert_eq!(outputs.len(), 4);
1972        for batch in outputs {
1973            assert_eq!(batch.num_rows(), 100);
1974        }
1975        assert_eq!(coalescer.get_buffered_rows(), 0);
1976    }
1977
1978    #[test]
1979    fn test_coalasce_push_batch_with_indices() {
1980        const MID_POINT: u32 = 2333;
1981        const TOTAL_ROWS: u32 = 23333;
1982        let batch1 = uint32_batch_non_null(0..MID_POINT);
1983        let batch2 = uint32_batch_non_null((MID_POINT..TOTAL_ROWS).rev());
1984
1985        let mut coalescer = BatchCoalescer::new(
1986            Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])),
1987            TOTAL_ROWS as usize,
1988        );
1989        coalescer.push_batch(batch1).unwrap();
1990
1991        let rev_indices = (0..((TOTAL_ROWS - MID_POINT) as u64)).rev();
1992        let reversed_indices_batch = uint64_batch_non_null(rev_indices);
1993
1994        let reverse_indices = UInt64Array::from(reversed_indices_batch.column(0).to_data());
1995        coalescer
1996            .push_batch_with_indices(batch2, &reverse_indices)
1997            .unwrap();
1998
1999        coalescer.finish_buffered_batch().unwrap();
2000        let actual = coalescer.next_completed_batch().unwrap();
2001
2002        let expected = uint32_batch_non_null(0..TOTAL_ROWS);
2003
2004        assert_eq!(expected, actual);
2005    }
2006}