pub struct BatchCoalescer {
schema: SchemaRef,
batch_size: usize,
buffer: Vec<RecordBatch>,
buffered_rows: usize,
completed: VecDeque<RecordBatch>,
}
Expand description
Concatenate multiple [RecordBatch
]es
Implements the common pattern of incrementally creating output
[RecordBatch
]es of a specific size from an input stream of
[RecordBatch
]es.
This is useful after operations such as filter
and take
that produce
smaller batches, and we want to coalesce them into larger
See: https://github.com/apache/arrow-rs/issues/6692
§Example
use arrow_array::record_batch;
use arrow_select::coalesce::{BatchCoalescer};
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
// Create a `BatchCoalescer` that will produce batches with at least 4 rows
let target_batch_size = 4;
let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
// push the batches
coalescer.push_batch(batch1).unwrap();
// only pushed 3 rows (not yet 4, enough to produce a batch)
assert!(coalescer.next_completed_batch().is_none());
coalescer.push_batch(batch2).unwrap();
// now we have 5 rows, so we can produce a batch
let finished = coalescer.next_completed_batch().unwrap();
// 4 rows came out (target batch size is 4)
let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
assert_eq!(finished, expected);
// Have no more input, but still have an in-progress batch
assert!(coalescer.next_completed_batch().is_none());
// We can finish the batch, which will produce the remaining rows
coalescer.finish_buffered_batch().unwrap();
let expected = record_batch!(("a", Int32, [5])).unwrap();
assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
// The coalescer is now empty
assert!(coalescer.next_completed_batch().is_none());
§Background
Generally speaking, larger [RecordBatch
]es are more efficient to process
than smaller [RecordBatch
]es (until the CPU cache is exceeded) because
there is fixed processing overhead per batch. This coalescer builds up these
larger batches incrementally.
┌────────────────────┐
│ RecordBatch │
│ num_rows = 100 │
└────────────────────┘ ┌────────────────────┐
│ │
┌────────────────────┐ Coalesce │ │
│ │ Batches │ │
│ RecordBatch │ │ │
│ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │
│ │ │ RecordBatch │
│ │ │ num_rows = 400 │
└────────────────────┘ │ │
│ │
┌────────────────────┐ │ │
│ │ │ │
│ RecordBatch │ │ │
│ num_rows = 100 │ └────────────────────┘
│ │
└────────────────────┘
§Notes:
-
Output rows are produced in the same order as the input rows
-
The output is a sequence of batches, with all but the last being at exactly
target_batch_size
rows. -
Eventually this may also be able to handle other optimizations such as a combined filter/coalesce operation. See https://github.com/apache/arrow-rs/issues/6692
Fields§
§schema: SchemaRef
The input schema
batch_size: usize
output batch size
buffer: Vec<RecordBatch>
In-progress buffered batches
buffered_rows: usize
Buffered row count. Always less than batch_size
completed: VecDeque<RecordBatch>
Completed batches
Implementations§
Source§impl BatchCoalescer
impl BatchCoalescer
Sourcepub fn new(schema: SchemaRef, batch_size: usize) -> Self
pub fn new(schema: SchemaRef, batch_size: usize) -> Self
Create a new BatchCoalescer
§Arguments
schema
- the schema of the output batchesbatch_size
- the number of rows in each output batch. Typical values are4096
or8192
rows.
Sourcepub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
Push next batch into the Coalescer
See Self::next_completed_batch()
to retrieve any completed batches.
Sourcepub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>
pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>
Concatenates any buffered batches into a single RecordBatch
and
clears any output buffers
Normally this is called when the input stream is exhausted, and we want to finalize the last batch of rows.
See Self::next_completed_batch()
for the completed batches.
Sourcepub fn has_completed_batch(&self) -> bool
pub fn has_completed_batch(&self) -> bool
Returns true if there are any completed batches
Sourcepub fn next_completed_batch(&mut self) -> Option<RecordBatch>
pub fn next_completed_batch(&mut self) -> Option<RecordBatch>
Returns the next completed batch, if any