arrow_select/coalesce.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`] concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use crate::take::take_record_batch;
25use arrow_array::types::{BinaryViewType, StringViewType};
26use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
27use arrow_schema::{ArrowError, DataType, SchemaRef};
28use std::collections::VecDeque;
29use std::sync::Arc;
30// Originally From DataFusion's coalesce module:
31// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
32
33mod byte_view;
34mod generic;
35mod primitive;
36
37use byte_view::InProgressByteViewArray;
38use generic::GenericInProgressArray;
39use primitive::InProgressPrimitiveArray;
40
41/// Concatenate multiple [`RecordBatch`]es
42///
43/// Implements the common pattern of incrementally creating output
44/// [`RecordBatch`]es of a specific size from an input stream of
45/// [`RecordBatch`]es.
46///
47/// This is useful after operations such as [`filter`] and [`take`] that produce
48/// smaller batches, and we want to coalesce them into larger batches for
49/// further processing.
50///
51/// # Motivation
52///
53/// If we use [`concat_batches`] to implement the same functionality, there are 2 potential issues:
54/// 1. At least 2x peak memory (holding the input and output of concat)
55/// 2. 2 copies of the data (to create the output of filter and then create the output of concat)
56///
57/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
58/// about the motivation.
59///
60/// [`filter`]: crate::filter::filter
61/// [`take`]: crate::take::take
62/// [`concat_batches`]: crate::concat::concat_batches
63///
64/// # Example
65/// ```
66/// use arrow_array::record_batch;
67/// use arrow_select::coalesce::{BatchCoalescer};
68/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
69/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
70///
71/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
72/// let target_batch_size = 4;
73/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
74///
75/// // push the batches
76/// coalescer.push_batch(batch1).unwrap();
77/// // only pushed 3 rows (not yet 4, enough to produce a batch)
78/// assert!(coalescer.next_completed_batch().is_none());
79/// coalescer.push_batch(batch2).unwrap();
80/// // now we have 5 rows, so we can produce a batch
81/// let finished = coalescer.next_completed_batch().unwrap();
82/// // 4 rows came out (target batch size is 4)
83/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
84/// assert_eq!(finished, expected);
85///
86/// // Have no more input, but still have an in-progress batch
87/// assert!(coalescer.next_completed_batch().is_none());
88/// // We can finish the batch, which will produce the remaining rows
89/// coalescer.finish_buffered_batch().unwrap();
90/// let expected = record_batch!(("a", Int32, [5])).unwrap();
91/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
92///
93/// // The coalescer is now empty
94/// assert!(coalescer.next_completed_batch().is_none());
95/// ```
96///
97/// # Background
98///
99/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
100/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
101/// there is fixed processing overhead per batch. This coalescer builds up these
102/// larger batches incrementally.
103///
104/// ```text
105/// ┌────────────────────┐
106/// │ RecordBatch │
107/// │ num_rows = 100 │
108/// └────────────────────┘ ┌────────────────────┐
109/// │ │
110/// ┌────────────────────┐ Coalesce │ │
111/// │ │ Batches │ │
112/// │ RecordBatch │ │ │
113/// │ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │
114/// │ │ │ RecordBatch │
115/// │ │ │ num_rows = 400 │
116/// └────────────────────┘ │ │
117/// │ │
118/// ┌────────────────────┐ │ │
119/// │ │ │ │
120/// │ RecordBatch │ │ │
121/// │ num_rows = 100 │ └────────────────────┘
122/// │ │
123/// └────────────────────┘
124/// ```
125///
126/// # Notes:
127///
128/// 1. Output rows are produced in the same order as the input rows
129///
130/// 2. The output is a sequence of batches, with all but the last being at exactly
131/// `target_batch_size` rows.
132#[derive(Debug)]
133pub struct BatchCoalescer {
134 /// The input schema
135 schema: SchemaRef,
136 /// The target batch size (and thus size for views allocation). This is a
137 /// hard limit: the output batch will be exactly `target_batch_size`,
138 /// rather than possibly being slightly above.
139 target_batch_size: usize,
140 /// In-progress arrays
141 in_progress_arrays: Vec<Box<dyn InProgressArray>>,
142 /// Buffered row count. Always less than `batch_size`
143 buffered_rows: usize,
144 /// Completed batches
145 completed: VecDeque<RecordBatch>,
146 /// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`]
147 biggest_coalesce_batch_size: Option<usize>,
148}
149
150impl BatchCoalescer {
151 /// Create a new `BatchCoalescer`
152 ///
153 /// # Arguments
154 /// - `schema` - the schema of the output batches
155 /// - `target_batch_size` - the number of rows in each output batch.
156 /// Typical values are `4096` or `8192` rows.
157 ///
158 pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
159 let in_progress_arrays = schema
160 .fields()
161 .iter()
162 .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
163 .collect::<Vec<_>>();
164
165 Self {
166 schema,
167 target_batch_size,
168 in_progress_arrays,
169 // We will for sure store at least one completed batch
170 completed: VecDeque::with_capacity(1),
171 buffered_rows: 0,
172 biggest_coalesce_batch_size: None,
173 }
174 }
175
176 /// Set the coalesce batch size limit (default `None`)
177 ///
178 /// This limit determine when batches should bypass coalescing. Intuitively,
179 /// batches that are already large are costly to coalesce and are efficient
180 /// enough to process directly without coalescing.
181 ///
182 /// If `Some(limit)`, batches larger than this limit will bypass coalescing
183 /// when there is no buffered data, or when the previously buffered data
184 /// already exceeds this limit.
185 ///
186 /// If `None`, all batches will be coalesced according to the
187 /// target_batch_size.
188 pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
189 self.biggest_coalesce_batch_size = limit;
190 self
191 }
192
193 /// Get the current biggest coalesce batch size limit
194 ///
195 /// See [`Self::with_biggest_coalesce_batch_size`] for details
196 pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
197 self.biggest_coalesce_batch_size
198 }
199
200 /// Set the biggest coalesce batch size limit
201 ///
202 /// See [`Self::with_biggest_coalesce_batch_size`] for details
203 pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
204 self.biggest_coalesce_batch_size = limit;
205 }
206
207 /// Return the schema of the output batches
208 pub fn schema(&self) -> SchemaRef {
209 Arc::clone(&self.schema)
210 }
211
212 /// Push a batch into the Coalescer after applying a filter
213 ///
214 /// This is semantically equivalent of calling [`Self::push_batch`]
215 /// with the results from [`filter_record_batch`]
216 ///
217 /// # Example
218 /// ```
219 /// # use arrow_array::{record_batch, BooleanArray};
220 /// # use arrow_select::coalesce::BatchCoalescer;
221 /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
222 /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
223 /// // Apply a filter to each batch to pick the first and last row
224 /// let filter = BooleanArray::from(vec![true, false, true]);
225 /// // create a new Coalescer that targets creating 1000 row batches
226 /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
227 /// coalescer.push_batch_with_filter(batch1, &filter);
228 /// coalescer.push_batch_with_filter(batch2, &filter);
229 /// // finsh and retrieve the created batch
230 /// coalescer.finish_buffered_batch().unwrap();
231 /// let completed_batch = coalescer.next_completed_batch().unwrap();
232 /// // filtered out 2 and 5:
233 /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
234 /// assert_eq!(completed_batch, expected_batch);
235 /// ```
236 pub fn push_batch_with_filter(
237 &mut self,
238 batch: RecordBatch,
239 filter: &BooleanArray,
240 ) -> Result<(), ArrowError> {
241 // TODO: optimize this to avoid materializing (copying the results
242 // of filter to a new batch)
243 let filtered_batch = filter_record_batch(&batch, filter)?;
244 self.push_batch(filtered_batch)
245 }
246
247 /// Push a batch into the Coalescer after applying a set of indices
248 /// This is semantically equivalent of calling [`Self::push_batch`]
249 /// with the results from [`take_record_batch`]
250 ///
251 /// # Example
252 /// ```
253 /// # use arrow_array::{record_batch, UInt64Array};
254 /// # use arrow_select::coalesce::BatchCoalescer;
255 /// let batch1 = record_batch!(("a", Int32, [0, 0, 0])).unwrap();
256 /// let batch2 = record_batch!(("a", Int32, [1, 1, 4, 5, 1, 4])).unwrap();
257 /// // Sorted indices to create a sorted output, this can be obtained with
258 /// // `arrow-ord`'s sort_to_indices operation
259 /// let indices = UInt64Array::from(vec![0, 1, 4, 2, 5, 3]);
260 /// // create a new Coalescer that targets creating 1000 row batches
261 /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
262 /// coalescer.push_batch(batch1);
263 /// coalescer.push_batch_with_indices(batch2, &indices);
264 /// // finsh and retrieve the created batch
265 /// coalescer.finish_buffered_batch().unwrap();
266 /// let completed_batch = coalescer.next_completed_batch().unwrap();
267 /// let expected_batch = record_batch!(("a", Int32, [0, 0, 0, 1, 1, 1, 4, 4, 5])).unwrap();
268 /// assert_eq!(completed_batch, expected_batch);
269 /// ```
270 pub fn push_batch_with_indices(
271 &mut self,
272 batch: RecordBatch,
273 indices: &dyn Array,
274 ) -> Result<(), ArrowError> {
275 // todo: optimize this to avoid materializing (copying the results of take indices to a new batch)
276 let taken_batch = take_record_batch(&batch, indices)?;
277 self.push_batch(taken_batch)
278 }
279
280 /// Push all the rows from `batch` into the Coalescer
281 ///
282 /// When buffered data plus incoming rows reach `target_batch_size` ,
283 /// completed batches are generated eagerly and can be retrieved via
284 /// [`Self::next_completed_batch()`].
285 /// Output batches contain exactly `target_batch_size` rows, so the tail of
286 /// the input batch may remain buffered.
287 /// Remaining partial data either waits for future input batches or can be
288 /// materialized immediately by calling [`Self::finish_buffered_batch()`].
289 ///
290 /// # Example
291 /// ```
292 /// # use arrow_array::record_batch;
293 /// # use arrow_select::coalesce::BatchCoalescer;
294 /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
295 /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
296 /// // create a new Coalescer that targets creating 1000 row batches
297 /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
298 /// coalescer.push_batch(batch1);
299 /// coalescer.push_batch(batch2);
300 /// // finsh and retrieve the created batch
301 /// coalescer.finish_buffered_batch().unwrap();
302 /// let completed_batch = coalescer.next_completed_batch().unwrap();
303 /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
304 /// assert_eq!(completed_batch, expected_batch);
305 /// ```
306 pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
307 // Large batch bypass optimization:
308 // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
309 // we can avoid expensive split-and-merge operations by passing it through directly.
310 //
311 // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
312 // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
313 // If not set (None), ALL batches follow normal coalescing behavior regardless of size.
314
315 // =============================================================================
316 // CASE 1: No buffer + large batch → Direct bypass
317 // =============================================================================
318 // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
319 // Input sequence: [600, 1200, 300]
320 //
321 // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
322 // 600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
323 // → output: [600] (bypass, preserves large batch)
324 // 1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
325 // → output: [1200] (bypass, preserves large batch)
326 // 300 → normal batch, buffer: [300]
327 // Result: [600], [1200], [300] - large batches preserved, mixed sizes
328
329 // =============================================================================
330 // CASE 2: Buffer too large + large batch → Flush first, then bypass
331 // =============================================================================
332 // This case prevents creating extremely large merged batches that would
333 // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
334 //
335 // Example 1: Buffer exceeds limit before large batch arrives
336 // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
337 // Input: [350, 200, 800]
338 //
339 // Step 1: push_batch([350])
340 // → batch_size=350 <= 400, normal path
341 // → buffer: [350], buffered_rows=350
342 //
343 // Step 2: push_batch([200])
344 // → batch_size=200 <= 400, normal path
345 // → buffer: [350, 200], buffered_rows=550
346 //
347 // Step 3: push_batch([800])
348 // → batch_size=800 > 400, large batch path
349 // → buffered_rows=550 > 400 → Case 2: flush first
350 // → flush: output [550] (combined [350, 200])
351 // → then bypass: output [800]
352 // Result: [550], [800] - buffer flushed to prevent oversized merge
353 //
354 // Example 2: Multiple small batches accumulate before large batch
355 // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
356 // Input: [150, 100, 80, 900]
357 //
358 // Step 1-3: Accumulate small batches
359 // 150 → buffer: [150], buffered_rows=150
360 // 100 → buffer: [150, 100], buffered_rows=250
361 // 80 → buffer: [150, 100, 80], buffered_rows=330
362 //
363 // Step 4: push_batch([900])
364 // → batch_size=900 > 300, large batch path
365 // → buffered_rows=330 > 300 → Case 2: flush first
366 // → flush: output [330] (combined [150, 100, 80])
367 // → then bypass: output [900]
368 // Result: [330], [900] - prevents merge into [1230] which would be too large
369
370 // =============================================================================
371 // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
372 // =============================================================================
373 // When buffer is small enough, we still merge to maintain efficiency
374 // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
375 // Input: [300, 1200]
376 //
377 // Step 1: push_batch([300])
378 // → batch_size=300 <= 500, normal path
379 // → buffer: [300], buffered_rows=300
380 //
381 // Step 2: push_batch([1200])
382 // → batch_size=1200 > 500, large batch path
383 // → buffered_rows=300 <= 500 → Case 3: normal merge
384 // → buffer: [300, 1200] (1500 total)
385 // → 1500 > target_batch_size → split: output [1000], buffer [500]
386 // Result: [1000], [500] - normal split/merge behavior maintained
387
388 // =============================================================================
389 // Comparison: Default vs Optimized Behavior
390 // =============================================================================
391 // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
392 // Input: [600, 1200, 300]
393 //
394 // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
395 // 600 → buffer: [600]
396 // 1200 → buffer: [600, 1200] (1800 rows total)
397 // → split: output [1000 rows], buffer [800 rows remaining]
398 // 300 → buffer: [800, 300] (1100 rows total)
399 // → split: output [1000 rows], buffer [100 rows remaining]
400 // Result: [1000], [1000], [100] - all outputs respect target_batch_size
401 //
402 // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
403 // 600 → Case 1: direct bypass → output: [600]
404 // 1200 → Case 1: direct bypass → output: [1200]
405 // 300 → normal path → buffer: [300]
406 // Result: [600], [1200], [300] - large batches preserved
407
408 // =============================================================================
409 // Benefits and Trade-offs
410 // =============================================================================
411 // Benefits of the optimization:
412 // - Large batches stay intact (better for downstream vectorized processing)
413 // - Fewer split/merge operations (better CPU performance)
414 // - More predictable memory usage patterns
415 // - Maintains streaming efficiency while preserving batch boundaries
416 //
417 // Trade-offs:
418 // - Output batch sizes become variable (not always target_batch_size)
419 // - May produce smaller partial batches when flushing before large batches
420 // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
421
422 // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
423 // small batches together?
424
425 let batch_size = batch.num_rows();
426
427 // Fast path: skip empty batches
428 if batch_size == 0 {
429 return Ok(());
430 }
431
432 // Large batch optimization: bypass coalescing for oversized batches
433 if let Some(limit) = self.biggest_coalesce_batch_size {
434 if batch_size > limit {
435 // Case 1: No buffered data - emit large batch directly
436 // Example: [] + [1200] → output [1200], buffer []
437 if self.buffered_rows == 0 {
438 self.completed.push_back(batch);
439 return Ok(());
440 }
441
442 // Case 2: Buffer too large - flush then emit to avoid oversized merge
443 // Example: [850] + [1200] → output [850], then output [1200]
444 // This prevents creating batches much larger than both target_batch_size
445 // and biggest_coalesce_batch_size, which could cause memory issues
446 if self.buffered_rows > limit {
447 self.finish_buffered_batch()?;
448 self.completed.push_back(batch);
449 return Ok(());
450 }
451
452 // Case 3: Small buffer - proceed with normal coalescing
453 // Example: [300] + [1200] → split and merge normally
454 // This ensures small batches still get properly coalesced
455 // while allowing some controlled growth beyond the limit
456 }
457 }
458
459 let (_schema, arrays, mut num_rows) = batch.into_parts();
460
461 // setup input rows
462 assert_eq!(arrays.len(), self.in_progress_arrays.len());
463 self.in_progress_arrays
464 .iter_mut()
465 .zip(arrays)
466 .for_each(|(in_progress, array)| {
467 in_progress.set_source(Some(array));
468 });
469
470 // If pushing this batch would exceed the target batch size,
471 // finish the current batch and start a new one
472 let mut offset = 0;
473 while num_rows > (self.target_batch_size - self.buffered_rows) {
474 let remaining_rows = self.target_batch_size - self.buffered_rows;
475 debug_assert!(remaining_rows > 0);
476
477 // Copy remaining_rows from each array
478 for in_progress in self.in_progress_arrays.iter_mut() {
479 in_progress.copy_rows(offset, remaining_rows)?;
480 }
481
482 self.buffered_rows += remaining_rows;
483 offset += remaining_rows;
484 num_rows -= remaining_rows;
485
486 self.finish_buffered_batch()?;
487 }
488
489 // Add any the remaining rows to the buffer
490 self.buffered_rows += num_rows;
491 if num_rows > 0 {
492 for in_progress in self.in_progress_arrays.iter_mut() {
493 in_progress.copy_rows(offset, num_rows)?;
494 }
495 }
496
497 // If we have reached the target batch size, finalize the buffered batch
498 if self.buffered_rows >= self.target_batch_size {
499 self.finish_buffered_batch()?;
500 }
501
502 // clear in progress sources (to allow the memory to be freed)
503 for in_progress in self.in_progress_arrays.iter_mut() {
504 in_progress.set_source(None);
505 }
506
507 Ok(())
508 }
509
510 /// Returns the number of buffered rows
511 pub fn get_buffered_rows(&self) -> usize {
512 self.buffered_rows
513 }
514
515 /// Concatenates any buffered batches into a single `RecordBatch` and
516 /// clears any output buffers
517 ///
518 /// Normally this is called when the input stream is exhausted, and
519 /// we want to finalize the last batch of rows.
520 ///
521 /// See [`Self::next_completed_batch()`] for the completed batches.
522 pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
523 if self.buffered_rows == 0 {
524 return Ok(());
525 }
526 let new_arrays = self
527 .in_progress_arrays
528 .iter_mut()
529 .map(|array| array.finish())
530 .collect::<Result<Vec<_>, ArrowError>>()?;
531
532 for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
533 debug_assert_eq!(array.data_type(), field.data_type());
534 debug_assert_eq!(array.len(), self.buffered_rows);
535 }
536
537 // SAFETY: each array was created of the correct type and length.
538 let batch = unsafe {
539 RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
540 };
541
542 self.buffered_rows = 0;
543 self.completed.push_back(batch);
544 Ok(())
545 }
546
547 /// Returns true if there is any buffered data
548 pub fn is_empty(&self) -> bool {
549 self.buffered_rows == 0 && self.completed.is_empty()
550 }
551
552 /// Returns true if there are any completed batches
553 pub fn has_completed_batch(&self) -> bool {
554 !self.completed.is_empty()
555 }
556
557 /// Removes and returns the next completed batch, if any.
558 pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
559 self.completed.pop_front()
560 }
561}
562
563/// Return a new `InProgressArray` for the given data type
564fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
565 macro_rules! instantiate_primitive {
566 ($t:ty) => {
567 Box::new(InProgressPrimitiveArray::<$t>::new(
568 batch_size,
569 data_type.clone(),
570 ))
571 };
572 }
573
574 downcast_primitive! {
575 // Instantiate InProgressPrimitiveArray for each primitive type
576 data_type => (instantiate_primitive),
577 DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
578 DataType::BinaryView => {
579 Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
580 }
581 _ => Box::new(GenericInProgressArray::new()),
582 }
583}
584
585/// Incrementally builds up arrays
586///
587/// [`GenericInProgressArray`] is the default implementation that buffers
588/// arrays and uses other kernels concatenates them when finished.
589///
590/// Some types have specialized implementations for this array types (e.g.,
591/// [`StringViewArray`], etc.).
592///
593/// [`StringViewArray`]: arrow_array::StringViewArray
594trait InProgressArray: std::fmt::Debug + Send + Sync {
595 /// Set the source array.
596 ///
597 /// Calls to [`Self::copy_rows`] will copy rows from this array into the
598 /// current in-progress array
599 fn set_source(&mut self, source: Option<ArrayRef>);
600
601 /// Copy rows from the current source array into the in-progress array
602 ///
603 /// The source array is set by [`Self::set_source`].
604 ///
605 /// Return an error if the source array is not set
606 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
607
608 /// Finish the currently in-progress array and return it as an `ArrayRef`
609 fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615 use crate::concat::concat_batches;
616 use arrow_array::builder::StringViewBuilder;
617 use arrow_array::cast::AsArray;
618 use arrow_array::types::Int32Type;
619 use arrow_array::{
620 BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
621 TimestampNanosecondArray, UInt32Array, UInt64Array, make_array,
622 };
623 use arrow_buffer::BooleanBufferBuilder;
624 use arrow_schema::{DataType, Field, Schema};
625 use rand::{Rng, SeedableRng};
626 use std::ops::Range;
627
628 #[test]
629 fn test_coalesce() {
630 let batch = uint32_batch(0..8);
631 Test::new("coalesce")
632 .with_batches(std::iter::repeat_n(batch, 10))
633 // expected output is exactly 21 rows (except for the final batch)
634 .with_batch_size(21)
635 .with_expected_output_sizes(vec![21, 21, 21, 17])
636 .run();
637 }
638
639 #[test]
640 fn test_coalesce_one_by_one() {
641 let batch = uint32_batch(0..1); // single row input
642 Test::new("coalesce_one_by_one")
643 .with_batches(std::iter::repeat_n(batch, 97))
644 // expected output is exactly 20 rows (except for the final batch)
645 .with_batch_size(20)
646 .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
647 .run();
648 }
649
650 #[test]
651 fn test_coalesce_empty() {
652 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
653
654 Test::new("coalesce_empty")
655 .with_batches(vec![])
656 .with_schema(schema)
657 .with_batch_size(21)
658 .with_expected_output_sizes(vec![])
659 .run();
660 }
661
662 #[test]
663 fn test_single_large_batch_greater_than_target() {
664 // test a single large batch
665 let batch = uint32_batch(0..4096);
666 Test::new("coalesce_single_large_batch_greater_than_target")
667 .with_batch(batch)
668 .with_batch_size(1000)
669 .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
670 .run();
671 }
672
673 #[test]
674 fn test_single_large_batch_smaller_than_target() {
675 // test a single large batch
676 let batch = uint32_batch(0..4096);
677 Test::new("coalesce_single_large_batch_smaller_than_target")
678 .with_batch(batch)
679 .with_batch_size(8192)
680 .with_expected_output_sizes(vec![4096])
681 .run();
682 }
683
684 #[test]
685 fn test_single_large_batch_equal_to_target() {
686 // test a single large batch
687 let batch = uint32_batch(0..4096);
688 Test::new("coalesce_single_large_batch_equal_to_target")
689 .with_batch(batch)
690 .with_batch_size(4096)
691 .with_expected_output_sizes(vec![4096])
692 .run();
693 }
694
695 #[test]
696 fn test_single_large_batch_equally_divisible_in_target() {
697 // test a single large batch
698 let batch = uint32_batch(0..4096);
699 Test::new("coalesce_single_large_batch_equally_divisible_in_target")
700 .with_batch(batch)
701 .with_batch_size(1024)
702 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
703 .run();
704 }
705
706 #[test]
707 fn test_empty_schema() {
708 let schema = Schema::empty();
709 let batch = RecordBatch::new_empty(schema.into());
710 Test::new("coalesce_empty_schema")
711 .with_batch(batch)
712 .with_expected_output_sizes(vec![])
713 .run();
714 }
715
716 /// Coalesce multiple batches, 80k rows, with a 0.1% selectivity filter
717 #[test]
718 fn test_coalesce_filtered_001() {
719 let mut filter_builder = RandomFilterBuilder {
720 num_rows: 8000,
721 selectivity: 0.001,
722 seed: 0,
723 };
724
725 // add 10 batches of 8000 rows each
726 // 80k rows, selecting 0.1% means 80 rows
727 // not exactly 80 as the rows are random;
728 let mut test = Test::new("coalesce_filtered_001");
729 for _ in 0..10 {
730 test = test
731 .with_batch(multi_column_batch(0..8000))
732 .with_filter(filter_builder.next_filter())
733 }
734 test.with_batch_size(15)
735 .with_expected_output_sizes(vec![15, 15, 15, 13])
736 .run();
737 }
738
739 /// Coalesce multiple batches, 80k rows, with a 1% selectivity filter
740 #[test]
741 fn test_coalesce_filtered_01() {
742 let mut filter_builder = RandomFilterBuilder {
743 num_rows: 8000,
744 selectivity: 0.01,
745 seed: 0,
746 };
747
748 // add 10 batches of 8000 rows each
749 // 80k rows, selecting 1% means 800 rows
750 // not exactly 800 as the rows are random;
751 let mut test = Test::new("coalesce_filtered_01");
752 for _ in 0..10 {
753 test = test
754 .with_batch(multi_column_batch(0..8000))
755 .with_filter(filter_builder.next_filter())
756 }
757 test.with_batch_size(128)
758 .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
759 .run();
760 }
761
762 /// Coalesce multiple batches, 80k rows, with a 10% selectivity filter
763 #[test]
764 fn test_coalesce_filtered_10() {
765 let mut filter_builder = RandomFilterBuilder {
766 num_rows: 8000,
767 selectivity: 0.1,
768 seed: 0,
769 };
770
771 // add 10 batches of 8000 rows each
772 // 80k rows, selecting 10% means 8000 rows
773 // not exactly 800 as the rows are random;
774 let mut test = Test::new("coalesce_filtered_10");
775 for _ in 0..10 {
776 test = test
777 .with_batch(multi_column_batch(0..8000))
778 .with_filter(filter_builder.next_filter())
779 }
780 test.with_batch_size(1024)
781 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
782 .run();
783 }
784
785 /// Coalesce multiple batches, 8k rows, with a 90% selectivity filter
786 #[test]
787 fn test_coalesce_filtered_90() {
788 let mut filter_builder = RandomFilterBuilder {
789 num_rows: 800,
790 selectivity: 0.90,
791 seed: 0,
792 };
793
794 // add 10 batches of 800 rows each
795 // 8k rows, selecting 99% means 7200 rows
796 // not exactly 7200 as the rows are random;
797 let mut test = Test::new("coalesce_filtered_90");
798 for _ in 0..10 {
799 test = test
800 .with_batch(multi_column_batch(0..800))
801 .with_filter(filter_builder.next_filter())
802 }
803 test.with_batch_size(1024)
804 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
805 .run();
806 }
807
808 /// Coalesce multiple batches, 8k rows, with mixed filers, including 100%
809 #[test]
810 fn test_coalesce_filtered_mixed() {
811 let mut filter_builder = RandomFilterBuilder {
812 num_rows: 800,
813 selectivity: 0.90,
814 seed: 0,
815 };
816
817 let mut test = Test::new("coalesce_filtered_mixed");
818 for _ in 0..3 {
819 // also add in a batch that selects almost all rows and when
820 // sliced will have some batches that are entirely used
821 let mut all_filter_builder = BooleanBufferBuilder::new(1000);
822 all_filter_builder.append_n(500, true);
823 all_filter_builder.append_n(1, false);
824 all_filter_builder.append_n(499, false);
825 let all_filter = all_filter_builder.build();
826
827 test = test
828 .with_batch(multi_column_batch(0..1000))
829 .with_filter(BooleanArray::from(all_filter))
830 .with_batch(multi_column_batch(0..800))
831 .with_filter(filter_builder.next_filter());
832 // decrease selectivity
833 filter_builder.selectivity *= 0.6;
834 }
835
836 // use a small batch size to ensure the filter is appended in slices
837 // and some of those slides will select the entire thing.
838 test.with_batch_size(250)
839 .with_expected_output_sizes(vec![
840 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 250, 179,
841 ])
842 .run();
843 }
844
845 #[test]
846 fn test_coalesce_non_null() {
847 Test::new("coalesce_non_null")
848 // 4040 rows of unit32
849 .with_batch(uint32_batch_non_null(0..3000))
850 .with_batch(uint32_batch_non_null(0..1040))
851 .with_batch_size(1024)
852 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
853 .run();
854 }
855 #[test]
856 fn test_utf8_split() {
857 Test::new("coalesce_utf8")
858 // 4040 rows of utf8 strings in total, split into batches of 1024
859 .with_batch(utf8_batch(0..3000))
860 .with_batch(utf8_batch(0..1040))
861 .with_batch_size(1024)
862 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
863 .run();
864 }
865
866 #[test]
867 fn test_string_view_no_views() {
868 let output_batches = Test::new("coalesce_string_view_no_views")
869 // both input batches have no views, so no need to compact
870 .with_batch(stringview_batch([Some("foo"), Some("bar")]))
871 .with_batch(stringview_batch([Some("baz"), Some("qux")]))
872 .with_expected_output_sizes(vec![4])
873 .run();
874
875 expect_buffer_layout(
876 col_as_string_view("c0", output_batches.first().unwrap()),
877 vec![],
878 );
879 }
880
881 #[test]
882 fn test_string_view_batch_small_no_compact() {
883 // view with only short strings (no buffers) --> no need to compact
884 let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
885 let output_batches = Test::new("coalesce_string_view_batch_small_no_compact")
886 .with_batch(batch.clone())
887 .with_expected_output_sizes(vec![1000])
888 .run();
889
890 let array = col_as_string_view("c0", &batch);
891 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
892 assert_eq!(array.data_buffers().len(), 0);
893 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
894
895 expect_buffer_layout(gc_array, vec![]);
896 }
897
898 #[test]
899 fn test_string_view_batch_large_no_compact() {
900 // view with large strings (has buffers) but full --> no need to compact
901 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
902 let output_batches = Test::new("coalesce_string_view_batch_large_no_compact")
903 .with_batch(batch.clone())
904 .with_batch_size(1000)
905 .with_expected_output_sizes(vec![1000])
906 .run();
907
908 let array = col_as_string_view("c0", &batch);
909 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
910 assert_eq!(array.data_buffers().len(), 5);
911 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
912
913 expect_buffer_layout(
914 gc_array,
915 vec![
916 ExpectedLayout {
917 len: 8190,
918 capacity: 8192,
919 },
920 ExpectedLayout {
921 len: 8190,
922 capacity: 8192,
923 },
924 ExpectedLayout {
925 len: 8190,
926 capacity: 8192,
927 },
928 ExpectedLayout {
929 len: 8190,
930 capacity: 8192,
931 },
932 ExpectedLayout {
933 len: 2240,
934 capacity: 8192,
935 },
936 ],
937 );
938 }
939
940 #[test]
941 fn test_string_view_batch_small_with_buffers_no_compact() {
942 // view with buffers but only short views
943 let short_strings = std::iter::repeat(Some("SmallString"));
944 let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
945 // 20 short strings, then a long ones
946 let values = short_strings.take(20).chain(long_strings);
947 let batch = stringview_batch_repeated(1000, values)
948 // take only 10 short strings (no long ones)
949 .slice(5, 10);
950 let output_batches = Test::new("coalesce_string_view_batch_small_with_buffers_no_compact")
951 .with_batch(batch.clone())
952 .with_batch_size(1000)
953 .with_expected_output_sizes(vec![10])
954 .run();
955
956 let array = col_as_string_view("c0", &batch);
957 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
958 assert_eq!(array.data_buffers().len(), 1); // input has one buffer
959 assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
960 }
961
962 #[test]
963 fn test_string_view_batch_large_slice_compact() {
964 // view with large strings (has buffers) and only partially used --> no need to compact
965 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
966 // slice only 22 rows, so most of the buffer is not used
967 .slice(11, 22);
968
969 let output_batches = Test::new("coalesce_string_view_batch_large_slice_compact")
970 .with_batch(batch.clone())
971 .with_batch_size(1000)
972 .with_expected_output_sizes(vec![22])
973 .run();
974
975 let array = col_as_string_view("c0", &batch);
976 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
977 assert_eq!(array.data_buffers().len(), 5);
978
979 expect_buffer_layout(
980 gc_array,
981 vec![ExpectedLayout {
982 len: 770,
983 capacity: 8192,
984 }],
985 );
986 }
987
988 #[test]
989 fn test_string_view_mixed() {
990 let large_view_batch =
991 stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
992 let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
993 let mixed_batch = stringview_batch_repeated(
994 1000,
995 [Some("This string is longer than 12 bytes"), Some("Small")],
996 );
997 let mixed_batch_nulls = stringview_batch_repeated(
998 1000,
999 [
1000 Some("This string is longer than 12 bytes"),
1001 Some("Small"),
1002 None,
1003 ],
1004 );
1005
1006 // Several batches with mixed inline / non inline
1007 // 4k rows in
1008 let output_batches = Test::new("coalesce_string_view_mixed")
1009 .with_batch(large_view_batch.clone())
1010 .with_batch(small_view_batch)
1011 // this batch needs to be compacted (less than 1/2 full)
1012 .with_batch(large_view_batch.slice(10, 20))
1013 .with_batch(mixed_batch_nulls)
1014 // this batch needs to be compacted (less than 1/2 full)
1015 .with_batch(large_view_batch.slice(10, 20))
1016 .with_batch(mixed_batch)
1017 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
1018 .run();
1019
1020 expect_buffer_layout(
1021 col_as_string_view("c0", output_batches.first().unwrap()),
1022 vec![
1023 ExpectedLayout {
1024 len: 8190,
1025 capacity: 8192,
1026 },
1027 ExpectedLayout {
1028 len: 8190,
1029 capacity: 8192,
1030 },
1031 ExpectedLayout {
1032 len: 8190,
1033 capacity: 8192,
1034 },
1035 ExpectedLayout {
1036 len: 8190,
1037 capacity: 8192,
1038 },
1039 ExpectedLayout {
1040 len: 2240,
1041 capacity: 8192,
1042 },
1043 ],
1044 );
1045 }
1046
1047 #[test]
1048 fn test_string_view_many_small_compact() {
1049 // 200 rows alternating long (28) and short (≤12) strings.
1050 // Only the 100 long strings go into data buffers: 100 × 28 = 2800.
1051 let batch = stringview_batch_repeated(
1052 200,
1053 [Some("This string is 28 bytes long"), Some("small string")],
1054 );
1055 let output_batches = Test::new("coalesce_string_view_many_small_compact")
1056 // First allocated buffer is 8kb.
1057 // Appending 10 batches of 2800 bytes will use 2800 * 10 = 14kb (8kb, an 16kb and 32kbkb)
1058 .with_batch(batch.clone())
1059 .with_batch(batch.clone())
1060 .with_batch(batch.clone())
1061 .with_batch(batch.clone())
1062 .with_batch(batch.clone())
1063 .with_batch(batch.clone())
1064 .with_batch(batch.clone())
1065 .with_batch(batch.clone())
1066 .with_batch(batch.clone())
1067 .with_batch(batch.clone())
1068 .with_batch_size(8000)
1069 .with_expected_output_sizes(vec![2000]) // only 1000 rows total
1070 .run();
1071
1072 // expect a nice even distribution of buffers
1073 expect_buffer_layout(
1074 col_as_string_view("c0", output_batches.first().unwrap()),
1075 vec![
1076 ExpectedLayout {
1077 len: 8176,
1078 capacity: 8192,
1079 },
1080 ExpectedLayout {
1081 len: 16380,
1082 capacity: 16384,
1083 },
1084 ExpectedLayout {
1085 len: 3444,
1086 capacity: 32768,
1087 },
1088 ],
1089 );
1090 }
1091
1092 #[test]
1093 fn test_string_view_many_small_boundary() {
1094 // The strings are designed to exactly fit into buffers that are powers of 2 long
1095 let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1096 let output_batches = Test::new("coalesce_string_view_many_small_boundary")
1097 .with_batches(std::iter::repeat_n(batch, 20))
1098 .with_batch_size(900)
1099 .with_expected_output_sizes(vec![900, 900, 200])
1100 .run();
1101
1102 // expect each buffer to be entirely full except the last one
1103 expect_buffer_layout(
1104 col_as_string_view("c0", output_batches.first().unwrap()),
1105 vec![
1106 ExpectedLayout {
1107 len: 8192,
1108 capacity: 8192,
1109 },
1110 ExpectedLayout {
1111 len: 16384,
1112 capacity: 16384,
1113 },
1114 ExpectedLayout {
1115 len: 4224,
1116 capacity: 32768,
1117 },
1118 ],
1119 );
1120 }
1121
1122 #[test]
1123 fn test_string_view_large_small() {
1124 // The strings are 37 bytes long, so each batch has 100 * 28 = 2800 bytes
1125 let mixed_batch = stringview_batch_repeated(
1126 200,
1127 [Some("This string is 28 bytes long"), Some("small string")],
1128 );
1129 // These strings aren't copied, this array has an 8k buffer
1130 let all_large = stringview_batch_repeated(
1131 50,
1132 [Some(
1133 "This buffer has only large strings in it so there are no buffer copies",
1134 )],
1135 );
1136
1137 let output_batches = Test::new("coalesce_string_view_large_small")
1138 // First allocated buffer is 8kb.
1139 // Appending five batches of 2800 bytes will use 2800 * 10 = 28kb (8kb, an 16kb and 32kbkb)
1140 .with_batch(mixed_batch.clone())
1141 .with_batch(mixed_batch.clone())
1142 .with_batch(all_large.clone())
1143 .with_batch(mixed_batch.clone())
1144 .with_batch(all_large.clone())
1145 .with_batch(mixed_batch.clone())
1146 .with_batch(mixed_batch.clone())
1147 .with_batch(all_large.clone())
1148 .with_batch(mixed_batch.clone())
1149 .with_batch(all_large.clone())
1150 .with_batch_size(8000)
1151 .with_expected_output_sizes(vec![1400])
1152 .run();
1153
1154 expect_buffer_layout(
1155 col_as_string_view("c0", output_batches.first().unwrap()),
1156 vec![
1157 ExpectedLayout {
1158 len: 8190,
1159 capacity: 8192,
1160 },
1161 ExpectedLayout {
1162 len: 16366,
1163 capacity: 16384,
1164 },
1165 ExpectedLayout {
1166 len: 6244,
1167 capacity: 32768,
1168 },
1169 ],
1170 );
1171 }
1172
1173 #[test]
1174 fn test_binary_view() {
1175 let values: Vec<Option<&[u8]>> = vec![
1176 Some(b"foo"),
1177 None,
1178 Some(b"A longer string that is more than 12 bytes"),
1179 ];
1180
1181 let binary_view =
1182 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1183 let batch =
1184 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1185
1186 Test::new("coalesce_binary_view")
1187 .with_batch(batch.clone())
1188 .with_batch(batch.clone())
1189 .with_batch_size(512)
1190 .with_expected_output_sizes(vec![512, 512, 512, 464])
1191 .run();
1192 }
1193
1194 #[derive(Debug, Clone, PartialEq)]
1195 struct ExpectedLayout {
1196 len: usize,
1197 capacity: usize,
1198 }
1199
1200 /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
1201 fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1202 let actual = array
1203 .data_buffers()
1204 .iter()
1205 .map(|b| ExpectedLayout {
1206 len: b.len(),
1207 capacity: b.capacity(),
1208 })
1209 .collect::<Vec<_>>();
1210
1211 assert_eq!(
1212 actual, expected,
1213 "Expected buffer layout {expected:#?} but got {actual:#?}"
1214 );
1215 }
1216
1217 /// Test for [`BatchCoalescer`]
1218 ///
1219 /// Pushes the input batches to the coalescer and verifies that the resulting
1220 /// batches have the
1221 /// 1. expected number of rows
1222 /// 2. The same results when the batches are filtered using the filter kernel
1223 #[derive(Debug, Clone)]
1224 struct Test {
1225 /// A human readable name to assist in debugging
1226 name: String,
1227 /// Batches to feed to the coalescer.
1228 input_batches: Vec<RecordBatch>,
1229 /// Filters to apply to the corresponding input batches.
1230 ///
1231 /// If there are no filters for the input batches, the batch will be
1232 /// pushed as is.
1233 filters: Vec<BooleanArray>,
1234 /// The schema. If not provided, the first batch's schema is used.
1235 schema: Option<SchemaRef>,
1236 /// Expected output sizes of the resulting batches
1237 expected_output_sizes: Vec<usize>,
1238 /// target batch size (default to 1024)
1239 target_batch_size: usize,
1240 }
1241
1242 impl Default for Test {
1243 fn default() -> Self {
1244 Self {
1245 name: "".to_string(),
1246 input_batches: vec![],
1247 filters: vec![],
1248 schema: None,
1249 expected_output_sizes: vec![],
1250 target_batch_size: 1024,
1251 }
1252 }
1253 }
1254
1255 impl Test {
1256 fn new(name: impl Into<String>) -> Self {
1257 Self {
1258 name: name.into(),
1259 ..Self::default()
1260 }
1261 }
1262
1263 /// Append the description to the test name
1264 fn with_description(mut self, description: &str) -> Self {
1265 self.name.push_str(": ");
1266 self.name.push_str(description);
1267 self
1268 }
1269
1270 /// Set the target batch size
1271 fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1272 self.target_batch_size = target_batch_size;
1273 self
1274 }
1275
1276 /// Extend the input batches with `batch`
1277 fn with_batch(mut self, batch: RecordBatch) -> Self {
1278 self.input_batches.push(batch);
1279 self
1280 }
1281
1282 /// Extend the filters with `filter`
1283 fn with_filter(mut self, filter: BooleanArray) -> Self {
1284 self.filters.push(filter);
1285 self
1286 }
1287
1288 /// Replaces the input batches with `batches`
1289 fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1290 self.input_batches = batches.into_iter().collect();
1291 self
1292 }
1293
1294 /// Specifies the schema for the test
1295 fn with_schema(mut self, schema: SchemaRef) -> Self {
1296 self.schema = Some(schema);
1297 self
1298 }
1299
1300 /// Extends `sizes` to expected output sizes
1301 fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1302 self.expected_output_sizes.extend(sizes);
1303 self
1304 }
1305
1306 /// Runs the test -- see documentation on [`Test`] for details
1307 ///
1308 /// Returns the resulting output batches
1309 fn run(self) -> Vec<RecordBatch> {
1310 // Test several permutations of input batches:
1311 // 1. Removing nulls from some batches (test non-null fast paths)
1312 // 2. Empty batches
1313 // 3. One column (from the batch)
1314 let mut extra_tests = vec![];
1315 extra_tests.push(self.clone().make_half_non_nullable());
1316 extra_tests.push(self.clone().insert_empty_batches());
1317 let single_column_tests = self.make_single_column_tests();
1318 for test in single_column_tests {
1319 extra_tests.push(test.clone().make_half_non_nullable());
1320 extra_tests.push(test);
1321 }
1322
1323 // Run original test case first, so any obvious errors are caught
1324 // by an easier to understand test case
1325 let results = self.run_inner();
1326 // Run the extra cases to expand coverage
1327 for extra in extra_tests {
1328 extra.run_inner();
1329 }
1330
1331 results
1332 }
1333
1334 /// Runs the current test instance
1335 fn run_inner(self) -> Vec<RecordBatch> {
1336 let expected_output = self.expected_output();
1337 let schema = self.schema();
1338
1339 let Self {
1340 name,
1341 input_batches,
1342 filters,
1343 schema: _,
1344 target_batch_size,
1345 expected_output_sizes,
1346 } = self;
1347
1348 println!("Running test '{name}'");
1349
1350 let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1351
1352 let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1353
1354 // feed input batches and filters to the coalescer
1355 let mut filters = filters.into_iter();
1356 for batch in input_batches {
1357 if let Some(filter) = filters.next() {
1358 coalescer.push_batch_with_filter(batch, &filter).unwrap();
1359 } else {
1360 coalescer.push_batch(batch).unwrap();
1361 }
1362 }
1363 assert_eq!(schema, coalescer.schema());
1364
1365 if had_input {
1366 assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1367 } else {
1368 assert!(coalescer.is_empty(), "Coalescer should be empty");
1369 }
1370
1371 coalescer.finish_buffered_batch().unwrap();
1372 if had_input {
1373 assert!(
1374 coalescer.has_completed_batch(),
1375 "Coalescer should have completed batches"
1376 );
1377 }
1378
1379 let mut output_batches = vec![];
1380 while let Some(batch) = coalescer.next_completed_batch() {
1381 output_batches.push(batch);
1382 }
1383
1384 // make sure we got the expected number of output batches and content
1385 let mut starting_idx = 0;
1386 let actual_output_sizes: Vec<usize> =
1387 output_batches.iter().map(|b| b.num_rows()).collect();
1388 assert_eq!(
1389 expected_output_sizes, actual_output_sizes,
1390 "Unexpected number of rows in output batches\n\
1391 Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1392 );
1393 let iter = expected_output_sizes
1394 .iter()
1395 .zip(output_batches.iter())
1396 .enumerate();
1397
1398 // Verify that the actual contents of each output batch matches the expected output
1399 for (i, (expected_size, batch)) in iter {
1400 // compare the contents of the batch after normalization (using
1401 // `==` compares the underlying memory layout too)
1402 let expected_batch = expected_output.slice(starting_idx, *expected_size);
1403 let expected_batch = normalize_batch(expected_batch);
1404 let batch = normalize_batch(batch.clone());
1405 assert_eq!(
1406 expected_batch, batch,
1407 "Unexpected content in batch {i}:\
1408 \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1409 );
1410 starting_idx += *expected_size;
1411 }
1412 output_batches
1413 }
1414
1415 /// Return the expected output schema. If not overridden by `with_schema`, it
1416 /// returns the schema of the first input batch.
1417 fn schema(&self) -> SchemaRef {
1418 self.schema
1419 .clone()
1420 .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1421 }
1422
1423 /// Returns the expected output as a single `RecordBatch`
1424 fn expected_output(&self) -> RecordBatch {
1425 let schema = self.schema();
1426 if self.filters.is_empty() {
1427 return concat_batches(&schema, &self.input_batches).unwrap();
1428 }
1429
1430 let mut filters = self.filters.iter();
1431 let filtered_batches = self
1432 .input_batches
1433 .iter()
1434 .map(|batch| {
1435 if let Some(filter) = filters.next() {
1436 filter_record_batch(batch, filter).unwrap()
1437 } else {
1438 batch.clone()
1439 }
1440 })
1441 .collect::<Vec<_>>();
1442 concat_batches(&schema, &filtered_batches).unwrap()
1443 }
1444
1445 /// Return a copy of self where every other batch has had its nulls removed
1446 /// (there are often fast paths that are used when there are no nulls)
1447 fn make_half_non_nullable(mut self) -> Self {
1448 // remove the nulls from every other batch
1449 self.input_batches = self
1450 .input_batches
1451 .iter()
1452 .enumerate()
1453 .map(|(i, batch)| {
1454 if i % 2 == 1 {
1455 batch.clone()
1456 } else {
1457 Self::remove_nulls_from_batch(batch)
1458 }
1459 })
1460 .collect();
1461 self.with_description("non-nullable")
1462 }
1463
1464 /// Insert several empty batches into the input before each existing input
1465 fn insert_empty_batches(mut self) -> Self {
1466 let empty_batch = RecordBatch::new_empty(self.schema());
1467 self.input_batches = self
1468 .input_batches
1469 .into_iter()
1470 .flat_map(|batch| [empty_batch.clone(), batch])
1471 .collect();
1472 let empty_filters = BooleanArray::builder(0).finish();
1473 self.filters = self
1474 .filters
1475 .into_iter()
1476 .flat_map(|filter| [empty_filters.clone(), filter])
1477 .collect();
1478 self.with_description("empty batches inserted")
1479 }
1480
1481 /// Sets one batch to be non-nullable by removing nulls from all columns
1482 fn remove_nulls_from_batch(batch: &RecordBatch) -> RecordBatch {
1483 let new_columns = batch
1484 .columns()
1485 .iter()
1486 .map(Self::remove_nulls_from_array)
1487 .collect::<Vec<_>>();
1488 let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
1489 RecordBatch::try_new_with_options(batch.schema(), new_columns, &options).unwrap()
1490 }
1491
1492 fn remove_nulls_from_array(array: &ArrayRef) -> ArrayRef {
1493 make_array(array.to_data().into_builder().nulls(None).build().unwrap())
1494 }
1495
1496 /// Returns a set of tests where each test that is the sae as self, but
1497 /// has a single column from the original input batch
1498 ///
1499 /// This can be useful to single column optimizations, specifically
1500 /// filter optimization.
1501 fn make_single_column_tests(&self) -> Vec<Self> {
1502 let original_schema = self.schema();
1503 let mut new_tests = vec![];
1504 for column in original_schema.fields() {
1505 let single_column_schema = Arc::new(Schema::new(vec![column.clone()]));
1506
1507 let single_column_batches = self.input_batches.iter().map(|batch| {
1508 let single_column = batch.column_by_name(column.name()).unwrap();
1509 RecordBatch::try_new(
1510 Arc::clone(&single_column_schema),
1511 vec![single_column.clone()],
1512 )
1513 .unwrap()
1514 });
1515
1516 let single_column_test = self
1517 .clone()
1518 .with_schema(Arc::clone(&single_column_schema))
1519 .with_batches(single_column_batches)
1520 .with_description("single column")
1521 .with_description(column.name());
1522
1523 new_tests.push(single_column_test);
1524 }
1525 new_tests
1526 }
1527 }
1528
1529 /// Return a RecordBatch with a UInt32Array with the specified range and
1530 /// every third value is null.
1531 fn uint32_batch<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1532 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1533
1534 let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1535 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1536 }
1537
1538 /// Return a RecordBatch with a UInt32Array with no nulls specified range
1539 fn uint32_batch_non_null<T: std::iter::Iterator<Item = u32>>(range: T) -> RecordBatch {
1540 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1541
1542 let array = UInt32Array::from_iter_values(range);
1543 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1544 }
1545
1546 /// Return a RecordBatch with a UInt64Array with no nulls specified range
1547 fn uint64_batch_non_null<T: std::iter::Iterator<Item = u64>>(range: T) -> RecordBatch {
1548 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt64, false)]));
1549
1550 let array = UInt64Array::from_iter_values(range);
1551 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1552 }
1553
1554 /// Return a RecordBatch with a StringArrary with values `value0`, `value1`, ...
1555 /// and every third value is `None`.
1556 fn utf8_batch(range: Range<u32>) -> RecordBatch {
1557 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1558
1559 let array = StringArray::from_iter(range.map(|i| {
1560 if i % 3 == 0 {
1561 None
1562 } else {
1563 Some(format!("value{i}"))
1564 }
1565 }));
1566
1567 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1568 }
1569
1570 /// Return a RecordBatch with a StringViewArray with (only) the specified values
1571 fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1572 let schema = Arc::new(Schema::new(vec![Field::new(
1573 "c0",
1574 DataType::Utf8View,
1575 false,
1576 )]));
1577
1578 let array = StringViewArray::from_iter(values);
1579 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1580 }
1581
1582 /// Return a RecordBatch with a StringViewArray with num_rows by repeating
1583 /// values over and over.
1584 fn stringview_batch_repeated<'a>(
1585 num_rows: usize,
1586 values: impl IntoIterator<Item = Option<&'a str>>,
1587 ) -> RecordBatch {
1588 let schema = Arc::new(Schema::new(vec![Field::new(
1589 "c0",
1590 DataType::Utf8View,
1591 true,
1592 )]));
1593
1594 // Repeat the values to a total of num_rows
1595 let values: Vec<_> = values.into_iter().collect();
1596 let values_iter = std::iter::repeat(values.iter())
1597 .flatten()
1598 .cloned()
1599 .take(num_rows);
1600
1601 let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1602 for val in values_iter {
1603 builder.append_option(val);
1604 }
1605
1606 let array = builder.finish();
1607 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1608 }
1609
1610 /// Return a RecordBatch of 100 rows
1611 fn multi_column_batch(range: Range<i32>) -> RecordBatch {
1612 let int64_array = Int64Array::from_iter(
1613 range
1614 .clone()
1615 .map(|v| if v % 5 == 0 { None } else { Some(v as i64) }),
1616 );
1617 let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
1618 if v % 5 == 0 {
1619 None
1620 } else if v % 7 == 0 {
1621 Some(format!("This is a string longer than 12 bytes{v}"))
1622 } else {
1623 Some(format!("Short {v}"))
1624 }
1625 }));
1626 let string_array = StringArray::from_iter(range.clone().map(|v| {
1627 if v % 11 == 0 {
1628 None
1629 } else {
1630 Some(format!("Value {v}"))
1631 }
1632 }));
1633 let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
1634 if v % 3 == 0 {
1635 None
1636 } else {
1637 Some(v as i64 * 1000) // simulate a timestamp in milliseconds
1638 }
1639 }))
1640 .with_timezone("America/New_York");
1641
1642 RecordBatch::try_from_iter(vec![
1643 ("int64", Arc::new(int64_array) as ArrayRef),
1644 ("stringview", Arc::new(string_view_array) as ArrayRef),
1645 ("string", Arc::new(string_array) as ArrayRef),
1646 ("timestamp", Arc::new(timestamp_array) as ArrayRef),
1647 ])
1648 .unwrap()
1649 }
1650
1651 /// Return a boolean array that filters out randomly selected rows
1652 /// from the input batch with a `selectivity`.
1653 ///
1654 /// For example a `selectivity` of 0.1 will filter out
1655 /// 90% of the rows.
1656 #[derive(Debug)]
1657 struct RandomFilterBuilder {
1658 /// Number of rows to add to each filter
1659 num_rows: usize,
1660 /// selectivity of the filter (between 0.0 and 1.0)
1661 /// 0 selects no rows, 1.0 selects all rows
1662 selectivity: f64,
1663 /// seed for random number generator, increases by one each time
1664 /// `next_filter` is called
1665 seed: u64,
1666 }
1667 impl RandomFilterBuilder {
1668 /// Build the next filter with the current seed and increment the seed
1669 /// by one.
1670 fn next_filter(&mut self) -> BooleanArray {
1671 assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
1672 let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
1673 self.seed += 1;
1674 BooleanArray::from_iter(
1675 (0..self.num_rows)
1676 .map(|_| rng.random_bool(self.selectivity))
1677 .map(Some),
1678 )
1679 }
1680 }
1681
1682 /// Returns the named column as a StringViewArray
1683 fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1684 batch
1685 .column_by_name(name)
1686 .expect("column not found")
1687 .as_string_view_opt()
1688 .expect("column is not a string view")
1689 }
1690
1691 /// Normalize the `RecordBatch` so that the memory layout is consistent
1692 /// (e.g. StringArray is compacted).
1693 fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1694 // Only need to normalize StringViews (as == also tests for memory layout)
1695 let (schema, mut columns, row_count) = batch.into_parts();
1696
1697 for column in columns.iter_mut() {
1698 let Some(string_view) = column.as_string_view_opt() else {
1699 continue;
1700 };
1701
1702 // Re-create the StringViewArray to ensure memory layout is
1703 // consistent
1704 let mut builder = StringViewBuilder::new();
1705 for s in string_view.iter() {
1706 builder.append_option(s);
1707 }
1708 // Update the column with the new StringViewArray
1709 *column = Arc::new(builder.finish());
1710 }
1711
1712 let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1713 RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1714 }
1715
1716 /// Helper function to create a test batch with specified number of rows
1717 fn create_test_batch(num_rows: usize) -> RecordBatch {
1718 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
1719 let array = Int32Array::from_iter_values(0..num_rows as i32);
1720 RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
1721 }
1722 #[test]
1723 fn test_biggest_coalesce_batch_size_none_default() {
1724 // Test that default behavior (None) coalesces all batches
1725 let mut coalescer = BatchCoalescer::new(
1726 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1727 100,
1728 );
1729
1730 // Push a large batch (1000 rows) - should be coalesced normally
1731 let large_batch = create_test_batch(1000);
1732 coalescer.push_batch(large_batch).unwrap();
1733
1734 // Should produce multiple batches of target size (100)
1735 let mut output_batches = vec![];
1736 while let Some(batch) = coalescer.next_completed_batch() {
1737 output_batches.push(batch);
1738 }
1739
1740 coalescer.finish_buffered_batch().unwrap();
1741 while let Some(batch) = coalescer.next_completed_batch() {
1742 output_batches.push(batch);
1743 }
1744
1745 // Should have 10 batches of 100 rows each
1746 assert_eq!(output_batches.len(), 10);
1747 for batch in output_batches {
1748 assert_eq!(batch.num_rows(), 100);
1749 }
1750 }
1751
1752 #[test]
1753 fn test_biggest_coalesce_batch_size_bypass_large_batch() {
1754 // Test that batches larger than biggest_coalesce_batch_size bypass coalescing
1755 let mut coalescer = BatchCoalescer::new(
1756 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1757 100,
1758 );
1759 coalescer.set_biggest_coalesce_batch_size(Some(500));
1760
1761 // Push a large batch (1000 rows) - should bypass coalescing
1762 let large_batch = create_test_batch(1000);
1763 coalescer.push_batch(large_batch.clone()).unwrap();
1764
1765 // Should have one completed batch immediately (the original large batch)
1766 assert!(coalescer.has_completed_batch());
1767 let output_batch = coalescer.next_completed_batch().unwrap();
1768 assert_eq!(output_batch.num_rows(), 1000);
1769
1770 // Should be no more completed batches
1771 assert!(!coalescer.has_completed_batch());
1772 assert_eq!(coalescer.get_buffered_rows(), 0);
1773 }
1774
1775 #[test]
1776 fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
1777 // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally
1778 let mut coalescer = BatchCoalescer::new(
1779 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1780 100,
1781 )
1782 .with_biggest_coalesce_batch_size(Some(500));
1783
1784 // Push small batches that should be coalesced
1785 let small_batch = create_test_batch(50);
1786 coalescer.push_batch(small_batch.clone()).unwrap();
1787
1788 // Should not have completed batch yet (only 50 rows, target is 100)
1789 assert!(!coalescer.has_completed_batch());
1790 assert_eq!(coalescer.get_buffered_rows(), 50);
1791
1792 // Push another small batch
1793 coalescer.push_batch(small_batch).unwrap();
1794
1795 // Now should have a completed batch (100 rows total)
1796 assert!(coalescer.has_completed_batch());
1797 let output_batch = coalescer.next_completed_batch().unwrap();
1798 let size = output_batch
1799 .column(0)
1800 .as_primitive::<Int32Type>()
1801 .get_buffer_memory_size();
1802 assert_eq!(size, 400); // 100 rows * 4 bytes each
1803 assert_eq!(output_batch.num_rows(), 100);
1804
1805 assert_eq!(coalescer.get_buffered_rows(), 0);
1806 }
1807
1808 #[test]
1809 fn test_biggest_coalesce_batch_size_equal_boundary() {
1810 // Test behavior when batch size equals biggest_coalesce_batch_size
1811 let mut coalescer = BatchCoalescer::new(
1812 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1813 100,
1814 );
1815 coalescer.set_biggest_coalesce_batch_size(Some(500));
1816
1817 // Push a batch exactly equal to the limit
1818 let boundary_batch = create_test_batch(500);
1819 coalescer.push_batch(boundary_batch).unwrap();
1820
1821 // Should be coalesced (not bypass) since it's equal, not greater
1822 let mut output_count = 0;
1823 while coalescer.next_completed_batch().is_some() {
1824 output_count += 1;
1825 }
1826
1827 coalescer.finish_buffered_batch().unwrap();
1828 while coalescer.next_completed_batch().is_some() {
1829 output_count += 1;
1830 }
1831
1832 // Should have 5 batches of 100 rows each
1833 assert_eq!(output_count, 5);
1834 }
1835
1836 #[test]
1837 fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
1838 // Test the new consecutive large batch bypass behavior
1839 // Pattern: small batches -> first large batch (coalesced) -> consecutive large batches (bypass)
1840 let mut coalescer = BatchCoalescer::new(
1841 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1842 100,
1843 );
1844 coalescer.set_biggest_coalesce_batch_size(Some(200));
1845
1846 let small_batch = create_test_batch(50);
1847
1848 // Push small batch first to create buffered data
1849 coalescer.push_batch(small_batch).unwrap();
1850 assert_eq!(coalescer.get_buffered_rows(), 50);
1851 assert!(!coalescer.has_completed_batch());
1852
1853 // Push first large batch - should go through normal coalescing due to buffered data
1854 let large_batch1 = create_test_batch(250);
1855 coalescer.push_batch(large_batch1).unwrap();
1856
1857 // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered
1858 let mut completed_batches = vec![];
1859 while let Some(batch) = coalescer.next_completed_batch() {
1860 completed_batches.push(batch);
1861 }
1862 assert_eq!(completed_batches.len(), 3);
1863 assert_eq!(coalescer.get_buffered_rows(), 0);
1864
1865 // Now push consecutive large batches - they should bypass
1866 let large_batch2 = create_test_batch(300);
1867 let large_batch3 = create_test_batch(400);
1868
1869 // Push second large batch - should bypass since it's consecutive and buffer is empty
1870 coalescer.push_batch(large_batch2).unwrap();
1871 assert!(coalescer.has_completed_batch());
1872 let output = coalescer.next_completed_batch().unwrap();
1873 assert_eq!(output.num_rows(), 300); // bypassed with original size
1874 assert_eq!(coalescer.get_buffered_rows(), 0);
1875
1876 // Push third large batch - should also bypass
1877 coalescer.push_batch(large_batch3).unwrap();
1878 assert!(coalescer.has_completed_batch());
1879 let output = coalescer.next_completed_batch().unwrap();
1880 assert_eq!(output.num_rows(), 400); // bypassed with original size
1881 assert_eq!(coalescer.get_buffered_rows(), 0);
1882 }
1883
1884 #[test]
1885 fn test_biggest_coalesce_batch_size_empty_batch() {
1886 // Test that empty batches don't trigger the bypass logic
1887 let mut coalescer = BatchCoalescer::new(
1888 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1889 100,
1890 );
1891 coalescer.set_biggest_coalesce_batch_size(Some(50));
1892
1893 let empty_batch = create_test_batch(0);
1894 coalescer.push_batch(empty_batch).unwrap();
1895
1896 // Empty batch should be handled normally (no effect)
1897 assert!(!coalescer.has_completed_batch());
1898 assert_eq!(coalescer.get_buffered_rows(), 0);
1899 }
1900
1901 #[test]
1902 fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
1903 // Test that when there is buffered data, large batches do NOT bypass (unless consecutive)
1904 let mut coalescer = BatchCoalescer::new(
1905 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1906 100,
1907 );
1908 coalescer.set_biggest_coalesce_batch_size(Some(200));
1909
1910 // Add some buffered data first
1911 let small_batch = create_test_batch(30);
1912 coalescer.push_batch(small_batch.clone()).unwrap();
1913 coalescer.push_batch(small_batch).unwrap();
1914 assert_eq!(coalescer.get_buffered_rows(), 60);
1915
1916 // Push large batch that would normally bypass, but shouldn't because buffered_rows > 0
1917 let large_batch = create_test_batch(250);
1918 coalescer.push_batch(large_batch).unwrap();
1919
1920 // The large batch should be processed through normal coalescing logic
1921 // Total: 60 (buffered) + 250 (new) = 310 rows
1922 // Output: 3 complete batches of 100 rows each, 10 rows remain buffered
1923
1924 let mut completed_batches = vec![];
1925 while let Some(batch) = coalescer.next_completed_batch() {
1926 completed_batches.push(batch);
1927 }
1928
1929 assert_eq!(completed_batches.len(), 3);
1930 for batch in &completed_batches {
1931 assert_eq!(batch.num_rows(), 100);
1932 }
1933 assert_eq!(coalescer.get_buffered_rows(), 10);
1934 }
1935
1936 #[test]
1937 fn test_biggest_coalesce_batch_size_zero_limit() {
1938 // Test edge case where limit is 0 (all batches bypass when no buffered data)
1939 let mut coalescer = BatchCoalescer::new(
1940 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1941 100,
1942 );
1943 coalescer.set_biggest_coalesce_batch_size(Some(0));
1944
1945 // Even a 1-row batch should bypass when there's no buffered data
1946 let tiny_batch = create_test_batch(1);
1947 coalescer.push_batch(tiny_batch).unwrap();
1948
1949 assert!(coalescer.has_completed_batch());
1950 let output = coalescer.next_completed_batch().unwrap();
1951 assert_eq!(output.num_rows(), 1);
1952 }
1953
1954 #[test]
1955 fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
1956 // Test that bypass only occurs when buffered_rows == 0
1957 let mut coalescer = BatchCoalescer::new(
1958 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1959 100,
1960 );
1961 coalescer.set_biggest_coalesce_batch_size(Some(200));
1962
1963 // First, push a large batch with no buffered data - should bypass
1964 let large_batch = create_test_batch(300);
1965 coalescer.push_batch(large_batch.clone()).unwrap();
1966
1967 assert!(coalescer.has_completed_batch());
1968 let output = coalescer.next_completed_batch().unwrap();
1969 assert_eq!(output.num_rows(), 300); // bypassed
1970 assert_eq!(coalescer.get_buffered_rows(), 0);
1971
1972 // Now add some buffered data
1973 let small_batch = create_test_batch(50);
1974 coalescer.push_batch(small_batch).unwrap();
1975 assert_eq!(coalescer.get_buffered_rows(), 50);
1976
1977 // Push the same large batch again - should NOT bypass this time (not consecutive)
1978 coalescer.push_batch(large_batch).unwrap();
1979
1980 // Should process through normal coalescing: 50 + 300 = 350 rows
1981 // Output: 3 complete batches of 100 rows, 50 rows buffered
1982 let mut completed_batches = vec![];
1983 while let Some(batch) = coalescer.next_completed_batch() {
1984 completed_batches.push(batch);
1985 }
1986
1987 assert_eq!(completed_batches.len(), 3);
1988 for batch in &completed_batches {
1989 assert_eq!(batch.num_rows(), 100);
1990 }
1991 assert_eq!(coalescer.get_buffered_rows(), 50);
1992 }
1993
1994 #[test]
1995 fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
1996 // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600
1997 let mut coalescer = BatchCoalescer::new(
1998 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1999 1000,
2000 );
2001 coalescer.set_biggest_coalesce_batch_size(Some(500));
2002
2003 // Push small batches first
2004 coalescer.push_batch(create_test_batch(20)).unwrap();
2005 coalescer.push_batch(create_test_batch(20)).unwrap();
2006 coalescer.push_batch(create_test_batch(30)).unwrap();
2007
2008 assert_eq!(coalescer.get_buffered_rows(), 70);
2009 assert!(!coalescer.has_completed_batch());
2010
2011 // Push first large batch (700) - should coalesce due to buffered data
2012 coalescer.push_batch(create_test_batch(700)).unwrap();
2013
2014 // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered
2015 assert_eq!(coalescer.get_buffered_rows(), 770);
2016 assert!(!coalescer.has_completed_batch());
2017
2018 // Push second large batch (600) - should bypass since previous was large
2019 coalescer.push_batch(create_test_batch(600)).unwrap();
2020
2021 // Should flush buffer (770 rows) and bypass the 600
2022 let mut outputs = vec![];
2023 while let Some(batch) = coalescer.next_completed_batch() {
2024 outputs.push(batch);
2025 }
2026 assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one bypassed (600)
2027 assert_eq!(outputs[0].num_rows(), 770);
2028 assert_eq!(outputs[1].num_rows(), 600);
2029 assert_eq!(coalescer.get_buffered_rows(), 0);
2030
2031 // Push remaining large batches - should all bypass
2032 let remaining_batches = [700, 900, 700, 600];
2033 for &size in &remaining_batches {
2034 coalescer.push_batch(create_test_batch(size)).unwrap();
2035
2036 assert!(coalescer.has_completed_batch());
2037 let output = coalescer.next_completed_batch().unwrap();
2038 assert_eq!(output.num_rows(), size);
2039 assert_eq!(coalescer.get_buffered_rows(), 0);
2040 }
2041 }
2042
2043 #[test]
2044 fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
2045 // Test truly consecutive large batches that should all bypass
2046 // This test ensures buffer is completely empty between large batches
2047 let mut coalescer = BatchCoalescer::new(
2048 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2049 100,
2050 );
2051 coalescer.set_biggest_coalesce_batch_size(Some(200));
2052
2053 // Push consecutive large batches with no prior buffered data
2054 let large_batches = vec![
2055 create_test_batch(300),
2056 create_test_batch(400),
2057 create_test_batch(350),
2058 create_test_batch(500),
2059 ];
2060
2061 let mut all_outputs = vec![];
2062
2063 for (i, large_batch) in large_batches.into_iter().enumerate() {
2064 let expected_size = large_batch.num_rows();
2065
2066 // Buffer should be empty before each large batch
2067 assert_eq!(
2068 coalescer.get_buffered_rows(),
2069 0,
2070 "Buffer should be empty before batch {}",
2071 i
2072 );
2073
2074 coalescer.push_batch(large_batch).unwrap();
2075
2076 // Each large batch should bypass and produce exactly one output batch
2077 assert!(
2078 coalescer.has_completed_batch(),
2079 "Should have completed batch after pushing batch {}",
2080 i
2081 );
2082
2083 let output = coalescer.next_completed_batch().unwrap();
2084 assert_eq!(
2085 output.num_rows(),
2086 expected_size,
2087 "Batch {} should have bypassed with original size",
2088 i
2089 );
2090
2091 // Should be no more batches and buffer should be empty
2092 assert!(
2093 !coalescer.has_completed_batch(),
2094 "Should have no more completed batches after batch {}",
2095 i
2096 );
2097 assert_eq!(
2098 coalescer.get_buffered_rows(),
2099 0,
2100 "Buffer should be empty after batch {}",
2101 i
2102 );
2103
2104 all_outputs.push(output);
2105 }
2106
2107 // Verify we got exactly 4 output batches with original sizes
2108 assert_eq!(all_outputs.len(), 4);
2109 assert_eq!(all_outputs[0].num_rows(), 300);
2110 assert_eq!(all_outputs[1].num_rows(), 400);
2111 assert_eq!(all_outputs[2].num_rows(), 350);
2112 assert_eq!(all_outputs[3].num_rows(), 500);
2113 }
2114
2115 #[test]
2116 fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
2117 // Test that small batches reset the consecutive large batch tracking
2118 let mut coalescer = BatchCoalescer::new(
2119 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
2120 100,
2121 );
2122 coalescer.set_biggest_coalesce_batch_size(Some(200));
2123
2124 // Push first large batch - should bypass (no buffered data)
2125 coalescer.push_batch(create_test_batch(300)).unwrap();
2126 let output = coalescer.next_completed_batch().unwrap();
2127 assert_eq!(output.num_rows(), 300);
2128
2129 // Push second large batch - should bypass (consecutive)
2130 coalescer.push_batch(create_test_batch(400)).unwrap();
2131 let output = coalescer.next_completed_batch().unwrap();
2132 assert_eq!(output.num_rows(), 400);
2133
2134 // Push small batch - resets consecutive tracking
2135 coalescer.push_batch(create_test_batch(50)).unwrap();
2136 assert_eq!(coalescer.get_buffered_rows(), 50);
2137
2138 // Push large batch again - should NOT bypass due to buffered data
2139 coalescer.push_batch(create_test_batch(350)).unwrap();
2140
2141 // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100
2142 let mut outputs = vec![];
2143 while let Some(batch) = coalescer.next_completed_batch() {
2144 outputs.push(batch);
2145 }
2146 assert_eq!(outputs.len(), 4);
2147 for batch in outputs {
2148 assert_eq!(batch.num_rows(), 100);
2149 }
2150 assert_eq!(coalescer.get_buffered_rows(), 0);
2151 }
2152
2153 #[test]
2154 fn test_coalasce_push_batch_with_indices() {
2155 const MID_POINT: u32 = 2333;
2156 const TOTAL_ROWS: u32 = 23333;
2157 let batch1 = uint32_batch_non_null(0..MID_POINT);
2158 let batch2 = uint32_batch_non_null((MID_POINT..TOTAL_ROWS).rev());
2159
2160 let mut coalescer = BatchCoalescer::new(
2161 Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])),
2162 TOTAL_ROWS as usize,
2163 );
2164 coalescer.push_batch(batch1).unwrap();
2165
2166 let rev_indices = (0..((TOTAL_ROWS - MID_POINT) as u64)).rev();
2167 let reversed_indices_batch = uint64_batch_non_null(rev_indices);
2168
2169 let reverse_indices = UInt64Array::from(reversed_indices_batch.column(0).to_data());
2170 coalescer
2171 .push_batch_with_indices(batch2, &reverse_indices)
2172 .unwrap();
2173
2174 coalescer.finish_buffered_batch().unwrap();
2175 let actual = coalescer.next_completed_batch().unwrap();
2176
2177 let expected = uint32_batch_non_null(0..TOTAL_ROWS);
2178
2179 assert_eq!(expected, actual);
2180 }
2181}