Skip to main content

arrow_select/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`]  concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::{FilterBuilder, FilterPredicate, FilterSelection};
24use crate::take::take_record_batch;
25use arrow_array::types::{BinaryViewType, StringViewType};
26use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
27use arrow_schema::{ArrowError, DataType, SchemaRef};
28use std::collections::VecDeque;
29use std::sync::Arc;
30// Originally From DataFusion's coalesce module:
31// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
32
33mod byte_view;
34mod generic;
35mod primitive;
36
37use byte_view::InProgressByteViewArray;
38use generic::GenericInProgressArray;
39use primitive::InProgressPrimitiveArray;
40
41fn has_sparse_filter_copy(data_type: &DataType) -> bool {
42    data_type.is_primitive() || matches!(data_type, DataType::Utf8View | DataType::BinaryView)
43}
44
45/// Maximum selected row fraction for the fused sparse-filter copy path.
46///
47/// Shared benchmark results show this path helps low-selectivity filters, but
48/// can regress once the filter becomes denser. Keep this as a cheap integer
49/// threshold on the hot path: `selected_count <= filter_len / 16`.
50const SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR: usize = 16;
51
52fn should_use_sparse_filter_copy(filter_len: usize, selected_count: usize) -> bool {
53    selected_count <= filter_len / SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR
54}
55
56/// Concatenate multiple [`RecordBatch`]es
57///
58/// Implements the common pattern of incrementally creating output
59/// [`RecordBatch`]es of a specific size from an input stream of
60/// [`RecordBatch`]es.
61///
62/// This is useful after operations such as [`filter`] and [`take`] that produce
63/// smaller batches, and we want to coalesce them into larger batches for
64/// further processing.
65///
66/// # Motivation
67///
68/// If we use [`concat_batches`] to implement the same functionality, there are 2 potential issues:
69/// 1. At least 2x peak memory (holding the input and output of concat)
70/// 2. 2 copies of the data (to create the output of filter and then create the output of concat)
71///
72/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
73/// about the motivation.
74///
75/// [`filter`]: crate::filter::filter
76/// [`take`]: crate::take::take
77/// [`concat_batches`]: crate::concat::concat_batches
78///
79/// # Example
80/// ```
81/// use arrow_array::record_batch;
82/// use arrow_select::coalesce::{BatchCoalescer};
83/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
84/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
85///
86/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
87/// let target_batch_size = 4;
88/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
89///
90/// // push the batches
91/// coalescer.push_batch(batch1).unwrap();
92/// // only pushed 3 rows (not yet 4, enough to produce a batch)
93/// assert!(coalescer.next_completed_batch().is_none());
94/// coalescer.push_batch(batch2).unwrap();
95/// // now we have 5 rows, so we can produce a batch
96/// let finished = coalescer.next_completed_batch().unwrap();
97/// // 4 rows came out (target batch size is 4)
98/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
99/// assert_eq!(finished, expected);
100///
101/// // Have no more input, but still have an in-progress batch
102/// assert!(coalescer.next_completed_batch().is_none());
103/// // We can finish the batch, which will produce the remaining rows
104/// coalescer.finish_buffered_batch().unwrap();
105/// let expected = record_batch!(("a", Int32, [5])).unwrap();
106/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
107///
108/// // The coalescer is now empty
109/// assert!(coalescer.next_completed_batch().is_none());
110/// ```
111///
112/// # Background
113///
114/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
115/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
116/// there is fixed processing overhead per batch. This coalescer builds up these
117/// larger batches incrementally.
118///
119/// ```text
120/// ┌────────────────────┐
121/// │    RecordBatch     │
122/// │   num_rows = 100   │
123/// └────────────────────┘                 ┌────────────────────┐
124///                                        │                    │
125/// ┌────────────────────┐     Coalesce    │                    │
126/// │                    │      Batches    │                    │
127/// │    RecordBatch     │                 │                    │
128/// │   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
129/// │                    │                 │    RecordBatch     │
130/// │                    │                 │   num_rows = 400   │
131/// └────────────────────┘                 │                    │
132///                                        │                    │
133/// ┌────────────────────┐                 │                    │
134/// │                    │                 │                    │
135/// │    RecordBatch     │                 │                    │
136/// │   num_rows = 100   │                 └────────────────────┘
137/// │                    │
138/// └────────────────────┘
139/// ```
140///
141/// # Notes:
142///
143/// 1. Output rows are produced in the same order as the input rows
144///
145/// 2. The output is a sequence of batches, with all but the last being at exactly
146///    `target_batch_size` rows.
147#[derive(Debug)]
148pub struct BatchCoalescer {
149    /// The input schema
150    schema: SchemaRef,
151    /// The target batch size (and thus size for views allocation). This is a
152    /// hard limit: the output batch will be exactly `target_batch_size`,
153    /// rather than possibly being slightly above.
154    target_batch_size: usize,
155    /// In-progress arrays
156    in_progress_arrays: Vec<Box<dyn InProgressArray>>,
157    /// True if some column still needs the materialized filter path.
158    has_non_specialized_filter_columns: bool,
159    /// Buffered row count. Always less than `batch_size`
160    buffered_rows: usize,
161    /// Completed batches
162    completed: VecDeque<RecordBatch>,
163    /// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`]
164    biggest_coalesce_batch_size: Option<usize>,
165}
166
167impl BatchCoalescer {
168    /// Create a new `BatchCoalescer`
169    ///
170    /// # Arguments
171    /// - `schema` - the schema of the output batches
172    /// - `target_batch_size` - the number of rows in each output batch.
173    ///   Typical values are `4096` or `8192` rows.
174    ///
175    pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
176        let has_non_specialized_filter_columns = schema
177            .fields()
178            .iter()
179            .any(|field| !has_sparse_filter_copy(field.data_type()));
180        let in_progress_arrays = schema
181            .fields()
182            .iter()
183            .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
184            .collect::<Vec<_>>();
185
186        Self {
187            schema,
188            target_batch_size,
189            in_progress_arrays,
190            has_non_specialized_filter_columns,
191            // We will for sure store at least one completed batch
192            completed: VecDeque::with_capacity(1),
193            buffered_rows: 0,
194            biggest_coalesce_batch_size: None,
195        }
196    }
197
198    /// Set the coalesce batch size limit (default `None`)
199    ///
200    /// This limit determine when batches should bypass coalescing. Intuitively,
201    /// batches that are already large are costly to coalesce and are efficient
202    /// enough to process directly without coalescing.
203    ///
204    /// If `Some(limit)`, batches larger than this limit will bypass coalescing
205    /// when there is no buffered data, or when the previously buffered data
206    /// already exceeds this limit.
207    ///
208    /// If `None`, all batches will be coalesced according to the
209    /// target_batch_size.
210    pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
211        self.biggest_coalesce_batch_size = limit;
212        self
213    }
214
215    /// Get the current biggest coalesce batch size limit
216    ///
217    /// See [`Self::with_biggest_coalesce_batch_size`] for details
218    pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
219        self.biggest_coalesce_batch_size
220    }
221
222    /// Set the biggest coalesce batch size limit
223    ///
224    /// See [`Self::with_biggest_coalesce_batch_size`] for details
225    pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
226        self.biggest_coalesce_batch_size = limit;
227    }
228
229    /// Return the schema of the output batches
230    pub fn schema(&self) -> SchemaRef {
231        Arc::clone(&self.schema)
232    }
233
234    /// Push a batch into the Coalescer after applying a filter
235    ///
236    /// This is semantically equivalent of calling [`Self::push_batch`]
237    /// with the results from [`crate::filter::filter_record_batch`]
238    ///
239    /// # Example
240    /// ```
241    /// # use arrow_array::{record_batch, BooleanArray};
242    /// # use arrow_select::coalesce::BatchCoalescer;
243    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
244    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
245    /// // Apply a filter to each batch to pick the first and last row
246    /// let filter = BooleanArray::from(vec![true, false, true]);
247    /// // create a new Coalescer that targets creating 1000 row batches
248    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
249    /// coalescer.push_batch_with_filter(batch1, &filter);
250    /// coalescer.push_batch_with_filter(batch2, &filter);
251    /// // finsh and retrieve the created batch
252    /// coalescer.finish_buffered_batch().unwrap();
253    /// let completed_batch = coalescer.next_completed_batch().unwrap();
254    /// // filtered out 2 and 5:
255    /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
256    /// assert_eq!(completed_batch, expected_batch);
257    /// ```
258    pub fn push_batch_with_filter(
259        &mut self,
260        batch: RecordBatch,
261        filter: &BooleanArray,
262    ) -> Result<(), ArrowError> {
263        self.push_batch_with_filtered_columns(batch, filter)
264    }
265
266    /// Push a batch into the Coalescer after applying a set of indices
267    /// This is semantically equivalent of calling [`Self::push_batch`]
268    /// with the results from  [`take_record_batch`]
269    ///
270    /// # Example
271    /// ```
272    /// # use arrow_array::{record_batch, UInt64Array};
273    /// # use arrow_select::coalesce::BatchCoalescer;
274    /// let batch1 = record_batch!(("a", Int32, [0, 0, 0])).unwrap();
275    /// let batch2 = record_batch!(("a", Int32, [1, 1, 4, 5, 1, 4])).unwrap();
276    /// // Sorted indices to create a sorted output, this can be obtained with
277    /// // `arrow-ord`'s sort_to_indices operation
278    /// let indices = UInt64Array::from(vec![0, 1, 4, 2, 5, 3]);
279    /// // create a new Coalescer that targets creating 1000 row batches
280    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
281    /// coalescer.push_batch(batch1);
282    /// coalescer.push_batch_with_indices(batch2, &indices);
283    /// // finsh and retrieve the created batch
284    /// coalescer.finish_buffered_batch().unwrap();
285    /// let completed_batch = coalescer.next_completed_batch().unwrap();
286    /// let expected_batch = record_batch!(("a", Int32, [0, 0, 0, 1, 1, 1, 4, 4, 5])).unwrap();
287    /// assert_eq!(completed_batch, expected_batch);
288    /// ```
289    pub fn push_batch_with_indices(
290        &mut self,
291        batch: RecordBatch,
292        indices: &dyn Array,
293    ) -> Result<(), ArrowError> {
294        // todo: optimize this to avoid materializing (copying the results of take indices to a new batch)
295        let taken_batch = take_record_batch(&batch, indices)?;
296        self.push_batch(taken_batch)
297    }
298
299    /// Push all the rows from `batch` into the Coalescer
300    ///
301    /// When buffered data plus incoming rows reach `target_batch_size` ,
302    /// completed batches are generated eagerly and can be retrieved via
303    /// [`Self::next_completed_batch()`].
304    /// Output batches contain exactly `target_batch_size` rows, so the tail of
305    /// the input batch may remain buffered.
306    /// Remaining partial data either waits for future input batches or can be
307    /// materialized immediately by calling [`Self::finish_buffered_batch()`].
308    ///
309    /// # Example
310    /// ```
311    /// # use arrow_array::record_batch;
312    /// # use arrow_select::coalesce::BatchCoalescer;
313    /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
314    /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
315    /// // create a new Coalescer that targets creating 1000 row batches
316    /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
317    /// coalescer.push_batch(batch1);
318    /// coalescer.push_batch(batch2);
319    /// // finsh and retrieve the created batch
320    /// coalescer.finish_buffered_batch().unwrap();
321    /// let completed_batch = coalescer.next_completed_batch().unwrap();
322    /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
323    /// assert_eq!(completed_batch, expected_batch);
324    /// ```
325    pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
326        // Large batch bypass optimization:
327        // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
328        // we can avoid expensive split-and-merge operations by passing it through directly.
329        //
330        // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
331        // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
332        // If not set (None), ALL batches follow normal coalescing behavior regardless of size.
333
334        // =============================================================================
335        // CASE 1: No buffer + large batch → Direct bypass
336        // =============================================================================
337        // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
338        // Input sequence: [600, 1200, 300]
339        //
340        // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
341        //   600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
342        //        → output: [600] (bypass, preserves large batch)
343        //   1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
344        //         → output: [1200] (bypass, preserves large batch)
345        //   300 → normal batch, buffer: [300]
346        //   Result: [600], [1200], [300] - large batches preserved, mixed sizes
347
348        // =============================================================================
349        // CASE 2: Buffer too large + large batch → Flush first, then bypass
350        // =============================================================================
351        // This case prevents creating extremely large merged batches that would
352        // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
353        //
354        // Example 1: Buffer exceeds limit before large batch arrives
355        // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
356        // Input: [350, 200, 800]
357        //
358        // Step 1: push_batch([350])
359        //   → batch_size=350 <= 400, normal path
360        //   → buffer: [350], buffered_rows=350
361        //
362        // Step 2: push_batch([200])
363        //   → batch_size=200 <= 400, normal path
364        //   → buffer: [350, 200], buffered_rows=550
365        //
366        // Step 3: push_batch([800])
367        //   → batch_size=800 > 400, large batch path
368        //   → buffered_rows=550 > 400 → Case 2: flush first
369        //   → flush: output [550] (combined [350, 200])
370        //   → then bypass: output [800]
371        //   Result: [550], [800] - buffer flushed to prevent oversized merge
372        //
373        // Example 2: Multiple small batches accumulate before large batch
374        // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
375        // Input: [150, 100, 80, 900]
376        //
377        // Step 1-3: Accumulate small batches
378        //   150 → buffer: [150], buffered_rows=150
379        //   100 → buffer: [150, 100], buffered_rows=250
380        //   80  → buffer: [150, 100, 80], buffered_rows=330
381        //
382        // Step 4: push_batch([900])
383        //   → batch_size=900 > 300, large batch path
384        //   → buffered_rows=330 > 300 → Case 2: flush first
385        //   → flush: output [330] (combined [150, 100, 80])
386        //   → then bypass: output [900]
387        //   Result: [330], [900] - prevents merge into [1230] which would be too large
388
389        // =============================================================================
390        // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
391        // =============================================================================
392        // When buffer is small enough, we still merge to maintain efficiency
393        // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
394        // Input: [300, 1200]
395        //
396        // Step 1: push_batch([300])
397        //   → batch_size=300 <= 500, normal path
398        //   → buffer: [300], buffered_rows=300
399        //
400        // Step 2: push_batch([1200])
401        //   → batch_size=1200 > 500, large batch path
402        //   → buffered_rows=300 <= 500 → Case 3: normal merge
403        //   → buffer: [300, 1200] (1500 total)
404        //   → 1500 > target_batch_size → split: output [1000], buffer [500]
405        //   Result: [1000], [500] - normal split/merge behavior maintained
406
407        // =============================================================================
408        // Comparison: Default vs Optimized Behavior
409        // =============================================================================
410        // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
411        // Input: [600, 1200, 300]
412        //
413        // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
414        //   600 → buffer: [600]
415        //   1200 → buffer: [600, 1200] (1800 rows total)
416        //         → split: output [1000 rows], buffer [800 rows remaining]
417        //   300 → buffer: [800, 300] (1100 rows total)
418        //        → split: output [1000 rows], buffer [100 rows remaining]
419        //   Result: [1000], [1000], [100] - all outputs respect target_batch_size
420        //
421        // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
422        //   600 → Case 1: direct bypass → output: [600]
423        //   1200 → Case 1: direct bypass → output: [1200]
424        //   300 → normal path → buffer: [300]
425        //   Result: [600], [1200], [300] - large batches preserved
426
427        // =============================================================================
428        // Benefits and Trade-offs
429        // =============================================================================
430        // Benefits of the optimization:
431        // - Large batches stay intact (better for downstream vectorized processing)
432        // - Fewer split/merge operations (better CPU performance)
433        // - More predictable memory usage patterns
434        // - Maintains streaming efficiency while preserving batch boundaries
435        //
436        // Trade-offs:
437        // - Output batch sizes become variable (not always target_batch_size)
438        // - May produce smaller partial batches when flushing before large batches
439        // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
440
441        // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
442        // small batches together?
443
444        let batch_size = batch.num_rows();
445
446        // Fast path: skip empty batches
447        if batch_size == 0 {
448            return Ok(());
449        }
450
451        // Large batch optimization: bypass coalescing for oversized batches
452        if let Some(limit) = self.biggest_coalesce_batch_size {
453            if batch_size > limit {
454                // Case 1: No buffered data - emit large batch directly
455                // Example: [] + [1200] → output [1200], buffer []
456                if self.buffered_rows == 0 {
457                    self.completed.push_back(batch);
458                    return Ok(());
459                }
460
461                // Case 2: Buffer too large - flush then emit to avoid oversized merge
462                // Example: [850] + [1200] → output [850], then output [1200]
463                // This prevents creating batches much larger than both target_batch_size
464                // and biggest_coalesce_batch_size, which could cause memory issues
465                if self.buffered_rows > limit {
466                    self.finish_buffered_batch()?;
467                    self.completed.push_back(batch);
468                    return Ok(());
469                }
470
471                // Case 3: Small buffer - proceed with normal coalescing
472                // Example: [300] + [1200] → split and merge normally
473                // This ensures small batches still get properly coalesced
474                // while allowing some controlled growth beyond the limit
475            }
476        }
477
478        let (_schema, arrays, mut num_rows) = batch.into_parts();
479
480        // Validate column count matches the expected schema
481        if arrays.len() != self.in_progress_arrays.len() {
482            return Err(ArrowError::InvalidArgumentError(format!(
483                "Batch has {} columns but BatchCoalescer expects {}",
484                arrays.len(),
485                self.in_progress_arrays.len()
486            )));
487        }
488        self.in_progress_arrays
489            .iter_mut()
490            .zip(arrays)
491            .for_each(|(in_progress, array)| {
492                in_progress.set_source(Some(array));
493            });
494
495        // If pushing this batch would exceed the target batch size,
496        // finish the current batch and start a new one
497        let mut offset = 0;
498        while num_rows > (self.target_batch_size - self.buffered_rows) {
499            let remaining_rows = self.target_batch_size - self.buffered_rows;
500            debug_assert!(remaining_rows > 0);
501
502            // Copy remaining_rows from each array
503            for in_progress in self.in_progress_arrays.iter_mut() {
504                in_progress.copy_rows(offset, remaining_rows)?;
505            }
506
507            self.buffered_rows += remaining_rows;
508            offset += remaining_rows;
509            num_rows -= remaining_rows;
510
511            self.finish_buffered_batch()?;
512        }
513
514        // Add any the remaining rows to the buffer
515        self.buffered_rows += num_rows;
516        if num_rows > 0 {
517            for in_progress in self.in_progress_arrays.iter_mut() {
518                in_progress.copy_rows(offset, num_rows)?;
519            }
520        }
521
522        // If we have reached the target batch size, finalize the buffered batch
523        if self.buffered_rows >= self.target_batch_size {
524            self.finish_buffered_batch()?;
525        }
526
527        // clear in progress sources (to allow the memory to be freed)
528        for in_progress in self.in_progress_arrays.iter_mut() {
529            in_progress.set_source(None);
530        }
531
532        Ok(())
533    }
534
535    /// Returns the number of buffered rows
536    pub fn get_buffered_rows(&self) -> usize {
537        self.buffered_rows
538    }
539
540    /// Concatenates any buffered batches into a single `RecordBatch` and
541    /// clears any output buffers
542    ///
543    /// Normally this is called when the input stream is exhausted, and
544    /// we want to finalize the last batch of rows.
545    ///
546    /// See [`Self::next_completed_batch()`] for the completed batches.
547    pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
548        if self.buffered_rows == 0 {
549            return Ok(());
550        }
551        let new_arrays = self
552            .in_progress_arrays
553            .iter_mut()
554            .map(|array| array.finish())
555            .collect::<Result<Vec<_>, ArrowError>>()?;
556
557        for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
558            debug_assert_eq!(array.data_type(), field.data_type());
559            debug_assert_eq!(array.len(), self.buffered_rows);
560        }
561
562        // SAFETY: each array was created of the correct type and length.
563        let batch = unsafe {
564            RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
565        };
566
567        self.buffered_rows = 0;
568        self.completed.push_back(batch);
569        Ok(())
570    }
571
572    /// Returns true if there is any buffered data
573    pub fn is_empty(&self) -> bool {
574        self.buffered_rows == 0 && self.completed.is_empty()
575    }
576
577    /// Returns true if there are any completed batches
578    pub fn has_completed_batch(&self) -> bool {
579        !self.completed.is_empty()
580    }
581
582    /// Removes and returns the next completed batch, if any.
583    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
584        self.completed.pop_front()
585    }
586}
587
588impl BatchCoalescer {
589    fn filter_predicate_for_batch(
590        batch: &RecordBatch,
591        filter: &BooleanArray,
592        selected_count: usize,
593    ) -> FilterPredicate {
594        let mut filter_builder = FilterBuilder::new_with_count(filter, selected_count);
595        if batch.num_columns() > 1
596            || (batch.num_columns() > 0
597                && FilterBuilder::is_optimize_beneficial(batch.schema_ref().field(0).data_type()))
598        {
599            filter_builder = filter_builder.optimize();
600        }
601        filter_builder.build()
602    }
603
604    fn push_batch_with_filtered_columns(
605        &mut self,
606        batch: RecordBatch,
607        filter: &BooleanArray,
608    ) -> Result<(), ArrowError> {
609        let filter_len = filter.len();
610        let batch_num_rows = batch.num_rows();
611        let batch_num_columns = batch.num_columns();
612
613        if filter_len > batch_num_rows {
614            return Err(ArrowError::InvalidArgumentError(format!(
615                "Filter predicate of length {} is larger than target array of length {}",
616                filter_len, batch_num_rows
617            )));
618        }
619
620        let selected_count = filter.true_count();
621        if selected_count == 0 {
622            return Ok(());
623        }
624
625        if selected_count == batch_num_rows && filter_len == batch_num_rows {
626            return self.push_batch(batch);
627        }
628
629        if batch_num_columns != self.in_progress_arrays.len() {
630            return Err(ArrowError::InvalidArgumentError(format!(
631                "Batch has {} columns but BatchCoalescer expects {}",
632                batch_num_columns,
633                self.in_progress_arrays.len()
634            )));
635        }
636
637        let exceeds_coalesce_limit = self
638            .biggest_coalesce_batch_size
639            .is_some_and(|limit| selected_count > limit);
640        let does_not_fit_buffer = selected_count > self.target_batch_size - self.buffered_rows;
641        let should_materialize_filter = exceeds_coalesce_limit
642            || self.has_non_specialized_filter_columns
643            || does_not_fit_buffer
644            || !should_use_sparse_filter_copy(filter_len, selected_count);
645
646        if should_materialize_filter {
647            // Use materialized filtering when sparse per-column copying is unavailable.
648            let predicate = Self::filter_predicate_for_batch(&batch, filter, selected_count);
649            let filtered_batch = predicate.filter_record_batch(&batch)?;
650            return self.push_batch(filtered_batch);
651        }
652
653        let predicate = Self::filter_predicate_for_batch(&batch, filter, selected_count);
654        let (_schema, arrays, _num_rows) = batch.into_parts();
655
656        for (in_progress, array) in self.in_progress_arrays.iter_mut().zip(arrays) {
657            in_progress.copy_rows_by_filter_from(array, &predicate)?;
658        }
659
660        self.buffered_rows += selected_count;
661        if self.buffered_rows >= self.target_batch_size {
662            self.finish_buffered_batch()?;
663        }
664
665        Ok(())
666    }
667}
668
669/// Return a new `InProgressArray` for the given data type
670fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
671    macro_rules! instantiate_primitive {
672        ($t:ty) => {
673            Box::new(InProgressPrimitiveArray::<$t>::new(
674                batch_size,
675                data_type.clone(),
676            ))
677        };
678    }
679
680    downcast_primitive! {
681        // Instantiate InProgressPrimitiveArray for each primitive type
682        data_type => (instantiate_primitive),
683        DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
684        DataType::BinaryView => {
685            Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
686        }
687        _ => Box::new(GenericInProgressArray::new()),
688    }
689}
690
691/// Incrementally builds up arrays
692///
693/// [`GenericInProgressArray`] is the default implementation that buffers
694/// arrays and uses other kernels concatenates them when finished.
695///
696/// Some types have specialized implementations for this array types (e.g.,
697/// [`StringViewArray`], etc.).
698///
699/// [`StringViewArray`]: arrow_array::StringViewArray
700trait InProgressArray: std::fmt::Debug + Send + Sync {
701    /// Set the source array.
702    ///
703    /// Calls to [`Self::copy_rows`] will copy rows from this array into the
704    /// current in-progress array
705    fn set_source(&mut self, source: Option<ArrayRef>);
706
707    /// Copy rows from the current source array into the in-progress array
708    ///
709    /// The source array is set by [`Self::set_source`].
710    ///
711    /// Return an error if the source array is not set
712    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
713
714    /// Copy rows selected by `filter` from the current source array.
715    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
716        self.copy_rows_by_selection(filter.selection())
717    }
718
719    /// Copy rows selected by `filter` from `source`.
720    fn copy_rows_by_filter_from(
721        &mut self,
722        source: ArrayRef,
723        filter: &FilterPredicate,
724    ) -> Result<(), ArrowError> {
725        self.set_source(Some(source));
726        let result = self.copy_rows_by_filter(filter);
727        self.set_source(None);
728        result
729    }
730
731    /// Copy rows described by a [`FilterSelection`] from the current source array.
732    fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) -> Result<(), ArrowError> {
733        match selection {
734            FilterSelection::None => Ok(()),
735            FilterSelection::All { len } => self.copy_rows(0, len),
736            FilterSelection::Slices(slices) => {
737                slices.try_for_each(|(start, end)| self.copy_rows(start, end - start))
738            }
739            FilterSelection::Indices(indices) => indices.try_for_each(|idx| self.copy_rows(idx, 1)),
740        }
741    }
742
743    /// Finish the currently in-progress array and return it as an `ArrayRef`
744    fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
745}
746
747#[cfg(test)]
748mod tests {
749    use super::*;
750    use crate::concat::concat_batches;
751    use crate::filter::filter_record_batch;
752    use arrow_array::builder::StringViewBuilder;
753    use arrow_array::cast::AsArray;
754    use arrow_array::types::Int32Type;
755    use arrow_array::{
756        BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
757        TimestampNanosecondArray, UInt32Array, UInt64Array, make_array,
758    };
759    use arrow_buffer::BooleanBufferBuilder;
760    use arrow_schema::{DataType, Field, Schema};
761    use rand::{Rng, SeedableRng};
762    use std::ops::Range;
763
764    #[test]
765    fn test_coalesce() {
766        let batch = uint32_batch(0..8);
767        Test::new("coalesce")
768            .with_batches(std::iter::repeat_n(batch, 10))
769            // expected output is exactly 21 rows (except for the final batch)
770            .with_batch_size(21)
771            .with_expected_output_sizes(vec![21, 21, 21, 17])
772            .run();
773    }
774
775    #[test]
776    fn test_coalesce_one_by_one() {
777        let batch = uint32_batch(0..1); // single row input
778        Test::new("coalesce_one_by_one")
779            .with_batches(std::iter::repeat_n(batch, 97))
780            // expected output is exactly 20 rows (except for the final batch)
781            .with_batch_size(20)
782            .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
783            .run();
784    }
785
786    #[test]
787    fn test_coalesce_empty() {
788        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
789
790        Test::new("coalesce_empty")
791            .with_batches(vec![])
792            .with_schema(schema)
793            .with_batch_size(21)
794            .with_expected_output_sizes(vec![])
795            .run();
796    }
797
798    #[test]
799    fn test_sparse_filter_copy_threshold() {
800        assert!(should_use_sparse_filter_copy(8192, 8));
801        assert!(should_use_sparse_filter_copy(8192, 81));
802        assert!(!should_use_sparse_filter_copy(8192, 819));
803        assert!(!should_use_sparse_filter_copy(8192, 6553));
804    }
805
806    #[test]
807    fn test_single_large_batch_greater_than_target() {
808        // test a single large batch
809        let batch = uint32_batch(0..4096);
810        Test::new("coalesce_single_large_batch_greater_than_target")
811            .with_batch(batch)
812            .with_batch_size(1000)
813            .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
814            .run();
815    }
816
817    #[test]
818    fn test_single_large_batch_smaller_than_target() {
819        // test a single large batch
820        let batch = uint32_batch(0..4096);
821        Test::new("coalesce_single_large_batch_smaller_than_target")
822            .with_batch(batch)
823            .with_batch_size(8192)
824            .with_expected_output_sizes(vec![4096])
825            .run();
826    }
827
828    #[test]
829    fn test_single_large_batch_equal_to_target() {
830        // test a single large batch
831        let batch = uint32_batch(0..4096);
832        Test::new("coalesce_single_large_batch_equal_to_target")
833            .with_batch(batch)
834            .with_batch_size(4096)
835            .with_expected_output_sizes(vec![4096])
836            .run();
837    }
838
839    #[test]
840    fn test_single_large_batch_equally_divisible_in_target() {
841        // test a single large batch
842        let batch = uint32_batch(0..4096);
843        Test::new("coalesce_single_large_batch_equally_divisible_in_target")
844            .with_batch(batch)
845            .with_batch_size(1024)
846            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
847            .run();
848    }
849
850    #[test]
851    fn test_empty_schema() {
852        let schema = Schema::empty();
853        let batch = RecordBatch::new_empty(schema.into());
854        Test::new("coalesce_empty_schema")
855            .with_batch(batch)
856            .with_expected_output_sizes(vec![])
857            .run();
858    }
859
860    /// Coalesce multiple batches, 80k rows, with a 0.1% selectivity filter
861    #[test]
862    fn test_coalesce_filtered_001() {
863        let mut filter_builder = RandomFilterBuilder {
864            num_rows: 8000,
865            selectivity: 0.001,
866            seed: 0,
867        };
868
869        // add 10 batches of 8000 rows each
870        // 80k rows, selecting 0.1% means 80 rows
871        // not exactly 80 as the rows are random;
872        let mut test = Test::new("coalesce_filtered_001");
873        for _ in 0..10 {
874            test = test
875                .with_batch(multi_column_batch(0..8000))
876                .with_filter(filter_builder.next_filter())
877        }
878        test.with_batch_size(15)
879            .with_expected_output_sizes(vec![15, 15, 15, 13])
880            .run();
881    }
882
883    /// Coalesce multiple batches, 80k rows, with a 1% selectivity filter
884    #[test]
885    fn test_coalesce_filtered_01() {
886        let mut filter_builder = RandomFilterBuilder {
887            num_rows: 8000,
888            selectivity: 0.01,
889            seed: 0,
890        };
891
892        // add 10 batches of 8000 rows each
893        // 80k rows, selecting 1% means 800 rows
894        // not exactly 800 as the rows are random;
895        let mut test = Test::new("coalesce_filtered_01");
896        for _ in 0..10 {
897            test = test
898                .with_batch(multi_column_batch(0..8000))
899                .with_filter(filter_builder.next_filter())
900        }
901        test.with_batch_size(128)
902            .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
903            .run();
904    }
905
906    /// Coalesce multiple batches, 80k rows, with a 10% selectivity filter
907    #[test]
908    fn test_coalesce_filtered_10() {
909        let mut filter_builder = RandomFilterBuilder {
910            num_rows: 8000,
911            selectivity: 0.1,
912            seed: 0,
913        };
914
915        // add 10 batches of 8000 rows each
916        // 80k rows, selecting 10% means 8000 rows
917        // not exactly 800 as the rows are random;
918        let mut test = Test::new("coalesce_filtered_10");
919        for _ in 0..10 {
920            test = test
921                .with_batch(multi_column_batch(0..8000))
922                .with_filter(filter_builder.next_filter())
923        }
924        test.with_batch_size(1024)
925            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
926            .run();
927    }
928
929    /// Coalesce multiple batches, 8k rows, with a 90% selectivity filter
930    #[test]
931    fn test_coalesce_filtered_90() {
932        let mut filter_builder = RandomFilterBuilder {
933            num_rows: 800,
934            selectivity: 0.90,
935            seed: 0,
936        };
937
938        // add 10 batches of 800 rows each
939        // 8k rows, selecting 99% means 7200 rows
940        // not exactly 7200 as the rows are random;
941        let mut test = Test::new("coalesce_filtered_90");
942        for _ in 0..10 {
943            test = test
944                .with_batch(multi_column_batch(0..800))
945                .with_filter(filter_builder.next_filter())
946        }
947        test.with_batch_size(1024)
948            .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
949            .run();
950    }
951
952    /// Coalesce multiple batches, 8k rows, with mixed filers, including 100%
953    #[test]
954    fn test_coalesce_filtered_mixed() {
955        let mut filter_builder = RandomFilterBuilder {
956            num_rows: 800,
957            selectivity: 0.90,
958            seed: 0,
959        };
960
961        let mut test = Test::new("coalesce_filtered_mixed");
962        for _ in 0..3 {
963            // also add in a batch that selects almost all rows and when
964            // sliced will have some batches that are entirely used
965            let mut all_filter_builder = BooleanBufferBuilder::new(1000);
966            all_filter_builder.append_n(500, true);
967            all_filter_builder.append_n(1, false);
968            all_filter_builder.append_n(499, false);
969            let all_filter = all_filter_builder.build();
970
971            test = test
972                .with_batch(multi_column_batch(0..1000))
973                .with_filter(BooleanArray::from(all_filter))
974                .with_batch(multi_column_batch(0..800))
975                .with_filter(filter_builder.next_filter());
976            // decrease selectivity
977            filter_builder.selectivity *= 0.6;
978        }
979
980        // use a small batch size to ensure the filter is appended in slices
981        // and some of those slides will select the entire thing.
982        test.with_batch_size(250)
983            .with_expected_output_sizes(vec![
984                250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 179,
985            ])
986            .run();
987    }
988
989    #[test]
990    fn test_coalesce_non_null() {
991        Test::new("coalesce_non_null")
992            // 4040 rows of unit32
993            .with_batch(uint32_batch_non_null(0..3000))
994            .with_batch(uint32_batch_non_null(0..1040))
995            .with_batch_size(1024)
996            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
997            .run();
998    }
999    #[test]
1000    fn test_utf8_split() {
1001        Test::new("coalesce_utf8")
1002            // 4040 rows of utf8 strings in total, split into batches of 1024
1003            .with_batch(utf8_batch(0..3000))
1004            .with_batch(utf8_batch(0..1040))
1005            .with_batch_size(1024)
1006            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
1007            .run();
1008    }
1009
1010    #[test]
1011    fn test_string_view_no_views() {
1012        let output_batches = Test::new("coalesce_string_view_no_views")
1013            // both input batches have no views, so no need to compact
1014            .with_batch(stringview_batch([Some("foo"), Some("bar")]))
1015            .with_batch(stringview_batch([Some("baz"), Some("qux")]))
1016            .with_expected_output_sizes(vec![4])
1017            .run();
1018
1019        expect_buffer_layout(
1020            col_as_string_view("c0", output_batches.first().unwrap()),
1021            vec![],
1022        );
1023    }
1024
1025    #[test]
1026    fn test_string_view_batch_small_no_compact() {
1027        // view with only short strings (no buffers) --> no need to compact
1028        let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
1029        let output_batches = Test::new("coalesce_string_view_batch_small_no_compact")
1030            .with_batch(batch.clone())
1031            .with_expected_output_sizes(vec![1000])
1032            .run();
1033
1034        let array = col_as_string_view("c0", &batch);
1035        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
1036        assert_eq!(array.data_buffers().len(), 0);
1037        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
1038
1039        expect_buffer_layout(gc_array, vec![]);
1040    }
1041
1042    #[test]
1043    fn test_string_view_batch_large_no_compact() {
1044        // view with large strings (has buffers) but full --> no need to compact
1045        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
1046        let output_batches = Test::new("coalesce_string_view_batch_large_no_compact")
1047            .with_batch(batch.clone())
1048            .with_batch_size(1000)
1049            .with_expected_output_sizes(vec![1000])
1050            .run();
1051
1052        let array = col_as_string_view("c0", &batch);
1053        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
1054        assert_eq!(array.data_buffers().len(), 5);
1055        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
1056
1057        expect_buffer_layout(
1058            gc_array,
1059            vec![
1060                ExpectedLayout {
1061                    len: 8190,
1062                    capacity: 8192,
1063                },
1064                ExpectedLayout {
1065                    len: 8190,
1066                    capacity: 8192,
1067                },
1068                ExpectedLayout {
1069                    len: 8190,
1070                    capacity: 8192,
1071                },
1072                ExpectedLayout {
1073                    len: 8190,
1074                    capacity: 8192,
1075                },
1076                ExpectedLayout {
1077                    len: 2240,
1078                    capacity: 8192,
1079                },
1080            ],
1081        );
1082    }
1083
1084    #[test]
1085    fn test_string_view_batch_small_with_buffers_no_compact() {
1086        // view with buffers but only short views
1087        let short_strings = std::iter::repeat(Some("SmallString"));
1088        let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
1089        // 20 short strings, then a long ones
1090        let values = short_strings.take(20).chain(long_strings);
1091        let batch = stringview_batch_repeated(1000, values)
1092            // take only 10 short strings (no long ones)
1093            .slice(5, 10);
1094        let output_batches = Test::new("coalesce_string_view_batch_small_with_buffers_no_compact")
1095            .with_batch(batch.clone())
1096            .with_batch_size(1000)
1097            .with_expected_output_sizes(vec![10])
1098            .run();
1099
1100        let array = col_as_string_view("c0", &batch);
1101        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
1102        assert_eq!(array.data_buffers().len(), 1); // input has one buffer
1103        assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
1104    }
1105
1106    #[test]
1107    fn test_string_view_batch_large_slice_compact() {
1108        // view with large strings (has buffers) and only partially used  --> no need to compact
1109        let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
1110            // slice only 22 rows, so most of the buffer is not used
1111            .slice(11, 22);
1112
1113        let output_batches = Test::new("coalesce_string_view_batch_large_slice_compact")
1114            .with_batch(batch.clone())
1115            .with_batch_size(1000)
1116            .with_expected_output_sizes(vec![22])
1117            .run();
1118
1119        let array = col_as_string_view("c0", &batch);
1120        let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
1121        assert_eq!(array.data_buffers().len(), 5);
1122
1123        expect_buffer_layout(
1124            gc_array,
1125            vec![ExpectedLayout {
1126                len: 770,
1127                capacity: 8192,
1128            }],
1129        );
1130    }
1131
1132    #[test]
1133    fn test_string_view_mixed() {
1134        let large_view_batch =
1135            stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
1136        let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
1137        let mixed_batch = stringview_batch_repeated(
1138            1000,
1139            [Some("This string is longer than 12 bytes"), Some("Small")],
1140        );
1141        let mixed_batch_nulls = stringview_batch_repeated(
1142            1000,
1143            [
1144                Some("This string is longer than 12 bytes"),
1145                Some("Small"),
1146                None,
1147            ],
1148        );
1149
1150        // Several batches with mixed inline / non inline
1151        // 4k rows in
1152        let output_batches = Test::new("coalesce_string_view_mixed")
1153            .with_batch(large_view_batch.clone())
1154            .with_batch(small_view_batch)
1155            // this batch needs to be compacted (less than 1/2 full)
1156            .with_batch(large_view_batch.slice(10, 20))
1157            .with_batch(mixed_batch_nulls)
1158            // this batch needs to be compacted (less than 1/2 full)
1159            .with_batch(large_view_batch.slice(10, 20))
1160            .with_batch(mixed_batch)
1161            .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
1162            .run();
1163
1164        expect_buffer_layout(
1165            col_as_string_view("c0", output_batches.first().unwrap()),
1166            vec![
1167                ExpectedLayout {
1168                    len: 8190,
1169                    capacity: 8192,
1170                },
1171                ExpectedLayout {
1172                    len: 8190,
1173                    capacity: 8192,
1174                },
1175                ExpectedLayout {
1176                    len: 8190,
1177                    capacity: 8192,
1178                },
1179                ExpectedLayout {
1180                    len: 8190,
1181                    capacity: 8192,
1182                },
1183                ExpectedLayout {
1184                    len: 2240,
1185                    capacity: 8192,
1186                },
1187            ],
1188        );
1189    }
1190
1191    #[test]
1192    fn test_string_view_many_small_compact() {
1193        // 200 rows alternating long (28) and short (≤12) strings.
1194        // Only the 100 long strings go into data buffers: 100 × 28 = 2800.
1195        let batch = stringview_batch_repeated(
1196            200,
1197            [Some("This string is 28 bytes long"), Some("small string")],
1198        );
1199        let output_batches = Test::new("coalesce_string_view_many_small_compact")
1200            // First allocated buffer is 8kb.
1201            // Appending 10 batches of 2800 bytes will use 2800 * 10 = 14kb (8kb, an 16kb and 32kbkb)
1202            .with_batch(batch.clone())
1203            .with_batch(batch.clone())
1204            .with_batch(batch.clone())
1205            .with_batch(batch.clone())
1206            .with_batch(batch.clone())
1207            .with_batch(batch.clone())
1208            .with_batch(batch.clone())
1209            .with_batch(batch.clone())
1210            .with_batch(batch.clone())
1211            .with_batch(batch.clone())
1212            .with_batch_size(8000)
1213            .with_expected_output_sizes(vec![2000]) // only 1000 rows total
1214            .run();
1215
1216        // expect a nice even distribution of buffers
1217        expect_buffer_layout(
1218            col_as_string_view("c0", output_batches.first().unwrap()),
1219            vec![
1220                ExpectedLayout {
1221                    len: 8176,
1222                    capacity: 8192,
1223                },
1224                ExpectedLayout {
1225                    len: 16380,
1226                    capacity: 16384,
1227                },
1228                ExpectedLayout {
1229                    len: 3444,
1230                    capacity: 32768,
1231                },
1232            ],
1233        );
1234    }
1235
1236    #[test]
1237    fn test_string_view_many_small_boundary() {
1238        // The strings are designed to exactly fit into buffers that are powers of 2 long
1239        let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1240        let output_batches = Test::new("coalesce_string_view_many_small_boundary")
1241            .with_batches(std::iter::repeat_n(batch, 20))
1242            .with_batch_size(900)
1243            .with_expected_output_sizes(vec![900, 900, 200])
1244            .run();
1245
1246        // expect each buffer to be entirely full except the last one
1247        expect_buffer_layout(
1248            col_as_string_view("c0", output_batches.first().unwrap()),
1249            vec![
1250                ExpectedLayout {
1251                    len: 8192,
1252                    capacity: 8192,
1253                },
1254                ExpectedLayout {
1255                    len: 16384,
1256                    capacity: 16384,
1257                },
1258                ExpectedLayout {
1259                    len: 4224,
1260                    capacity: 32768,
1261                },
1262            ],
1263        );
1264    }
1265
1266    #[test]
1267    fn test_string_view_large_small() {
1268        // The strings are 37 bytes long, so each batch has 100 * 28 = 2800 bytes
1269        let mixed_batch = stringview_batch_repeated(
1270            200,
1271            [Some("This string is 28 bytes long"), Some("small string")],
1272        );
1273        // These strings aren't copied, this array has an 8k buffer
1274        let all_large = stringview_batch_repeated(
1275            50,
1276            [Some(
1277                "This buffer has only large strings in it so there are no buffer copies",
1278            )],
1279        );
1280
1281        let output_batches = Test::new("coalesce_string_view_large_small")
1282            // First allocated buffer is 8kb.
1283            // Appending five batches of 2800 bytes will use 2800 * 10 = 28kb (8kb, an 16kb and 32kbkb)
1284            .with_batch(mixed_batch.clone())
1285            .with_batch(mixed_batch.clone())
1286            .with_batch(all_large.clone())
1287            .with_batch(mixed_batch.clone())
1288            .with_batch(all_large.clone())
1289            .with_batch(mixed_batch.clone())
1290            .with_batch(mixed_batch.clone())
1291            .with_batch(all_large.clone())
1292            .with_batch(mixed_batch.clone())
1293            .with_batch(all_large.clone())
1294            .with_batch_size(8000)
1295            .with_expected_output_sizes(vec![1400])
1296            .run();
1297
1298        expect_buffer_layout(
1299            col_as_string_view("c0", output_batches.first().unwrap()),
1300            vec![
1301                ExpectedLayout {
1302                    len: 8190,
1303                    capacity: 8192,
1304                },
1305                ExpectedLayout {
1306                    len: 16366,
1307                    capacity: 16384,
1308                },
1309                ExpectedLayout {
1310                    len: 6244,
1311                    capacity: 32768,
1312                },
1313            ],
1314        );
1315    }
1316
1317    #[test]
1318    fn test_binary_view() {
1319        let values: Vec<Option<&[u8]>> = vec![
1320            Some(b"foo"),
1321            None,
1322            Some(b"A longer string that is more than 12 bytes"),
1323        ];
1324
1325        let binary_view =
1326            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1327        let batch =
1328            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1329
1330        Test::new("coalesce_binary_view")
1331            .with_batch(batch.clone())
1332            .with_batch(batch.clone())
1333            .with_batch_size(512)
1334            .with_expected_output_sizes(vec![512, 512, 512, 464])
1335            .run();
1336    }
1337
1338    #[test]
1339    fn test_binary_view_filtered() {
1340        let values: Vec<Option<&[u8]>> = vec![
1341            Some(b"foo"),
1342            None,
1343            Some(b"A longer string that is more than 12 bytes"),
1344        ];
1345
1346        let binary_view =
1347            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1348        let batch =
1349            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1350        let filter = sparse_filter(1000);
1351
1352        Test::new("coalesce_binary_view_filtered")
1353            .with_batch(batch.clone())
1354            .with_filter(filter.clone())
1355            .with_batch(batch)
1356            .with_filter(filter)
1357            .with_batch_size(256)
1358            .with_expected_output_sizes(vec![250])
1359            .run();
1360    }
1361
1362    #[test]
1363    fn test_binary_view_filtered_inline() {
1364        let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
1365
1366        let binary_view =
1367            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1368        let batch =
1369            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1370        let filter = sparse_filter(1000);
1371
1372        Test::new("coalesce_binary_view_filtered_inline")
1373            .with_batch(batch.clone())
1374            .with_filter(filter.clone())
1375            .with_batch(batch)
1376            .with_filter(filter)
1377            .with_batch_size(300)
1378            .with_expected_output_sizes(vec![250])
1379            .run();
1380    }
1381
1382    #[test]
1383    fn test_string_view_filtered_inline() {
1384        let values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1385
1386        let string_view =
1387            StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1388        let batch =
1389            RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap();
1390        let filter = sparse_filter(1000);
1391
1392        Test::new("coalesce_string_view_filtered_inline")
1393            .with_batch(batch.clone())
1394            .with_filter(filter.clone())
1395            .with_batch(batch)
1396            .with_filter(filter)
1397            .with_batch_size(300)
1398            .with_expected_output_sizes(vec![250])
1399            .run();
1400    }
1401
1402    #[test]
1403    fn test_mixed_inline_binary_view_filtered() {
1404        let int_values =
1405            Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
1406        let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
1407        let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
1408        let binary_view = BinaryViewArray::from_iter(
1409            std::iter::repeat(binary_values.iter()).flatten().take(1000),
1410        );
1411
1412        let batch = RecordBatch::try_from_iter(vec![
1413            ("i", Arc::new(int_values) as ArrayRef),
1414            ("f", Arc::new(float_values) as ArrayRef),
1415            ("b", Arc::new(binary_view) as ArrayRef),
1416        ])
1417        .unwrap();
1418
1419        let filter = sparse_filter(1000);
1420
1421        Test::new("coalesce_mixed_inline_binary_view_filtered")
1422            .with_batch(batch.clone())
1423            .with_filter(filter.clone())
1424            .with_batch(batch)
1425            .with_filter(filter)
1426            .with_batch_size(300)
1427            .with_expected_output_sizes(vec![250])
1428            .run();
1429    }
1430
1431    #[test]
1432    fn test_mixed_inline_string_view_filtered() {
1433        let int_values =
1434            Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
1435        let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
1436        let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1437        let string_view = StringViewArray::from_iter(
1438            std::iter::repeat(string_values.iter()).flatten().take(1000),
1439        );
1440
1441        let batch = RecordBatch::try_from_iter(vec![
1442            ("i", Arc::new(int_values) as ArrayRef),
1443            ("f", Arc::new(float_values) as ArrayRef),
1444            ("s", Arc::new(string_view) as ArrayRef),
1445        ])
1446        .unwrap();
1447
1448        let filter = sparse_filter(1000);
1449
1450        Test::new("coalesce_mixed_inline_string_view_filtered")
1451            .with_batch(batch.clone())
1452            .with_filter(filter.clone())
1453            .with_batch(batch)
1454            .with_filter(filter)
1455            .with_batch_size(300)
1456            .with_expected_output_sizes(vec![250])
1457            .run();
1458    }
1459
1460    #[test]
1461    fn test_inline_binary_view_sparse() {
1462        // All-inline binary views (<=12 bytes) + 5% selectivity
1463        let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
1464        let binary_view =
1465            BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1466        let batch =
1467            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1468        let filter = very_sparse_filter(1000);
1469
1470        Test::new("inline_binary_view_sparse")
1471            .with_batch(batch.clone())
1472            .with_filter(filter.clone())
1473            .with_batch(batch)
1474            .with_filter(filter)
1475            .with_batch_size(1024)
1476            .with_expected_output_sizes(vec![100])
1477            .run();
1478    }
1479
1480    #[test]
1481    fn test_inline_string_view_sparse() {
1482        let values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1483        let string_view =
1484            StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1485        let batch =
1486            RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap();
1487        let filter = very_sparse_filter(1000);
1488
1489        Test::new("inline_string_view_sparse")
1490            .with_batch(batch.clone())
1491            .with_filter(filter.clone())
1492            .with_batch(batch)
1493            .with_filter(filter)
1494            .with_batch_size(1024)
1495            .with_expected_output_sizes(vec![100])
1496            .run();
1497    }
1498
1499    #[test]
1500    fn test_inline_mixed_sparse() {
1501        // Mixing primitives with inline views exercises materialized `Indices`
1502        // selection for both the primitive and the byte-view.
1503        let int_values =
1504            Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
1505        let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
1506        let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1507        let string_view = StringViewArray::from_iter(
1508            std::iter::repeat(string_values.iter()).flatten().take(1000),
1509        );
1510        let binary_values: Vec<Option<&[u8]>> = vec![Some(b"x"), None, Some(b"abcdef")];
1511        let binary_view = BinaryViewArray::from_iter(
1512            std::iter::repeat(binary_values.iter()).flatten().take(1000),
1513        );
1514
1515        let batch = RecordBatch::try_from_iter(vec![
1516            ("i", Arc::new(int_values) as ArrayRef),
1517            ("f", Arc::new(float_values) as ArrayRef),
1518            ("s", Arc::new(string_view) as ArrayRef),
1519            ("b", Arc::new(binary_view) as ArrayRef),
1520        ])
1521        .unwrap();
1522        let filter = very_sparse_filter(1000);
1523
1524        Test::new("inline_mixed_sparse")
1525            .with_batch(batch.clone())
1526            .with_filter(filter.clone())
1527            .with_batch(batch)
1528            .with_filter(filter)
1529            .with_batch_size(1024)
1530            .with_expected_output_sizes(vec![100])
1531            .run();
1532    }
1533
1534    #[test]
1535    fn test_inline_crosses_target_batch_size() {
1536        // Each 1000-row batch selects 50 rows; target_batch_size 100
1537        // exercises intermediate batch flushing batch (not just the final
1538        // flush).
1539        let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
1540        let make_batch = || {
1541            let binary_view =
1542                BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1543            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap()
1544        };
1545        let filter = very_sparse_filter(1000);
1546
1547        Test::new("inline_crosses_target_batch_size")
1548            .with_batch(make_batch())
1549            .with_filter(filter.clone())
1550            .with_batch(make_batch())
1551            .with_filter(filter.clone())
1552            .with_batch(make_batch())
1553            .with_filter(filter)
1554            .with_batch_size(100)
1555            .with_expected_output_sizes(vec![100, 50])
1556            .run();
1557    }
1558
1559    #[test]
1560    fn test_inline_filter_rejects_filter_longer_than_batch() {
1561        let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), Some(b"bar")];
1562        let binary_view = BinaryViewArray::from_iter(values);
1563        let batch =
1564            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1565        let filter = BooleanArray::from(vec![true, false, true]);
1566
1567        let mut coalescer = BatchCoalescer::new(batch.schema(), 100);
1568        let result = coalescer.push_batch_with_filter(batch, &filter);
1569        assert!(result.is_err());
1570        let err = result.unwrap_err().to_string();
1571        assert!(
1572            err.contains("Filter predicate of length 3 is larger than target array of length 2"),
1573            "unexpected error: {err}"
1574        );
1575    }
1576
1577    #[test]
1578    fn test_mixed_boolean_inline_string_view_filtered() {
1579        let bool_values = BooleanArray::from_iter((0..1000).map(|v| Some(v % 3 == 0)));
1580        let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
1581        let string_view = StringViewArray::from_iter(
1582            std::iter::repeat(string_values.iter()).flatten().take(1000),
1583        );
1584
1585        let batch = RecordBatch::try_from_iter(vec![
1586            ("b", Arc::new(bool_values) as ArrayRef),
1587            ("s", Arc::new(string_view) as ArrayRef),
1588        ])
1589        .unwrap();
1590
1591        let filter = sparse_filter(1000);
1592
1593        Test::new("coalesce_mixed_boolean_inline_string_view_filtered")
1594            .with_batch(batch.clone())
1595            .with_filter(filter.clone())
1596            .with_batch(batch)
1597            .with_filter(filter)
1598            .with_batch_size(300)
1599            .with_expected_output_sizes(vec![250])
1600            .run();
1601    }
1602
1603    #[test]
1604    fn test_filter_fast_path_schema_capability() {
1605        let supported = Arc::new(Schema::new(vec![
1606            Field::new("primitive", DataType::UInt32, false),
1607            Field::new("utf8_view", DataType::Utf8View, true),
1608            Field::new("binary_view", DataType::BinaryView, true),
1609        ]));
1610        let coalescer = BatchCoalescer::new(supported, 100);
1611        assert!(!coalescer.has_non_specialized_filter_columns);
1612
1613        let utf8 = Arc::new(Schema::new(vec![Field::new("utf8", DataType::Utf8, true)]));
1614        let coalescer = BatchCoalescer::new(utf8, 100);
1615        assert!(coalescer.has_non_specialized_filter_columns);
1616
1617        let boolean = Arc::new(Schema::new(vec![Field::new(
1618            "boolean",
1619            DataType::Boolean,
1620            true,
1621        )]));
1622        let coalescer = BatchCoalescer::new(boolean, 100);
1623        assert!(coalescer.has_non_specialized_filter_columns);
1624    }
1625
1626    #[derive(Debug, Clone, PartialEq)]
1627    struct ExpectedLayout {
1628        len: usize,
1629        capacity: usize,
1630    }
1631
1632    /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
1633    fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1634        let actual = array
1635            .data_buffers()
1636            .iter()
1637            .map(|b| ExpectedLayout {
1638                len: b.len(),
1639                capacity: b.capacity(),
1640            })
1641            .collect::<Vec<_>>();
1642
1643        assert_eq!(
1644            actual, expected,
1645            "Expected buffer layout {expected:#?} but got {actual:#?}"
1646        );
1647    }
1648
1649    /// Test for [`BatchCoalescer`]
1650    ///
1651    /// Pushes the input batches to the coalescer and verifies that the resulting
1652    /// batches have the
1653    /// 1. expected number of rows
1654    /// 2. The same results when the batches are filtered using the filter kernel
1655    #[derive(Debug, Clone)]
1656    struct Test {
1657        /// A human readable name to assist in debugging
1658        name: String,
1659        /// Batches to feed to the coalescer.
1660        input_batches: Vec<RecordBatch>,
1661        /// Filters to apply to the corresponding input batches.
1662        ///
1663        /// If there are no filters for the input batches, the batch will be
1664        /// pushed as is.
1665        filters: Vec<BooleanArray>,
1666        /// The schema. If not provided, the first batch's schema is used.
1667        schema: Option<SchemaRef>,
1668        /// Expected output sizes of the resulting batches
1669        expected_output_sizes: Vec<usize>,
1670        /// target batch size (default to 1024)
1671        target_batch_size: usize,
1672    }
1673
1674    impl Default for Test {
1675        fn default() -> Self {
1676            Self {
1677                name: "".to_string(),
1678                input_batches: vec![],
1679                filters: vec![],
1680                schema: None,
1681                expected_output_sizes: vec![],
1682                target_batch_size: 1024,
1683            }
1684        }
1685    }
1686
1687    impl Test {
1688        fn new(name: impl Into<String>) -> Self {
1689            Self {
1690                name: name.into(),
1691                ..Self::default()
1692            }
1693        }
1694
1695        /// Append the description to the test name
1696        fn with_description(mut self, description: &str) -> Self {
1697            self.name.push_str(": ");
1698            self.name.push_str(description);
1699            self
1700        }
1701
1702        /// Set the target batch size
1703        fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1704            self.target_batch_size = target_batch_size;
1705            self
1706        }
1707
1708        /// Extend the input batches with `batch`
1709        fn with_batch(mut self, batch: RecordBatch) -> Self {
1710            self.input_batches.push(batch);
1711            self
1712        }
1713
1714        /// Extend the filters with `filter`
1715        fn with_filter(mut self, filter: BooleanArray) -> Self {
1716            self.filters.push(filter);
1717            self
1718        }
1719
1720        /// Replaces the input batches with `batches`
1721        fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1722            self.input_batches = batches.into_iter().collect();
1723            self
1724        }
1725
1726        /// Specifies the schema for the test
1727        fn with_schema(mut self, schema: SchemaRef) -> Self {
1728            self.schema = Some(schema);
1729            self
1730        }
1731
1732        /// Extends `sizes` to expected output sizes
1733        fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1734            self.expected_output_sizes.extend(sizes);
1735            self
1736        }
1737
1738        /// Runs the test -- see documentation on [`Test`] for details
1739        ///
1740        /// Returns the resulting output batches
1741        fn run(self) -> Vec<RecordBatch> {
1742            // Test several permutations of input batches:
1743            // 1. Removing nulls from some batches (test non-null fast paths)
1744            // 2. Empty batches
1745            // 3. One column (from the batch)
1746            let mut extra_tests = vec![];
1747            extra_tests.push(self.clone().make_half_non_nullable());
1748            extra_tests.push(self.clone().insert_empty_batches());
1749            let single_column_tests = self.make_single_column_tests();
1750            for test in single_column_tests {
1751                extra_tests.push(test.clone().make_half_non_nullable());
1752                extra_tests.push(test);
1753            }
1754
1755            // Run original test case first, so any obvious errors are caught
1756            // by an easier to understand test case
1757            let results = self.run_inner();
1758            // Run the extra cases to expand coverage
1759            for extra in extra_tests {
1760                extra.run_inner();
1761            }
1762
1763            results
1764        }
1765
1766        /// Runs the current test instance
1767        fn run_inner(self) -> Vec<RecordBatch> {
1768            let expected_output = self.expected_output();
1769            let schema = self.schema();
1770
1771            let Self {
1772                name,
1773                input_batches,
1774                filters,
1775                schema: _,
1776                target_batch_size,
1777                expected_output_sizes,
1778            } = self;
1779
1780            println!("Running test '{name}'");
1781
1782            let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1783
1784            let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1785
1786            // feed input batches and filters to the coalescer
1787            let mut filters = filters.into_iter();
1788            for batch in input_batches {
1789                if let Some(filter) = filters.next() {
1790                    coalescer.push_batch_with_filter(batch, &filter).unwrap();
1791                } else {
1792                    coalescer.push_batch(batch).unwrap();
1793                }
1794            }
1795            assert_eq!(schema, coalescer.schema());
1796
1797            if had_input {
1798                assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1799            } else {
1800                assert!(coalescer.is_empty(), "Coalescer should be empty");
1801            }
1802
1803            coalescer.finish_buffered_batch().unwrap();
1804            if had_input {
1805                assert!(
1806                    coalescer.has_completed_batch(),
1807                    "Coalescer should have completed batches"
1808                );
1809            }
1810
1811            let mut output_batches = vec![];
1812            while let Some(batch) = coalescer.next_completed_batch() {
1813                output_batches.push(batch);
1814            }
1815
1816            // make sure we got the expected number of output batches and content
1817            let mut starting_idx = 0;
1818            let actual_output_sizes: Vec<usize> =
1819                output_batches.iter().map(|b| b.num_rows()).collect();
1820            assert_eq!(
1821                expected_output_sizes, actual_output_sizes,
1822                "Unexpected number of rows in output batches\n\
1823                Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1824            );
1825            let iter = expected_output_sizes
1826                .iter()
1827                .zip(output_batches.iter())
1828                .enumerate();
1829
1830            // Verify that the actual contents of each output batch matches the expected output
1831            for (i, (expected_size, batch)) in iter {
1832                // compare the contents of the batch after normalization (using
1833                // `==` compares the underlying memory layout too)
1834                let expected_batch = expected_output.slice(starting_idx, *expected_size);
1835                let expected_batch = normalize_batch(expected_batch);
1836                let batch = normalize_batch(batch.clone());
1837                assert_eq!(
1838                    expected_batch, batch,
1839                    "Unexpected content in batch {i}:\
1840                    \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1841                );
1842                starting_idx += *expected_size;
1843            }
1844            output_batches
1845        }
1846
1847        /// Return the expected output schema. If not overridden by `with_schema`, it
1848        /// returns the schema of the first input batch.
1849        fn schema(&self) -> SchemaRef {
1850            self.schema
1851                .clone()
1852                .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1853        }
1854
1855        /// Returns the expected output as a single `RecordBatch`
1856        fn expected_output(&self) -> RecordBatch {
1857            let schema = self.schema();
1858            if self.filters.is_empty() {
1859                return concat_batches(&schema, &self.input_batches).unwrap();
1860            }
1861
1862            let mut filters = self.filters.iter();
1863            let filtered_batches = self
1864                .input_batches
1865                .iter()
1866                .map(|batch| {
1867                    if let Some(filter) = filters.next() {
1868                        filter_record_batch(batch, filter).unwrap()
1869                    } else {
1870                        batch.clone()
1871                    }
1872                })
1873                .collect::<Vec<_>>();
1874            concat_batches(&schema, &filtered_batches).unwrap()
1875        }
1876
1877        /// Return a copy of self where every other batch has had its nulls removed
1878        /// (there are often fast paths that are used when there are no nulls)
1879        fn make_half_non_nullable(mut self) -> Self {
1880            // remove the nulls from every other batch
1881            self.input_batches = self
1882                .input_batches
1883                .iter()
1884                .enumerate()
1885                .map(|(i, batch)| {
1886                    if i % 2 == 1 {
1887                        batch.clone()
1888                    } else {
1889                        Self::remove_nulls_from_batch(batch)
1890                    }
1891                })
1892                .collect();
1893            self.with_description("non-nullable")
1894        }
1895
1896        /// Insert several empty batches into the input before each existing input
1897        fn insert_empty_batches(mut self) -> Self {
1898            let empty_batch = RecordBatch::new_empty(self.schema());
1899            self.input_batches = self
1900                .input_batches
1901                .into_iter()
1902                .flat_map(|batch| [empty_batch.clone(), batch])
1903                .collect();
1904            let empty_filters = BooleanArray::builder(0).finish();
1905            self.filters = self
1906                .filters
1907                .into_iter()
1908                .flat_map(|filter| [empty_filters.clone(), filter])
1909                .collect();
1910            self.with_description("empty batches inserted")
1911        }
1912
1913        /// Sets one batch to be non-nullable by removing nulls from all columns
1914        fn remove_nulls_from_batch(batch: &RecordBatch) -> RecordBatch {
1915            let new_columns = batch
1916                .columns()
1917                .iter()
1918                .map(Self::remove_nulls_from_array)
1919                .collect::<Vec<_>>();
1920            let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
1921            RecordBatch::try_new_with_options(batch.schema(), new_columns, &options).unwrap()
1922        }
1923
1924        fn remove_nulls_from_array(array: &ArrayRef) -> ArrayRef {
1925            make_array(array.to_data().into_builder().nulls(None).build().unwrap())
1926        }
1927
1928        /// Returns a set of tests where each test that is the sae as self, but
1929        /// has a single column from the original input batch
1930        ///
1931        /// This can be useful to single column optimizations, specifically
1932        /// filter optimization.
1933        fn make_single_column_tests(&self) -> Vec<Self> {
1934            let original_schema = self.schema();
1935            let mut new_tests = vec![];
1936            for column in original_schema.fields() {
1937                let single_column_schema = Arc::new(Schema::new(vec![column.clone()]));
1938
1939                let single_column_batches = self.input_batches.iter().map(|batch| {
1940                    let single_column = batch.column_by_name(column.name()).unwrap();
1941                    RecordBatch::try_new(
1942                        Arc::clone(&single_column_schema),
1943                        vec![single_column.clone()],
1944                    )
1945                    .unwrap()
1946                });
1947
1948                let single_column_test = self
1949                    .clone()
1950                    .with_schema(Arc::clone(&single_column_schema))
1951                    .with_batches(single_column_batches)
1952                    .with_description("single column")
1953                    .with_description(column.name());
1954
1955                new_tests.push(single_column_test);
1956            }
1957            new_tests
1958        }
1959    }
1960
1961    /// Return a RecordBatch with a UInt32Array with the specified range and
1962    /// every third value is null.
1963    fn uint32_batch<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1964        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1965
1966        let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1967        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1968    }
1969
1970    /// Return a RecordBatch with a UInt32Array with no nulls specified range
1971    fn uint32_batch_non_null<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1972        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1973
1974        let array = UInt32Array::from_iter_values(range);
1975        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1976    }
1977
1978    /// Return a RecordBatch with a UInt64Array with no nulls specified range
1979    fn uint64_batch_non_null<T: std::iter::Iterator<Item = u64>>(range: T) -> RecordBatch {
1980        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt64, false)]));
1981
1982        let array = UInt64Array::from_iter_values(range);
1983        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1984    }
1985
1986    /// Return a RecordBatch with a StringArrary with values `value0`, `value1`, ...
1987    /// and every third value is `None`.
1988    fn utf8_batch(range: Range<u32>) -> RecordBatch {
1989        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1990
1991        let array = StringArray::from_iter(range.map(|i| {
1992            if i % 3 == 0 {
1993                None
1994            } else {
1995                Some(format!("value{i}"))
1996            }
1997        }));
1998
1999        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
2000    }
2001
2002    /// Return a RecordBatch with a StringViewArray with (only) the specified values
2003    fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
2004        let schema = Arc::new(Schema::new(vec![Field::new(
2005            "c0",
2006            DataType::Utf8View,
2007            false,
2008        )]));
2009
2010        let array = StringViewArray::from_iter(values);
2011        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
2012    }
2013
2014    /// Return a RecordBatch with a StringViewArray with num_rows by repeating
2015    /// values over and over.
2016    fn stringview_batch_repeated<'a>(
2017        num_rows: usize,
2018        values: impl IntoIterator<Item = Option<&'a str>>,
2019    ) -> RecordBatch {
2020        let schema = Arc::new(Schema::new(vec![Field::new(
2021            "c0",
2022            DataType::Utf8View,
2023            true,
2024        )]));
2025
2026        // Repeat the values to a total of num_rows
2027        let values: Vec<_> = values.into_iter().collect();
2028        let values_iter = std::iter::repeat(values.iter())
2029            .flatten()
2030            .cloned()
2031            .take(num_rows);
2032
2033        let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
2034        for val in values_iter {
2035            builder.append_option(val);
2036        }
2037
2038        let array = builder.finish();
2039        RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
2040    }
2041
2042    /// Return a RecordBatch of 100 rows
2043    fn multi_column_batch(range: Range<i32>) -> RecordBatch {
2044        let int64_array = Int64Array::from_iter(
2045            range
2046                .clone()
2047                .map(|v| if v % 5 == 0 { None } else { Some(v as i64) }),
2048        );
2049        let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
2050            if v % 5 == 0 {
2051                None
2052            } else if v % 7 == 0 {
2053                Some(format!("This is a string longer than 12 bytes{v}"))
2054            } else {
2055                Some(format!("Short {v}"))
2056            }
2057        }));
2058        let string_array = StringArray::from_iter(range.clone().map(|v| {
2059            if v % 11 == 0 {
2060                None
2061            } else {
2062                Some(format!("Value {v}"))
2063            }
2064        }));
2065        let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
2066            if v % 3 == 0 {
2067                None
2068            } else {
2069                Some(v as i64 * 1000) // simulate a timestamp in milliseconds
2070            }
2071        }))
2072        .with_timezone("America/New_York");
2073
2074        RecordBatch::try_from_iter(vec![
2075            ("int64", Arc::new(int64_array) as ArrayRef),
2076            ("stringview", Arc::new(string_view_array) as ArrayRef),
2077            ("string", Arc::new(string_array) as ArrayRef),
2078            ("timestamp", Arc::new(timestamp_array) as ArrayRef),
2079        ])
2080        .unwrap()
2081    }
2082
2083    /// Return a boolean array that filters out randomly selected rows
2084    /// from the input batch with a `selectivity`.
2085    ///
2086    /// For example a `selectivity` of 0.1 will filter out
2087    /// 90% of the rows.
2088    #[derive(Debug)]
2089    struct RandomFilterBuilder {
2090        /// Number of rows to add to each filter
2091        num_rows: usize,
2092        /// selectivity of the filter (between 0.0 and 1.0)
2093        /// 0 selects no rows, 1.0 selects all rows
2094        selectivity: f64,
2095        /// seed for random number generator, increases by one each time
2096        /// `next_filter` is called
2097        seed: u64,
2098    }
2099    impl RandomFilterBuilder {
2100        /// Build the next filter with the current seed and increment the seed
2101        /// by one.
2102        fn next_filter(&mut self) -> BooleanArray {
2103            assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
2104            let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
2105            self.seed += 1;
2106            BooleanArray::from_iter(
2107                (0..self.num_rows)
2108                    .map(|_| rng.random_bool(self.selectivity))
2109                    .map(Some),
2110            )
2111        }
2112    }
2113
2114    /// Returns the named column as a StringViewArray
2115    fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
2116        batch
2117            .column_by_name(name)
2118            .expect("column not found")
2119            .as_string_view_opt()
2120            .expect("column is not a string view")
2121    }
2122
2123    /// Filter that selects 12.5% of the rows (`idx % 8 == 0`)
2124    fn sparse_filter(len: usize) -> BooleanArray {
2125        BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0)))
2126    }
2127
2128    /// Filter that selects 5% of the rows (`idx % 20 == 0`)
2129    ///
2130    /// Different code paths are taken for very sparse filters
2131    fn very_sparse_filter(len: usize) -> BooleanArray {
2132        BooleanArray::from_iter((0..len).map(|idx| Some(idx % 20 == 0)))
2133    }
2134
2135    /// Normalize the `RecordBatch` so that the memory layout is consistent
2136    /// (e.g. StringArray is compacted).
2137    fn normalize_batch(batch: RecordBatch) -> RecordBatch {
2138        // Only need to normalize StringViews (as == also tests for memory layout)
2139        let (schema, mut columns, row_count) = batch.into_parts();
2140
2141        for column in columns.iter_mut() {
2142            if let Some(string_view) = column.as_string_view_opt() {
2143                // Re-create the StringViewArray to ensure memory layout is
2144                // consistent
2145                let mut builder = StringViewBuilder::new();
2146                for s in string_view.iter() {
2147                    builder.append_option(s);
2148                }
2149                *column = Arc::new(builder.finish());
2150                continue;
2151            }
2152
2153            if let Some(binary_view) = column.as_binary_view_opt() {
2154                *column = Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
2155            }
2156        }
2157
2158        let options = RecordBatchOptions::new().with_row_count(Some(row_count));
2159        RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
2160    }
2161
2162    /// Helper function to create a test batch with specified number of rows
2163    fn create_test_batch(num_rows: usize) -> RecordBatch {
2164        let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
2165        let array = Int32Array::from_iter_values(0..num_rows as i32);
2166        RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
2167    }
2168    #[test]
2169    fn test_biggest_coalesce_batch_size_none_default() {
2170        // Test that default behavior (None) coalesces all batches
2171        let mut coalescer = BatchCoalescer::new(
2172            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2173            100,
2174        );
2175
2176        // Push a large batch (1000 rows) - should be coalesced normally
2177        let large_batch = create_test_batch(1000);
2178        coalescer.push_batch(large_batch).unwrap();
2179
2180        // Should produce multiple batches of target size (100)
2181        let mut output_batches = vec![];
2182        while let Some(batch) = coalescer.next_completed_batch() {
2183            output_batches.push(batch);
2184        }
2185
2186        coalescer.finish_buffered_batch().unwrap();
2187        while let Some(batch) = coalescer.next_completed_batch() {
2188            output_batches.push(batch);
2189        }
2190
2191        // Should have 10 batches of 100 rows each
2192        assert_eq!(output_batches.len(), 10);
2193        for batch in output_batches {
2194            assert_eq!(batch.num_rows(), 100);
2195        }
2196    }
2197
2198    #[test]
2199    fn test_biggest_coalesce_batch_size_bypass_large_batch() {
2200        // Test that batches larger than biggest_coalesce_batch_size bypass coalescing
2201        let mut coalescer = BatchCoalescer::new(
2202            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2203            100,
2204        );
2205        coalescer.set_biggest_coalesce_batch_size(Some(500));
2206
2207        // Push a large batch (1000 rows) - should bypass coalescing
2208        let large_batch = create_test_batch(1000);
2209        coalescer.push_batch(large_batch.clone()).unwrap();
2210
2211        // Should have one completed batch immediately (the original large batch)
2212        assert!(coalescer.has_completed_batch());
2213        let output_batch = coalescer.next_completed_batch().unwrap();
2214        assert_eq!(output_batch.num_rows(), 1000);
2215
2216        // Should be no more completed batches
2217        assert!(!coalescer.has_completed_batch());
2218        assert_eq!(coalescer.get_buffered_rows(), 0);
2219    }
2220
2221    #[test]
2222    fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
2223        // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally
2224        let mut coalescer = BatchCoalescer::new(
2225            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2226            100,
2227        )
2228        .with_biggest_coalesce_batch_size(Some(500));
2229
2230        // Push small batches that should be coalesced
2231        let small_batch = create_test_batch(50);
2232        coalescer.push_batch(small_batch.clone()).unwrap();
2233
2234        // Should not have completed batch yet (only 50 rows, target is 100)
2235        assert!(!coalescer.has_completed_batch());
2236        assert_eq!(coalescer.get_buffered_rows(), 50);
2237
2238        // Push another small batch
2239        coalescer.push_batch(small_batch).unwrap();
2240
2241        // Now should have a completed batch (100 rows total)
2242        assert!(coalescer.has_completed_batch());
2243        let output_batch = coalescer.next_completed_batch().unwrap();
2244        let size = output_batch
2245            .column(0)
2246            .as_primitive::<Int32Type>()
2247            .get_buffer_memory_size();
2248        assert_eq!(size, 400); // 100 rows * 4 bytes each
2249        assert_eq!(output_batch.num_rows(), 100);
2250
2251        assert_eq!(coalescer.get_buffered_rows(), 0);
2252    }
2253
2254    #[test]
2255    fn test_biggest_coalesce_batch_size_equal_boundary() {
2256        // Test behavior when batch size equals biggest_coalesce_batch_size
2257        let mut coalescer = BatchCoalescer::new(
2258            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2259            100,
2260        );
2261        coalescer.set_biggest_coalesce_batch_size(Some(500));
2262
2263        // Push a batch exactly equal to the limit
2264        let boundary_batch = create_test_batch(500);
2265        coalescer.push_batch(boundary_batch).unwrap();
2266
2267        // Should be coalesced (not bypass) since it's equal, not greater
2268        let mut output_count = 0;
2269        while coalescer.next_completed_batch().is_some() {
2270            output_count += 1;
2271        }
2272
2273        coalescer.finish_buffered_batch().unwrap();
2274        while coalescer.next_completed_batch().is_some() {
2275            output_count += 1;
2276        }
2277
2278        // Should have 5 batches of 100 rows each
2279        assert_eq!(output_count, 5);
2280    }
2281
2282    #[test]
2283    fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
2284        // Test the new consecutive large batch bypass behavior
2285        // Pattern: small batches -> first large batch (coalesced) -> consecutive large batches (bypass)
2286        let mut coalescer = BatchCoalescer::new(
2287            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2288            100,
2289        );
2290        coalescer.set_biggest_coalesce_batch_size(Some(200));
2291
2292        let small_batch = create_test_batch(50);
2293
2294        // Push small batch first to create buffered data
2295        coalescer.push_batch(small_batch).unwrap();
2296        assert_eq!(coalescer.get_buffered_rows(), 50);
2297        assert!(!coalescer.has_completed_batch());
2298
2299        // Push first large batch - should go through normal coalescing due to buffered data
2300        let large_batch1 = create_test_batch(250);
2301        coalescer.push_batch(large_batch1).unwrap();
2302
2303        // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered
2304        let mut completed_batches = vec![];
2305        while let Some(batch) = coalescer.next_completed_batch() {
2306            completed_batches.push(batch);
2307        }
2308        assert_eq!(completed_batches.len(), 3);
2309        assert_eq!(coalescer.get_buffered_rows(), 0);
2310
2311        // Now push consecutive large batches - they should bypass
2312        let large_batch2 = create_test_batch(300);
2313        let large_batch3 = create_test_batch(400);
2314
2315        // Push second large batch - should bypass since it's consecutive and buffer is empty
2316        coalescer.push_batch(large_batch2).unwrap();
2317        assert!(coalescer.has_completed_batch());
2318        let output = coalescer.next_completed_batch().unwrap();
2319        assert_eq!(output.num_rows(), 300); // bypassed with original size
2320        assert_eq!(coalescer.get_buffered_rows(), 0);
2321
2322        // Push third large batch - should also bypass
2323        coalescer.push_batch(large_batch3).unwrap();
2324        assert!(coalescer.has_completed_batch());
2325        let output = coalescer.next_completed_batch().unwrap();
2326        assert_eq!(output.num_rows(), 400); // bypassed with original size
2327        assert_eq!(coalescer.get_buffered_rows(), 0);
2328    }
2329
2330    #[test]
2331    fn test_biggest_coalesce_batch_size_empty_batch() {
2332        // Test that empty batches don't trigger the bypass logic
2333        let mut coalescer = BatchCoalescer::new(
2334            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2335            100,
2336        );
2337        coalescer.set_biggest_coalesce_batch_size(Some(50));
2338
2339        let empty_batch = create_test_batch(0);
2340        coalescer.push_batch(empty_batch).unwrap();
2341
2342        // Empty batch should be handled normally (no effect)
2343        assert!(!coalescer.has_completed_batch());
2344        assert_eq!(coalescer.get_buffered_rows(), 0);
2345    }
2346
2347    #[test]
2348    fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
2349        // Test that when there is buffered data, large batches do NOT bypass (unless consecutive)
2350        let mut coalescer = BatchCoalescer::new(
2351            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2352            100,
2353        );
2354        coalescer.set_biggest_coalesce_batch_size(Some(200));
2355
2356        // Add some buffered data first
2357        let small_batch = create_test_batch(30);
2358        coalescer.push_batch(small_batch.clone()).unwrap();
2359        coalescer.push_batch(small_batch).unwrap();
2360        assert_eq!(coalescer.get_buffered_rows(), 60);
2361
2362        // Push large batch that would normally bypass, but shouldn't because buffered_rows > 0
2363        let large_batch = create_test_batch(250);
2364        coalescer.push_batch(large_batch).unwrap();
2365
2366        // The large batch should be processed through normal coalescing logic
2367        // Total: 60 (buffered) + 250 (new) = 310 rows
2368        // Output: 3 complete batches of 100 rows each, 10 rows remain buffered
2369
2370        let mut completed_batches = vec![];
2371        while let Some(batch) = coalescer.next_completed_batch() {
2372            completed_batches.push(batch);
2373        }
2374
2375        assert_eq!(completed_batches.len(), 3);
2376        for batch in &completed_batches {
2377            assert_eq!(batch.num_rows(), 100);
2378        }
2379        assert_eq!(coalescer.get_buffered_rows(), 10);
2380    }
2381
2382    #[test]
2383    fn test_biggest_coalesce_batch_size_zero_limit() {
2384        // Test edge case where limit is 0 (all batches bypass when no buffered data)
2385        let mut coalescer = BatchCoalescer::new(
2386            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2387            100,
2388        );
2389        coalescer.set_biggest_coalesce_batch_size(Some(0));
2390
2391        // Even a 1-row batch should bypass when there's no buffered data
2392        let tiny_batch = create_test_batch(1);
2393        coalescer.push_batch(tiny_batch).unwrap();
2394
2395        assert!(coalescer.has_completed_batch());
2396        let output = coalescer.next_completed_batch().unwrap();
2397        assert_eq!(output.num_rows(), 1);
2398    }
2399
2400    #[test]
2401    fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
2402        // Test that bypass only occurs when buffered_rows == 0
2403        let mut coalescer = BatchCoalescer::new(
2404            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2405            100,
2406        );
2407        coalescer.set_biggest_coalesce_batch_size(Some(200));
2408
2409        // First, push a large batch with no buffered data - should bypass
2410        let large_batch = create_test_batch(300);
2411        coalescer.push_batch(large_batch.clone()).unwrap();
2412
2413        assert!(coalescer.has_completed_batch());
2414        let output = coalescer.next_completed_batch().unwrap();
2415        assert_eq!(output.num_rows(), 300); // bypassed
2416        assert_eq!(coalescer.get_buffered_rows(), 0);
2417
2418        // Now add some buffered data
2419        let small_batch = create_test_batch(50);
2420        coalescer.push_batch(small_batch).unwrap();
2421        assert_eq!(coalescer.get_buffered_rows(), 50);
2422
2423        // Push the same large batch again - should NOT bypass this time (not consecutive)
2424        coalescer.push_batch(large_batch).unwrap();
2425
2426        // Should process through normal coalescing: 50 + 300 = 350 rows
2427        // Output: 3 complete batches of 100 rows, 50 rows buffered
2428        let mut completed_batches = vec![];
2429        while let Some(batch) = coalescer.next_completed_batch() {
2430            completed_batches.push(batch);
2431        }
2432
2433        assert_eq!(completed_batches.len(), 3);
2434        for batch in &completed_batches {
2435            assert_eq!(batch.num_rows(), 100);
2436        }
2437        assert_eq!(coalescer.get_buffered_rows(), 50);
2438    }
2439
2440    #[test]
2441    fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
2442        // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600
2443        let mut coalescer = BatchCoalescer::new(
2444            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2445            1000,
2446        );
2447        coalescer.set_biggest_coalesce_batch_size(Some(500));
2448
2449        // Push small batches first
2450        coalescer.push_batch(create_test_batch(20)).unwrap();
2451        coalescer.push_batch(create_test_batch(20)).unwrap();
2452        coalescer.push_batch(create_test_batch(30)).unwrap();
2453
2454        assert_eq!(coalescer.get_buffered_rows(), 70);
2455        assert!(!coalescer.has_completed_batch());
2456
2457        // Push first large batch (700) - should coalesce due to buffered data
2458        coalescer.push_batch(create_test_batch(700)).unwrap();
2459
2460        // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered
2461        assert_eq!(coalescer.get_buffered_rows(), 770);
2462        assert!(!coalescer.has_completed_batch());
2463
2464        // Push second large batch (600) - should bypass since previous was large
2465        coalescer.push_batch(create_test_batch(600)).unwrap();
2466
2467        // Should flush buffer (770 rows) and bypass the 600
2468        let mut outputs = vec![];
2469        while let Some(batch) = coalescer.next_completed_batch() {
2470            outputs.push(batch);
2471        }
2472        assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one bypassed (600)
2473        assert_eq!(outputs[0].num_rows(), 770);
2474        assert_eq!(outputs[1].num_rows(), 600);
2475        assert_eq!(coalescer.get_buffered_rows(), 0);
2476
2477        // Push remaining large batches - should all bypass
2478        let remaining_batches = [700, 900, 700, 600];
2479        for &size in &remaining_batches {
2480            coalescer.push_batch(create_test_batch(size)).unwrap();
2481
2482            assert!(coalescer.has_completed_batch());
2483            let output = coalescer.next_completed_batch().unwrap();
2484            assert_eq!(output.num_rows(), size);
2485            assert_eq!(coalescer.get_buffered_rows(), 0);
2486        }
2487    }
2488
2489    #[test]
2490    fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
2491        // Test truly consecutive large batches that should all bypass
2492        // This test ensures buffer is completely empty between large batches
2493        let mut coalescer = BatchCoalescer::new(
2494            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2495            100,
2496        );
2497        coalescer.set_biggest_coalesce_batch_size(Some(200));
2498
2499        // Push consecutive large batches with no prior buffered data
2500        let large_batches = vec![
2501            create_test_batch(300),
2502            create_test_batch(400),
2503            create_test_batch(350),
2504            create_test_batch(500),
2505        ];
2506
2507        let mut all_outputs = vec![];
2508
2509        for (i, large_batch) in large_batches.into_iter().enumerate() {
2510            let expected_size = large_batch.num_rows();
2511
2512            // Buffer should be empty before each large batch
2513            assert_eq!(
2514                coalescer.get_buffered_rows(),
2515                0,
2516                "Buffer should be empty before batch {}",
2517                i
2518            );
2519
2520            coalescer.push_batch(large_batch).unwrap();
2521
2522            // Each large batch should bypass and produce exactly one output batch
2523            assert!(
2524                coalescer.has_completed_batch(),
2525                "Should have completed batch after pushing batch {}",
2526                i
2527            );
2528
2529            let output = coalescer.next_completed_batch().unwrap();
2530            assert_eq!(
2531                output.num_rows(),
2532                expected_size,
2533                "Batch {} should have bypassed with original size",
2534                i
2535            );
2536
2537            // Should be no more batches and buffer should be empty
2538            assert!(
2539                !coalescer.has_completed_batch(),
2540                "Should have no more completed batches after batch {}",
2541                i
2542            );
2543            assert_eq!(
2544                coalescer.get_buffered_rows(),
2545                0,
2546                "Buffer should be empty after batch {}",
2547                i
2548            );
2549
2550            all_outputs.push(output);
2551        }
2552
2553        // Verify we got exactly 4 output batches with original sizes
2554        assert_eq!(all_outputs.len(), 4);
2555        assert_eq!(all_outputs[0].num_rows(), 300);
2556        assert_eq!(all_outputs[1].num_rows(), 400);
2557        assert_eq!(all_outputs[2].num_rows(), 350);
2558        assert_eq!(all_outputs[3].num_rows(), 500);
2559    }
2560
2561    #[test]
2562    fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
2563        // Test that small batches reset the consecutive large batch tracking
2564        let mut coalescer = BatchCoalescer::new(
2565            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2566            100,
2567        );
2568        coalescer.set_biggest_coalesce_batch_size(Some(200));
2569
2570        // Push first large batch - should bypass (no buffered data)
2571        coalescer.push_batch(create_test_batch(300)).unwrap();
2572        let output = coalescer.next_completed_batch().unwrap();
2573        assert_eq!(output.num_rows(), 300);
2574
2575        // Push second large batch - should bypass (consecutive)
2576        coalescer.push_batch(create_test_batch(400)).unwrap();
2577        let output = coalescer.next_completed_batch().unwrap();
2578        assert_eq!(output.num_rows(), 400);
2579
2580        // Push small batch - resets consecutive tracking
2581        coalescer.push_batch(create_test_batch(50)).unwrap();
2582        assert_eq!(coalescer.get_buffered_rows(), 50);
2583
2584        // Push large batch again - should NOT bypass due to buffered data
2585        coalescer.push_batch(create_test_batch(350)).unwrap();
2586
2587        // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100
2588        let mut outputs = vec![];
2589        while let Some(batch) = coalescer.next_completed_batch() {
2590            outputs.push(batch);
2591        }
2592        assert_eq!(outputs.len(), 4);
2593        for batch in outputs {
2594            assert_eq!(batch.num_rows(), 100);
2595        }
2596        assert_eq!(coalescer.get_buffered_rows(), 0);
2597    }
2598
2599    #[test]
2600    fn test_coalasce_push_batch_with_indices() {
2601        const MID_POINT: u32 = 2333;
2602        const TOTAL_ROWS: u32 = 23333;
2603        let batch1 = uint32_batch_non_null(0..MID_POINT);
2604        let batch2 = uint32_batch_non_null((MID_POINT..TOTAL_ROWS).rev());
2605
2606        let mut coalescer = BatchCoalescer::new(
2607            Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])),
2608            TOTAL_ROWS as usize,
2609        );
2610        coalescer.push_batch(batch1).unwrap();
2611
2612        let rev_indices = (0..((TOTAL_ROWS - MID_POINT) as u64)).rev();
2613        let reversed_indices_batch = uint64_batch_non_null(rev_indices);
2614
2615        let reverse_indices = UInt64Array::from(reversed_indices_batch.column(0).to_data());
2616        coalescer
2617            .push_batch_with_indices(batch2, &reverse_indices)
2618            .unwrap();
2619
2620        coalescer.finish_buffered_batch().unwrap();
2621        let actual = coalescer.next_completed_batch().unwrap();
2622
2623        let expected = uint32_batch_non_null(0..TOTAL_ROWS);
2624
2625        assert_eq!(expected, actual);
2626    }
2627
2628    #[test]
2629    fn test_push_batch_schema_mismatch_fewer_columns() {
2630        // Coalescer expects 0 columns, batch has 1
2631        let empty_schema = Arc::new(Schema::empty());
2632        let mut coalescer = BatchCoalescer::new(empty_schema, 100);
2633        let batch = uint32_batch(0..5);
2634        let result = coalescer.push_batch(batch);
2635        assert!(result.is_err());
2636        let err = result.unwrap_err().to_string();
2637        assert!(
2638            err.contains("Batch has 1 columns but BatchCoalescer expects 0"),
2639            "unexpected error: {err}"
2640        );
2641    }
2642
2643    #[test]
2644    fn test_push_batch_schema_mismatch_more_columns() {
2645        // Coalescer expects 2 columns, batch has 1
2646        let schema = Arc::new(Schema::new(vec![
2647            Field::new("c0", DataType::UInt32, false),
2648            Field::new("c1", DataType::UInt32, false),
2649        ]));
2650        let mut coalescer = BatchCoalescer::new(schema, 100);
2651        let batch = uint32_batch(0..5);
2652        let result = coalescer.push_batch(batch);
2653        assert!(result.is_err());
2654        let err = result.unwrap_err().to_string();
2655        assert!(
2656            err.contains("Batch has 1 columns but BatchCoalescer expects 2"),
2657            "unexpected error: {err}"
2658        );
2659    }
2660
2661    #[test]
2662    fn test_push_batch_schema_mismatch_two_vs_zero() {
2663        // Coalescer expects 0 columns, batch has 2
2664        let empty_schema = Arc::new(Schema::empty());
2665        let mut coalescer = BatchCoalescer::new(empty_schema, 100);
2666        let schema = Arc::new(Schema::new(vec![
2667            Field::new("c0", DataType::UInt32, false),
2668            Field::new("c1", DataType::UInt32, false),
2669        ]));
2670        let batch = RecordBatch::try_new(
2671            schema,
2672            vec![
2673                Arc::new(UInt32Array::from(vec![1, 2, 3])),
2674                Arc::new(UInt32Array::from(vec![4, 5, 6])),
2675            ],
2676        )
2677        .unwrap();
2678        let result = coalescer.push_batch(batch);
2679        assert!(result.is_err());
2680        let err = result.unwrap_err().to_string();
2681        assert!(
2682            err.contains("Batch has 2 columns but BatchCoalescer expects 0"),
2683            "unexpected error: {err}"
2684        );
2685    }
2686}