pub struct BatchCoalescer {
schema: SchemaRef,
batch_size: usize,
in_progress_arrays: Vec<Box<dyn InProgressArray>>,
buffered_rows: usize,
completed: VecDeque<RecordBatch>,
}
Expand description
Concatenate multiple [RecordBatch
]es
Implements the common pattern of incrementally creating output
[RecordBatch
]es of a specific size from an input stream of
[RecordBatch
]es.
This is useful after operations such as filter
and take
that produce
smaller batches, and we want to coalesce them into larger batches for
further processing.
See: https://github.com/apache/arrow-rs/issues/6692
§Example
use arrow_array::record_batch;
use arrow_select::coalesce::{BatchCoalescer};
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
// Create a `BatchCoalescer` that will produce batches with at least 4 rows
let target_batch_size = 4;
let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
// push the batches
coalescer.push_batch(batch1).unwrap();
// only pushed 3 rows (not yet 4, enough to produce a batch)
assert!(coalescer.next_completed_batch().is_none());
coalescer.push_batch(batch2).unwrap();
// now we have 5 rows, so we can produce a batch
let finished = coalescer.next_completed_batch().unwrap();
// 4 rows came out (target batch size is 4)
let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
assert_eq!(finished, expected);
// Have no more input, but still have an in-progress batch
assert!(coalescer.next_completed_batch().is_none());
// We can finish the batch, which will produce the remaining rows
coalescer.finish_buffered_batch().unwrap();
let expected = record_batch!(("a", Int32, [5])).unwrap();
assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
// The coalescer is now empty
assert!(coalescer.next_completed_batch().is_none());
§Background
Generally speaking, larger [RecordBatch
]es are more efficient to process
than smaller [RecordBatch
]es (until the CPU cache is exceeded) because
there is fixed processing overhead per batch. This coalescer builds up these
larger batches incrementally.
┌────────────────────┐
│ RecordBatch │
│ num_rows = 100 │
└────────────────────┘ ┌────────────────────┐
│ │
┌────────────────────┐ Coalesce │ │
│ │ Batches │ │
│ RecordBatch │ │ │
│ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │
│ │ │ RecordBatch │
│ │ │ num_rows = 400 │
└────────────────────┘ │ │
│ │
┌────────────────────┐ │ │
│ │ │ │
│ RecordBatch │ │ │
│ num_rows = 100 │ └────────────────────┘
│ │
└────────────────────┘
§Notes:
-
Output rows are produced in the same order as the input rows
-
The output is a sequence of batches, with all but the last being at exactly
target_batch_size
rows.
Fields§
§schema: SchemaRef
The input schema
batch_size: usize
output batch size
in_progress_arrays: Vec<Box<dyn InProgressArray>>
In-progress arrays
buffered_rows: usize
Buffered row count. Always less than batch_size
completed: VecDeque<RecordBatch>
Completed batches
Implementations§
Source§impl BatchCoalescer
impl BatchCoalescer
Sourcepub fn new(schema: SchemaRef, batch_size: usize) -> Self
pub fn new(schema: SchemaRef, batch_size: usize) -> Self
Create a new BatchCoalescer
§Arguments
schema
- the schema of the output batchesbatch_size
- the number of rows in each output batch. Typical values are4096
or8192
rows.
Sourcepub fn push_batch_with_filter(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError>
pub fn push_batch_with_filter( &mut self, batch: RecordBatch, filter: &BooleanArray, ) -> Result<(), ArrowError>
Push a batch into the Coalescer after applying a filter
This is semantically equivalent of calling Self::push_batch
with the results from filter_record_batch
§Example
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
// Apply a filter to each batch to pick the first and last row
let filter = BooleanArray::from(vec![true, false, true]);
// create a new Coalescer that targets creating 1000 row batches
let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
coalescer.push_batch_with_filter(batch1, &filter);
coalescer.push_batch_with_filter(batch2, &filter);
// finsh and retrieve the created batch
coalescer.finish_buffered_batch().unwrap();
let completed_batch = coalescer.next_completed_batch().unwrap();
// filtered out 2 and 5:
let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
assert_eq!(completed_batch, expected_batch);
Sourcepub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
Push all the rows from batch
into the Coalescer
See Self::next_completed_batch()
to retrieve any completed batches.
§Example
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
// create a new Coalescer that targets creating 1000 row batches
let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
coalescer.push_batch(batch1);
coalescer.push_batch(batch2);
// finsh and retrieve the created batch
coalescer.finish_buffered_batch().unwrap();
let completed_batch = coalescer.next_completed_batch().unwrap();
let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
assert_eq!(completed_batch, expected_batch);
Sourcepub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>
pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>
Concatenates any buffered batches into a single RecordBatch
and
clears any output buffers
Normally this is called when the input stream is exhausted, and we want to finalize the last batch of rows.
See Self::next_completed_batch()
for the completed batches.
Sourcepub fn has_completed_batch(&self) -> bool
pub fn has_completed_batch(&self) -> bool
Returns true if there are any completed batches
Sourcepub fn next_completed_batch(&mut self) -> Option<RecordBatch>
pub fn next_completed_batch(&mut self) -> Option<RecordBatch>
Returns the next completed batch, if any