Skip to main content

arrow_select/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`]  concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use crate::take::take_record_batch;
25use arrow_array::types::{BinaryViewType, StringViewType};
26use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
27use arrow_schema::{ArrowError, DataType, SchemaRef};
28use std::collections::VecDeque;
29use std::sync::Arc;
30// Originally From DataFusion's coalesce module:
31// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
32
33mod byte_view;
34mod generic;
35mod primitive;
36
37use byte_view::InProgressByteViewArray;
38use generic::GenericInProgressArray;
39use primitive::InProgressPrimitiveArray;
40
41/// Concatenate multiple [`RecordBatch`]es
42///
43/// Implements the common pattern of incrementally creating output
44/// [`RecordBatch`]es of a specific size from an input stream of
45/// [`RecordBatch`]es.
46///
47/// This is useful after operations such as [`filter`] and [`take`] that produce
48/// smaller batches, and we want to coalesce them into larger batches for
49/// further processing.
50///
51/// # Motivation
52///
53/// If we use [`concat_batches`] to implement the same functionality, there are 2 potential issues:
54/// 1. At least 2x peak memory (holding the input and output of concat)
55/// 2. 2 copies of the data (to create the output of filter and then create the output of concat)
56///
57/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
58/// about the motivation.
59///
60/// [`filter`]: crate::filter::filter
61/// [`take`]: crate::take::take
62/// [`concat_batches`]: crate::concat::concat_batches
63///
64/// # Example
65/// ```
66/// use arrow_array::record_batch;
67/// use arrow_select::coalesce::{BatchCoalescer};
68/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
69/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
70///
71/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
72/// let target_batch_size = 4;
73/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
74///
75/// // push the batches
76/// coalescer.push_batch(batch1).unwrap();
77/// // only pushed 3 rows (not yet 4, enough to produce a batch)
78/// assert!(coalescer.next_completed_batch().is_none());
79/// coalescer.push_batch(batch2).unwrap();
80/// // now we have 5 rows, so we can produce a batch
81/// let finished = coalescer.next_completed_batch().unwrap();
82/// // 4 rows came out (target batch size is 4)
83/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
84/// assert_eq!(finished, expected);
85///
86/// // Have no more input, but still have an in-progress batch
87/// assert!(coalescer.next_completed_batch().is_none());
88/// // We can finish the batch, which will produce the remaining rows
89/// coalescer.finish_buffered_batch().unwrap();
90/// let expected = record_batch!(("a", Int32, [5])).unwrap();
91/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
92///
93/// // The coalescer is now empty
94/// assert!(coalescer.next_completed_batch().is_none());
95/// ```
96///
97/// # Background
98///
99/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
100/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
101/// there is fixed processing overhead per batch. This coalescer builds up these
102/// larger batches incrementally.
103///
104/// ```text
105/// ┌────────────────────┐
106/// │    RecordBatch     │
107/// │   num_rows = 100   │
108/// └────────────────────┘                 ┌────────────────────┐
109///                                        │                    │
110/// ┌────────────────────┐     Coalesce    │                    │
111/// │                    │      Batches    │                    │
112/// │    RecordBatch     │                 │                    │
113/// │   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
114/// │                    │                 │    RecordBatch     │
115/// │                    │                 │   num_rows = 400   │
116/// └────────────────────┘                 │                    │
117///                                        │                    │
118/// ┌────────────────────┐                 │                    │
119/// │                    │                 │                    │
120/// │    RecordBatch     │                 │                    │
121/// │   num_rows = 100   │                 └────────────────────┘
122/// │                    │
123/// └────────────────────┘
124/// ```
125///
126/// # Notes:
127///
128/// 1. Output rows are produced in the same order as the input rows
129///
130/// 2. The output is a sequence of batches, with all but the last being at exactly
131///    `target_batch_size` rows.
132#[derive(Debug)]
133pub struct BatchCoalescer {
134    /// The input schema
135    schema: SchemaRef,
136    /// The target batch size (and thus size for views allocation). This is a
137    /// hard limit: the output batch will be exactly `target_batch_size`,
138    /// rather than possibly being slightly above.
139    target_batch_size: usize,
140    /// In-progress arrays
141    in_progress_arrays: Vec<Box<dyn InProgressArray>>,
142    /// Buffered row count. Always less than `batch_size`
143    buffered_rows: usize,
144    /// Completed batches
145    completed: VecDeque<RecordBatch>,
146    /// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`]
147    biggest_coalesce_batch_size: Option<usize>,
148}
149
150impl BatchCoalescer {
151    /// Create a new `BatchCoalescer`
152    ///
153    /// # Arguments
154    /// - `schema` - the schema of the output batches
155    /// - `target_batch_size` - the number of rows in each output batch.
156    ///   Typical values are `4096` or `8192` rows.
157    ///
158    pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
159        let in_progress_arrays = schema
160            .fields()
161            .iter()
162            .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
163            .collect::<Vec<_>>();
164
165        Self {
166            schema,
167            target_batch_size,
168            in_progress_arrays,
169            // We will for sure store at least one completed batch
170            completed: VecDeque::with_capacity(1),
171            buffered_rows: 0,
172            biggest_coalesce_batch_size: None,
173        }
174    }
175
176    /// Set the coalesce batch size limit (default `None`)
177    ///
178    /// This limit determine when batches should bypass coalescing. Intuitively,
179    /// batches that are already large are costly to coalesce and are efficient
180    /// enough to process directly without coalescing.
181    ///
182    /// If `Some(limit)`, batches larger than this limit will bypass coalescing
183    /// when there is no buffered data, or when the previously buffered data
184    /// already exceeds this limit.
185    ///
186    /// If `None`, all batches will be coalesced according to the
187    /// target_batch_size.
188    pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
189        self.biggest_coalesce_batch_size = limit;
190        self
191    }
192
193    /// Get the current biggest coalesce batch size limit
194    ///
195    /// See [`Self::with_biggest_coalesce_batch_size`] for details
196    pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
197        self.biggest_coalesce_batch_size
198    }
199
200    /// Set the biggest coalesce batch size limit
201    ///
202    /// See [`Self::with_biggest_coalesce_batch_size`] for details
203    pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
204        self.biggest_coalesce_batch_size = limit;
205    }
206
207    /// Return the schema of the output batches
208    pub fn schema(&self) -> SchemaRef {
209        Arc::clone(&self.schema)
210    }
211
212    /// Push a batch into the Coalescer after applying a filter
213    ///
214    /// This is semantically equivalent of calling [`Self::push_batch`]
215    /// with the results from  [`filter_record_batch`]
216    ///
217    /// # Example
218    /// ```
219    /// # use arrow_array::{record_batch, BooleanArray};
220    /// # use arrow_select::coalesce::BatchCoalescer;
221    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
222    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
223    /// // Apply a filter to each batch to pick the first and last row
224    /// let filter = BooleanArray::from(vec![true, false, true]);
225    /// // create a new Coalescer that targets creating 1000 row batches
226    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
227    /// coalescer.push_batch_with_filter(batch1, &filter);
228    /// coalescer.push_batch_with_filter(batch2, &filter);
229    /// // finsh and retrieve the created batch
230    /// coalescer.finish_buffered_batch().unwrap();
231    /// let completed_batch = coalescer.next_completed_batch().unwrap();
232    /// // filtered out 2 and 5:
233    /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
234    /// assert_eq!(completed_batch, expected_batch);
235    /// ```
236    pub fn push_batch_with_filter(
237        &mut self,
238        batch: RecordBatch,
239        filter: &BooleanArray,
240    ) -> Result<(), ArrowError> {
241        // TODO: optimize this to avoid materializing (copying the results
242        // of filter to a new batch)
243        let filtered_batch = filter_record_batch(&batch, filter)?;
244        self.push_batch(filtered_batch)
245    }
246
247    /// Push a batch into the Coalescer after applying a set of indices
248    /// This is semantically equivalent of calling [`Self::push_batch`]
249    /// with the results from  [`take_record_batch`]
250    ///
251    /// # Example
252    /// ```
253    /// # use arrow_array::{record_batch, UInt64Array};
254    /// # use arrow_select::coalesce::BatchCoalescer;
255    /// let batch1 = record_batch!(("a", Int32, [0, 0, 0])).unwrap();
256    /// let batch2 = record_batch!(("a", Int32, [1, 1, 4, 5, 1, 4])).unwrap();
257    /// // Sorted indices to create a sorted output, this can be obtained with
258    /// // `arrow-ord`'s sort_to_indices operation
259    /// let indices = UInt64Array::from(vec![0, 1, 4, 2, 5, 3]);
260    /// // create a new Coalescer that targets creating 1000 row batches
261    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
262    /// coalescer.push_batch(batch1);
263    /// coalescer.push_batch_with_indices(batch2, &indices);
264    /// // finsh and retrieve the created batch
265    /// coalescer.finish_buffered_batch().unwrap();
266    /// let completed_batch = coalescer.next_completed_batch().unwrap();
267    /// let expected_batch = record_batch!(("a", Int32, [0, 0, 0, 1, 1, 1, 4, 4, 5])).unwrap();
268    /// assert_eq!(completed_batch, expected_batch);
269    /// ```
270    pub fn push_batch_with_indices(
271        &mut self,
272        batch: RecordBatch,
273        indices: &dyn Array,
274    ) -> Result<(), ArrowError> {
275        // todo: optimize this to avoid materializing (copying the results of take indices to a new batch)
276        let taken_batch = take_record_batch(&batch, indices)?;
277        self.push_batch(taken_batch)
278    }
279
280    /// Push all the rows from `batch` into the Coalescer
281    ///
282    /// When buffered data plus incoming rows reach `target_batch_size` ,
283    /// completed batches are generated eagerly and can be retrieved via
284    /// [`Self::next_completed_batch()`].
285    /// Output batches contain exactly `target_batch_size` rows, so the tail of
286    /// the input batch may remain buffered.
287    /// Remaining partial data either waits for future input batches or can be
288    /// materialized immediately by calling [`Self::finish_buffered_batch()`].
289    ///
290    /// # Example
291    /// ```
292    /// # use arrow_array::record_batch;
293    /// # use arrow_select::coalesce::BatchCoalescer;
294    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
295    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
296    /// // create a new Coalescer that targets creating 1000 row batches
297    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
298    /// coalescer.push_batch(batch1);
299    /// coalescer.push_batch(batch2);
300    /// // finsh and retrieve the created batch
301    /// coalescer.finish_buffered_batch().unwrap();
302    /// let completed_batch = coalescer.next_completed_batch().unwrap();
303    /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
304    /// assert_eq!(completed_batch, expected_batch);
305    /// ```
306    pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
307        // Large batch bypass optimization:
308        // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
309        // we can avoid expensive split-and-merge operations by passing it through directly.
310        //
311        // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
312        // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
313        // If not set (None), ALL batches follow normal coalescing behavior regardless of size.
314
315        // =============================================================================
316        // CASE 1: No buffer + large batch → Direct bypass
317        // =============================================================================
318        // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
319        // Input sequence: [600, 1200, 300]
320        //
321        // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
322        //   600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
323        //        → output: [600] (bypass, preserves large batch)
324        //   1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
325        //         → output: [1200] (bypass, preserves large batch)
326        //   300 → normal batch, buffer: [300]
327        //   Result: [600], [1200], [300] - large batches preserved, mixed sizes
328
329        // =============================================================================
330        // CASE 2: Buffer too large + large batch → Flush first, then bypass
331        // =============================================================================
332        // This case prevents creating extremely large merged batches that would
333        // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
334        //
335        // Example 1: Buffer exceeds limit before large batch arrives
336        // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
337        // Input: [350, 200, 800]
338        //
339        // Step 1: push_batch([350])
340        //   → batch_size=350 <= 400, normal path
341        //   → buffer: [350], buffered_rows=350
342        //
343        // Step 2: push_batch([200])
344        //   → batch_size=200 <= 400, normal path
345        //   → buffer: [350, 200], buffered_rows=550
346        //
347        // Step 3: push_batch([800])
348        //   → batch_size=800 > 400, large batch path
349        //   → buffered_rows=550 > 400 → Case 2: flush first
350        //   → flush: output [550] (combined [350, 200])
351        //   → then bypass: output [800]
352        //   Result: [550], [800] - buffer flushed to prevent oversized merge
353        //
354        // Example 2: Multiple small batches accumulate before large batch
355        // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
356        // Input: [150, 100, 80, 900]
357        //
358        // Step 1-3: Accumulate small batches
359        //   150 → buffer: [150], buffered_rows=150
360        //   100 → buffer: [150, 100], buffered_rows=250
361        //   80  → buffer: [150, 100, 80], buffered_rows=330
362        //
363        // Step 4: push_batch([900])
364        //   → batch_size=900 > 300, large batch path
365        //   → buffered_rows=330 > 300 → Case 2: flush first
366        //   → flush: output [330] (combined [150, 100, 80])
367        //   → then bypass: output [900]
368        //   Result: [330], [900] - prevents merge into [1230] which would be too large
369
370        // =============================================================================
371        // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
372        // =============================================================================
373        // When buffer is small enough, we still merge to maintain efficiency
374        // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
375        // Input: [300, 1200]
376        //
377        // Step 1: push_batch([300])
378        //   → batch_size=300 <= 500, normal path
379        //   → buffer: [300], buffered_rows=300
380        //
381        // Step 2: push_batch([1200])
382        //   → batch_size=1200 > 500, large batch path
383        //   → buffered_rows=300 <= 500 → Case 3: normal merge
384        //   → buffer: [300, 1200] (1500 total)
385        //   → 1500 > target_batch_size → split: output [1000], buffer [500]
386        //   Result: [1000], [500] - normal split/merge behavior maintained
387
388        // =============================================================================
389        // Comparison: Default vs Optimized Behavior
390        // =============================================================================
391        // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
392        // Input: [600, 1200, 300]
393        //
394        // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
395        //   600 → buffer: [600]
396        //   1200 → buffer: [600, 1200] (1800 rows total)
397        //         → split: output [1000 rows], buffer [800 rows remaining]
398        //   300 → buffer: [800, 300] (1100 rows total)
399        //        → split: output [1000 rows], buffer [100 rows remaining]
400        //   Result: [1000], [1000], [100] - all outputs respect target_batch_size
401        //
402        // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
403        //   600 → Case 1: direct bypass → output: [600]
404        //   1200 → Case 1: direct bypass → output: [1200]
405        //   300 → normal path → buffer: [300]
406        //   Result: [600], [1200], [300] - large batches preserved
407
408        // =============================================================================
409        // Benefits and Trade-offs
410        // =============================================================================
411        // Benefits of the optimization:
412        // - Large batches stay intact (better for downstream vectorized processing)
413        // - Fewer split/merge operations (better CPU performance)
414        // - More predictable memory usage patterns
415        // - Maintains streaming efficiency while preserving batch boundaries
416        //
417        // Trade-offs:
418        // - Output batch sizes become variable (not always target_batch_size)
419        // - May produce smaller partial batches when flushing before large batches
420        // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
421
422        // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
423        // small batches together?
424
425        let batch_size = batch.num_rows();
426
427        // Fast path: skip empty batches
428        if batch_size == 0 {
429            return Ok(());
430        }
431
432        // Large batch optimization: bypass coalescing for oversized batches
433        if let Some(limit) = self.biggest_coalesce_batch_size {
434            if batch_size > limit {
435                // Case 1: No buffered data - emit large batch directly
436                // Example: [] + [1200] → output [1200], buffer []
437                if self.buffered_rows == 0 {
438                    self.completed.push_back(batch);
439                    return Ok(());
440                }
441
442                // Case 2: Buffer too large - flush then emit to avoid oversized merge
443                // Example: [850] + [1200] → output [850], then output [1200]
444                // This prevents creating batches much larger than both target_batch_size
445                // and biggest_coalesce_batch_size, which could cause memory issues
446                if self.buffered_rows > limit {
447                    self.finish_buffered_batch()?;
448                    self.completed.push_back(batch);
449                    return Ok(());
450                }
451
452                // Case 3: Small buffer - proceed with normal coalescing
453                // Example: [300] + [1200] → split and merge normally
454                // This ensures small batches still get properly coalesced
455                // while allowing some controlled growth beyond the limit
456            }
457        }
458
459        let (_schema, arrays, mut num_rows) = batch.into_parts();
460
461        // setup input rows
462        assert_eq!(arrays.len(), self.in_progress_arrays.len());
463        self.in_progress_arrays
464            .iter_mut()
465            .zip(arrays)
466            .for_each(|(in_progress, array)| {
467                in_progress.set_source(Some(array));
468            });
469
470        // If pushing this batch would exceed the target batch size,
471        // finish the current batch and start a new one
472        let mut offset = 0;
473        while num_rows > (self.target_batch_size - self.buffered_rows) {
474            let remaining_rows = self.target_batch_size - self.buffered_rows;
475            debug_assert!(remaining_rows > 0);
476
477            // Copy remaining_rows from each array
478            for in_progress in self.in_progress_arrays.iter_mut() {
479                in_progress.copy_rows(offset, remaining_rows)?;
480            }
481
482            self.buffered_rows += remaining_rows;
483            offset += remaining_rows;
484            num_rows -= remaining_rows;
485
486            self.finish_buffered_batch()?;
487        }
488
489        // Add any the remaining rows to the buffer
490        self.buffered_rows += num_rows;
491        if num_rows > 0 {
492            for in_progress in self.in_progress_arrays.iter_mut() {
493                in_progress.copy_rows(offset, num_rows)?;
494            }
495        }
496
497        // If we have reached the target batch size, finalize the buffered batch
498        if self.buffered_rows >= self.target_batch_size {
499            self.finish_buffered_batch()?;
500        }
501
502        // clear in progress sources (to allow the memory to be freed)
503        for in_progress in self.in_progress_arrays.iter_mut() {
504            in_progress.set_source(None);
505        }
506
507        Ok(())
508    }
509
510    /// Returns the number of buffered rows
511    pub fn get_buffered_rows(&self) -> usize {
512        self.buffered_rows
513    }
514
515    /// Concatenates any buffered batches into a single `RecordBatch` and
516    /// clears any output buffers
517    ///
518    /// Normally this is called when the input stream is exhausted, and
519    /// we want to finalize the last batch of rows.
520    ///
521    /// See [`Self::next_completed_batch()`] for the completed batches.
522    pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
523        if self.buffered_rows == 0 {
524            return Ok(());
525        }
526        let new_arrays = self
527            .in_progress_arrays
528            .iter_mut()
529            .map(|array| array.finish())
530            .collect::<Result<Vec<_>, ArrowError>>()?;
531
532        for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
533            debug_assert_eq!(array.data_type(), field.data_type());
534            debug_assert_eq!(array.len(), self.buffered_rows);
535        }
536
537        // SAFETY: each array was created of the correct type and length.
538        let batch = unsafe {
539            RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
540        };
541
542        self.buffered_rows = 0;
543        self.completed.push_back(batch);
544        Ok(())
545    }
546
547    /// Returns true if there is any buffered data
548    pub fn is_empty(&self) -> bool {
549        self.buffered_rows == 0 && self.completed.is_empty()
550    }
551
552    /// Returns true if there are any completed batches
553    pub fn has_completed_batch(&self) -> bool {
554        !self.completed.is_empty()
555    }
556
557    /// Removes and returns the next completed batch, if any.
558    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
559        self.completed.pop_front()
560    }
561}
562
563/// Return a new `InProgressArray` for the given data type
564fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
565    macro_rules! instantiate_primitive {
566        ($t:ty) => {
567            Box::new(InProgressPrimitiveArray::<$t>::new(
568                batch_size,
569                data_type.clone(),
570            ))
571        };
572    }
573
574    downcast_primitive! {
575        // Instantiate InProgressPrimitiveArray for each primitive type
576        data_type => (instantiate_primitive),
577        DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
578        DataType::BinaryView => {
579            Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
580        }
581        _ => Box::new(GenericInProgressArray::new()),
582    }
583}
584
585/// Incrementally builds up arrays
586///
587/// [`GenericInProgressArray`] is the default implementation that buffers
588/// arrays and uses other kernels concatenates them when finished.
589///
590/// Some types have specialized implementations for this array types (e.g.,
591/// [`StringViewArray`], etc.).
592///
593/// [`StringViewArray`]: arrow_array::StringViewArray
594trait InProgressArray: std::fmt::Debug + Send + Sync {
595    /// Set the source array.
596    ///
597    /// Calls to [`Self::copy_rows`] will copy rows from this array into the
598    /// current in-progress array
599    fn set_source(&mut self, source: Option<ArrayRef>);
600
601    /// Copy rows from the current source array into the in-progress array
602    ///
603    /// The source array is set by [`Self::set_source`].
604    ///
605    /// Return an error if the source array is not set
606    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
607
608    /// Finish the currently in-progress array and return it as an `ArrayRef`
609    fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use crate::concat::concat_batches;
616    use arrow_array::builder::StringViewBuilder;
617    use arrow_array::cast::AsArray;
618    use arrow_array::types::Int32Type;
619    use arrow_array::{
620        BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
621        TimestampNanosecondArray, UInt32Array, UInt64Array, make_array,
622    };
623    use arrow_buffer::BooleanBufferBuilder;
624    use arrow_schema::{DataType, Field, Schema};
625    use rand::{Rng, SeedableRng};
626    use std::ops::Range;
627
628    #[test]
629    fn test_coalesce() {
630        let batch = uint32_batch(0..8);
631        Test::new("coalesce")
632            .with_batches(std::iter::repeat_n(batch, 10))
633            // expected output is exactly 21 rows (except for the final batch)
634            .with_batch_size(21)
635            .with_expected_output_sizes(vec![21, 21, 21, 17])
636            .run();
637    }
638
639    #[test]
640    fn test_coalesce_one_by_one() {
641        let batch = uint32_batch(0..1); // single row input
642        Test::new("coalesce_one_by_one")
643            .with_batches(std::iter::repeat_n(batch, 97))
644            // expected output is exactly 20 rows (except for the final batch)
645            .with_batch_size(20)
646            .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
647            .run();
648    }
649
650    #[test]
651    fn test_coalesce_empty() {
652        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
653
654        Test::new("coalesce_empty")
655            .with_batches(vec![])
656            .with_schema(schema)
657            .with_batch_size(21)
658            .with_expected_output_sizes(vec![])
659            .run();
660    }
661
662    #[test]
663    fn test_single_large_batch_greater_than_target() {
664        // test a single large batch
665        let batch = uint32_batch(0..4096);
666        Test::new("coalesce_single_large_batch_greater_than_target")
667            .with_batch(batch)
668            .with_batch_size(1000)
669            .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
670            .run();
671    }
672
673    #[test]
674    fn test_single_large_batch_smaller_than_target() {
675        // test a single large batch
676        let batch = uint32_batch(0..4096);
677        Test::new("coalesce_single_large_batch_smaller_than_target")
678            .with_batch(batch)
679            .with_batch_size(8192)
680            .with_expected_output_sizes(vec![4096])
681            .run();
682    }
683
684    #[test]
685    fn test_single_large_batch_equal_to_target() {
686        // test a single large batch
687        let batch = uint32_batch(0..4096);
688        Test::new("coalesce_single_large_batch_equal_to_target")
689            .with_batch(batch)
690            .with_batch_size(4096)
691            .with_expected_output_sizes(vec![4096])
692            .run();
693    }
694
695    #[test]
696    fn test_single_large_batch_equally_divisible_in_target() {
697        // test a single large batch
698        let batch = uint32_batch(0..4096);
699        Test::new("coalesce_single_large_batch_equally_divisible_in_target")
700            .with_batch(batch)
701            .with_batch_size(1024)
702            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
703            .run();
704    }
705
706    #[test]
707    fn test_empty_schema() {
708        let schema = Schema::empty();
709        let batch = RecordBatch::new_empty(schema.into());
710        Test::new("coalesce_empty_schema")
711            .with_batch(batch)
712            .with_expected_output_sizes(vec![])
713            .run();
714    }
715
716    /// Coalesce multiple batches, 80k rows, with a 0.1% selectivity filter
717    #[test]
718    fn test_coalesce_filtered_001() {
719        let mut filter_builder = RandomFilterBuilder {
720            num_rows: 8000,
721            selectivity: 0.001,
722            seed: 0,
723        };
724
725        // add 10 batches of 8000 rows each
726        // 80k rows, selecting 0.1% means 80 rows
727        // not exactly 80 as the rows are random;
728        let mut test = Test::new("coalesce_filtered_001");
729        for _ in 0..10 {
730            test = test
731                .with_batch(multi_column_batch(0..8000))
732                .with_filter(filter_builder.next_filter())
733        }
734        test.with_batch_size(15)
735            .with_expected_output_sizes(vec![15, 15, 15, 13])
736            .run();
737    }
738
739    /// Coalesce multiple batches, 80k rows, with a 1% selectivity filter
740    #[test]
741    fn test_coalesce_filtered_01() {
742        let mut filter_builder = RandomFilterBuilder {
743            num_rows: 8000,
744            selectivity: 0.01,
745            seed: 0,
746        };
747
748        // add 10 batches of 8000 rows each
749        // 80k rows, selecting 1% means 800 rows
750        // not exactly 800 as the rows are random;
751        let mut test = Test::new("coalesce_filtered_01");
752        for _ in 0..10 {
753            test = test
754                .with_batch(multi_column_batch(0..8000))
755                .with_filter(filter_builder.next_filter())
756        }
757        test.with_batch_size(128)
758            .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
759            .run();
760    }
761
762    /// Coalesce multiple batches, 80k rows, with a 10% selectivity filter
763    #[test]
764    fn test_coalesce_filtered_10() {
765        let mut filter_builder = RandomFilterBuilder {
766            num_rows: 8000,
767            selectivity: 0.1,
768            seed: 0,
769        };
770
771        // add 10 batches of 8000 rows each
772        // 80k rows, selecting 10% means 8000 rows
773        // not exactly 800 as the rows are random;
774        let mut test = Test::new("coalesce_filtered_10");
775        for _ in 0..10 {
776            test = test
777                .with_batch(multi_column_batch(0..8000))
778                .with_filter(filter_builder.next_filter())
779        }
780        test.with_batch_size(1024)
781            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
782            .run();
783    }
784
785    /// Coalesce multiple batches, 8k rows, with a 90% selectivity filter
786    #[test]
787    fn test_coalesce_filtered_90() {
788        let mut filter_builder = RandomFilterBuilder {
789            num_rows: 800,
790            selectivity: 0.90,
791            seed: 0,
792        };
793
794        // add 10 batches of 800 rows each
795        // 8k rows, selecting 99% means 7200 rows
796        // not exactly 7200 as the rows are random;
797        let mut test = Test::new("coalesce_filtered_90");
798        for _ in 0..10 {
799            test = test
800                .with_batch(multi_column_batch(0..800))
801                .with_filter(filter_builder.next_filter())
802        }
803        test.with_batch_size(1024)
804            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
805            .run();
806    }
807
808    /// Coalesce multiple batches, 8k rows, with mixed filers, including 100%
809    #[test]
810    fn test_coalesce_filtered_mixed() {
811        let mut filter_builder = RandomFilterBuilder {
812            num_rows: 800,
813            selectivity: 0.90,
814            seed: 0,
815        };
816
817        let mut test = Test::new("coalesce_filtered_mixed");
818        for _ in 0..3 {
819            // also add in a batch that selects almost all rows and when
820            // sliced will have some batches that are entirely used
821            let mut all_filter_builder = BooleanBufferBuilder::new(1000);
822            all_filter_builder.append_n(500, true);
823            all_filter_builder.append_n(1, false);
824            all_filter_builder.append_n(499, false);
825            let all_filter = all_filter_builder.build();
826
827            test = test
828                .with_batch(multi_column_batch(0..1000))
829                .with_filter(BooleanArray::from(all_filter))
830                .with_batch(multi_column_batch(0..800))
831                .with_filter(filter_builder.next_filter());
832            // decrease selectivity
833            filter_builder.selectivity *= 0.6;
834        }
835
836        // use a small batch size to ensure the filter is appended in slices
837        // and some of those slides will select the entire thing.
838        test.with_batch_size(250)
839            .with_expected_output_sizes(vec![
840                250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 179,
841            ])
842            .run();
843    }
844
845    #[test]
846    fn test_coalesce_non_null() {
847        Test::new("coalesce_non_null")
848            // 4040 rows of unit32
849            .with_batch(uint32_batch_non_null(0..3000))
850            .with_batch(uint32_batch_non_null(0..1040))
851            .with_batch_size(1024)
852            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
853            .run();
854    }
855    #[test]
856    fn test_utf8_split() {
857        Test::new("coalesce_utf8")
858            // 4040 rows of utf8 strings in total, split into batches of 1024
859            .with_batch(utf8_batch(0..3000))
860            .with_batch(utf8_batch(0..1040))
861            .with_batch_size(1024)
862            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
863            .run();
864    }
865
866    #[test]
867    fn test_string_view_no_views() {
868        let output_batches = Test::new("coalesce_string_view_no_views")
869            // both input batches have no views, so no need to compact
870            .with_batch(stringview_batch([Some("foo"), Some("bar")]))
871            .with_batch(stringview_batch([Some("baz"), Some("qux")]))
872            .with_expected_output_sizes(vec![4])
873            .run();
874
875        expect_buffer_layout(
876            col_as_string_view("c0", output_batches.first().unwrap()),
877            vec![],
878        );
879    }
880
881    #[test]
882    fn test_string_view_batch_small_no_compact() {
883        // view with only short strings (no buffers) --> no need to compact
884        let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
885        let output_batches = Test::new("coalesce_string_view_batch_small_no_compact")
886            .with_batch(batch.clone())
887            .with_expected_output_sizes(vec![1000])
888            .run();
889
890        let array = col_as_string_view("c0", &batch);
891        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
892        assert_eq!(array.data_buffers().len(), 0);
893        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
894
895        expect_buffer_layout(gc_array, vec![]);
896    }
897
898    #[test]
899    fn test_string_view_batch_large_no_compact() {
900        // view with large strings (has buffers) but full --> no need to compact
901        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
902        let output_batches = Test::new("coalesce_string_view_batch_large_no_compact")
903            .with_batch(batch.clone())
904            .with_batch_size(1000)
905            .with_expected_output_sizes(vec![1000])
906            .run();
907
908        let array = col_as_string_view("c0", &batch);
909        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
910        assert_eq!(array.data_buffers().len(), 5);
911        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
912
913        expect_buffer_layout(
914            gc_array,
915            vec![
916                ExpectedLayout {
917                    len: 8190,
918                    capacity: 8192,
919                },
920                ExpectedLayout {
921                    len: 8190,
922                    capacity: 8192,
923                },
924                ExpectedLayout {
925                    len: 8190,
926                    capacity: 8192,
927                },
928                ExpectedLayout {
929                    len: 8190,
930                    capacity: 8192,
931                },
932                ExpectedLayout {
933                    len: 2240,
934                    capacity: 8192,
935                },
936            ],
937        );
938    }
939
940    #[test]
941    fn test_string_view_batch_small_with_buffers_no_compact() {
942        // view with buffers but only short views
943        let short_strings = std::iter::repeat(Some("SmallString"));
944        let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
945        // 20 short strings, then a long ones
946        let values = short_strings.take(20).chain(long_strings);
947        let batch = stringview_batch_repeated(1000, values)
948            // take only 10 short strings (no long ones)
949            .slice(5, 10);
950        let output_batches = Test::new("coalesce_string_view_batch_small_with_buffers_no_compact")
951            .with_batch(batch.clone())
952            .with_batch_size(1000)
953            .with_expected_output_sizes(vec![10])
954            .run();
955
956        let array = col_as_string_view("c0", &batch);
957        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
958        assert_eq!(array.data_buffers().len(), 1); // input has one buffer
959        assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
960    }
961
962    #[test]
963    fn test_string_view_batch_large_slice_compact() {
964        // view with large strings (has buffers) and only partially used  --> no need to compact
965        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
966            // slice only 22 rows, so most of the buffer is not used
967            .slice(11, 22);
968
969        let output_batches = Test::new("coalesce_string_view_batch_large_slice_compact")
970            .with_batch(batch.clone())
971            .with_batch_size(1000)
972            .with_expected_output_sizes(vec![22])
973            .run();
974
975        let array = col_as_string_view("c0", &batch);
976        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
977        assert_eq!(array.data_buffers().len(), 5);
978
979        expect_buffer_layout(
980            gc_array,
981            vec![ExpectedLayout {
982                len: 770,
983                capacity: 8192,
984            }],
985        );
986    }
987
988    #[test]
989    fn test_string_view_mixed() {
990        let large_view_batch =
991            stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
992        let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
993        let mixed_batch = stringview_batch_repeated(
994            1000,
995            [Some("This string is longer than 12 bytes"), Some("Small")],
996        );
997        let mixed_batch_nulls = stringview_batch_repeated(
998            1000,
999            [
1000                Some("This string is longer than 12 bytes"),
1001                Some("Small"),
1002                None,
1003            ],
1004        );
1005
1006        // Several batches with mixed inline / non inline
1007        // 4k rows in
1008        let output_batches = Test::new("coalesce_string_view_mixed")
1009            .with_batch(large_view_batch.clone())
1010            .with_batch(small_view_batch)
1011            // this batch needs to be compacted (less than 1/2 full)
1012            .with_batch(large_view_batch.slice(10, 20))
1013            .with_batch(mixed_batch_nulls)
1014            // this batch needs to be compacted (less than 1/2 full)
1015            .with_batch(large_view_batch.slice(10, 20))
1016            .with_batch(mixed_batch)
1017            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
1018            .run();
1019
1020        expect_buffer_layout(
1021            col_as_string_view("c0", output_batches.first().unwrap()),
1022            vec![
1023                ExpectedLayout {
1024                    len: 8190,
1025                    capacity: 8192,
1026                },
1027                ExpectedLayout {
1028                    len: 8190,
1029                    capacity: 8192,
1030                },
1031                ExpectedLayout {
1032                    len: 8190,
1033                    capacity: 8192,
1034                },
1035                ExpectedLayout {
1036                    len: 8190,
1037                    capacity: 8192,
1038                },
1039                ExpectedLayout {
1040                    len: 2240,
1041                    capacity: 8192,
1042                },
1043            ],
1044        );
1045    }
1046
1047    #[test]
1048    fn test_string_view_many_small_compact() {
1049        // 200 rows alternating long (28) and short (≤12) strings.
1050        // Only the 100 long strings go into data buffers: 100 × 28 = 2800.
1051        let batch = stringview_batch_repeated(
1052            200,
1053            [Some("This string is 28 bytes long"), Some("small string")],
1054        );
1055        let output_batches = Test::new("coalesce_string_view_many_small_compact")
1056            // First allocated buffer is 8kb.
1057            // Appending 10 batches of 2800 bytes will use 2800 * 10 = 14kb (8kb, an 16kb and 32kbkb)
1058            .with_batch(batch.clone())
1059            .with_batch(batch.clone())
1060            .with_batch(batch.clone())
1061            .with_batch(batch.clone())
1062            .with_batch(batch.clone())
1063            .with_batch(batch.clone())
1064            .with_batch(batch.clone())
1065            .with_batch(batch.clone())
1066            .with_batch(batch.clone())
1067            .with_batch(batch.clone())
1068            .with_batch_size(8000)
1069            .with_expected_output_sizes(vec![2000]) // only 1000 rows total
1070            .run();
1071
1072        // expect a nice even distribution of buffers
1073        expect_buffer_layout(
1074            col_as_string_view("c0", output_batches.first().unwrap()),
1075            vec![
1076                ExpectedLayout {
1077                    len: 8176,
1078                    capacity: 8192,
1079                },
1080                ExpectedLayout {
1081                    len: 16380,
1082                    capacity: 16384,
1083                },
1084                ExpectedLayout {
1085                    len: 3444,
1086                    capacity: 32768,
1087                },
1088            ],
1089        );
1090    }
1091
1092    #[test]
1093    fn test_string_view_many_small_boundary() {
1094        // The strings are designed to exactly fit into buffers that are powers of 2 long
1095        let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1096        let output_batches = Test::new("coalesce_string_view_many_small_boundary")
1097            .with_batches(std::iter::repeat_n(batch, 20))
1098            .with_batch_size(900)
1099            .with_expected_output_sizes(vec![900, 900, 200])
1100            .run();
1101
1102        // expect each buffer to be entirely full except the last one
1103        expect_buffer_layout(
1104            col_as_string_view("c0", output_batches.first().unwrap()),
1105            vec![
1106                ExpectedLayout {
1107                    len: 8192,
1108                    capacity: 8192,
1109                },
1110                ExpectedLayout {
1111                    len: 16384,
1112                    capacity: 16384,
1113                },
1114                ExpectedLayout {
1115                    len: 4224,
1116                    capacity: 32768,
1117                },
1118            ],
1119        );
1120    }
1121
1122    #[test]
1123    fn test_string_view_large_small() {
1124        // The strings are 37 bytes long, so each batch has 100 * 28 = 2800 bytes
1125        let mixed_batch = stringview_batch_repeated(
1126            200,
1127            [Some("This string is 28 bytes long"), Some("small string")],
1128        );
1129        // These strings aren't copied, this array has an 8k buffer
1130        let all_large = stringview_batch_repeated(
1131            50,
1132            [Some(
1133                "This buffer has only large strings in it so there are no buffer copies",
1134            )],
1135        );
1136
1137        let output_batches = Test::new("coalesce_string_view_large_small")
1138            // First allocated buffer is 8kb.
1139            // Appending five batches of 2800 bytes will use 2800 * 10 = 28kb (8kb, an 16kb and 32kbkb)
1140            .with_batch(mixed_batch.clone())
1141            .with_batch(mixed_batch.clone())
1142            .with_batch(all_large.clone())
1143            .with_batch(mixed_batch.clone())
1144            .with_batch(all_large.clone())
1145            .with_batch(mixed_batch.clone())
1146            .with_batch(mixed_batch.clone())
1147            .with_batch(all_large.clone())
1148            .with_batch(mixed_batch.clone())
1149            .with_batch(all_large.clone())
1150            .with_batch_size(8000)
1151            .with_expected_output_sizes(vec![1400])
1152            .run();
1153
1154        expect_buffer_layout(
1155            col_as_string_view("c0", output_batches.first().unwrap()),
1156            vec![
1157                ExpectedLayout {
1158                    len: 8190,
1159                    capacity: 8192,
1160                },
1161                ExpectedLayout {
1162                    len: 16366,
1163                    capacity: 16384,
1164                },
1165                ExpectedLayout {
1166                    len: 6244,
1167                    capacity: 32768,
1168                },
1169            ],
1170        );
1171    }
1172
1173    #[test]
1174    fn test_binary_view() {
1175        let values: Vec<Option<&[u8]>> = vec![
1176            Some(b"foo"),
1177            None,
1178            Some(b"A longer string that is more than 12 bytes"),
1179        ];
1180
1181        let binary_view =
1182            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1183        let batch =
1184            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1185
1186        Test::new("coalesce_binary_view")
1187            .with_batch(batch.clone())
1188            .with_batch(batch.clone())
1189            .with_batch_size(512)
1190            .with_expected_output_sizes(vec![512, 512, 512, 464])
1191            .run();
1192    }
1193
1194    #[derive(Debug, Clone, PartialEq)]
1195    struct ExpectedLayout {
1196        len: usize,
1197        capacity: usize,
1198    }
1199
1200    /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
1201    fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1202        let actual = array
1203            .data_buffers()
1204            .iter()
1205            .map(|b| ExpectedLayout {
1206                len: b.len(),
1207                capacity: b.capacity(),
1208            })
1209            .collect::<Vec<_>>();
1210
1211        assert_eq!(
1212            actual, expected,
1213            "Expected buffer layout {expected:#?} but got {actual:#?}"
1214        );
1215    }
1216
1217    /// Test for [`BatchCoalescer`]
1218    ///
1219    /// Pushes the input batches to the coalescer and verifies that the resulting
1220    /// batches have the
1221    /// 1. expected number of rows
1222    /// 2. The same results when the batches are filtered using the filter kernel
1223    #[derive(Debug, Clone)]
1224    struct Test {
1225        /// A human readable name to assist in debugging
1226        name: String,
1227        /// Batches to feed to the coalescer.
1228        input_batches: Vec<RecordBatch>,
1229        /// Filters to apply to the corresponding input batches.
1230        ///
1231        /// If there are no filters for the input batches, the batch will be
1232        /// pushed as is.
1233        filters: Vec<BooleanArray>,
1234        /// The schema. If not provided, the first batch's schema is used.
1235        schema: Option<SchemaRef>,
1236        /// Expected output sizes of the resulting batches
1237        expected_output_sizes: Vec<usize>,
1238        /// target batch size (default to 1024)
1239        target_batch_size: usize,
1240    }
1241
1242    impl Default for Test {
1243        fn default() -> Self {
1244            Self {
1245                name: "".to_string(),
1246                input_batches: vec![],
1247                filters: vec![],
1248                schema: None,
1249                expected_output_sizes: vec![],
1250                target_batch_size: 1024,
1251            }
1252        }
1253    }
1254
1255    impl Test {
1256        fn new(name: impl Into<String>) -> Self {
1257            Self {
1258                name: name.into(),
1259                ..Self::default()
1260            }
1261        }
1262
1263        /// Append the description to the test name
1264        fn with_description(mut self, description: &str) -> Self {
1265            self.name.push_str(": ");
1266            self.name.push_str(description);
1267            self
1268        }
1269
1270        /// Set the target batch size
1271        fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1272            self.target_batch_size = target_batch_size;
1273            self
1274        }
1275
1276        /// Extend the input batches with `batch`
1277        fn with_batch(mut self, batch: RecordBatch) -> Self {
1278            self.input_batches.push(batch);
1279            self
1280        }
1281
1282        /// Extend the filters with `filter`
1283        fn with_filter(mut self, filter: BooleanArray) -> Self {
1284            self.filters.push(filter);
1285            self
1286        }
1287
1288        /// Replaces the input batches with `batches`
1289        fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1290            self.input_batches = batches.into_iter().collect();
1291            self
1292        }
1293
1294        /// Specifies the schema for the test
1295        fn with_schema(mut self, schema: SchemaRef) -> Self {
1296            self.schema = Some(schema);
1297            self
1298        }
1299
1300        /// Extends `sizes` to expected output sizes
1301        fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1302            self.expected_output_sizes.extend(sizes);
1303            self
1304        }
1305
1306        /// Runs the test -- see documentation on [`Test`] for details
1307        ///
1308        /// Returns the resulting output batches
1309        fn run(self) -> Vec<RecordBatch> {
1310            // Test several permutations of input batches:
1311            // 1. Removing nulls from some batches (test non-null fast paths)
1312            // 2. Empty batches
1313            // 3. One column (from the batch)
1314            let mut extra_tests = vec![];
1315            extra_tests.push(self.clone().make_half_non_nullable());
1316            extra_tests.push(self.clone().insert_empty_batches());
1317            let single_column_tests = self.make_single_column_tests();
1318            for test in single_column_tests {
1319                extra_tests.push(test.clone().make_half_non_nullable());
1320                extra_tests.push(test);
1321            }
1322
1323            // Run original test case first, so any obvious errors are caught
1324            // by an easier to understand test case
1325            let results = self.run_inner();
1326            // Run the extra cases to expand coverage
1327            for extra in extra_tests {
1328                extra.run_inner();
1329            }
1330
1331            results
1332        }
1333
1334        /// Runs the current test instance
1335        fn run_inner(self) -> Vec<RecordBatch> {
1336            let expected_output = self.expected_output();
1337            let schema = self.schema();
1338
1339            let Self {
1340                name,
1341                input_batches,
1342                filters,
1343                schema: _,
1344                target_batch_size,
1345                expected_output_sizes,
1346            } = self;
1347
1348            println!("Running test '{name}'");
1349
1350            let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1351
1352            let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1353
1354            // feed input batches and filters to the coalescer
1355            let mut filters = filters.into_iter();
1356            for batch in input_batches {
1357                if let Some(filter) = filters.next() {
1358                    coalescer.push_batch_with_filter(batch, &filter).unwrap();
1359                } else {
1360                    coalescer.push_batch(batch).unwrap();
1361                }
1362            }
1363            assert_eq!(schema, coalescer.schema());
1364
1365            if had_input {
1366                assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1367            } else {
1368                assert!(coalescer.is_empty(), "Coalescer should be empty");
1369            }
1370
1371            coalescer.finish_buffered_batch().unwrap();
1372            if had_input {
1373                assert!(
1374                    coalescer.has_completed_batch(),
1375                    "Coalescer should have completed batches"
1376                );
1377            }
1378
1379            let mut output_batches = vec![];
1380            while let Some(batch) = coalescer.next_completed_batch() {
1381                output_batches.push(batch);
1382            }
1383
1384            // make sure we got the expected number of output batches and content
1385            let mut starting_idx = 0;
1386            let actual_output_sizes: Vec<usize> =
1387                output_batches.iter().map(|b| b.num_rows()).collect();
1388            assert_eq!(
1389                expected_output_sizes, actual_output_sizes,
1390                "Unexpected number of rows in output batches\n\
1391                Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1392            );
1393            let iter = expected_output_sizes
1394                .iter()
1395                .zip(output_batches.iter())
1396                .enumerate();
1397
1398            // Verify that the actual contents of each output batch matches the expected output
1399            for (i, (expected_size, batch)) in iter {
1400                // compare the contents of the batch after normalization (using
1401                // `==` compares the underlying memory layout too)
1402                let expected_batch = expected_output.slice(starting_idx, *expected_size);
1403                let expected_batch = normalize_batch(expected_batch);
1404                let batch = normalize_batch(batch.clone());
1405                assert_eq!(
1406                    expected_batch, batch,
1407                    "Unexpected content in batch {i}:\
1408                    \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1409                );
1410                starting_idx += *expected_size;
1411            }
1412            output_batches
1413        }
1414
1415        /// Return the expected output schema. If not overridden by `with_schema`, it
1416        /// returns the schema of the first input batch.
1417        fn schema(&self) -> SchemaRef {
1418            self.schema
1419                .clone()
1420                .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1421        }
1422
1423        /// Returns the expected output as a single `RecordBatch`
1424        fn expected_output(&self) -> RecordBatch {
1425            let schema = self.schema();
1426            if self.filters.is_empty() {
1427                return concat_batches(&schema, &self.input_batches).unwrap();
1428            }
1429
1430            let mut filters = self.filters.iter();
1431            let filtered_batches = self
1432                .input_batches
1433                .iter()
1434                .map(|batch| {
1435                    if let Some(filter) = filters.next() {
1436                        filter_record_batch(batch, filter).unwrap()
1437                    } else {
1438                        batch.clone()
1439                    }
1440                })
1441                .collect::<Vec<_>>();
1442            concat_batches(&schema, &filtered_batches).unwrap()
1443        }
1444
1445        /// Return a copy of self where every other batch has had its nulls removed
1446        /// (there are often fast paths that are used when there are no nulls)
1447        fn make_half_non_nullable(mut self) -> Self {
1448            // remove the nulls from every other batch
1449            self.input_batches = self
1450                .input_batches
1451                .iter()
1452                .enumerate()
1453                .map(|(i, batch)| {
1454                    if i % 2 == 1 {
1455                        batch.clone()
1456                    } else {
1457                        Self::remove_nulls_from_batch(batch)
1458                    }
1459                })
1460                .collect();
1461            self.with_description("non-nullable")
1462        }
1463
1464        /// Insert several empty batches into the input before each existing input
1465        fn insert_empty_batches(mut self) -> Self {
1466            let empty_batch = RecordBatch::new_empty(self.schema());
1467            self.input_batches = self
1468                .input_batches
1469                .into_iter()
1470                .flat_map(|batch| [empty_batch.clone(), batch])
1471                .collect();
1472            let empty_filters = BooleanArray::builder(0).finish();
1473            self.filters = self
1474                .filters
1475                .into_iter()
1476                .flat_map(|filter| [empty_filters.clone(), filter])
1477                .collect();
1478            self.with_description("empty batches inserted")
1479        }
1480
1481        /// Sets one batch to be non-nullable by removing nulls from all columns
1482        fn remove_nulls_from_batch(batch: &RecordBatch) -> RecordBatch {
1483            let new_columns = batch
1484                .columns()
1485                .iter()
1486                .map(Self::remove_nulls_from_array)
1487                .collect::<Vec<_>>();
1488            let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
1489            RecordBatch::try_new_with_options(batch.schema(), new_columns, &options).unwrap()
1490        }
1491
1492        fn remove_nulls_from_array(array: &ArrayRef) -> ArrayRef {
1493            make_array(array.to_data().into_builder().nulls(None).build().unwrap())
1494        }
1495
1496        /// Returns a set of tests where each test that is the sae as self, but
1497        /// has a single column from the original input batch
1498        ///
1499        /// This can be useful to single column optimizations, specifically
1500        /// filter optimization.
1501        fn make_single_column_tests(&self) -> Vec<Self> {
1502            let original_schema = self.schema();
1503            let mut new_tests = vec![];
1504            for column in original_schema.fields() {
1505                let single_column_schema = Arc::new(Schema::new(vec![column.clone()]));
1506
1507                let single_column_batches = self.input_batches.iter().map(|batch| {
1508                    let single_column = batch.column_by_name(column.name()).unwrap();
1509                    RecordBatch::try_new(
1510                        Arc::clone(&single_column_schema),
1511                        vec![single_column.clone()],
1512                    )
1513                    .unwrap()
1514                });
1515
1516                let single_column_test = self
1517                    .clone()
1518                    .with_schema(Arc::clone(&single_column_schema))
1519                    .with_batches(single_column_batches)
1520                    .with_description("single column")
1521                    .with_description(column.name());
1522
1523                new_tests.push(single_column_test);
1524            }
1525            new_tests
1526        }
1527    }
1528
1529    /// Return a RecordBatch with a UInt32Array with the specified range and
1530    /// every third value is null.
1531    fn uint32_batch<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1532        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1533
1534        let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1535        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1536    }
1537
1538    /// Return a RecordBatch with a UInt32Array with no nulls specified range
1539    fn uint32_batch_non_null<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1540        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1541
1542        let array = UInt32Array::from_iter_values(range);
1543        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1544    }
1545
1546    /// Return a RecordBatch with a UInt64Array with no nulls specified range
1547    fn uint64_batch_non_null<T: std::iter::Iterator<Item = u64>>(range: T) -> RecordBatch {
1548        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt64, false)]));
1549
1550        let array = UInt64Array::from_iter_values(range);
1551        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1552    }
1553
1554    /// Return a RecordBatch with a StringArrary with values `value0`, `value1`, ...
1555    /// and every third value is `None`.
1556    fn utf8_batch(range: Range<u32>) -> RecordBatch {
1557        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1558
1559        let array = StringArray::from_iter(range.map(|i| {
1560            if i % 3 == 0 {
1561                None
1562            } else {
1563                Some(format!("value{i}"))
1564            }
1565        }));
1566
1567        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1568    }
1569
1570    /// Return a RecordBatch with a StringViewArray with (only) the specified values
1571    fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1572        let schema = Arc::new(Schema::new(vec![Field::new(
1573            "c0",
1574            DataType::Utf8View,
1575            false,
1576        )]));
1577
1578        let array = StringViewArray::from_iter(values);
1579        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1580    }
1581
1582    /// Return a RecordBatch with a StringViewArray with num_rows by repeating
1583    /// values over and over.
1584    fn stringview_batch_repeated<'a>(
1585        num_rows: usize,
1586        values: impl IntoIterator<Item = Option<&'a str>>,
1587    ) -> RecordBatch {
1588        let schema = Arc::new(Schema::new(vec![Field::new(
1589            "c0",
1590            DataType::Utf8View,
1591            true,
1592        )]));
1593
1594        // Repeat the values to a total of num_rows
1595        let values: Vec<_> = values.into_iter().collect();
1596        let values_iter = std::iter::repeat(values.iter())
1597            .flatten()
1598            .cloned()
1599            .take(num_rows);
1600
1601        let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1602        for val in values_iter {
1603            builder.append_option(val);
1604        }
1605
1606        let array = builder.finish();
1607        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1608    }
1609
1610    /// Return a RecordBatch of 100 rows
1611    fn multi_column_batch(range: Range<i32>) -> RecordBatch {
1612        let int64_array = Int64Array::from_iter(
1613            range
1614                .clone()
1615                .map(|v| if v % 5 == 0 { None } else { Some(v as i64) }),
1616        );
1617        let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
1618            if v % 5 == 0 {
1619                None
1620            } else if v % 7 == 0 {
1621                Some(format!("This is a string longer than 12 bytes{v}"))
1622            } else {
1623                Some(format!("Short {v}"))
1624            }
1625        }));
1626        let string_array = StringArray::from_iter(range.clone().map(|v| {
1627            if v % 11 == 0 {
1628                None
1629            } else {
1630                Some(format!("Value {v}"))
1631            }
1632        }));
1633        let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
1634            if v % 3 == 0 {
1635                None
1636            } else {
1637                Some(v as i64 * 1000) // simulate a timestamp in milliseconds
1638            }
1639        }))
1640        .with_timezone("America/New_York");
1641
1642        RecordBatch::try_from_iter(vec![
1643            ("int64", Arc::new(int64_array) as ArrayRef),
1644            ("stringview", Arc::new(string_view_array) as ArrayRef),
1645            ("string", Arc::new(string_array) as ArrayRef),
1646            ("timestamp", Arc::new(timestamp_array) as ArrayRef),
1647        ])
1648        .unwrap()
1649    }
1650
1651    /// Return a boolean array that filters out randomly selected rows
1652    /// from the input batch with a `selectivity`.
1653    ///
1654    /// For example a `selectivity` of 0.1 will filter out
1655    /// 90% of the rows.
1656    #[derive(Debug)]
1657    struct RandomFilterBuilder {
1658        /// Number of rows to add to each filter
1659        num_rows: usize,
1660        /// selectivity of the filter (between 0.0 and 1.0)
1661        /// 0 selects no rows, 1.0 selects all rows
1662        selectivity: f64,
1663        /// seed for random number generator, increases by one each time
1664        /// `next_filter` is called
1665        seed: u64,
1666    }
1667    impl RandomFilterBuilder {
1668        /// Build the next filter with the current seed and increment the seed
1669        /// by one.
1670        fn next_filter(&mut self) -> BooleanArray {
1671            assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
1672            let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
1673            self.seed += 1;
1674            BooleanArray::from_iter(
1675                (0..self.num_rows)
1676                    .map(|_| rng.random_bool(self.selectivity))
1677                    .map(Some),
1678            )
1679        }
1680    }
1681
1682    /// Returns the named column as a StringViewArray
1683    fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1684        batch
1685            .column_by_name(name)
1686            .expect("column not found")
1687            .as_string_view_opt()
1688            .expect("column is not a string view")
1689    }
1690
1691    /// Normalize the `RecordBatch` so that the memory layout is consistent
1692    /// (e.g. StringArray is compacted).
1693    fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1694        // Only need to normalize StringViews (as == also tests for memory layout)
1695        let (schema, mut columns, row_count) = batch.into_parts();
1696
1697        for column in columns.iter_mut() {
1698            let Some(string_view) = column.as_string_view_opt() else {
1699                continue;
1700            };
1701
1702            // Re-create the StringViewArray to ensure memory layout is
1703            // consistent
1704            let mut builder = StringViewBuilder::new();
1705            for s in string_view.iter() {
1706                builder.append_option(s);
1707            }
1708            // Update the column with the new StringViewArray
1709            *column = Arc::new(builder.finish());
1710        }
1711
1712        let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1713        RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1714    }
1715
1716    /// Helper function to create a test batch with specified number of rows
1717    fn create_test_batch(num_rows: usize) -> RecordBatch {
1718        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
1719        let array = Int32Array::from_iter_values(0..num_rows as i32);
1720        RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
1721    }
1722    #[test]
1723    fn test_biggest_coalesce_batch_size_none_default() {
1724        // Test that default behavior (None) coalesces all batches
1725        let mut coalescer = BatchCoalescer::new(
1726            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1727            100,
1728        );
1729
1730        // Push a large batch (1000 rows) - should be coalesced normally
1731        let large_batch = create_test_batch(1000);
1732        coalescer.push_batch(large_batch).unwrap();
1733
1734        // Should produce multiple batches of target size (100)
1735        let mut output_batches = vec![];
1736        while let Some(batch) = coalescer.next_completed_batch() {
1737            output_batches.push(batch);
1738        }
1739
1740        coalescer.finish_buffered_batch().unwrap();
1741        while let Some(batch) = coalescer.next_completed_batch() {
1742            output_batches.push(batch);
1743        }
1744
1745        // Should have 10 batches of 100 rows each
1746        assert_eq!(output_batches.len(), 10);
1747        for batch in output_batches {
1748            assert_eq!(batch.num_rows(), 100);
1749        }
1750    }
1751
1752    #[test]
1753    fn test_biggest_coalesce_batch_size_bypass_large_batch() {
1754        // Test that batches larger than biggest_coalesce_batch_size bypass coalescing
1755        let mut coalescer = BatchCoalescer::new(
1756            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1757            100,
1758        );
1759        coalescer.set_biggest_coalesce_batch_size(Some(500));
1760
1761        // Push a large batch (1000 rows) - should bypass coalescing
1762        let large_batch = create_test_batch(1000);
1763        coalescer.push_batch(large_batch.clone()).unwrap();
1764
1765        // Should have one completed batch immediately (the original large batch)
1766        assert!(coalescer.has_completed_batch());
1767        let output_batch = coalescer.next_completed_batch().unwrap();
1768        assert_eq!(output_batch.num_rows(), 1000);
1769
1770        // Should be no more completed batches
1771        assert!(!coalescer.has_completed_batch());
1772        assert_eq!(coalescer.get_buffered_rows(), 0);
1773    }
1774
1775    #[test]
1776    fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
1777        // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally
1778        let mut coalescer = BatchCoalescer::new(
1779            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1780            100,
1781        )
1782        .with_biggest_coalesce_batch_size(Some(500));
1783
1784        // Push small batches that should be coalesced
1785        let small_batch = create_test_batch(50);
1786        coalescer.push_batch(small_batch.clone()).unwrap();
1787
1788        // Should not have completed batch yet (only 50 rows, target is 100)
1789        assert!(!coalescer.has_completed_batch());
1790        assert_eq!(coalescer.get_buffered_rows(), 50);
1791
1792        // Push another small batch
1793        coalescer.push_batch(small_batch).unwrap();
1794
1795        // Now should have a completed batch (100 rows total)
1796        assert!(coalescer.has_completed_batch());
1797        let output_batch = coalescer.next_completed_batch().unwrap();
1798        let size = output_batch
1799            .column(0)
1800            .as_primitive::<Int32Type>()
1801            .get_buffer_memory_size();
1802        assert_eq!(size, 400); // 100 rows * 4 bytes each
1803        assert_eq!(output_batch.num_rows(), 100);
1804
1805        assert_eq!(coalescer.get_buffered_rows(), 0);
1806    }
1807
1808    #[test]
1809    fn test_biggest_coalesce_batch_size_equal_boundary() {
1810        // Test behavior when batch size equals biggest_coalesce_batch_size
1811        let mut coalescer = BatchCoalescer::new(
1812            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1813            100,
1814        );
1815        coalescer.set_biggest_coalesce_batch_size(Some(500));
1816
1817        // Push a batch exactly equal to the limit
1818        let boundary_batch = create_test_batch(500);
1819        coalescer.push_batch(boundary_batch).unwrap();
1820
1821        // Should be coalesced (not bypass) since it's equal, not greater
1822        let mut output_count = 0;
1823        while coalescer.next_completed_batch().is_some() {
1824            output_count += 1;
1825        }
1826
1827        coalescer.finish_buffered_batch().unwrap();
1828        while coalescer.next_completed_batch().is_some() {
1829            output_count += 1;
1830        }
1831
1832        // Should have 5 batches of 100 rows each
1833        assert_eq!(output_count, 5);
1834    }
1835
1836    #[test]
1837    fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
1838        // Test the new consecutive large batch bypass behavior
1839        // Pattern: small batches -> first large batch (coalesced) -> consecutive large batches (bypass)
1840        let mut coalescer = BatchCoalescer::new(
1841            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1842            100,
1843        );
1844        coalescer.set_biggest_coalesce_batch_size(Some(200));
1845
1846        let small_batch = create_test_batch(50);
1847
1848        // Push small batch first to create buffered data
1849        coalescer.push_batch(small_batch).unwrap();
1850        assert_eq!(coalescer.get_buffered_rows(), 50);
1851        assert!(!coalescer.has_completed_batch());
1852
1853        // Push first large batch - should go through normal coalescing due to buffered data
1854        let large_batch1 = create_test_batch(250);
1855        coalescer.push_batch(large_batch1).unwrap();
1856
1857        // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered
1858        let mut completed_batches = vec![];
1859        while let Some(batch) = coalescer.next_completed_batch() {
1860            completed_batches.push(batch);
1861        }
1862        assert_eq!(completed_batches.len(), 3);
1863        assert_eq!(coalescer.get_buffered_rows(), 0);
1864
1865        // Now push consecutive large batches - they should bypass
1866        let large_batch2 = create_test_batch(300);
1867        let large_batch3 = create_test_batch(400);
1868
1869        // Push second large batch - should bypass since it's consecutive and buffer is empty
1870        coalescer.push_batch(large_batch2).unwrap();
1871        assert!(coalescer.has_completed_batch());
1872        let output = coalescer.next_completed_batch().unwrap();
1873        assert_eq!(output.num_rows(), 300); // bypassed with original size
1874        assert_eq!(coalescer.get_buffered_rows(), 0);
1875
1876        // Push third large batch - should also bypass
1877        coalescer.push_batch(large_batch3).unwrap();
1878        assert!(coalescer.has_completed_batch());
1879        let output = coalescer.next_completed_batch().unwrap();
1880        assert_eq!(output.num_rows(), 400); // bypassed with original size
1881        assert_eq!(coalescer.get_buffered_rows(), 0);
1882    }
1883
1884    #[test]
1885    fn test_biggest_coalesce_batch_size_empty_batch() {
1886        // Test that empty batches don't trigger the bypass logic
1887        let mut coalescer = BatchCoalescer::new(
1888            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1889            100,
1890        );
1891        coalescer.set_biggest_coalesce_batch_size(Some(50));
1892
1893        let empty_batch = create_test_batch(0);
1894        coalescer.push_batch(empty_batch).unwrap();
1895
1896        // Empty batch should be handled normally (no effect)
1897        assert!(!coalescer.has_completed_batch());
1898        assert_eq!(coalescer.get_buffered_rows(), 0);
1899    }
1900
1901    #[test]
1902    fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
1903        // Test that when there is buffered data, large batches do NOT bypass (unless consecutive)
1904        let mut coalescer = BatchCoalescer::new(
1905            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1906            100,
1907        );
1908        coalescer.set_biggest_coalesce_batch_size(Some(200));
1909
1910        // Add some buffered data first
1911        let small_batch = create_test_batch(30);
1912        coalescer.push_batch(small_batch.clone()).unwrap();
1913        coalescer.push_batch(small_batch).unwrap();
1914        assert_eq!(coalescer.get_buffered_rows(), 60);
1915
1916        // Push large batch that would normally bypass, but shouldn't because buffered_rows > 0
1917        let large_batch = create_test_batch(250);
1918        coalescer.push_batch(large_batch).unwrap();
1919
1920        // The large batch should be processed through normal coalescing logic
1921        // Total: 60 (buffered) + 250 (new) = 310 rows
1922        // Output: 3 complete batches of 100 rows each, 10 rows remain buffered
1923
1924        let mut completed_batches = vec![];
1925        while let Some(batch) = coalescer.next_completed_batch() {
1926            completed_batches.push(batch);
1927        }
1928
1929        assert_eq!(completed_batches.len(), 3);
1930        for batch in &completed_batches {
1931            assert_eq!(batch.num_rows(), 100);
1932        }
1933        assert_eq!(coalescer.get_buffered_rows(), 10);
1934    }
1935
1936    #[test]
1937    fn test_biggest_coalesce_batch_size_zero_limit() {
1938        // Test edge case where limit is 0 (all batches bypass when no buffered data)
1939        let mut coalescer = BatchCoalescer::new(
1940            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1941            100,
1942        );
1943        coalescer.set_biggest_coalesce_batch_size(Some(0));
1944
1945        // Even a 1-row batch should bypass when there's no buffered data
1946        let tiny_batch = create_test_batch(1);
1947        coalescer.push_batch(tiny_batch).unwrap();
1948
1949        assert!(coalescer.has_completed_batch());
1950        let output = coalescer.next_completed_batch().unwrap();
1951        assert_eq!(output.num_rows(), 1);
1952    }
1953
1954    #[test]
1955    fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
1956        // Test that bypass only occurs when buffered_rows == 0
1957        let mut coalescer = BatchCoalescer::new(
1958            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1959            100,
1960        );
1961        coalescer.set_biggest_coalesce_batch_size(Some(200));
1962
1963        // First, push a large batch with no buffered data - should bypass
1964        let large_batch = create_test_batch(300);
1965        coalescer.push_batch(large_batch.clone()).unwrap();
1966
1967        assert!(coalescer.has_completed_batch());
1968        let output = coalescer.next_completed_batch().unwrap();
1969        assert_eq!(output.num_rows(), 300); // bypassed
1970        assert_eq!(coalescer.get_buffered_rows(), 0);
1971
1972        // Now add some buffered data
1973        let small_batch = create_test_batch(50);
1974        coalescer.push_batch(small_batch).unwrap();
1975        assert_eq!(coalescer.get_buffered_rows(), 50);
1976
1977        // Push the same large batch again - should NOT bypass this time (not consecutive)
1978        coalescer.push_batch(large_batch).unwrap();
1979
1980        // Should process through normal coalescing: 50 + 300 = 350 rows
1981        // Output: 3 complete batches of 100 rows, 50 rows buffered
1982        let mut completed_batches = vec![];
1983        while let Some(batch) = coalescer.next_completed_batch() {
1984            completed_batches.push(batch);
1985        }
1986
1987        assert_eq!(completed_batches.len(), 3);
1988        for batch in &completed_batches {
1989            assert_eq!(batch.num_rows(), 100);
1990        }
1991        assert_eq!(coalescer.get_buffered_rows(), 50);
1992    }
1993
1994    #[test]
1995    fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
1996        // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600
1997        let mut coalescer = BatchCoalescer::new(
1998            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1999            1000,
2000        );
2001        coalescer.set_biggest_coalesce_batch_size(Some(500));
2002
2003        // Push small batches first
2004        coalescer.push_batch(create_test_batch(20)).unwrap();
2005        coalescer.push_batch(create_test_batch(20)).unwrap();
2006        coalescer.push_batch(create_test_batch(30)).unwrap();
2007
2008        assert_eq!(coalescer.get_buffered_rows(), 70);
2009        assert!(!coalescer.has_completed_batch());
2010
2011        // Push first large batch (700) - should coalesce due to buffered data
2012        coalescer.push_batch(create_test_batch(700)).unwrap();
2013
2014        // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered
2015        assert_eq!(coalescer.get_buffered_rows(), 770);
2016        assert!(!coalescer.has_completed_batch());
2017
2018        // Push second large batch (600) - should bypass since previous was large
2019        coalescer.push_batch(create_test_batch(600)).unwrap();
2020
2021        // Should flush buffer (770 rows) and bypass the 600
2022        let mut outputs = vec![];
2023        while let Some(batch) = coalescer.next_completed_batch() {
2024            outputs.push(batch);
2025        }
2026        assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one bypassed (600)
2027        assert_eq!(outputs[0].num_rows(), 770);
2028        assert_eq!(outputs[1].num_rows(), 600);
2029        assert_eq!(coalescer.get_buffered_rows(), 0);
2030
2031        // Push remaining large batches - should all bypass
2032        let remaining_batches = [700, 900, 700, 600];
2033        for &size in &remaining_batches {
2034            coalescer.push_batch(create_test_batch(size)).unwrap();
2035
2036            assert!(coalescer.has_completed_batch());
2037            let output = coalescer.next_completed_batch().unwrap();
2038            assert_eq!(output.num_rows(), size);
2039            assert_eq!(coalescer.get_buffered_rows(), 0);
2040        }
2041    }
2042
2043    #[test]
2044    fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
2045        // Test truly consecutive large batches that should all bypass
2046        // This test ensures buffer is completely empty between large batches
2047        let mut coalescer = BatchCoalescer::new(
2048            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2049            100,
2050        );
2051        coalescer.set_biggest_coalesce_batch_size(Some(200));
2052
2053        // Push consecutive large batches with no prior buffered data
2054        let large_batches = vec![
2055            create_test_batch(300),
2056            create_test_batch(400),
2057            create_test_batch(350),
2058            create_test_batch(500),
2059        ];
2060
2061        let mut all_outputs = vec![];
2062
2063        for (i, large_batch) in large_batches.into_iter().enumerate() {
2064            let expected_size = large_batch.num_rows();
2065
2066            // Buffer should be empty before each large batch
2067            assert_eq!(
2068                coalescer.get_buffered_rows(),
2069                0,
2070                "Buffer should be empty before batch {}",
2071                i
2072            );
2073
2074            coalescer.push_batch(large_batch).unwrap();
2075
2076            // Each large batch should bypass and produce exactly one output batch
2077            assert!(
2078                coalescer.has_completed_batch(),
2079                "Should have completed batch after pushing batch {}",
2080                i
2081            );
2082
2083            let output = coalescer.next_completed_batch().unwrap();
2084            assert_eq!(
2085                output.num_rows(),
2086                expected_size,
2087                "Batch {} should have bypassed with original size",
2088                i
2089            );
2090
2091            // Should be no more batches and buffer should be empty
2092            assert!(
2093                !coalescer.has_completed_batch(),
2094                "Should have no more completed batches after batch {}",
2095                i
2096            );
2097            assert_eq!(
2098                coalescer.get_buffered_rows(),
2099                0,
2100                "Buffer should be empty after batch {}",
2101                i
2102            );
2103
2104            all_outputs.push(output);
2105        }
2106
2107        // Verify we got exactly 4 output batches with original sizes
2108        assert_eq!(all_outputs.len(), 4);
2109        assert_eq!(all_outputs[0].num_rows(), 300);
2110        assert_eq!(all_outputs[1].num_rows(), 400);
2111        assert_eq!(all_outputs[2].num_rows(), 350);
2112        assert_eq!(all_outputs[3].num_rows(), 500);
2113    }
2114
2115    #[test]
2116    fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
2117        // Test that small batches reset the consecutive large batch tracking
2118        let mut coalescer = BatchCoalescer::new(
2119            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2120            100,
2121        );
2122        coalescer.set_biggest_coalesce_batch_size(Some(200));
2123
2124        // Push first large batch - should bypass (no buffered data)
2125        coalescer.push_batch(create_test_batch(300)).unwrap();
2126        let output = coalescer.next_completed_batch().unwrap();
2127        assert_eq!(output.num_rows(), 300);
2128
2129        // Push second large batch - should bypass (consecutive)
2130        coalescer.push_batch(create_test_batch(400)).unwrap();
2131        let output = coalescer.next_completed_batch().unwrap();
2132        assert_eq!(output.num_rows(), 400);
2133
2134        // Push small batch - resets consecutive tracking
2135        coalescer.push_batch(create_test_batch(50)).unwrap();
2136        assert_eq!(coalescer.get_buffered_rows(), 50);
2137
2138        // Push large batch again - should NOT bypass due to buffered data
2139        coalescer.push_batch(create_test_batch(350)).unwrap();
2140
2141        // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100
2142        let mut outputs = vec![];
2143        while let Some(batch) = coalescer.next_completed_batch() {
2144            outputs.push(batch);
2145        }
2146        assert_eq!(outputs.len(), 4);
2147        for batch in outputs {
2148            assert_eq!(batch.num_rows(), 100);
2149        }
2150        assert_eq!(coalescer.get_buffered_rows(), 0);
2151    }
2152
2153    #[test]
2154    fn test_coalasce_push_batch_with_indices() {
2155        const MID_POINT: u32 = 2333;
2156        const TOTAL_ROWS: u32 = 23333;
2157        let batch1 = uint32_batch_non_null(0..MID_POINT);
2158        let batch2 = uint32_batch_non_null((MID_POINT..TOTAL_ROWS).rev());
2159
2160        let mut coalescer = BatchCoalescer::new(
2161            Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])),
2162            TOTAL_ROWS as usize,
2163        );
2164        coalescer.push_batch(batch1).unwrap();
2165
2166        let rev_indices = (0..((TOTAL_ROWS - MID_POINT) as u64)).rev();
2167        let reversed_indices_batch = uint64_batch_non_null(rev_indices);
2168
2169        let reverse_indices = UInt64Array::from(reversed_indices_batch.column(0).to_data());
2170        coalescer
2171            .push_batch_with_indices(batch2, &reverse_indices)
2172            .unwrap();
2173
2174        coalescer.finish_buffered_batch().unwrap();
2175        let actual = coalescer.next_completed_batch().unwrap();
2176
2177        let expected = uint32_batch_non_null(0..TOTAL_ROWS);
2178
2179        assert_eq!(expected, actual);
2180    }
2181}