Skip to main content

arrow_select/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`]  concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use crate::take::take_record_batch;
25use arrow_array::types::{BinaryViewType, StringViewType};
26use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
27use arrow_schema::{ArrowError, DataType, SchemaRef};
28use std::collections::VecDeque;
29use std::sync::Arc;
30// Originally From DataFusion's coalesce module:
31// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
32
33mod byte_view;
34mod generic;
35mod primitive;
36
37use byte_view::InProgressByteViewArray;
38use generic::GenericInProgressArray;
39use primitive::InProgressPrimitiveArray;
40
41/// Concatenate multiple [`RecordBatch`]es
42///
43/// Implements the common pattern of incrementally creating output
44/// [`RecordBatch`]es of a specific size from an input stream of
45/// [`RecordBatch`]es.
46///
47/// This is useful after operations such as [`filter`] and [`take`] that produce
48/// smaller batches, and we want to coalesce them into larger batches for
49/// further processing.
50///
51/// # Motivation
52///
53/// If we use [`concat_batches`] to implement the same functionality, there are 2 potential issues:
54/// 1. At least 2x peak memory (holding the input and output of concat)
55/// 2. 2 copies of the data (to create the output of filter and then create the output of concat)
56///
57/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
58/// about the motivation.
59///
60/// [`filter`]: crate::filter::filter
61/// [`take`]: crate::take::take
62/// [`concat_batches`]: crate::concat::concat_batches
63///
64/// # Example
65/// ```
66/// use arrow_array::record_batch;
67/// use arrow_select::coalesce::{BatchCoalescer};
68/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
69/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
70///
71/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
72/// let target_batch_size = 4;
73/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
74///
75/// // push the batches
76/// coalescer.push_batch(batch1).unwrap();
77/// // only pushed 3 rows (not yet 4, enough to produce a batch)
78/// assert!(coalescer.next_completed_batch().is_none());
79/// coalescer.push_batch(batch2).unwrap();
80/// // now we have 5 rows, so we can produce a batch
81/// let finished = coalescer.next_completed_batch().unwrap();
82/// // 4 rows came out (target batch size is 4)
83/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
84/// assert_eq!(finished, expected);
85///
86/// // Have no more input, but still have an in-progress batch
87/// assert!(coalescer.next_completed_batch().is_none());
88/// // We can finish the batch, which will produce the remaining rows
89/// coalescer.finish_buffered_batch().unwrap();
90/// let expected = record_batch!(("a", Int32, [5])).unwrap();
91/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
92///
93/// // The coalescer is now empty
94/// assert!(coalescer.next_completed_batch().is_none());
95/// ```
96///
97/// # Background
98///
99/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
100/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
101/// there is fixed processing overhead per batch. This coalescer builds up these
102/// larger batches incrementally.
103///
104/// ```text
105/// ┌────────────────────┐
106/// │    RecordBatch     │
107/// │   num_rows = 100   │
108/// └────────────────────┘                 ┌────────────────────┐
109///                                        │                    │
110/// ┌────────────────────┐     Coalesce    │                    │
111/// │                    │      Batches    │                    │
112/// │    RecordBatch     │                 │                    │
113/// │   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
114/// │                    │                 │    RecordBatch     │
115/// │                    │                 │   num_rows = 400   │
116/// └────────────────────┘                 │                    │
117///                                        │                    │
118/// ┌────────────────────┐                 │                    │
119/// │                    │                 │                    │
120/// │    RecordBatch     │                 │                    │
121/// │   num_rows = 100   │                 └────────────────────┘
122/// │                    │
123/// └────────────────────┘
124/// ```
125///
126/// # Notes:
127///
128/// 1. Output rows are produced in the same order as the input rows
129///
130/// 2. The output is a sequence of batches, with all but the last being at exactly
131///    `target_batch_size` rows.
132#[derive(Debug)]
133pub struct BatchCoalescer {
134    /// The input schema
135    schema: SchemaRef,
136    /// The target batch size (and thus size for views allocation). This is a
137    /// hard limit: the output batch will be exactly `target_batch_size`,
138    /// rather than possibly being slightly above.
139    target_batch_size: usize,
140    /// In-progress arrays
141    in_progress_arrays: Vec<Box<dyn InProgressArray>>,
142    /// Buffered row count. Always less than `batch_size`
143    buffered_rows: usize,
144    /// Completed batches
145    completed: VecDeque<RecordBatch>,
146    /// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`]
147    biggest_coalesce_batch_size: Option<usize>,
148}
149
150impl BatchCoalescer {
151    /// Create a new `BatchCoalescer`
152    ///
153    /// # Arguments
154    /// - `schema` - the schema of the output batches
155    /// - `target_batch_size` - the number of rows in each output batch.
156    ///   Typical values are `4096` or `8192` rows.
157    ///
158    pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
159        let in_progress_arrays = schema
160            .fields()
161            .iter()
162            .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
163            .collect::<Vec<_>>();
164
165        Self {
166            schema,
167            target_batch_size,
168            in_progress_arrays,
169            // We will for sure store at least one completed batch
170            completed: VecDeque::with_capacity(1),
171            buffered_rows: 0,
172            biggest_coalesce_batch_size: None,
173        }
174    }
175
176    /// Set the coalesce batch size limit (default `None`)
177    ///
178    /// This limit determine when batches should bypass coalescing. Intuitively,
179    /// batches that are already large are costly to coalesce and are efficient
180    /// enough to process directly without coalescing.
181    ///
182    /// If `Some(limit)`, batches larger than this limit will bypass coalescing
183    /// when there is no buffered data, or when the previously buffered data
184    /// already exceeds this limit.
185    ///
186    /// If `None`, all batches will be coalesced according to the
187    /// target_batch_size.
188    pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
189        self.biggest_coalesce_batch_size = limit;
190        self
191    }
192
193    /// Get the current biggest coalesce batch size limit
194    ///
195    /// See [`Self::with_biggest_coalesce_batch_size`] for details
196    pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
197        self.biggest_coalesce_batch_size
198    }
199
200    /// Set the biggest coalesce batch size limit
201    ///
202    /// See [`Self::with_biggest_coalesce_batch_size`] for details
203    pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
204        self.biggest_coalesce_batch_size = limit;
205    }
206
207    /// Return the schema of the output batches
208    pub fn schema(&self) -> SchemaRef {
209        Arc::clone(&self.schema)
210    }
211
212    /// Push a batch into the Coalescer after applying a filter
213    ///
214    /// This is semantically equivalent of calling [`Self::push_batch`]
215    /// with the results from  [`filter_record_batch`]
216    ///
217    /// # Example
218    /// ```
219    /// # use arrow_array::{record_batch, BooleanArray};
220    /// # use arrow_select::coalesce::BatchCoalescer;
221    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
222    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
223    /// // Apply a filter to each batch to pick the first and last row
224    /// let filter = BooleanArray::from(vec![true, false, true]);
225    /// // create a new Coalescer that targets creating 1000 row batches
226    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
227    /// coalescer.push_batch_with_filter(batch1, &filter);
228    /// coalescer.push_batch_with_filter(batch2, &filter);
229    /// // finsh and retrieve the created batch
230    /// coalescer.finish_buffered_batch().unwrap();
231    /// let completed_batch = coalescer.next_completed_batch().unwrap();
232    /// // filtered out 2 and 5:
233    /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
234    /// assert_eq!(completed_batch, expected_batch);
235    /// ```
236    pub fn push_batch_with_filter(
237        &mut self,
238        batch: RecordBatch,
239        filter: &BooleanArray,
240    ) -> Result<(), ArrowError> {
241        // TODO: optimize this to avoid materializing (copying the results
242        // of filter to a new batch)
243        let filtered_batch = filter_record_batch(&batch, filter)?;
244        self.push_batch(filtered_batch)
245    }
246
247    /// Push a batch into the Coalescer after applying a set of indices
248    /// This is semantically equivalent of calling [`Self::push_batch`]
249    /// with the results from  [`take_record_batch`]
250    ///
251    /// # Example
252    /// ```
253    /// # use arrow_array::{record_batch, UInt64Array};
254    /// # use arrow_select::coalesce::BatchCoalescer;
255    /// let batch1 = record_batch!(("a", Int32, [0, 0, 0])).unwrap();
256    /// let batch2 = record_batch!(("a", Int32, [1, 1, 4, 5, 1, 4])).unwrap();
257    /// // Sorted indices to create a sorted output, this can be obtained with
258    /// // `arrow-ord`'s sort_to_indices operation
259    /// let indices = UInt64Array::from(vec![0, 1, 4, 2, 5, 3]);
260    /// // create a new Coalescer that targets creating 1000 row batches
261    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
262    /// coalescer.push_batch(batch1);
263    /// coalescer.push_batch_with_indices(batch2, &indices);
264    /// // finsh and retrieve the created batch
265    /// coalescer.finish_buffered_batch().unwrap();
266    /// let completed_batch = coalescer.next_completed_batch().unwrap();
267    /// let expected_batch = record_batch!(("a", Int32, [0, 0, 0, 1, 1, 1, 4, 4, 5])).unwrap();
268    /// assert_eq!(completed_batch, expected_batch);
269    /// ```
270    pub fn push_batch_with_indices(
271        &mut self,
272        batch: RecordBatch,
273        indices: &dyn Array,
274    ) -> Result<(), ArrowError> {
275        // todo: optimize this to avoid materializing (copying the results of take indices to a new batch)
276        let taken_batch = take_record_batch(&batch, indices)?;
277        self.push_batch(taken_batch)
278    }
279
280    /// Push all the rows from `batch` into the Coalescer
281    ///
282    /// When buffered data plus incoming rows reach `target_batch_size` ,
283    /// completed batches are generated eagerly and can be retrieved via
284    /// [`Self::next_completed_batch()`].
285    /// Output batches contain exactly `target_batch_size` rows, so the tail of
286    /// the input batch may remain buffered.
287    /// Remaining partial data either waits for future input batches or can be
288    /// materialized immediately by calling [`Self::finish_buffered_batch()`].
289    ///
290    /// # Example
291    /// ```
292    /// # use arrow_array::record_batch;
293    /// # use arrow_select::coalesce::BatchCoalescer;
294    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
295    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
296    /// // create a new Coalescer that targets creating 1000 row batches
297    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
298    /// coalescer.push_batch(batch1);
299    /// coalescer.push_batch(batch2);
300    /// // finsh and retrieve the created batch
301    /// coalescer.finish_buffered_batch().unwrap();
302    /// let completed_batch = coalescer.next_completed_batch().unwrap();
303    /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
304    /// assert_eq!(completed_batch, expected_batch);
305    /// ```
306    pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
307        // Large batch bypass optimization:
308        // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
309        // we can avoid expensive split-and-merge operations by passing it through directly.
310        //
311        // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
312        // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
313        // If not set (None), ALL batches follow normal coalescing behavior regardless of size.
314
315        // =============================================================================
316        // CASE 1: No buffer + large batch → Direct bypass
317        // =============================================================================
318        // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
319        // Input sequence: [600, 1200, 300]
320        //
321        // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
322        //   600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
323        //        → output: [600] (bypass, preserves large batch)
324        //   1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
325        //         → output: [1200] (bypass, preserves large batch)
326        //   300 → normal batch, buffer: [300]
327        //   Result: [600], [1200], [300] - large batches preserved, mixed sizes
328
329        // =============================================================================
330        // CASE 2: Buffer too large + large batch → Flush first, then bypass
331        // =============================================================================
332        // This case prevents creating extremely large merged batches that would
333        // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
334        //
335        // Example 1: Buffer exceeds limit before large batch arrives
336        // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
337        // Input: [350, 200, 800]
338        //
339        // Step 1: push_batch([350])
340        //   → batch_size=350 <= 400, normal path
341        //   → buffer: [350], buffered_rows=350
342        //
343        // Step 2: push_batch([200])
344        //   → batch_size=200 <= 400, normal path
345        //   → buffer: [350, 200], buffered_rows=550
346        //
347        // Step 3: push_batch([800])
348        //   → batch_size=800 > 400, large batch path
349        //   → buffered_rows=550 > 400 → Case 2: flush first
350        //   → flush: output [550] (combined [350, 200])
351        //   → then bypass: output [800]
352        //   Result: [550], [800] - buffer flushed to prevent oversized merge
353        //
354        // Example 2: Multiple small batches accumulate before large batch
355        // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
356        // Input: [150, 100, 80, 900]
357        //
358        // Step 1-3: Accumulate small batches
359        //   150 → buffer: [150], buffered_rows=150
360        //   100 → buffer: [150, 100], buffered_rows=250
361        //   80  → buffer: [150, 100, 80], buffered_rows=330
362        //
363        // Step 4: push_batch([900])
364        //   → batch_size=900 > 300, large batch path
365        //   → buffered_rows=330 > 300 → Case 2: flush first
366        //   → flush: output [330] (combined [150, 100, 80])
367        //   → then bypass: output [900]
368        //   Result: [330], [900] - prevents merge into [1230] which would be too large
369
370        // =============================================================================
371        // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
372        // =============================================================================
373        // When buffer is small enough, we still merge to maintain efficiency
374        // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
375        // Input: [300, 1200]
376        //
377        // Step 1: push_batch([300])
378        //   → batch_size=300 <= 500, normal path
379        //   → buffer: [300], buffered_rows=300
380        //
381        // Step 2: push_batch([1200])
382        //   → batch_size=1200 > 500, large batch path
383        //   → buffered_rows=300 <= 500 → Case 3: normal merge
384        //   → buffer: [300, 1200] (1500 total)
385        //   → 1500 > target_batch_size → split: output [1000], buffer [500]
386        //   Result: [1000], [500] - normal split/merge behavior maintained
387
388        // =============================================================================
389        // Comparison: Default vs Optimized Behavior
390        // =============================================================================
391        // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
392        // Input: [600, 1200, 300]
393        //
394        // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
395        //   600 → buffer: [600]
396        //   1200 → buffer: [600, 1200] (1800 rows total)
397        //         → split: output [1000 rows], buffer [800 rows remaining]
398        //   300 → buffer: [800, 300] (1100 rows total)
399        //        → split: output [1000 rows], buffer [100 rows remaining]
400        //   Result: [1000], [1000], [100] - all outputs respect target_batch_size
401        //
402        // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
403        //   600 → Case 1: direct bypass → output: [600]
404        //   1200 → Case 1: direct bypass → output: [1200]
405        //   300 → normal path → buffer: [300]
406        //   Result: [600], [1200], [300] - large batches preserved
407
408        // =============================================================================
409        // Benefits and Trade-offs
410        // =============================================================================
411        // Benefits of the optimization:
412        // - Large batches stay intact (better for downstream vectorized processing)
413        // - Fewer split/merge operations (better CPU performance)
414        // - More predictable memory usage patterns
415        // - Maintains streaming efficiency while preserving batch boundaries
416        //
417        // Trade-offs:
418        // - Output batch sizes become variable (not always target_batch_size)
419        // - May produce smaller partial batches when flushing before large batches
420        // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
421
422        // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
423        // small batches together?
424
425        let batch_size = batch.num_rows();
426
427        // Fast path: skip empty batches
428        if batch_size == 0 {
429            return Ok(());
430        }
431
432        // Large batch optimization: bypass coalescing for oversized batches
433        if let Some(limit) = self.biggest_coalesce_batch_size {
434            if batch_size > limit {
435                // Case 1: No buffered data - emit large batch directly
436                // Example: [] + [1200] → output [1200], buffer []
437                if self.buffered_rows == 0 {
438                    self.completed.push_back(batch);
439                    return Ok(());
440                }
441
442                // Case 2: Buffer too large - flush then emit to avoid oversized merge
443                // Example: [850] + [1200] → output [850], then output [1200]
444                // This prevents creating batches much larger than both target_batch_size
445                // and biggest_coalesce_batch_size, which could cause memory issues
446                if self.buffered_rows > limit {
447                    self.finish_buffered_batch()?;
448                    self.completed.push_back(batch);
449                    return Ok(());
450                }
451
452                // Case 3: Small buffer - proceed with normal coalescing
453                // Example: [300] + [1200] → split and merge normally
454                // This ensures small batches still get properly coalesced
455                // while allowing some controlled growth beyond the limit
456            }
457        }
458
459        let (_schema, arrays, mut num_rows) = batch.into_parts();
460
461        // Validate column count matches the expected schema
462        if arrays.len() != self.in_progress_arrays.len() {
463            return Err(ArrowError::InvalidArgumentError(format!(
464                "Batch has {} columns but BatchCoalescer expects {}",
465                arrays.len(),
466                self.in_progress_arrays.len()
467            )));
468        }
469        self.in_progress_arrays
470            .iter_mut()
471            .zip(arrays)
472            .for_each(|(in_progress, array)| {
473                in_progress.set_source(Some(array));
474            });
475
476        // If pushing this batch would exceed the target batch size,
477        // finish the current batch and start a new one
478        let mut offset = 0;
479        while num_rows > (self.target_batch_size - self.buffered_rows) {
480            let remaining_rows = self.target_batch_size - self.buffered_rows;
481            debug_assert!(remaining_rows > 0);
482
483            // Copy remaining_rows from each array
484            for in_progress in self.in_progress_arrays.iter_mut() {
485                in_progress.copy_rows(offset, remaining_rows)?;
486            }
487
488            self.buffered_rows += remaining_rows;
489            offset += remaining_rows;
490            num_rows -= remaining_rows;
491
492            self.finish_buffered_batch()?;
493        }
494
495        // Add any the remaining rows to the buffer
496        self.buffered_rows += num_rows;
497        if num_rows > 0 {
498            for in_progress in self.in_progress_arrays.iter_mut() {
499                in_progress.copy_rows(offset, num_rows)?;
500            }
501        }
502
503        // If we have reached the target batch size, finalize the buffered batch
504        if self.buffered_rows >= self.target_batch_size {
505            self.finish_buffered_batch()?;
506        }
507
508        // clear in progress sources (to allow the memory to be freed)
509        for in_progress in self.in_progress_arrays.iter_mut() {
510            in_progress.set_source(None);
511        }
512
513        Ok(())
514    }
515
516    /// Returns the number of buffered rows
517    pub fn get_buffered_rows(&self) -> usize {
518        self.buffered_rows
519    }
520
521    /// Concatenates any buffered batches into a single `RecordBatch` and
522    /// clears any output buffers
523    ///
524    /// Normally this is called when the input stream is exhausted, and
525    /// we want to finalize the last batch of rows.
526    ///
527    /// See [`Self::next_completed_batch()`] for the completed batches.
528    pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
529        if self.buffered_rows == 0 {
530            return Ok(());
531        }
532        let new_arrays = self
533            .in_progress_arrays
534            .iter_mut()
535            .map(|array| array.finish())
536            .collect::<Result<Vec<_>, ArrowError>>()?;
537
538        for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
539            debug_assert_eq!(array.data_type(), field.data_type());
540            debug_assert_eq!(array.len(), self.buffered_rows);
541        }
542
543        // SAFETY: each array was created of the correct type and length.
544        let batch = unsafe {
545            RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
546        };
547
548        self.buffered_rows = 0;
549        self.completed.push_back(batch);
550        Ok(())
551    }
552
553    /// Returns true if there is any buffered data
554    pub fn is_empty(&self) -> bool {
555        self.buffered_rows == 0 && self.completed.is_empty()
556    }
557
558    /// Returns true if there are any completed batches
559    pub fn has_completed_batch(&self) -> bool {
560        !self.completed.is_empty()
561    }
562
563    /// Removes and returns the next completed batch, if any.
564    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
565        self.completed.pop_front()
566    }
567}
568
569/// Return a new `InProgressArray` for the given data type
570fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
571    macro_rules! instantiate_primitive {
572        ($t:ty) => {
573            Box::new(InProgressPrimitiveArray::<$t>::new(
574                batch_size,
575                data_type.clone(),
576            ))
577        };
578    }
579
580    downcast_primitive! {
581        // Instantiate InProgressPrimitiveArray for each primitive type
582        data_type => (instantiate_primitive),
583        DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
584        DataType::BinaryView => {
585            Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
586        }
587        _ => Box::new(GenericInProgressArray::new()),
588    }
589}
590
591/// Incrementally builds up arrays
592///
593/// [`GenericInProgressArray`] is the default implementation that buffers
594/// arrays and uses other kernels concatenates them when finished.
595///
596/// Some types have specialized implementations for this array types (e.g.,
597/// [`StringViewArray`], etc.).
598///
599/// [`StringViewArray`]: arrow_array::StringViewArray
600trait InProgressArray: std::fmt::Debug + Send + Sync {
601    /// Set the source array.
602    ///
603    /// Calls to [`Self::copy_rows`] will copy rows from this array into the
604    /// current in-progress array
605    fn set_source(&mut self, source: Option<ArrayRef>);
606
607    /// Copy rows from the current source array into the in-progress array
608    ///
609    /// The source array is set by [`Self::set_source`].
610    ///
611    /// Return an error if the source array is not set
612    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
613
614    /// Finish the currently in-progress array and return it as an `ArrayRef`
615    fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621    use crate::concat::concat_batches;
622    use arrow_array::builder::StringViewBuilder;
623    use arrow_array::cast::AsArray;
624    use arrow_array::types::Int32Type;
625    use arrow_array::{
626        BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
627        TimestampNanosecondArray, UInt32Array, UInt64Array, make_array,
628    };
629    use arrow_buffer::BooleanBufferBuilder;
630    use arrow_schema::{DataType, Field, Schema};
631    use rand::{Rng, SeedableRng};
632    use std::ops::Range;
633
634    #[test]
635    fn test_coalesce() {
636        let batch = uint32_batch(0..8);
637        Test::new("coalesce")
638            .with_batches(std::iter::repeat_n(batch, 10))
639            // expected output is exactly 21 rows (except for the final batch)
640            .with_batch_size(21)
641            .with_expected_output_sizes(vec![21, 21, 21, 17])
642            .run();
643    }
644
645    #[test]
646    fn test_coalesce_one_by_one() {
647        let batch = uint32_batch(0..1); // single row input
648        Test::new("coalesce_one_by_one")
649            .with_batches(std::iter::repeat_n(batch, 97))
650            // expected output is exactly 20 rows (except for the final batch)
651            .with_batch_size(20)
652            .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
653            .run();
654    }
655
656    #[test]
657    fn test_coalesce_empty() {
658        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
659
660        Test::new("coalesce_empty")
661            .with_batches(vec![])
662            .with_schema(schema)
663            .with_batch_size(21)
664            .with_expected_output_sizes(vec![])
665            .run();
666    }
667
668    #[test]
669    fn test_single_large_batch_greater_than_target() {
670        // test a single large batch
671        let batch = uint32_batch(0..4096);
672        Test::new("coalesce_single_large_batch_greater_than_target")
673            .with_batch(batch)
674            .with_batch_size(1000)
675            .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
676            .run();
677    }
678
679    #[test]
680    fn test_single_large_batch_smaller_than_target() {
681        // test a single large batch
682        let batch = uint32_batch(0..4096);
683        Test::new("coalesce_single_large_batch_smaller_than_target")
684            .with_batch(batch)
685            .with_batch_size(8192)
686            .with_expected_output_sizes(vec![4096])
687            .run();
688    }
689
690    #[test]
691    fn test_single_large_batch_equal_to_target() {
692        // test a single large batch
693        let batch = uint32_batch(0..4096);
694        Test::new("coalesce_single_large_batch_equal_to_target")
695            .with_batch(batch)
696            .with_batch_size(4096)
697            .with_expected_output_sizes(vec![4096])
698            .run();
699    }
700
701    #[test]
702    fn test_single_large_batch_equally_divisible_in_target() {
703        // test a single large batch
704        let batch = uint32_batch(0..4096);
705        Test::new("coalesce_single_large_batch_equally_divisible_in_target")
706            .with_batch(batch)
707            .with_batch_size(1024)
708            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
709            .run();
710    }
711
712    #[test]
713    fn test_empty_schema() {
714        let schema = Schema::empty();
715        let batch = RecordBatch::new_empty(schema.into());
716        Test::new("coalesce_empty_schema")
717            .with_batch(batch)
718            .with_expected_output_sizes(vec![])
719            .run();
720    }
721
722    /// Coalesce multiple batches, 80k rows, with a 0.1% selectivity filter
723    #[test]
724    fn test_coalesce_filtered_001() {
725        let mut filter_builder = RandomFilterBuilder {
726            num_rows: 8000,
727            selectivity: 0.001,
728            seed: 0,
729        };
730
731        // add 10 batches of 8000 rows each
732        // 80k rows, selecting 0.1% means 80 rows
733        // not exactly 80 as the rows are random;
734        let mut test = Test::new("coalesce_filtered_001");
735        for _ in 0..10 {
736            test = test
737                .with_batch(multi_column_batch(0..8000))
738                .with_filter(filter_builder.next_filter())
739        }
740        test.with_batch_size(15)
741            .with_expected_output_sizes(vec![15, 15, 15, 13])
742            .run();
743    }
744
745    /// Coalesce multiple batches, 80k rows, with a 1% selectivity filter
746    #[test]
747    fn test_coalesce_filtered_01() {
748        let mut filter_builder = RandomFilterBuilder {
749            num_rows: 8000,
750            selectivity: 0.01,
751            seed: 0,
752        };
753
754        // add 10 batches of 8000 rows each
755        // 80k rows, selecting 1% means 800 rows
756        // not exactly 800 as the rows are random;
757        let mut test = Test::new("coalesce_filtered_01");
758        for _ in 0..10 {
759            test = test
760                .with_batch(multi_column_batch(0..8000))
761                .with_filter(filter_builder.next_filter())
762        }
763        test.with_batch_size(128)
764            .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
765            .run();
766    }
767
768    /// Coalesce multiple batches, 80k rows, with a 10% selectivity filter
769    #[test]
770    fn test_coalesce_filtered_10() {
771        let mut filter_builder = RandomFilterBuilder {
772            num_rows: 8000,
773            selectivity: 0.1,
774            seed: 0,
775        };
776
777        // add 10 batches of 8000 rows each
778        // 80k rows, selecting 10% means 8000 rows
779        // not exactly 800 as the rows are random;
780        let mut test = Test::new("coalesce_filtered_10");
781        for _ in 0..10 {
782            test = test
783                .with_batch(multi_column_batch(0..8000))
784                .with_filter(filter_builder.next_filter())
785        }
786        test.with_batch_size(1024)
787            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
788            .run();
789    }
790
791    /// Coalesce multiple batches, 8k rows, with a 90% selectivity filter
792    #[test]
793    fn test_coalesce_filtered_90() {
794        let mut filter_builder = RandomFilterBuilder {
795            num_rows: 800,
796            selectivity: 0.90,
797            seed: 0,
798        };
799
800        // add 10 batches of 800 rows each
801        // 8k rows, selecting 99% means 7200 rows
802        // not exactly 7200 as the rows are random;
803        let mut test = Test::new("coalesce_filtered_90");
804        for _ in 0..10 {
805            test = test
806                .with_batch(multi_column_batch(0..800))
807                .with_filter(filter_builder.next_filter())
808        }
809        test.with_batch_size(1024)
810            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
811            .run();
812    }
813
814    /// Coalesce multiple batches, 8k rows, with mixed filers, including 100%
815    #[test]
816    fn test_coalesce_filtered_mixed() {
817        let mut filter_builder = RandomFilterBuilder {
818            num_rows: 800,
819            selectivity: 0.90,
820            seed: 0,
821        };
822
823        let mut test = Test::new("coalesce_filtered_mixed");
824        for _ in 0..3 {
825            // also add in a batch that selects almost all rows and when
826            // sliced will have some batches that are entirely used
827            let mut all_filter_builder = BooleanBufferBuilder::new(1000);
828            all_filter_builder.append_n(500, true);
829            all_filter_builder.append_n(1, false);
830            all_filter_builder.append_n(499, false);
831            let all_filter = all_filter_builder.build();
832
833            test = test
834                .with_batch(multi_column_batch(0..1000))
835                .with_filter(BooleanArray::from(all_filter))
836                .with_batch(multi_column_batch(0..800))
837                .with_filter(filter_builder.next_filter());
838            // decrease selectivity
839            filter_builder.selectivity *= 0.6;
840        }
841
842        // use a small batch size to ensure the filter is appended in slices
843        // and some of those slides will select the entire thing.
844        test.with_batch_size(250)
845            .with_expected_output_sizes(vec![
846                250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 179,
847            ])
848            .run();
849    }
850
851    #[test]
852    fn test_coalesce_non_null() {
853        Test::new("coalesce_non_null")
854            // 4040 rows of unit32
855            .with_batch(uint32_batch_non_null(0..3000))
856            .with_batch(uint32_batch_non_null(0..1040))
857            .with_batch_size(1024)
858            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
859            .run();
860    }
861    #[test]
862    fn test_utf8_split() {
863        Test::new("coalesce_utf8")
864            // 4040 rows of utf8 strings in total, split into batches of 1024
865            .with_batch(utf8_batch(0..3000))
866            .with_batch(utf8_batch(0..1040))
867            .with_batch_size(1024)
868            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
869            .run();
870    }
871
872    #[test]
873    fn test_string_view_no_views() {
874        let output_batches = Test::new("coalesce_string_view_no_views")
875            // both input batches have no views, so no need to compact
876            .with_batch(stringview_batch([Some("foo"), Some("bar")]))
877            .with_batch(stringview_batch([Some("baz"), Some("qux")]))
878            .with_expected_output_sizes(vec![4])
879            .run();
880
881        expect_buffer_layout(
882            col_as_string_view("c0", output_batches.first().unwrap()),
883            vec![],
884        );
885    }
886
887    #[test]
888    fn test_string_view_batch_small_no_compact() {
889        // view with only short strings (no buffers) --> no need to compact
890        let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
891        let output_batches = Test::new("coalesce_string_view_batch_small_no_compact")
892            .with_batch(batch.clone())
893            .with_expected_output_sizes(vec![1000])
894            .run();
895
896        let array = col_as_string_view("c0", &batch);
897        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
898        assert_eq!(array.data_buffers().len(), 0);
899        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
900
901        expect_buffer_layout(gc_array, vec![]);
902    }
903
904    #[test]
905    fn test_string_view_batch_large_no_compact() {
906        // view with large strings (has buffers) but full --> no need to compact
907        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
908        let output_batches = Test::new("coalesce_string_view_batch_large_no_compact")
909            .with_batch(batch.clone())
910            .with_batch_size(1000)
911            .with_expected_output_sizes(vec![1000])
912            .run();
913
914        let array = col_as_string_view("c0", &batch);
915        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
916        assert_eq!(array.data_buffers().len(), 5);
917        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
918
919        expect_buffer_layout(
920            gc_array,
921            vec![
922                ExpectedLayout {
923                    len: 8190,
924                    capacity: 8192,
925                },
926                ExpectedLayout {
927                    len: 8190,
928                    capacity: 8192,
929                },
930                ExpectedLayout {
931                    len: 8190,
932                    capacity: 8192,
933                },
934                ExpectedLayout {
935                    len: 8190,
936                    capacity: 8192,
937                },
938                ExpectedLayout {
939                    len: 2240,
940                    capacity: 8192,
941                },
942            ],
943        );
944    }
945
946    #[test]
947    fn test_string_view_batch_small_with_buffers_no_compact() {
948        // view with buffers but only short views
949        let short_strings = std::iter::repeat(Some("SmallString"));
950        let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
951        // 20 short strings, then a long ones
952        let values = short_strings.take(20).chain(long_strings);
953        let batch = stringview_batch_repeated(1000, values)
954            // take only 10 short strings (no long ones)
955            .slice(5, 10);
956        let output_batches = Test::new("coalesce_string_view_batch_small_with_buffers_no_compact")
957            .with_batch(batch.clone())
958            .with_batch_size(1000)
959            .with_expected_output_sizes(vec![10])
960            .run();
961
962        let array = col_as_string_view("c0", &batch);
963        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
964        assert_eq!(array.data_buffers().len(), 1); // input has one buffer
965        assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
966    }
967
968    #[test]
969    fn test_string_view_batch_large_slice_compact() {
970        // view with large strings (has buffers) and only partially used  --> no need to compact
971        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
972            // slice only 22 rows, so most of the buffer is not used
973            .slice(11, 22);
974
975        let output_batches = Test::new("coalesce_string_view_batch_large_slice_compact")
976            .with_batch(batch.clone())
977            .with_batch_size(1000)
978            .with_expected_output_sizes(vec![22])
979            .run();
980
981        let array = col_as_string_view("c0", &batch);
982        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
983        assert_eq!(array.data_buffers().len(), 5);
984
985        expect_buffer_layout(
986            gc_array,
987            vec![ExpectedLayout {
988                len: 770,
989                capacity: 8192,
990            }],
991        );
992    }
993
994    #[test]
995    fn test_string_view_mixed() {
996        let large_view_batch =
997            stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
998        let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
999        let mixed_batch = stringview_batch_repeated(
1000            1000,
1001            [Some("This string is longer than 12 bytes"), Some("Small")],
1002        );
1003        let mixed_batch_nulls = stringview_batch_repeated(
1004            1000,
1005            [
1006                Some("This string is longer than 12 bytes"),
1007                Some("Small"),
1008                None,
1009            ],
1010        );
1011
1012        // Several batches with mixed inline / non inline
1013        // 4k rows in
1014        let output_batches = Test::new("coalesce_string_view_mixed")
1015            .with_batch(large_view_batch.clone())
1016            .with_batch(small_view_batch)
1017            // this batch needs to be compacted (less than 1/2 full)
1018            .with_batch(large_view_batch.slice(10, 20))
1019            .with_batch(mixed_batch_nulls)
1020            // this batch needs to be compacted (less than 1/2 full)
1021            .with_batch(large_view_batch.slice(10, 20))
1022            .with_batch(mixed_batch)
1023            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
1024            .run();
1025
1026        expect_buffer_layout(
1027            col_as_string_view("c0", output_batches.first().unwrap()),
1028            vec![
1029                ExpectedLayout {
1030                    len: 8190,
1031                    capacity: 8192,
1032                },
1033                ExpectedLayout {
1034                    len: 8190,
1035                    capacity: 8192,
1036                },
1037                ExpectedLayout {
1038                    len: 8190,
1039                    capacity: 8192,
1040                },
1041                ExpectedLayout {
1042                    len: 8190,
1043                    capacity: 8192,
1044                },
1045                ExpectedLayout {
1046                    len: 2240,
1047                    capacity: 8192,
1048                },
1049            ],
1050        );
1051    }
1052
1053    #[test]
1054    fn test_string_view_many_small_compact() {
1055        // 200 rows alternating long (28) and short (≤12) strings.
1056        // Only the 100 long strings go into data buffers: 100 × 28 = 2800.
1057        let batch = stringview_batch_repeated(
1058            200,
1059            [Some("This string is 28 bytes long"), Some("small string")],
1060        );
1061        let output_batches = Test::new("coalesce_string_view_many_small_compact")
1062            // First allocated buffer is 8kb.
1063            // Appending 10 batches of 2800 bytes will use 2800 * 10 = 14kb (8kb, an 16kb and 32kbkb)
1064            .with_batch(batch.clone())
1065            .with_batch(batch.clone())
1066            .with_batch(batch.clone())
1067            .with_batch(batch.clone())
1068            .with_batch(batch.clone())
1069            .with_batch(batch.clone())
1070            .with_batch(batch.clone())
1071            .with_batch(batch.clone())
1072            .with_batch(batch.clone())
1073            .with_batch(batch.clone())
1074            .with_batch_size(8000)
1075            .with_expected_output_sizes(vec![2000]) // only 1000 rows total
1076            .run();
1077
1078        // expect a nice even distribution of buffers
1079        expect_buffer_layout(
1080            col_as_string_view("c0", output_batches.first().unwrap()),
1081            vec![
1082                ExpectedLayout {
1083                    len: 8176,
1084                    capacity: 8192,
1085                },
1086                ExpectedLayout {
1087                    len: 16380,
1088                    capacity: 16384,
1089                },
1090                ExpectedLayout {
1091                    len: 3444,
1092                    capacity: 32768,
1093                },
1094            ],
1095        );
1096    }
1097
1098    #[test]
1099    fn test_string_view_many_small_boundary() {
1100        // The strings are designed to exactly fit into buffers that are powers of 2 long
1101        let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1102        let output_batches = Test::new("coalesce_string_view_many_small_boundary")
1103            .with_batches(std::iter::repeat_n(batch, 20))
1104            .with_batch_size(900)
1105            .with_expected_output_sizes(vec![900, 900, 200])
1106            .run();
1107
1108        // expect each buffer to be entirely full except the last one
1109        expect_buffer_layout(
1110            col_as_string_view("c0", output_batches.first().unwrap()),
1111            vec![
1112                ExpectedLayout {
1113                    len: 8192,
1114                    capacity: 8192,
1115                },
1116                ExpectedLayout {
1117                    len: 16384,
1118                    capacity: 16384,
1119                },
1120                ExpectedLayout {
1121                    len: 4224,
1122                    capacity: 32768,
1123                },
1124            ],
1125        );
1126    }
1127
1128    #[test]
1129    fn test_string_view_large_small() {
1130        // The strings are 37 bytes long, so each batch has 100 * 28 = 2800 bytes
1131        let mixed_batch = stringview_batch_repeated(
1132            200,
1133            [Some("This string is 28 bytes long"), Some("small string")],
1134        );
1135        // These strings aren't copied, this array has an 8k buffer
1136        let all_large = stringview_batch_repeated(
1137            50,
1138            [Some(
1139                "This buffer has only large strings in it so there are no buffer copies",
1140            )],
1141        );
1142
1143        let output_batches = Test::new("coalesce_string_view_large_small")
1144            // First allocated buffer is 8kb.
1145            // Appending five batches of 2800 bytes will use 2800 * 10 = 28kb (8kb, an 16kb and 32kbkb)
1146            .with_batch(mixed_batch.clone())
1147            .with_batch(mixed_batch.clone())
1148            .with_batch(all_large.clone())
1149            .with_batch(mixed_batch.clone())
1150            .with_batch(all_large.clone())
1151            .with_batch(mixed_batch.clone())
1152            .with_batch(mixed_batch.clone())
1153            .with_batch(all_large.clone())
1154            .with_batch(mixed_batch.clone())
1155            .with_batch(all_large.clone())
1156            .with_batch_size(8000)
1157            .with_expected_output_sizes(vec![1400])
1158            .run();
1159
1160        expect_buffer_layout(
1161            col_as_string_view("c0", output_batches.first().unwrap()),
1162            vec![
1163                ExpectedLayout {
1164                    len: 8190,
1165                    capacity: 8192,
1166                },
1167                ExpectedLayout {
1168                    len: 16366,
1169                    capacity: 16384,
1170                },
1171                ExpectedLayout {
1172                    len: 6244,
1173                    capacity: 32768,
1174                },
1175            ],
1176        );
1177    }
1178
1179    #[test]
1180    fn test_binary_view() {
1181        let values: Vec<Option<&[u8]>> = vec![
1182            Some(b"foo"),
1183            None,
1184            Some(b"A longer string that is more than 12 bytes"),
1185        ];
1186
1187        let binary_view =
1188            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1189        let batch =
1190            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1191
1192        Test::new("coalesce_binary_view")
1193            .with_batch(batch.clone())
1194            .with_batch(batch.clone())
1195            .with_batch_size(512)
1196            .with_expected_output_sizes(vec![512, 512, 512, 464])
1197            .run();
1198    }
1199
1200    #[derive(Debug, Clone, PartialEq)]
1201    struct ExpectedLayout {
1202        len: usize,
1203        capacity: usize,
1204    }
1205
1206    /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
1207    fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1208        let actual = array
1209            .data_buffers()
1210            .iter()
1211            .map(|b| ExpectedLayout {
1212                len: b.len(),
1213                capacity: b.capacity(),
1214            })
1215            .collect::<Vec<_>>();
1216
1217        assert_eq!(
1218            actual, expected,
1219            "Expected buffer layout {expected:#?} but got {actual:#?}"
1220        );
1221    }
1222
1223    /// Test for [`BatchCoalescer`]
1224    ///
1225    /// Pushes the input batches to the coalescer and verifies that the resulting
1226    /// batches have the
1227    /// 1. expected number of rows
1228    /// 2. The same results when the batches are filtered using the filter kernel
1229    #[derive(Debug, Clone)]
1230    struct Test {
1231        /// A human readable name to assist in debugging
1232        name: String,
1233        /// Batches to feed to the coalescer.
1234        input_batches: Vec<RecordBatch>,
1235        /// Filters to apply to the corresponding input batches.
1236        ///
1237        /// If there are no filters for the input batches, the batch will be
1238        /// pushed as is.
1239        filters: Vec<BooleanArray>,
1240        /// The schema. If not provided, the first batch's schema is used.
1241        schema: Option<SchemaRef>,
1242        /// Expected output sizes of the resulting batches
1243        expected_output_sizes: Vec<usize>,
1244        /// target batch size (default to 1024)
1245        target_batch_size: usize,
1246    }
1247
1248    impl Default for Test {
1249        fn default() -> Self {
1250            Self {
1251                name: "".to_string(),
1252                input_batches: vec![],
1253                filters: vec![],
1254                schema: None,
1255                expected_output_sizes: vec![],
1256                target_batch_size: 1024,
1257            }
1258        }
1259    }
1260
1261    impl Test {
1262        fn new(name: impl Into<String>) -> Self {
1263            Self {
1264                name: name.into(),
1265                ..Self::default()
1266            }
1267        }
1268
1269        /// Append the description to the test name
1270        fn with_description(mut self, description: &str) -> Self {
1271            self.name.push_str(": ");
1272            self.name.push_str(description);
1273            self
1274        }
1275
1276        /// Set the target batch size
1277        fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1278            self.target_batch_size = target_batch_size;
1279            self
1280        }
1281
1282        /// Extend the input batches with `batch`
1283        fn with_batch(mut self, batch: RecordBatch) -> Self {
1284            self.input_batches.push(batch);
1285            self
1286        }
1287
1288        /// Extend the filters with `filter`
1289        fn with_filter(mut self, filter: BooleanArray) -> Self {
1290            self.filters.push(filter);
1291            self
1292        }
1293
1294        /// Replaces the input batches with `batches`
1295        fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1296            self.input_batches = batches.into_iter().collect();
1297            self
1298        }
1299
1300        /// Specifies the schema for the test
1301        fn with_schema(mut self, schema: SchemaRef) -> Self {
1302            self.schema = Some(schema);
1303            self
1304        }
1305
1306        /// Extends `sizes` to expected output sizes
1307        fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1308            self.expected_output_sizes.extend(sizes);
1309            self
1310        }
1311
1312        /// Runs the test -- see documentation on [`Test`] for details
1313        ///
1314        /// Returns the resulting output batches
1315        fn run(self) -> Vec<RecordBatch> {
1316            // Test several permutations of input batches:
1317            // 1. Removing nulls from some batches (test non-null fast paths)
1318            // 2. Empty batches
1319            // 3. One column (from the batch)
1320            let mut extra_tests = vec![];
1321            extra_tests.push(self.clone().make_half_non_nullable());
1322            extra_tests.push(self.clone().insert_empty_batches());
1323            let single_column_tests = self.make_single_column_tests();
1324            for test in single_column_tests {
1325                extra_tests.push(test.clone().make_half_non_nullable());
1326                extra_tests.push(test);
1327            }
1328
1329            // Run original test case first, so any obvious errors are caught
1330            // by an easier to understand test case
1331            let results = self.run_inner();
1332            // Run the extra cases to expand coverage
1333            for extra in extra_tests {
1334                extra.run_inner();
1335            }
1336
1337            results
1338        }
1339
1340        /// Runs the current test instance
1341        fn run_inner(self) -> Vec<RecordBatch> {
1342            let expected_output = self.expected_output();
1343            let schema = self.schema();
1344
1345            let Self {
1346                name,
1347                input_batches,
1348                filters,
1349                schema: _,
1350                target_batch_size,
1351                expected_output_sizes,
1352            } = self;
1353
1354            println!("Running test '{name}'");
1355
1356            let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1357
1358            let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1359
1360            // feed input batches and filters to the coalescer
1361            let mut filters = filters.into_iter();
1362            for batch in input_batches {
1363                if let Some(filter) = filters.next() {
1364                    coalescer.push_batch_with_filter(batch, &filter).unwrap();
1365                } else {
1366                    coalescer.push_batch(batch).unwrap();
1367                }
1368            }
1369            assert_eq!(schema, coalescer.schema());
1370
1371            if had_input {
1372                assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1373            } else {
1374                assert!(coalescer.is_empty(), "Coalescer should be empty");
1375            }
1376
1377            coalescer.finish_buffered_batch().unwrap();
1378            if had_input {
1379                assert!(
1380                    coalescer.has_completed_batch(),
1381                    "Coalescer should have completed batches"
1382                );
1383            }
1384
1385            let mut output_batches = vec![];
1386            while let Some(batch) = coalescer.next_completed_batch() {
1387                output_batches.push(batch);
1388            }
1389
1390            // make sure we got the expected number of output batches and content
1391            let mut starting_idx = 0;
1392            let actual_output_sizes: Vec<usize> =
1393                output_batches.iter().map(|b| b.num_rows()).collect();
1394            assert_eq!(
1395                expected_output_sizes, actual_output_sizes,
1396                "Unexpected number of rows in output batches\n\
1397                Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1398            );
1399            let iter = expected_output_sizes
1400                .iter()
1401                .zip(output_batches.iter())
1402                .enumerate();
1403
1404            // Verify that the actual contents of each output batch matches the expected output
1405            for (i, (expected_size, batch)) in iter {
1406                // compare the contents of the batch after normalization (using
1407                // `==` compares the underlying memory layout too)
1408                let expected_batch = expected_output.slice(starting_idx, *expected_size);
1409                let expected_batch = normalize_batch(expected_batch);
1410                let batch = normalize_batch(batch.clone());
1411                assert_eq!(
1412                    expected_batch, batch,
1413                    "Unexpected content in batch {i}:\
1414                    \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1415                );
1416                starting_idx += *expected_size;
1417            }
1418            output_batches
1419        }
1420
1421        /// Return the expected output schema. If not overridden by `with_schema`, it
1422        /// returns the schema of the first input batch.
1423        fn schema(&self) -> SchemaRef {
1424            self.schema
1425                .clone()
1426                .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1427        }
1428
1429        /// Returns the expected output as a single `RecordBatch`
1430        fn expected_output(&self) -> RecordBatch {
1431            let schema = self.schema();
1432            if self.filters.is_empty() {
1433                return concat_batches(&schema, &self.input_batches).unwrap();
1434            }
1435
1436            let mut filters = self.filters.iter();
1437            let filtered_batches = self
1438                .input_batches
1439                .iter()
1440                .map(|batch| {
1441                    if let Some(filter) = filters.next() {
1442                        filter_record_batch(batch, filter).unwrap()
1443                    } else {
1444                        batch.clone()
1445                    }
1446                })
1447                .collect::<Vec<_>>();
1448            concat_batches(&schema, &filtered_batches).unwrap()
1449        }
1450
1451        /// Return a copy of self where every other batch has had its nulls removed
1452        /// (there are often fast paths that are used when there are no nulls)
1453        fn make_half_non_nullable(mut self) -> Self {
1454            // remove the nulls from every other batch
1455            self.input_batches = self
1456                .input_batches
1457                .iter()
1458                .enumerate()
1459                .map(|(i, batch)| {
1460                    if i % 2 == 1 {
1461                        batch.clone()
1462                    } else {
1463                        Self::remove_nulls_from_batch(batch)
1464                    }
1465                })
1466                .collect();
1467            self.with_description("non-nullable")
1468        }
1469
1470        /// Insert several empty batches into the input before each existing input
1471        fn insert_empty_batches(mut self) -> Self {
1472            let empty_batch = RecordBatch::new_empty(self.schema());
1473            self.input_batches = self
1474                .input_batches
1475                .into_iter()
1476                .flat_map(|batch| [empty_batch.clone(), batch])
1477                .collect();
1478            let empty_filters = BooleanArray::builder(0).finish();
1479            self.filters = self
1480                .filters
1481                .into_iter()
1482                .flat_map(|filter| [empty_filters.clone(), filter])
1483                .collect();
1484            self.with_description("empty batches inserted")
1485        }
1486
1487        /// Sets one batch to be non-nullable by removing nulls from all columns
1488        fn remove_nulls_from_batch(batch: &RecordBatch) -> RecordBatch {
1489            let new_columns = batch
1490                .columns()
1491                .iter()
1492                .map(Self::remove_nulls_from_array)
1493                .collect::<Vec<_>>();
1494            let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
1495            RecordBatch::try_new_with_options(batch.schema(), new_columns, &options).unwrap()
1496        }
1497
1498        fn remove_nulls_from_array(array: &ArrayRef) -> ArrayRef {
1499            make_array(array.to_data().into_builder().nulls(None).build().unwrap())
1500        }
1501
1502        /// Returns a set of tests where each test that is the sae as self, but
1503        /// has a single column from the original input batch
1504        ///
1505        /// This can be useful to single column optimizations, specifically
1506        /// filter optimization.
1507        fn make_single_column_tests(&self) -> Vec<Self> {
1508            let original_schema = self.schema();
1509            let mut new_tests = vec![];
1510            for column in original_schema.fields() {
1511                let single_column_schema = Arc::new(Schema::new(vec![column.clone()]));
1512
1513                let single_column_batches = self.input_batches.iter().map(|batch| {
1514                    let single_column = batch.column_by_name(column.name()).unwrap();
1515                    RecordBatch::try_new(
1516                        Arc::clone(&single_column_schema),
1517                        vec![single_column.clone()],
1518                    )
1519                    .unwrap()
1520                });
1521
1522                let single_column_test = self
1523                    .clone()
1524                    .with_schema(Arc::clone(&single_column_schema))
1525                    .with_batches(single_column_batches)
1526                    .with_description("single column")
1527                    .with_description(column.name());
1528
1529                new_tests.push(single_column_test);
1530            }
1531            new_tests
1532        }
1533    }
1534
1535    /// Return a RecordBatch with a UInt32Array with the specified range and
1536    /// every third value is null.
1537    fn uint32_batch<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1538        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1539
1540        let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1541        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1542    }
1543
1544    /// Return a RecordBatch with a UInt32Array with no nulls specified range
1545    fn uint32_batch_non_null<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1546        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1547
1548        let array = UInt32Array::from_iter_values(range);
1549        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1550    }
1551
1552    /// Return a RecordBatch with a UInt64Array with no nulls specified range
1553    fn uint64_batch_non_null<T: std::iter::Iterator<Item = u64>>(range: T) -> RecordBatch {
1554        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt64, false)]));
1555
1556        let array = UInt64Array::from_iter_values(range);
1557        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1558    }
1559
1560    /// Return a RecordBatch with a StringArrary with values `value0`, `value1`, ...
1561    /// and every third value is `None`.
1562    fn utf8_batch(range: Range<u32>) -> RecordBatch {
1563        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1564
1565        let array = StringArray::from_iter(range.map(|i| {
1566            if i % 3 == 0 {
1567                None
1568            } else {
1569                Some(format!("value{i}"))
1570            }
1571        }));
1572
1573        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1574    }
1575
1576    /// Return a RecordBatch with a StringViewArray with (only) the specified values
1577    fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1578        let schema = Arc::new(Schema::new(vec![Field::new(
1579            "c0",
1580            DataType::Utf8View,
1581            false,
1582        )]));
1583
1584        let array = StringViewArray::from_iter(values);
1585        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1586    }
1587
1588    /// Return a RecordBatch with a StringViewArray with num_rows by repeating
1589    /// values over and over.
1590    fn stringview_batch_repeated<'a>(
1591        num_rows: usize,
1592        values: impl IntoIterator<Item = Option<&'a str>>,
1593    ) -> RecordBatch {
1594        let schema = Arc::new(Schema::new(vec![Field::new(
1595            "c0",
1596            DataType::Utf8View,
1597            true,
1598        )]));
1599
1600        // Repeat the values to a total of num_rows
1601        let values: Vec<_> = values.into_iter().collect();
1602        let values_iter = std::iter::repeat(values.iter())
1603            .flatten()
1604            .cloned()
1605            .take(num_rows);
1606
1607        let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1608        for val in values_iter {
1609            builder.append_option(val);
1610        }
1611
1612        let array = builder.finish();
1613        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1614    }
1615
1616    /// Return a RecordBatch of 100 rows
1617    fn multi_column_batch(range: Range<i32>) -> RecordBatch {
1618        let int64_array = Int64Array::from_iter(
1619            range
1620                .clone()
1621                .map(|v| if v % 5 == 0 { None } else { Some(v as i64) }),
1622        );
1623        let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
1624            if v % 5 == 0 {
1625                None
1626            } else if v % 7 == 0 {
1627                Some(format!("This is a string longer than 12 bytes{v}"))
1628            } else {
1629                Some(format!("Short {v}"))
1630            }
1631        }));
1632        let string_array = StringArray::from_iter(range.clone().map(|v| {
1633            if v % 11 == 0 {
1634                None
1635            } else {
1636                Some(format!("Value {v}"))
1637            }
1638        }));
1639        let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
1640            if v % 3 == 0 {
1641                None
1642            } else {
1643                Some(v as i64 * 1000) // simulate a timestamp in milliseconds
1644            }
1645        }))
1646        .with_timezone("America/New_York");
1647
1648        RecordBatch::try_from_iter(vec![
1649            ("int64", Arc::new(int64_array) as ArrayRef),
1650            ("stringview", Arc::new(string_view_array) as ArrayRef),
1651            ("string", Arc::new(string_array) as ArrayRef),
1652            ("timestamp", Arc::new(timestamp_array) as ArrayRef),
1653        ])
1654        .unwrap()
1655    }
1656
1657    /// Return a boolean array that filters out randomly selected rows
1658    /// from the input batch with a `selectivity`.
1659    ///
1660    /// For example a `selectivity` of 0.1 will filter out
1661    /// 90% of the rows.
1662    #[derive(Debug)]
1663    struct RandomFilterBuilder {
1664        /// Number of rows to add to each filter
1665        num_rows: usize,
1666        /// selectivity of the filter (between 0.0 and 1.0)
1667        /// 0 selects no rows, 1.0 selects all rows
1668        selectivity: f64,
1669        /// seed for random number generator, increases by one each time
1670        /// `next_filter` is called
1671        seed: u64,
1672    }
1673    impl RandomFilterBuilder {
1674        /// Build the next filter with the current seed and increment the seed
1675        /// by one.
1676        fn next_filter(&mut self) -> BooleanArray {
1677            assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
1678            let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
1679            self.seed += 1;
1680            BooleanArray::from_iter(
1681                (0..self.num_rows)
1682                    .map(|_| rng.random_bool(self.selectivity))
1683                    .map(Some),
1684            )
1685        }
1686    }
1687
1688    /// Returns the named column as a StringViewArray
1689    fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1690        batch
1691            .column_by_name(name)
1692            .expect("column not found")
1693            .as_string_view_opt()
1694            .expect("column is not a string view")
1695    }
1696
1697    /// Normalize the `RecordBatch` so that the memory layout is consistent
1698    /// (e.g. StringArray is compacted).
1699    fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1700        // Only need to normalize StringViews (as == also tests for memory layout)
1701        let (schema, mut columns, row_count) = batch.into_parts();
1702
1703        for column in columns.iter_mut() {
1704            let Some(string_view) = column.as_string_view_opt() else {
1705                continue;
1706            };
1707
1708            // Re-create the StringViewArray to ensure memory layout is
1709            // consistent
1710            let mut builder = StringViewBuilder::new();
1711            for s in string_view.iter() {
1712                builder.append_option(s);
1713            }
1714            // Update the column with the new StringViewArray
1715            *column = Arc::new(builder.finish());
1716        }
1717
1718        let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1719        RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1720    }
1721
1722    /// Helper function to create a test batch with specified number of rows
1723    fn create_test_batch(num_rows: usize) -> RecordBatch {
1724        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
1725        let array = Int32Array::from_iter_values(0..num_rows as i32);
1726        RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
1727    }
1728    #[test]
1729    fn test_biggest_coalesce_batch_size_none_default() {
1730        // Test that default behavior (None) coalesces all batches
1731        let mut coalescer = BatchCoalescer::new(
1732            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1733            100,
1734        );
1735
1736        // Push a large batch (1000 rows) - should be coalesced normally
1737        let large_batch = create_test_batch(1000);
1738        coalescer.push_batch(large_batch).unwrap();
1739
1740        // Should produce multiple batches of target size (100)
1741        let mut output_batches = vec![];
1742        while let Some(batch) = coalescer.next_completed_batch() {
1743            output_batches.push(batch);
1744        }
1745
1746        coalescer.finish_buffered_batch().unwrap();
1747        while let Some(batch) = coalescer.next_completed_batch() {
1748            output_batches.push(batch);
1749        }
1750
1751        // Should have 10 batches of 100 rows each
1752        assert_eq!(output_batches.len(), 10);
1753        for batch in output_batches {
1754            assert_eq!(batch.num_rows(), 100);
1755        }
1756    }
1757
1758    #[test]
1759    fn test_biggest_coalesce_batch_size_bypass_large_batch() {
1760        // Test that batches larger than biggest_coalesce_batch_size bypass coalescing
1761        let mut coalescer = BatchCoalescer::new(
1762            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1763            100,
1764        );
1765        coalescer.set_biggest_coalesce_batch_size(Some(500));
1766
1767        // Push a large batch (1000 rows) - should bypass coalescing
1768        let large_batch = create_test_batch(1000);
1769        coalescer.push_batch(large_batch.clone()).unwrap();
1770
1771        // Should have one completed batch immediately (the original large batch)
1772        assert!(coalescer.has_completed_batch());
1773        let output_batch = coalescer.next_completed_batch().unwrap();
1774        assert_eq!(output_batch.num_rows(), 1000);
1775
1776        // Should be no more completed batches
1777        assert!(!coalescer.has_completed_batch());
1778        assert_eq!(coalescer.get_buffered_rows(), 0);
1779    }
1780
1781    #[test]
1782    fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
1783        // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally
1784        let mut coalescer = BatchCoalescer::new(
1785            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1786            100,
1787        )
1788        .with_biggest_coalesce_batch_size(Some(500));
1789
1790        // Push small batches that should be coalesced
1791        let small_batch = create_test_batch(50);
1792        coalescer.push_batch(small_batch.clone()).unwrap();
1793
1794        // Should not have completed batch yet (only 50 rows, target is 100)
1795        assert!(!coalescer.has_completed_batch());
1796        assert_eq!(coalescer.get_buffered_rows(), 50);
1797
1798        // Push another small batch
1799        coalescer.push_batch(small_batch).unwrap();
1800
1801        // Now should have a completed batch (100 rows total)
1802        assert!(coalescer.has_completed_batch());
1803        let output_batch = coalescer.next_completed_batch().unwrap();
1804        let size = output_batch
1805            .column(0)
1806            .as_primitive::<Int32Type>()
1807            .get_buffer_memory_size();
1808        assert_eq!(size, 400); // 100 rows * 4 bytes each
1809        assert_eq!(output_batch.num_rows(), 100);
1810
1811        assert_eq!(coalescer.get_buffered_rows(), 0);
1812    }
1813
1814    #[test]
1815    fn test_biggest_coalesce_batch_size_equal_boundary() {
1816        // Test behavior when batch size equals biggest_coalesce_batch_size
1817        let mut coalescer = BatchCoalescer::new(
1818            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1819            100,
1820        );
1821        coalescer.set_biggest_coalesce_batch_size(Some(500));
1822
1823        // Push a batch exactly equal to the limit
1824        let boundary_batch = create_test_batch(500);
1825        coalescer.push_batch(boundary_batch).unwrap();
1826
1827        // Should be coalesced (not bypass) since it's equal, not greater
1828        let mut output_count = 0;
1829        while coalescer.next_completed_batch().is_some() {
1830            output_count += 1;
1831        }
1832
1833        coalescer.finish_buffered_batch().unwrap();
1834        while coalescer.next_completed_batch().is_some() {
1835            output_count += 1;
1836        }
1837
1838        // Should have 5 batches of 100 rows each
1839        assert_eq!(output_count, 5);
1840    }
1841
1842    #[test]
1843    fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
1844        // Test the new consecutive large batch bypass behavior
1845        // Pattern: small batches -> first large batch (coalesced) -> consecutive large batches (bypass)
1846        let mut coalescer = BatchCoalescer::new(
1847            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1848            100,
1849        );
1850        coalescer.set_biggest_coalesce_batch_size(Some(200));
1851
1852        let small_batch = create_test_batch(50);
1853
1854        // Push small batch first to create buffered data
1855        coalescer.push_batch(small_batch).unwrap();
1856        assert_eq!(coalescer.get_buffered_rows(), 50);
1857        assert!(!coalescer.has_completed_batch());
1858
1859        // Push first large batch - should go through normal coalescing due to buffered data
1860        let large_batch1 = create_test_batch(250);
1861        coalescer.push_batch(large_batch1).unwrap();
1862
1863        // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered
1864        let mut completed_batches = vec![];
1865        while let Some(batch) = coalescer.next_completed_batch() {
1866            completed_batches.push(batch);
1867        }
1868        assert_eq!(completed_batches.len(), 3);
1869        assert_eq!(coalescer.get_buffered_rows(), 0);
1870
1871        // Now push consecutive large batches - they should bypass
1872        let large_batch2 = create_test_batch(300);
1873        let large_batch3 = create_test_batch(400);
1874
1875        // Push second large batch - should bypass since it's consecutive and buffer is empty
1876        coalescer.push_batch(large_batch2).unwrap();
1877        assert!(coalescer.has_completed_batch());
1878        let output = coalescer.next_completed_batch().unwrap();
1879        assert_eq!(output.num_rows(), 300); // bypassed with original size
1880        assert_eq!(coalescer.get_buffered_rows(), 0);
1881
1882        // Push third large batch - should also bypass
1883        coalescer.push_batch(large_batch3).unwrap();
1884        assert!(coalescer.has_completed_batch());
1885        let output = coalescer.next_completed_batch().unwrap();
1886        assert_eq!(output.num_rows(), 400); // bypassed with original size
1887        assert_eq!(coalescer.get_buffered_rows(), 0);
1888    }
1889
1890    #[test]
1891    fn test_biggest_coalesce_batch_size_empty_batch() {
1892        // Test that empty batches don't trigger the bypass logic
1893        let mut coalescer = BatchCoalescer::new(
1894            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1895            100,
1896        );
1897        coalescer.set_biggest_coalesce_batch_size(Some(50));
1898
1899        let empty_batch = create_test_batch(0);
1900        coalescer.push_batch(empty_batch).unwrap();
1901
1902        // Empty batch should be handled normally (no effect)
1903        assert!(!coalescer.has_completed_batch());
1904        assert_eq!(coalescer.get_buffered_rows(), 0);
1905    }
1906
1907    #[test]
1908    fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
1909        // Test that when there is buffered data, large batches do NOT bypass (unless consecutive)
1910        let mut coalescer = BatchCoalescer::new(
1911            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1912            100,
1913        );
1914        coalescer.set_biggest_coalesce_batch_size(Some(200));
1915
1916        // Add some buffered data first
1917        let small_batch = create_test_batch(30);
1918        coalescer.push_batch(small_batch.clone()).unwrap();
1919        coalescer.push_batch(small_batch).unwrap();
1920        assert_eq!(coalescer.get_buffered_rows(), 60);
1921
1922        // Push large batch that would normally bypass, but shouldn't because buffered_rows > 0
1923        let large_batch = create_test_batch(250);
1924        coalescer.push_batch(large_batch).unwrap();
1925
1926        // The large batch should be processed through normal coalescing logic
1927        // Total: 60 (buffered) + 250 (new) = 310 rows
1928        // Output: 3 complete batches of 100 rows each, 10 rows remain buffered
1929
1930        let mut completed_batches = vec![];
1931        while let Some(batch) = coalescer.next_completed_batch() {
1932            completed_batches.push(batch);
1933        }
1934
1935        assert_eq!(completed_batches.len(), 3);
1936        for batch in &completed_batches {
1937            assert_eq!(batch.num_rows(), 100);
1938        }
1939        assert_eq!(coalescer.get_buffered_rows(), 10);
1940    }
1941
1942    #[test]
1943    fn test_biggest_coalesce_batch_size_zero_limit() {
1944        // Test edge case where limit is 0 (all batches bypass when no buffered data)
1945        let mut coalescer = BatchCoalescer::new(
1946            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1947            100,
1948        );
1949        coalescer.set_biggest_coalesce_batch_size(Some(0));
1950
1951        // Even a 1-row batch should bypass when there's no buffered data
1952        let tiny_batch = create_test_batch(1);
1953        coalescer.push_batch(tiny_batch).unwrap();
1954
1955        assert!(coalescer.has_completed_batch());
1956        let output = coalescer.next_completed_batch().unwrap();
1957        assert_eq!(output.num_rows(), 1);
1958    }
1959
1960    #[test]
1961    fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
1962        // Test that bypass only occurs when buffered_rows == 0
1963        let mut coalescer = BatchCoalescer::new(
1964            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1965            100,
1966        );
1967        coalescer.set_biggest_coalesce_batch_size(Some(200));
1968
1969        // First, push a large batch with no buffered data - should bypass
1970        let large_batch = create_test_batch(300);
1971        coalescer.push_batch(large_batch.clone()).unwrap();
1972
1973        assert!(coalescer.has_completed_batch());
1974        let output = coalescer.next_completed_batch().unwrap();
1975        assert_eq!(output.num_rows(), 300); // bypassed
1976        assert_eq!(coalescer.get_buffered_rows(), 0);
1977
1978        // Now add some buffered data
1979        let small_batch = create_test_batch(50);
1980        coalescer.push_batch(small_batch).unwrap();
1981        assert_eq!(coalescer.get_buffered_rows(), 50);
1982
1983        // Push the same large batch again - should NOT bypass this time (not consecutive)
1984        coalescer.push_batch(large_batch).unwrap();
1985
1986        // Should process through normal coalescing: 50 + 300 = 350 rows
1987        // Output: 3 complete batches of 100 rows, 50 rows buffered
1988        let mut completed_batches = vec![];
1989        while let Some(batch) = coalescer.next_completed_batch() {
1990            completed_batches.push(batch);
1991        }
1992
1993        assert_eq!(completed_batches.len(), 3);
1994        for batch in &completed_batches {
1995            assert_eq!(batch.num_rows(), 100);
1996        }
1997        assert_eq!(coalescer.get_buffered_rows(), 50);
1998    }
1999
2000    #[test]
2001    fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
2002        // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600
2003        let mut coalescer = BatchCoalescer::new(
2004            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2005            1000,
2006        );
2007        coalescer.set_biggest_coalesce_batch_size(Some(500));
2008
2009        // Push small batches first
2010        coalescer.push_batch(create_test_batch(20)).unwrap();
2011        coalescer.push_batch(create_test_batch(20)).unwrap();
2012        coalescer.push_batch(create_test_batch(30)).unwrap();
2013
2014        assert_eq!(coalescer.get_buffered_rows(), 70);
2015        assert!(!coalescer.has_completed_batch());
2016
2017        // Push first large batch (700) - should coalesce due to buffered data
2018        coalescer.push_batch(create_test_batch(700)).unwrap();
2019
2020        // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered
2021        assert_eq!(coalescer.get_buffered_rows(), 770);
2022        assert!(!coalescer.has_completed_batch());
2023
2024        // Push second large batch (600) - should bypass since previous was large
2025        coalescer.push_batch(create_test_batch(600)).unwrap();
2026
2027        // Should flush buffer (770 rows) and bypass the 600
2028        let mut outputs = vec![];
2029        while let Some(batch) = coalescer.next_completed_batch() {
2030            outputs.push(batch);
2031        }
2032        assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one bypassed (600)
2033        assert_eq!(outputs[0].num_rows(), 770);
2034        assert_eq!(outputs[1].num_rows(), 600);
2035        assert_eq!(coalescer.get_buffered_rows(), 0);
2036
2037        // Push remaining large batches - should all bypass
2038        let remaining_batches = [700, 900, 700, 600];
2039        for &size in &remaining_batches {
2040            coalescer.push_batch(create_test_batch(size)).unwrap();
2041
2042            assert!(coalescer.has_completed_batch());
2043            let output = coalescer.next_completed_batch().unwrap();
2044            assert_eq!(output.num_rows(), size);
2045            assert_eq!(coalescer.get_buffered_rows(), 0);
2046        }
2047    }
2048
2049    #[test]
2050    fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
2051        // Test truly consecutive large batches that should all bypass
2052        // This test ensures buffer is completely empty between large batches
2053        let mut coalescer = BatchCoalescer::new(
2054            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2055            100,
2056        );
2057        coalescer.set_biggest_coalesce_batch_size(Some(200));
2058
2059        // Push consecutive large batches with no prior buffered data
2060        let large_batches = vec![
2061            create_test_batch(300),
2062            create_test_batch(400),
2063            create_test_batch(350),
2064            create_test_batch(500),
2065        ];
2066
2067        let mut all_outputs = vec![];
2068
2069        for (i, large_batch) in large_batches.into_iter().enumerate() {
2070            let expected_size = large_batch.num_rows();
2071
2072            // Buffer should be empty before each large batch
2073            assert_eq!(
2074                coalescer.get_buffered_rows(),
2075                0,
2076                "Buffer should be empty before batch {}",
2077                i
2078            );
2079
2080            coalescer.push_batch(large_batch).unwrap();
2081
2082            // Each large batch should bypass and produce exactly one output batch
2083            assert!(
2084                coalescer.has_completed_batch(),
2085                "Should have completed batch after pushing batch {}",
2086                i
2087            );
2088
2089            let output = coalescer.next_completed_batch().unwrap();
2090            assert_eq!(
2091                output.num_rows(),
2092                expected_size,
2093                "Batch {} should have bypassed with original size",
2094                i
2095            );
2096
2097            // Should be no more batches and buffer should be empty
2098            assert!(
2099                !coalescer.has_completed_batch(),
2100                "Should have no more completed batches after batch {}",
2101                i
2102            );
2103            assert_eq!(
2104                coalescer.get_buffered_rows(),
2105                0,
2106                "Buffer should be empty after batch {}",
2107                i
2108            );
2109
2110            all_outputs.push(output);
2111        }
2112
2113        // Verify we got exactly 4 output batches with original sizes
2114        assert_eq!(all_outputs.len(), 4);
2115        assert_eq!(all_outputs[0].num_rows(), 300);
2116        assert_eq!(all_outputs[1].num_rows(), 400);
2117        assert_eq!(all_outputs[2].num_rows(), 350);
2118        assert_eq!(all_outputs[3].num_rows(), 500);
2119    }
2120
2121    #[test]
2122    fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
2123        // Test that small batches reset the consecutive large batch tracking
2124        let mut coalescer = BatchCoalescer::new(
2125            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2126            100,
2127        );
2128        coalescer.set_biggest_coalesce_batch_size(Some(200));
2129
2130        // Push first large batch - should bypass (no buffered data)
2131        coalescer.push_batch(create_test_batch(300)).unwrap();
2132        let output = coalescer.next_completed_batch().unwrap();
2133        assert_eq!(output.num_rows(), 300);
2134
2135        // Push second large batch - should bypass (consecutive)
2136        coalescer.push_batch(create_test_batch(400)).unwrap();
2137        let output = coalescer.next_completed_batch().unwrap();
2138        assert_eq!(output.num_rows(), 400);
2139
2140        // Push small batch - resets consecutive tracking
2141        coalescer.push_batch(create_test_batch(50)).unwrap();
2142        assert_eq!(coalescer.get_buffered_rows(), 50);
2143
2144        // Push large batch again - should NOT bypass due to buffered data
2145        coalescer.push_batch(create_test_batch(350)).unwrap();
2146
2147        // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100
2148        let mut outputs = vec![];
2149        while let Some(batch) = coalescer.next_completed_batch() {
2150            outputs.push(batch);
2151        }
2152        assert_eq!(outputs.len(), 4);
2153        for batch in outputs {
2154            assert_eq!(batch.num_rows(), 100);
2155        }
2156        assert_eq!(coalescer.get_buffered_rows(), 0);
2157    }
2158
2159    #[test]
2160    fn test_coalasce_push_batch_with_indices() {
2161        const MID_POINT: u32 = 2333;
2162        const TOTAL_ROWS: u32 = 23333;
2163        let batch1 = uint32_batch_non_null(0..MID_POINT);
2164        let batch2 = uint32_batch_non_null((MID_POINT..TOTAL_ROWS).rev());
2165
2166        let mut coalescer = BatchCoalescer::new(
2167            Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])),
2168            TOTAL_ROWS as usize,
2169        );
2170        coalescer.push_batch(batch1).unwrap();
2171
2172        let rev_indices = (0..((TOTAL_ROWS - MID_POINT) as u64)).rev();
2173        let reversed_indices_batch = uint64_batch_non_null(rev_indices);
2174
2175        let reverse_indices = UInt64Array::from(reversed_indices_batch.column(0).to_data());
2176        coalescer
2177            .push_batch_with_indices(batch2, &reverse_indices)
2178            .unwrap();
2179
2180        coalescer.finish_buffered_batch().unwrap();
2181        let actual = coalescer.next_completed_batch().unwrap();
2182
2183        let expected = uint32_batch_non_null(0..TOTAL_ROWS);
2184
2185        assert_eq!(expected, actual);
2186    }
2187
2188    #[test]
2189    fn test_push_batch_schema_mismatch_fewer_columns() {
2190        // Coalescer expects 0 columns, batch has 1
2191        let empty_schema = Arc::new(Schema::empty());
2192        let mut coalescer = BatchCoalescer::new(empty_schema, 100);
2193        let batch = uint32_batch(0..5);
2194        let result = coalescer.push_batch(batch);
2195        assert!(result.is_err());
2196        let err = result.unwrap_err().to_string();
2197        assert!(
2198            err.contains("Batch has 1 columns but BatchCoalescer expects 0"),
2199            "unexpected error: {err}"
2200        );
2201    }
2202
2203    #[test]
2204    fn test_push_batch_schema_mismatch_more_columns() {
2205        // Coalescer expects 2 columns, batch has 1
2206        let schema = Arc::new(Schema::new(vec![
2207            Field::new("c0", DataType::UInt32, false),
2208            Field::new("c1", DataType::UInt32, false),
2209        ]));
2210        let mut coalescer = BatchCoalescer::new(schema, 100);
2211        let batch = uint32_batch(0..5);
2212        let result = coalescer.push_batch(batch);
2213        assert!(result.is_err());
2214        let err = result.unwrap_err().to_string();
2215        assert!(
2216            err.contains("Batch has 1 columns but BatchCoalescer expects 2"),
2217            "unexpected error: {err}"
2218        );
2219    }
2220
2221    #[test]
2222    fn test_push_batch_schema_mismatch_two_vs_zero() {
2223        // Coalescer expects 0 columns, batch has 2
2224        let empty_schema = Arc::new(Schema::empty());
2225        let mut coalescer = BatchCoalescer::new(empty_schema, 100);
2226        let schema = Arc::new(Schema::new(vec![
2227            Field::new("c0", DataType::UInt32, false),
2228            Field::new("c1", DataType::UInt32, false),
2229        ]));
2230        let batch = RecordBatch::try_new(
2231            schema,
2232            vec![
2233                Arc::new(UInt32Array::from(vec![1, 2, 3])),
2234                Arc::new(UInt32Array::from(vec![4, 5, 6])),
2235            ],
2236        )
2237        .unwrap();
2238        let result = coalescer.push_batch(batch);
2239        assert!(result.is_err());
2240        let err = result.unwrap_err().to_string();
2241        assert!(
2242            err.contains("Batch has 2 columns but BatchCoalescer expects 0"),
2243            "unexpected error: {err}"
2244        );
2245    }
2246}