Streaming Execution (Acero)

Creating and running execution plans

enum class UnalignedBufferHandling

How to handle unaligned buffers.

Values:

enumerator kWarn
enumerator kIgnore
enumerator kReallocate
enumerator kError
UnalignedBufferHandling GetDefaultUnalignedBufferHandling()

get the default behavior of unaligned buffer handling

This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which can be set to “warn”, “ignore”, “reallocate”, or “error”. If the environment variable is not set, or is set to an invalid value, this will return kWarn

Result<std::shared_ptr<Schema>> DeclarationToSchema(const Declaration &declaration, FunctionRegistry *function_registry = NULLPTR)

Calculate the output schema of a declaration.

This does not actually execute the plan. This operation may fail if the declaration represents an invalid plan (e.g. a project node with multiple inputs)

Parameters:
  • declaration – A declaration describing an execution plan

  • function_registry – The function registry to use for function execution. If null then the default function registry will be used.

Returns:

the schema that batches would have after going through the execution plan

Result<std::string> DeclarationToString(const Declaration &declaration, FunctionRegistry *function_registry = NULLPTR)

Create a string representation of a plan.

This representation is for debug purposes only.

Conversion to a string may fail if the declaration represents an invalid plan.

Use Substrait for complete serialization of plans

Parameters:
  • declaration – A declaration describing an execution plan

  • function_registry – The function registry to use for function execution. If null then the default function registry will be used.

Returns:

a string representation of the plan suitable for debugging output

Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Utility method to run a declaration and collect the results into a table.

This method will add a sink node to the declaration to collect results into a table. It will then create an ExecPlan from the declaration, start the exec plan, block until the plan has finished, and return the created table.

Parameters:
  • declaration – A declaration describing the plan to run

  • use_threads – If use_threads is false then all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded (but should not use significant CPU resources).

  • memory_pool – The memory pool to use for allocations made while running the plan.

  • function_registry – The function registry to use for function execution. If null then the default function registry will be used.

Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration, QueryOptions query_options)
Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Asynchronous version of.

Parameters:
  • declaration – A declaration describing the plan to run

  • use_threads – The behavior of use_threads is slightly different than the synchronous version since we cannot run synchronously on the calling thread. Instead, if use_threads=false then a new thread pool will be created with a single thread and this will be used for all compute work.

  • memory_pool – The memory pool to use for allocations made while running the plan.

  • function_registry – The function registry to use for function execution. If null then the default function registry will be used.

Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration, ExecContext custom_exec_context)

Overload of.

The executor must be specified (cannot be null) and must be kept alive until the returned future finishes.

See also

DeclarationToTableAsync accepting a custom exec context

Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Utility method to run a declaration and collect the results into ExecBatch vector.

See also

DeclarationToTable for details on threading & execution

Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration, QueryOptions query_options)
Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Asynchronous version of.

See also

DeclarationToTableAsync for details on threading & execution

Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declaration, ExecContext custom_exec_context)

Overload of.

See also

DeclarationToExecBatchesAsync accepting a custom exec context

See also

DeclarationToTableAsync for details on threading & execution

Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Utility method to run a declaration and collect the results into a vector.

See also

DeclarationToTable for details on threading & execution

Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(Declaration declaration, QueryOptions query_options)
Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Asynchronous version of.

See also

DeclarationToTableAsync for details on threading & execution

Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(Declaration declaration, ExecContext exec_context)

Overload of.

See also

DeclarationToBatchesAsync accepting a custom exec context

See also

DeclarationToTableAsync for details on threading & execution

Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Utility method to run a declaration and return results as a RecordBatchReader.

If an exec context is not provided then a default exec context will be used based on the value of use_threads. If use_threads is false then the CPU exeuctor will be a serial executor and all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded.

If use_threads is false then all CPU work will happen during the calls to RecordBatchReader::Next and no CPU work will happen in the background. If use_threads is true then CPU work will happen on the CPU thread pool and tasks may run in between calls to RecordBatchReader::Next. If the returned reader is not consumed quickly enough then the plan will eventually pause as the backpressure queue fills up.

If a custom exec context is provided then the value of use_threads will be ignored.

The returned RecordBatchReader can be closed early to cancel the computation of record batches. In this case, only errors encountered by the computation may be reported. In particular, no cancellation error may be reported.

Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration, QueryOptions query_options)
Status DeclarationToStatus(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Utility method to run a declaration and ignore results.

This can be useful when the data are consumed as part of the plan itself, for example, when the plan ends with a write node.

See also

DeclarationToTable for details on threading & execution

Status DeclarationToStatus(Declaration declaration, QueryOptions query_options)
Future DeclarationToStatusAsync(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)

Asynchronous version of.

This can be useful when the data are consumed as part of the plan itself, for example, when the plan ends with a write node.

See also

DeclarationToTableAsync for details on threading & execution

Future DeclarationToStatusAsync(Declaration declaration, ExecContext exec_context)

Overload of.

See also

DeclarationToStatusAsync accepting a custom exec context

See also

DeclarationToTableAsync for details on threading & execution

struct Declaration
#include <arrow/acero/exec_plan.h>

Helper class for declaring execution nodes.

A Declaration represents an unconstructed ExecNode (and potentially an entire graph since its inputs may also be Declarations)

A Declaration can be converted to a plan and executed using one of the DeclarationToXyz methods.

For more direct control, a Declaration can be added to an existing execution plan with Declaration::AddToPlan, which will recursively construct any inputs as necessary.

Public Types

using Input = std::variant<ExecNode*, Declaration>

Public Functions

inline Declaration()
inline Declaration(std::string factory_name, std::vector<Input> inputs, std::shared_ptr<ExecNodeOptions> options, std::string label)

construct a declaration

Parameters:
  • factory_name – the name of the exec node to construct. The node must have been added to the exec node registry with this name.

  • inputs – the inputs to the node, these should be other declarations

  • options – options that control the behavior of the node. You must use the appropriate subclass. For example, if factory_name is “project” then options should be ProjectNodeOptions.

  • label – a label to give the node. Can be used to distinguish it from other nodes of the same type in the plan.

template<typename Options>
inline Declaration(std::string factory_name, std::vector<Input> inputs, Options options, std::string label)
template<typename Options>
inline Declaration(std::string factory_name, std::vector<Input> inputs, Options options)
template<typename Options>
inline Declaration(std::string factory_name, Options options)
template<typename Options>
inline Declaration(std::string factory_name, Options options, std::string label)
Result<ExecNode*> AddToPlan(ExecPlan *plan, ExecFactoryRegistry *registry = default_exec_factory_registry()) const

add the declaration to an already created execution plan

This method will recursively call AddToPlan on all of the declaration’s inputs. This method is only for advanced use when the DeclarationToXyz methods are not sufficient.

Parameters:
  • plan – the plan to add the node to

  • registry – the registry to use to lookup the node factory

Returns:

the instantiated execution node

bool IsValid(ExecFactoryRegistry *registry = default_exec_factory_registry()) const

Public Members

std::string factory_name

the name of the factory to use when creating a node

std::vector<Input> inputs

the declarations’s inputs

std::shared_ptr<ExecNodeOptions> options

options to control the behavior of the node

std::string label

a label to give the node in the plan

Public Static Functions

static Declaration Sequence(std::vector<Declaration> decls)

Convenience factory for the common case of a simple sequence of nodes.

Each of decls will be appended to the inputs of the subsequent declaration, and the final modified declaration will be returned.

Without this convenience factory, constructing a sequence would require explicit, difficult-to-read nesting:

Declaration{"n3",
              {
                  Declaration{"n2",
                              {
                                  Declaration{"n1",
                                              {
                                                  Declaration{"n0", N0Opts{}},
                                              },
                                              N1Opts{}},
                              },
                              N2Opts{}},
              },
              N3Opts{}};

An equivalent Declaration can be constructed more tersely using Sequence:

Declaration::Sequence({
    {"n0", N0Opts{}},
    {"n1", N1Opts{}},
    {"n2", N2Opts{}},
    {"n3", N3Opts{}},
});

struct QueryOptions
#include <arrow/acero/exec_plan.h>

plan-wide options that can be specified when executing an execution plan

Public Members

bool use_legacy_batching = false

Should the plan use a legacy batching strategy.

This is currently in place only to support the Scanner::ToTable method. This method relies on batch indices from the scanner remaining consistent. This is impractical in the ExecPlan which might slice batches as needed (e.g. for a join)

However, it still works for simple plans and this is the only way we have at the moment for maintaining implicit order.

std::optional<bool> sequence_output = std::nullopt

If the output has a meaningful order then sequence the output of the plan.

The default behavior (std::nullopt) will sequence output batches if there is a meaningful ordering in the final node and will emit batches immediately otherwise.

If explicitly set to true then plan execution will fail if there is no meaningful ordering. This can be useful to validate a query that should be emitting ordered results.

If explicitly set to false then batches will be emit immediately even if there is a meaningful ordering. This could cause batches to be emit out of order but may offer a small decrease to latency.

bool use_threads = true

should the plan use multiple background threads for CPU-intensive work

If this is false then all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded (but should not use significant CPU resources).

Will be ignored if custom_cpu_executor is set

::arrow::internal::Executor *custom_cpu_executor = NULLPTR

custom executor to use for CPU-intensive work

Must be null or remain valid for the duration of the plan. If this is null then a default thread pool will be chosen whose behavior will be controlled by the use_threads option.

::arrow::internal::Executor *custom_io_executor = NULLPTR

custom executor to use for IO work

Must be null or remain valid for the duration of the plan. If this is null then the global io thread pool will be chosen whose behavior will be controlled by the “ARROW_IO_THREADS” environment.

MemoryPool *memory_pool = default_memory_pool()

a memory pool to use for allocations

Must remain valid for the duration of the plan.

FunctionRegistry *function_registry = GetFunctionRegistry()

a function registry to use for the plan

Must remain valid for the duration of the plan.

std::vector<std::string> field_names

the names of the output columns

If this is empty then names will be generated based on the input columns

If set then the number of names must equal the number of output columns

std::optional<UnalignedBufferHandling> unaligned_buffer_handling

Policy for unaligned buffers in source data.

Various compute functions and acero internals will type pun array buffers from uint8_t* to some kind of value type (e.g. we might cast to int32_t* to add two int32 arrays)

If the buffer is poorly aligned (e.g. an int32 array is not aligned on a 4-byte boundary) then this is technically undefined behavior in C++. However, most modern compilers and CPUs are fairly tolerant of this behavior and nothing bad (beyond a small hit to performance) is likely to happen.

Note that this only applies to source buffers. All buffers allocated internally by Acero will be suitably aligned.

If this field is set to kWarn then Acero will check if any buffers are unaligned and, if they are, will emit a warning.

If this field is set to kReallocate then Acero will allocate a new, suitably aligned buffer and copy the contents from the old buffer into this new buffer.

If this field is set to kError then Acero will gracefully abort the plan instead.

If this field is set to kIgnore then Acero will not even check if the buffers are unaligned.

If this field is not set then it will be treated as kWarn unless overridden by the ACERO_ALIGNMENT_HANDLING environment variable

struct BatchesWithCommonSchema
#include <arrow/acero/exec_plan.h>

a collection of exec batches with a common schema

Public Members

std::vector<ExecBatch> batches
std::shared_ptr<Schema> schema

Configuration for execution nodes

enum class JoinType

Values:

enumerator LEFT_SEMI
enumerator RIGHT_SEMI
enumerator LEFT_ANTI
enumerator RIGHT_ANTI
enumerator INNER
enumerator LEFT_OUTER
enumerator RIGHT_OUTER
enumerator FULL_OUTER
enum class JoinKeyCmp

Values:

enumerator EQ
enumerator IS
using ArrayVectorIteratorMaker = std::function<Iterator<std::shared_ptr<ArrayVector>>()>

a source node that reads from an iterator of array vectors

using ExecBatchIteratorMaker = std::function<Iterator<std::shared_ptr<ExecBatch>>()>

a source node that reads from an iterator of ExecBatch

using RecordBatchIteratorMaker = std::function<Iterator<std::shared_ptr<RecordBatch>>()>
constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30

a default value at which backpressure will be applied

constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28

a default value at which backpressure will be removed

std::string ToString(JoinType t)
class ExecNodeOptions
#include <arrow/acero/options.h>

A base class for all options objects.

The only time this is used directly is when a node has no configuration

Subclassed by arrow::acero::AggregateNodeOptions, arrow::acero::AsofJoinNodeOptions, arrow::acero::ConsumingSinkNodeOptions, arrow::acero::FetchNodeOptions, arrow::acero::FilterNodeOptions, arrow::acero::HashJoinNodeOptions, arrow::acero::NamedTableNodeOptions, arrow::acero::OrderByNodeOptions, arrow::acero::PivotLongerNodeOptions, arrow::acero::ProjectNodeOptions, arrow::acero::RecordBatchReaderSourceNodeOptions, arrow::acero::SchemaSourceNodeOptions< ItMaker >, arrow::acero::SinkNodeOptions, arrow::acero::SourceNodeOptions, arrow::acero::TableSinkNodeOptions, arrow::acero::TableSourceNodeOptions, arrow::dataset::ScanNodeOptions, arrow::dataset::ScanV2Options, arrow::dataset::WriteNodeOptions, arrow::acero::SchemaSourceNodeOptions< ArrayVectorIteratorMaker >, arrow::acero::SchemaSourceNodeOptions< ExecBatchIteratorMaker >, arrow::acero::SchemaSourceNodeOptions< RecordBatchIteratorMaker >

Public Functions

virtual ~ExecNodeOptions() = default

Public Members

std::shared_ptr<DebugOptions> debug_opts

This must not be used in release-mode.

class SourceNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

A node representing a generic source of data for Acero.

The source node will start calling generator during StartProducing. An initial task will be created that will call generator. It will not call generator reentrantly. If the source can be read in parallel then those details should be encapsulated within generator.

For each batch received a new task will be created to push that batch downstream. This task will slice smaller units of size ExecPlan::kMaxBatchSize from the parent batch and call InputRecieved. Thus, if the generator yields a large batch it may result in several calls to InputReceived.

The SourceNode will, by default, assign an implicit ordering to outgoing batches. This is valid as long as the generator generates batches in a deterministic fashion. Currently, the only way to override this is to subclass the SourceNode.

This node is not generally used directly but can serve as the basis for various specialized nodes.

Public Functions

inline SourceNodeOptions(std::shared_ptr<Schema> output_schema, std::function<Future<std::optional<ExecBatch>>()> generator)

Create an instance from values.

Public Members

std::shared_ptr<Schema> output_schema

the schema for batches that will be generated by this source

std::function< Future< std::optional< ExecBatch > >)> generator

an asynchronous stream of batches ending with std::nullopt

class TableSourceNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a node that generates data from a table already loaded in memory

The table source node will slice off chunks, defined by max_batch_size for parallel processing. The source node extends source node and so these chunks will be iteratively processed in small batches.

See also

SourceNode for details.

Public Functions

inline TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t max_batch_size = kDefaultMaxBatchSize)

Create an instance from values.

Public Members

std::shared_ptr<Table> table

a table which acts as the data source

int64_t max_batch_size

size of batches to emit from this node If the table is larger the node will emit multiple batches from the the table to be processed in parallel.

Public Static Attributes

static constexpr int64_t kDefaultMaxBatchSize = 1 << 20
class NamedTableNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

define a lazily resolved Arrow table.

The table uniquely identified by the names can typically be resolved at the time when the plan is to be consumed.

This node is for serialization purposes only and can never be executed.

Public Functions

inline NamedTableNodeOptions(std::vector<std::string> names, std::shared_ptr<Schema> schema)

Create an instance from values.

Public Members

std::vector<std::string> names

the names to put in the serialized plan

std::shared_ptr<Schema> schema

the output schema of the table

template<typename ItMaker>
class SchemaSourceNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a source node which feeds data from a synchronous iterator of batches

ItMaker is a maker of an iterator of tabular data.

The node can be configured to use an I/O executor. If set then each time the iterator is polled a new I/O thread task will be created to do the polling. This allows a blocking iterator to stay off the CPU thread pool.

Public Functions

inline SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker, arrow::internal::Executor *io_executor)

Create an instance that will create a new task on io_executor for each iteration.

inline SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker, bool requires_io = false)

Create an instance that will either iterate synchronously or use the default I/O executor.

Public Members

std::shared_ptr<Schema> schema

The schema of the record batches from the iterator.

ItMaker it_maker

A maker of an iterator which acts as the data source.

arrow::internal::Executor *io_executor

The executor to use for scanning the iterator.

Defaults to the default I/O executor. Only used if requires_io is true. If requires_io is false then this MUST be nullptr.

bool requires_io

If true then items will be fetched from the iterator on a dedicated I/O thread to keep I/O off the CPU thread.

class RecordBatchReaderSourceNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a source node that reads from a RecordBatchReader

Each iteration of the RecordBatchReader will be run on a new thread task created on the I/O thread pool.

Public Functions

inline RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader, arrow::internal::Executor *io_executor = NULLPTR)

Create an instance from values.

Public Members

std::shared_ptr<RecordBatchReader> reader

The RecordBatchReader which acts as the data source.

arrow::internal::Executor *io_executor

The executor to use for the reader.

Defaults to the default I/O executor.

class ArrayVectorSourceNodeOptions : public arrow::acero::SchemaSourceNodeOptions<ArrayVectorIteratorMaker>
#include <arrow/acero/options.h>

An extended Source node which accepts a schema and array-vectors.

class ExecBatchSourceNodeOptions : public arrow::acero::SchemaSourceNodeOptions<ExecBatchIteratorMaker>
#include <arrow/acero/options.h>

An extended Source node which accepts a schema and exec-batches.

Public Functions

ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema, std::vector<ExecBatch> batches, ::arrow::internal::Executor *io_executor)
ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema, std::vector<ExecBatch> batches, bool requires_io = false)
inline SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker, arrow::internal::Executor *io_executor)

Create an instance that will create a new task on io_executor for each iteration.

inline SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker, bool requires_io = false)

Create an instance that will either iterate synchronously or use the default I/O executor.

class RecordBatchSourceNodeOptions : public arrow::acero::SchemaSourceNodeOptions<RecordBatchIteratorMaker>
#include <arrow/acero/options.h>

a source node that reads from an iterator of RecordBatch

class FilterNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a node which excludes some rows from batches passed through it

filter_expression will be evaluated against each batch which is pushed to this node. Any rows for which filter_expression does not evaluate to true will be excluded in the batch emitted by this node.

This node will emit empty batches if all rows are excluded. This is done to avoid gaps in the ordering.

Public Functions

inline explicit FilterNodeOptions(Expression filter_expression)

create an instance from values

Public Members

Expression filter_expression

the expression to filter batches

The return type of this expression must be boolean

class FetchNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a node which selects a specified subset from the input

Public Functions

inline FetchNodeOptions(int64_t offset, int64_t count)

create an instance from values

Public Members

int64_t offset

the number of rows to skip

int64_t count

the number of rows to keep (not counting skipped rows)

Public Static Attributes

static constexpr std::string_view kName = "fetch"
class ProjectNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a node which executes expressions on input batches, producing batches of the same length with new columns.

Each expression will be evaluated against each batch which is pushed to this node to produce a corresponding output column.

If names are not provided, the string representations of exprs will be used.

Public Functions

inline explicit ProjectNodeOptions(std::vector<Expression> expressions, std::vector<std::string> names = {})

create an instance from values

Public Members

std::vector<Expression> expressions

the expressions to run on the batches

The output will have one column for each expression. If you wish to keep any of the columns from the input then you should create a simple field_ref expression for that column.

std::vector<std::string> names

the names of the output columns

If this is not specified then the result of calling ToString on the expression will be used instead

This list should either be empty or have the same length as expressions

class AggregateNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a node which aggregates input batches and calculates summary statistics

The node can summarize the entire input or it can group the input with grouping keys and segment keys.

By default, the aggregate node is a pipeline breaker. It must accumulate all input before any output is produced. Segment keys are a performance optimization. If you know your input is already partitioned by one or more columns then you can specify these as segment keys. At each change in the segment keys the node will emit values for all data seen so far.

Segment keys are currently limited to single-threaded mode.

Both keys and segment-keys determine the group. However segment-keys are also used for determining grouping segments, which should be large, and allow streaming a partial aggregation result after processing each segment. One common use-case for segment-keys is ordered aggregation, in which the segment-key attribute specifies a column with non-decreasing values or a lexicographically-ordered set of such columns.

If the keys attribute is a non-empty vector, then each aggregate in aggregates is expected to be a HashAggregate function. If the keys attribute is an empty vector, then each aggregate is assumed to be a ScalarAggregate function.

If the segment_keys attribute is a non-empty vector, then segmented aggregation, as described above, applies.

The keys and segment_keys vectors must be disjoint.

If no measures are provided then you will simply get the list of unique keys.

This node outputs segment keys first, followed by regular keys, followed by one column for each aggregate.

Public Functions

inline explicit AggregateNodeOptions(std::vector<Aggregate> aggregates, std::vector<FieldRef> keys = {}, std::vector<FieldRef> segment_keys = {})

create an instance from values

Public Members

std::vector<Aggregate> aggregates
std::vector<FieldRef> keys
std::vector<FieldRef> segment_keys
class BackpressureMonitor
#include <arrow/acero/options.h>

an interface that can be queried for backpressure statistics

Public Functions

virtual ~BackpressureMonitor() = default
virtual uint64_t bytes_in_use() = 0

fetches the number of bytes currently queued up

virtual bool is_paused() = 0

checks to see if backpressure is currently applied

struct BackpressureOptions
#include <arrow/acero/options.h>

Options to control backpressure behavior.

Public Functions

inline BackpressureOptions()

Create default options that perform no backpressure.

inline BackpressureOptions(uint64_t resume_if_below, uint64_t pause_if_above)

Create options that will perform backpressure.

Parameters:
  • resume_if_below – The producer should resume producing if the backpressure queue has fewer than resume_if_below items.

  • pause_if_above – The producer should pause producing if the backpressure queue has more than pause_if_above items

inline bool should_apply_backpressure() const

helper method to determine if backpressure is disabled

Returns:

true if pause_if_above is greater than zero, false otherwise

Public Members

uint64_t resume_if_below

the number of bytes at which the producer should resume producing

uint64_t pause_if_above

the number of bytes at which the producer should pause producing

If this is <= 0 then backpressure will be disabled

Public Static Functions

static inline BackpressureOptions DefaultBackpressure()

create an instance using default values for backpressure limits

class SinkNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a sink node which collects results in a queue

Emitted batches will only be ordered if there is a meaningful ordering and sequence_output is not set to false.

Subclassed by arrow::acero::OrderBySinkNodeOptions, arrow::acero::SelectKSinkNodeOptions

Public Functions

inline explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()> *generator, std::shared_ptr<Schema> *schema, BackpressureOptions backpressure = {}, BackpressureMonitor **backpressure_monitor = NULLPTR, std::optional<bool> sequence_output = std::nullopt)
inline explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()> *generator, BackpressureOptions backpressure = {}, BackpressureMonitor **backpressure_monitor = NULLPTR, std::optional<bool> sequence_output = std::nullopt)

Public Members

std::function< Future< std::optional< ExecBatch > >)> * generator

A pointer to a generator of batches.

This will be set when the node is added to the plan and should be used to consume data from the plan. If this function is not called frequently enough then the sink node will start to accumulate data and may apply backpressure.

std::shared_ptr<Schema> *schema

A pointer which will be set to the schema of the generated batches.

This is optional, if nullptr is passed in then it will be ignored. This will be set when the node is added to the plan, before StartProducing is called

BackpressureOptions backpressure

Options to control when to apply backpressure.

This is optional, the default is to never apply backpressure. If the plan is not consumed quickly enough the system may eventually run out of memory.

BackpressureMonitor **backpressure_monitor

A pointer to a backpressure monitor.

This will be set when the node is added to the plan. This can be used to inspect the amount of data currently queued in the sink node. This is an optional utility and backpressure can be applied even if this is not used.

std::optional<bool> sequence_output

Controls whether batches should be emitted immediately or sequenced in order.

See also

QueryOptions for more details

class BackpressureControl
#include <arrow/acero/options.h>

Control used by a SinkNodeConsumer to pause & resume.

Callers should ensure that they do not call Pause and Resume simultaneously and they should sequence things so that a call to Pause() is always followed by an eventual call to Resume()

Public Functions

virtual ~BackpressureControl() = default
virtual void Pause() = 0

Ask the input to pause.

This is best effort, batches may continue to arrive Must eventually be followed by a call to Resume() or deadlock will occur

virtual void Resume() = 0

Ask the input to resume.

class SinkNodeConsumer
#include <arrow/acero/options.h>

a sink node that consumes the data as part of the plan using callbacks

Subclassed by arrow::acero::NullSinkNodeConsumer, arrow::acero::TableSinkNodeConsumer

Public Functions

virtual ~SinkNodeConsumer() = default
virtual Status Init(const std::shared_ptr<Schema> &schema, BackpressureControl *backpressure_control, ExecPlan *plan) = 0

Prepare any consumer state.

This will be run once the schema is finalized as the plan is starting and before any calls to Consume. A common use is to save off the schema so that batches can be interpreted.

virtual Status Consume(ExecBatch batch) = 0

Consume a batch of data.

virtual Future Finish() = 0

Signal to the consumer that the last batch has been delivered.

The returned future should only finish when all outstanding tasks have completed

If the plan is ended early or aborts due to an error then this will not be called.

class ConsumingSinkNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

Add a sink node which consumes data within the exec plan run.

Public Functions

inline explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer, std::vector<std::string> names = {}, std::optional<bool> sequence_output = std::nullopt)

Public Members

std::shared_ptr<SinkNodeConsumer> consumer
std::vector<std::string> names

Names to rename the sink’s schema fields to.

If specified then names must be provided for all fields. Currently, only a flat schema is supported (see GH-31875).

If not specified then names will be generated based on the source data.

std::optional<bool> sequence_output

Controls whether batches should be emitted immediately or sequenced in order.

See also

QueryOptions for more details

class OrderBySinkNodeOptions : public arrow::acero::SinkNodeOptions
#include <arrow/acero/options.h>

Make a node which sorts rows passed through it.

All batches pushed to this node will be accumulated, then sorted, by the given fields. Then sorted batches will be forwarded to the generator in sorted order.

Public Functions

inline explicit OrderBySinkNodeOptions(SortOptions sort_options, std::function<Future<std::optional<ExecBatch>>()> *generator)

create an instance from values

Public Members

SortOptions sort_options

options describing which columns and direction to sort

class OrderByNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

Apply a new ordering to data.

Currently this node works by accumulating all data, sorting, and then emitting the new data with an updated batch index.

Larger-than-memory sort is not currently supported.

Public Functions

inline explicit OrderByNodeOptions(Ordering ordering)

Public Members

Ordering ordering

The new ordering to apply to outgoing data.

Public Static Attributes

static constexpr std::string_view kName = "order_by"
class HashJoinNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a node which implements a join operation using a hash table

Public Functions

inline HashJoinNodeOptions(JoinType in_join_type, std::vector<FieldRef> in_left_keys, std::vector<FieldRef> in_right_keys, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false)

create an instance from values that outputs all columns

inline HashJoinNodeOptions(std::vector<FieldRef> in_left_keys, std::vector<FieldRef> in_right_keys)

create an instance from keys

This will create an inner join that outputs all columns and has no post join filter

in_left_keys should have the same length and types as in_right_keys

Parameters:
  • in_left_keys – the keys in the left input

  • in_right_keys – the keys in the right input

inline HashJoinNodeOptions(JoinType join_type, std::vector<FieldRef> left_keys, std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, std::vector<FieldRef> right_output, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false)

create an instance from values using JoinKeyCmp::EQ for all comparisons

inline HashJoinNodeOptions(JoinType join_type, std::vector<FieldRef> left_keys, std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false)

create an instance from values

HashJoinNodeOptions() = default

Public Members

JoinType join_type = JoinType::INNER
std::vector<FieldRef> left_keys
std::vector<FieldRef> right_keys
bool output_all = false
std::vector<FieldRef> left_output
std::vector<FieldRef> right_output
std::vector<JoinKeyCmp> key_cmp
std::string output_suffix_for_left
std::string output_suffix_for_right
Expression filter = literal(true)
bool disable_bloom_filter = false

Public Static Attributes

static constexpr const char *default_output_suffix_for_left = ""
static constexpr const char *default_output_suffix_for_right = ""
class AsofJoinNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a node which implements the asof join operation

Note, this API is experimental and will change in the future

This node takes one left table and any number of right tables, and asof joins them together. Batches produced by each input must be ordered by the “on” key. This node will output one row for each row in the left table.

Public Functions

inline AsofJoinNodeOptions(std::vector<Keys> input_keys, int64_t tolerance)

Public Members

std::vector<Keys> input_keys

AsofJoin keys per input table.

At least two keys must be given. The first key corresponds to a left table and all other keys correspond to right tables for the as-of-join.

See also

Keys for details.

int64_t tolerance

Tolerance for inexact “on” key matching.

A right row is considered a match with the left row if right.on - left.on <= tolerance. The tolerance may be:

  • negative, in which case a past-as-of-join occurs;

  • or positive, in which case a future-as-of-join occurs;

  • or zero, in which case an exact-as-of-join occurs.

The tolerance is interpreted in the same units as the “on” key.

struct Keys
#include <arrow/acero/options.h>

Keys for one input table of the AsofJoin operation.

The keys must be consistent across the input tables: Each “on” key must refer to a field of the same type and units across the tables. Each “by” key must refer to a list of fields of the same types across the tables.

Public Members

FieldRef on_key

“on” key for the join.

The input table must be sorted by the “on” key. Must be a single field of a common type. Inexact match is used on the “on” key. i.e., a row is considered a match iff left_on - tolerance <= right_on <= left_on. Currently, the “on” key must be of an integer, date, or timestamp type.

std::vector<FieldRef> by_key

“by” key for the join.

Each input table must have each field of the “by” key. Exact equality is used for each field of the “by” key. Currently, each field of the “by” key must be of an integer, date, timestamp, or base-binary type.

class SelectKSinkNodeOptions : public arrow::acero::SinkNodeOptions
#include <arrow/acero/options.h>

a node which select top_k/bottom_k rows passed through it

All batches pushed to this node will be accumulated, then selected, by the given fields. Then sorted batches will be forwarded to the generator in sorted order.

Public Functions

inline explicit SelectKSinkNodeOptions(SelectKOptions select_k_options, std::function<Future<std::optional<ExecBatch>>()> *generator)

Public Members

SelectKOptions select_k_options

SelectK options.

class TableSinkNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

a sink node which accumulates all output into a table

Public Functions

inline explicit TableSinkNodeOptions(std::shared_ptr<Table> *output_table, std::optional<bool> sequence_output = std::nullopt)

create an instance from values

Public Members

std::shared_ptr<Table> *output_table

an “out parameter” specifying the table that will be created

Must not be null and remain valid for the entirety of the plan execution. After the plan has completed this will be set to point to the result table

std::optional<bool> sequence_output

Controls whether batches should be emitted immediately or sequenced in order.

See also

QueryOptions for more details

std::vector<std::string> names

Custom names to use for the columns.

If specified then names must be provided for all fields. Currently, only a flat schema is supported (see GH-31875).

If not specified then names will be generated based on the source data.

struct PivotLongerRowTemplate
#include <arrow/acero/options.h>

a row template that describes one row that will be generated for each input row

Public Functions

inline PivotLongerRowTemplate(std::vector<std::string> feature_values, std::vector<std::optional<FieldRef>> measurement_values)

Public Members

std::vector<std::string> feature_values

A (typically unique) set of feature values for the template, usually derived from a column name.

These will be used to populate the feature columns

std::vector<std::optional<FieldRef>> measurement_values

The fields containing the measurements to use for this row.

These will be used to populate the measurement columns. If nullopt then nulls will be inserted for the given value.

class PivotLongerNodeOptions : public arrow::acero::ExecNodeOptions
#include <arrow/acero/options.h>

Reshape a table by turning some columns into additional rows.

This operation is sometimes also referred to as UNPIVOT

This is typically done when there are multiple observations in each row in order to transform to a table containing a single observation per row.

For example:

time

left_temp

right_temp

1

10

20

2

15

18

The above table contains two observations per row. There is an implicit feature “location” (left vs right) and a measurement “temp”. What we really want is:

time

location

temp

1

left

10

1

right

20

2

left

15

2

right

18

For a more complex example consider:

time

ax1

ay1

bx1

ay2

0

1

2

3

4

We can pretend a vs b and x vs y are features while 1 and 2 are two different kinds of measurements. We thus want to pivot to

time

a/b

x/y

f1

f2

0

a

x

1

null

0

a

y

2

4

0

b

x

3

null

To do this we create a row template for each combination of features. One should be able to do this purely by looking at the column names. For example, given the above columns “ax1”, “ay1”, “bx1”, and “ay2” we know we have three feature combinations (a, x), (a, y), and (b, x). Similarly, we know we have two possible measurements, “1” and “2”.

For each combination of features we create a row template. In each row template we describe the combination and then list which columns to use for the measurements. If a measurement doesn’t exist for a given combination then we use nullopt.

So, for our above example, we have:

(a, x): names={“a”, “x”}, values={“ax1”, nullopt} (a, y): names={“a”, “y”}, values={“ay1”, “ay2”} (b, x): names={“b”, “x”}, values={“bx1”, nullopt}

Finishing it off we name our new columns: feature_field_names={“a/b”,”x/y”} measurement_field_names={“f1”, “f2”}

Public Members

std::vector<PivotLongerRowTemplate> row_templates

One or more row templates to create new output rows.

Normally there are at least two row templates. The output # of rows will be the input # of rows * the number of row templates

std::vector<std::string> feature_field_names

The names of the columns which describe the new features.

std::vector<std::string> measurement_field_names

The names of the columns which represent the measurements.

Public Static Attributes

static constexpr std::string_view kName = "pivot_longer"

Internals for creating custom nodes

ExecFactoryRegistry *default_exec_factory_registry()

The default registry, which includes built-in factories.

inline Result<ExecNode*> MakeExecNode(const std::string &factory_name, ExecPlan *plan, std::vector<ExecNode*> inputs, const ExecNodeOptions &options, ExecFactoryRegistry *registry = default_exec_factory_registry())

Construct an ExecNode using the named factory.

inline bool operator==(const ExecBatch &l, const ExecBatch &r)
inline bool operator!=(const ExecBatch &l, const ExecBatch &r)
void PrintTo(const ExecBatch&, std::ostream*)
class ExecPlan : public std::enable_shared_from_this<ExecPlan>
#include <arrow/acero/exec_plan.h>

Public Types

using NodeVector = std::vector<ExecNode*>

Public Functions

virtual ~ExecPlan() = default
QueryContext *query_context()
const NodeVector &nodes() const

retrieve the nodes in the plan

ExecNode *AddNode(std::unique_ptr<ExecNode> node)
template<typename Node, typename ...Args>
inline Node *EmplaceNode(Args&&... args)
Status Validate()
void StartProducing()

Start producing on all nodes.

Nodes are started in reverse topological order, such that any node is started before all of its inputs.

void StopProducing()

Stop producing on all nodes.

Triggers all sources to stop producing new data. In order to cleanly stop the plan will continue to run any tasks that are already in progress. The caller should still wait for finished to complete before destroying the plan.

Future finished()

A future which will be marked finished when all tasks have finished.

bool HasMetadata() const

Return whether the plan has non-empty metadata.

std::shared_ptr<const KeyValueMetadata> metadata() const

Return the plan’s attached metadata.

std::string ToString() const

Public Static Functions

static Result<std::shared_ptr<ExecPlan>> Make(QueryOptions options, ExecContext exec_context = *threaded_exec_context(), std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR)

Make an empty exec plan.

static Result<std::shared_ptr<ExecPlan>> Make(ExecContext exec_context = *threaded_exec_context(), std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR)
static Result<std::shared_ptr<ExecPlan>> Make(QueryOptions options, ExecContext *exec_context, std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR)
static Result<std::shared_ptr<ExecPlan>> Make(ExecContext *exec_context, std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR)

Public Static Attributes

static const uint32_t kMaxBatchSize = 1 << 15
class ExecNode
#include <arrow/acero/exec_plan.h>

Subclassed by arrow::acero::MapNode

Public Types

using NodeVector = std::vector<ExecNode*>

Public Functions

virtual ~ExecNode() = default
virtual const char *kind_name() const = 0
inline int num_inputs() const
inline const NodeVector &inputs() const

This node’s predecessors in the exec plan.

inline bool is_sink() const

True if the plan has no output schema (is a sink)

inline const std::vector<std::string> &input_labels() const

Labels identifying the function of each input.

inline const ExecNode *output() const

This node’s successor in the exec plan.

inline const std::shared_ptr<Schema> &output_schema() const

The datatypes for batches produced by this node.

inline ExecPlan *plan()

This node’s exec plan.

inline const std::string &label() const

An optional label, for display and debugging.

There is no guarantee that this value is non-empty or unique.

inline void SetLabel(std::string label)
virtual Status Validate() const
virtual const Ordering &ordering() const

the ordering of the output batches

This does not guarantee the batches will be emitted by this node in order. Instead it guarantees that the batches will have their ExecBatch::index property set in a way that respects this ordering.

In other words, given the ordering {{“x”, SortOrder::Ascending}} we know that all values of x in a batch with index N will be less than or equal to all values of x in a batch with index N+k (assuming k > 0). Furthermore, we also know that values will be sorted within a batch. Any row N will have a value of x that is less than the value for any row N+k.

Note that an ordering can be both Ordering::Unordered and Ordering::Implicit. A node’s output should be marked Ordering::Unordered if the order is non-deterministic. For example, a hash-join has no predictable output order.

If the ordering is Ordering::Implicit

then there is a meaningful order but that ordering is not represented by any column in the data. The most common case for this is when reading data from an in-memory table. The data has an implicit “row

order” which is not necessarily represented in the data set.

A filter or project node will not modify the ordering. Nothing needs to be done other than ensure the index assigned to output batches is the same as the input batch that was mapped.

Other nodes may introduce order. For example, an order-by node will emit a brand new ordering independent of the input ordering.

Finally, as described above, such as a hash-join or aggregation may may destroy ordering (although these nodes could also choose to establish a new ordering based on the hash keys).

Some nodes will require an ordering. For example, a fetch node or an asof join node will only function if the input data is ordered (for fetch it is enough to be implicitly ordered. For an asof join the ordering must be explicit and compatible with the on key.)

Nodes that maintain ordering should be careful to avoid introducing gaps in the batch index. This may require emitting empty batches in order to maintain continuity.

virtual Status InputReceived(ExecNode *input, ExecBatch batch) = 0

Upstream API: These functions are called by input nodes that want to inform this node about an updated condition (a new input batch or an impending end of stream).

Implementation rules:

A node will typically perform some kind of operation on the batch and then call InputReceived on its outputs with the result.

Other nodes may need to accumulate some number of inputs before any output can be produced. These nodes will add the batch to some kind of in-memory accumulation queue and return.

virtual Status InputFinished(ExecNode *input, int total_batches) = 0

Mark the inputs finished after the given number of batches.

This may be called before all inputs are received. This simply fixes the total number of incoming batches for an input, so that the ExecNode knows when it has received all input, regardless of order.

virtual Status Init()

Perform any needed initialization.

This hook performs any actions in between creation of ExecPlan and the call to StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes that executes this method is undefined, but the calls are made synchronously.

At this point a node can rely on all inputs & outputs (and the input schemas) being well defined.

virtual Status StartProducing() = 0

Lifecycle API:

  • start / stop to initiate and terminate production

  • pause / resume to apply backpressure

Implementation rules:

StopProducing may be called due to an error, by the user (e.g. cancel), or because a node has all the data it needs (e.g. limit, top-k on sorted data). This means the method may be called multiple times and we have the following additional rules

  • StopProducing() must be idempotent

  • StopProducing() must be forwarded to inputs (this is needed for the limit/top-k case because we may not be stopping the entire plan)

Start producing

This must only be called once.

This is typically called automatically by ExecPlan::StartProducing().

virtual void PauseProducing(ExecNode *output, int32_t counter) = 0

Pause producing temporarily.

This call is a hint that an output node is currently not willing to receive data.

This may be called any number of times. However, the node is still free to produce data (which may be difficult to prevent anyway if data is produced using multiple threads).

Parameters:
  • output – Pointer to the output that is full

  • counter – Counter used to sequence calls to pause/resume

virtual void ResumeProducing(ExecNode *output, int32_t counter) = 0

Resume producing after a temporary pause.

This call is a hint that an output node is willing to receive data again.

This may be called any number of times.

Parameters:
  • output – Pointer to the output that is now free

  • counter – Counter used to sequence calls to pause/resume

virtual Status StopProducing()

Stop producing new data.

If this node is a source then the source should stop generating data as quickly as possible. If this node is not a source then there is typically nothing that needs to be done although a node may choose to start ignoring incoming data.

This method will be called when an error occurs in the plan This method may also be called by the user if they wish to end a plan early Finally, this method may be called if a node determines it no longer needs any more input (for example, a limit node).

This method may be called multiple times.

This is not a pause. There will be no way to start the source again after this has been called.

std::string ToString(int indent = 0) const
class ExecFactoryRegistry
#include <arrow/acero/exec_plan.h>

An extensible registry for factories of ExecNodes.

Public Types

using Factory = std::function<Result<ExecNode*>(ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>

Public Functions

virtual ~ExecFactoryRegistry() = default
virtual Result<Factory> GetFactory(const std::string &factory_name) = 0

Get the named factory from this registry.

will raise if factory_name is not found

virtual Status AddFactory(std::string factory_name, Factory factory) = 0

Add a factory to this registry with the provided name.

will raise if factory_name is already in the registry

struct ExecBatch
#include <arrow/compute/exec.h>

Public Functions

ExecBatch() = default
inline ExecBatch(std::vector<Datum> values, int64_t length)
explicit ExecBatch(const RecordBatch &batch)
Result<std::shared_ptr<RecordBatch>> ToRecordBatch(std::shared_ptr<Schema> schema, MemoryPool *pool = default_memory_pool()) const
int64_t TotalBufferSize() const

The sum of bytes in each buffer referenced by the batch.

Note: Scalars are not counted Note: Some values may referenced only part of a buffer, for example, an array with an offset. The actual data visible to this batch will be smaller than the total buffer size in this case.

template<typename index_type>
inline const Datum &operator[](index_type i) const

Return the value at the i-th index.

bool Equals(const ExecBatch &other) const
inline int num_values() const

A convenience for the number of values / arguments.

ExecBatch Slice(int64_t offset, int64_t length) const
Result<ExecBatch> SelectValues(const std::vector<int> &ids) const
inline std::vector<TypeHolder> GetTypes() const

A convenience for returning the types from the batch.

std::string ToString() const

Public Members

std::vector<Datum> values

The values representing positional arguments to be passed to a kernel’s exec function for processing.

std::shared_ptr<SelectionVector> selection_vector

A deferred filter represented as an array of indices into the values.

For example, the filter [true, true, false, true] would be represented as the selection vector [0, 1, 3]. When the selection vector is set, ExecBatch::length is equal to the length of this array.

Expression guarantee = literal(true)

A predicate Expression guaranteed to evaluate to true for all rows in this batch.

int64_t length = 0

The semantic length of the ExecBatch.

When the values are all scalars, the length should be set to 1 for non-aggregate kernels, otherwise the length is taken from the array values, except when there is a selection vector. When there is a selection vector set, the length of the batch is the length of the selection. Aggregate kernels can have an ExecBatch formed by projecting just the partition columns from a batch in which case, it would have scalar rows with length greater than 1.

If the array values are of length 0 then the length is 0 regardless of whether any values are Scalar.

int64_t index = kUnsequencedIndex

index of this batch in a sorted stream of batches

This index must be strictly monotonic starting at 0 without gaps or it can be set to kUnsequencedIndex if there is no meaningful order

Public Static Functions

static Result<int64_t> InferLength(const std::vector<Datum> &values)

Infer the ExecBatch length from values.

static Result<ExecBatch> Make(std::vector<Datum> values, int64_t length = -1)

Creates an ExecBatch with length-validation.

If any value is given, then all values must have a common length. If the given length is negative, then the length of the ExecBatch is set to this common length, or to 1 if no values are given. Otherwise, the given length must equal the common length, if any value is given.