Struct BatchCoalescer

Source
pub struct BatchCoalescer {
    schema: SchemaRef,
    batch_size: usize,
    buffer: Vec<RecordBatch>,
    buffered_rows: usize,
    completed: VecDeque<RecordBatch>,
}
Expand description

Concatenate multiple [RecordBatch]es

Implements the common pattern of incrementally creating output [RecordBatch]es of a specific size from an input stream of [RecordBatch]es.

This is useful after operations such as filter and take that produce smaller batches, and we want to coalesce them into larger

See: https://github.com/apache/arrow-rs/issues/6692

§Example

use arrow_array::record_batch;
use arrow_select::coalesce::{BatchCoalescer};
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();

// Create a `BatchCoalescer` that will produce batches with at least 4 rows
let target_batch_size = 4;
let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);

// push the batches
coalescer.push_batch(batch1).unwrap();
// only pushed 3 rows (not yet 4, enough to produce a batch)
assert!(coalescer.next_completed_batch().is_none());
coalescer.push_batch(batch2).unwrap();
// now we have 5 rows, so we can produce a batch
let finished = coalescer.next_completed_batch().unwrap();
// 4 rows came out (target batch size is 4)
let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
assert_eq!(finished, expected);

// Have no more input, but still have an in-progress batch
assert!(coalescer.next_completed_batch().is_none());
// We can finish the batch, which will produce the remaining rows
coalescer.finish_buffered_batch().unwrap();
let expected = record_batch!(("a", Int32, [5])).unwrap();
assert_eq!(coalescer.next_completed_batch().unwrap(), expected);

// The coalescer is now empty
assert!(coalescer.next_completed_batch().is_none());

§Background

Generally speaking, larger [RecordBatch]es are more efficient to process than smaller [RecordBatch]es (until the CPU cache is exceeded) because there is fixed processing overhead per batch. This coalescer builds up these larger batches incrementally.

┌────────────────────┐
│    RecordBatch     │
│   num_rows = 100   │
└────────────────────┘                 ┌────────────────────┐
                                       │                    │
┌────────────────────┐     Coalesce    │                    │
│                    │      Batches    │                    │
│    RecordBatch     │                 │                    │
│   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
│                    │                 │    RecordBatch     │
│                    │                 │   num_rows = 400   │
└────────────────────┘                 │                    │
                                       │                    │
┌────────────────────┐                 │                    │
│                    │                 │                    │
│    RecordBatch     │                 │                    │
│   num_rows = 100   │                 └────────────────────┘
│                    │
└────────────────────┘

§Notes:

  1. Output rows are produced in the same order as the input rows

  2. The output is a sequence of batches, with all but the last being at exactly target_batch_size rows.

  3. Eventually this may also be able to handle other optimizations such as a combined filter/coalesce operation. See https://github.com/apache/arrow-rs/issues/6692

Fields§

§schema: SchemaRef

The input schema

§batch_size: usize

output batch size

§buffer: Vec<RecordBatch>

In-progress buffered batches

§buffered_rows: usize

Buffered row count. Always less than batch_size

§completed: VecDeque<RecordBatch>

Completed batches

Implementations§

Source§

impl BatchCoalescer

Source

pub fn new(schema: SchemaRef, batch_size: usize) -> Self

Create a new BatchCoalescer

§Arguments
  • schema - the schema of the output batches
  • batch_size - the number of rows in each output batch. Typical values are 4096 or 8192 rows.
Source

pub fn schema(&self) -> SchemaRef

Return the schema of the output batches

Source

pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>

Push next batch into the Coalescer

See Self::next_completed_batch() to retrieve any completed batches.

Source

pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>

Concatenates any buffered batches into a single RecordBatch and clears any output buffers

Normally this is called when the input stream is exhausted, and we want to finalize the last batch of rows.

See Self::next_completed_batch() for the completed batches.

Source

pub fn is_empty(&self) -> bool

Returns true if there is any buffered data

Source

pub fn has_completed_batch(&self) -> bool

Returns true if there are any completed batches

Source

pub fn next_completed_batch(&mut self) -> Option<RecordBatch>

Returns the next completed batch, if any

Trait Implementations§

Source§

impl Debug for BatchCoalescer

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.