Acero: A C++ streaming execution engine

Warning

Acero is experimental and a stable API is not yet guaranteed.

Motivation

For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, the Arrow C++ implementation also provides Acero, a streaming query engine with which computations can be formulated and executed.

An example graph of a streaming execution workflow.

Acero allows computation to be expressed as an “execution plan” (ExecPlan) which is a directed graph of operators. Each operator (ExecNode) provides, transforms, or consumes the data passing through it. Batches of data (ExecBatch) flow along edges of the graph from node to node. Structuring the API around streams of batches allows the working set for each node to be tuned for optimal performance independent of any other nodes in the graph. Each ExecNode processes batches as they are pushed to it along an edge of the graph by upstream nodes (its inputs), and pushes batches along an edge of the graph to downstream nodes (its outputs) as they are finalized.

Substrait

In order to use Acero you will need to create an execution plan. This is the model that describes the computation you want to apply to your data. Acero has its own internal representation for execution plans but most users should not interact with this directly as it will couple their code to Acero.

Substrait is an open standard for execution plans. Acero implements the Substrait “consumer” interface. This means that Acero can accept a Substrait plan and fulfill the plan, loading the requested data and applying the desired computation. By using Substrait plans users can easily switch out to a different execution engine at a later time.

Substrait Conformance

Substrait defines a broad set of operators and functions for many different situations and it is unlikely that Acero will ever completely satisfy all defined Substrait operators and functions. To help understand what features are available the following sections define which features have been currently implemented in Acero and any caveats that apply.

Plans

  • A plan should have a single top-level relation.

  • The consumer is currently based on version 0.20.0 of Substrait. Any features added that are newer will not be supported.

  • Due to a breaking change in 0.20.0 any Substrait plan older than 0.20.0 will be rejected.

Extensions

  • If a plan contains any extension type variations it will be rejected.

  • Advanced extensions can be provided by supplying a custom implementation of arrow::engine::ExtensionProvider.

Relations (in general)

  • Any relation not explicitly listed below will not be supported and will cause the plan to be rejected.

Read Relations

  • The projection property is not supported and plans containing this property will be rejected.

  • The VirtualTable and ``ExtensionTable``read types are not supported. Plans containing these types will be rejected.

  • Only the parquet and arrow file formats are currently supported.

  • All URIs must use the file scheme

  • partition_index, start, and length are not supported. Plans containing non-default values for these properties will be rejected.

  • The Substrait spec requires that a filter be completely satisfied by a read relation. However, Acero only uses a read filter for pushdown projection and it may not be fully satisfied. Users should generally attach an additional filter relation with the same filter expression after the read relation.

Filter Relations

  • No known caveats

Project Relations

  • No known caveats

Join Relations

  • The join type JOIN_TYPE_SINGLE is not supported and plans containing this will be rejected.

  • The join expression must be a call to either the equal or is_not_distinct_from functions. Both arguments to the call must be direct references. Only a single join key is supported.

  • The post_join_filter property is not supported and will be ignored.

Aggregate Relations

  • At most one grouping set is supported.

  • Each grouping expression must be a direct reference.

  • Each measure’s arguments must be direct references.

  • A measure may not have a filter

  • A measure may not have sorts

  • A measure’s invocation must be AGGREGATION_INVOCATION_ALL or AGGREGATION_INVOCATION_UNSPECIFIED

  • A measure’s phase must be AGGREGATION_PHASE_INITIAL_TO_RESULT

Expressions (general)

  • Various places in the Substrait spec allow for expressions to be used outside of a filter or project relation. For example, a join expression or an aggregate grouping set. Acero typically expects these expressions to be direct references. Planners should extract the implicit projection into a formal project relation before delivering the plan to Acero.

Literals

  • A literal with non-default nullability will cause a plan to be rejected.

Types

  • Acero does not have full support for non-nullable types and may allow input to have nulls without rejecting it.

  • The table below shows the mapping between Arrow types and Substrait type classes that are currently supported

Substrait / Arrow Type Mapping

Substrait Type

Arrow Type

Caveat

boolean

boolean

i8

int8

i16

int16

i32

int32

i64

int64

fp32

float32

fp64

float64

string

string

binary

binary

timestamp

timestamp<MICRO,””>

timestamp_tz

timestamp<MICRO,”UTC”>

date

date32<DAY>

time

time64<MICRO>

interval_year

Not currently supported

interval_day

Not currently supported

uuid

Not currently supported

FIXEDCHAR<L>

Not currently supported

VARCHAR<L>

Not currently supported

FIXEDBINARY<L>

fixed_size_binary<L>

DECIMAL<P,S>

decimal128<P,S>

STRUCT<T1…TN>

struct<T1…TN>

Arrow struct fields will have no name (empty string)

NSTRUCT<N:T1…N:Tn>

Not currently supported

LIST<T>

list<T>

MAP<K,V>

map<K,V>

K must not be nullable

Functions

  • The following functions have caveats or are not supported at all. Note that this is not a comprehensive list. Functions are being added to Substrait at a rapid pace and new functions may be missing.

    • Acero does not support the SATURATE option for overflow

    • Acero does not support kernels that take more than two arguments for the functions and, or, xor

    • Acero does not support temporal arithmetic

    • Acero does not support the following standard functions:

      • is_not_distinct_from

      • like

      • substring

      • starts_with

      • ends_with

      • contains

      • count

      • count_distinct

      • approx_count_distinct

  • The functions above should be referenced using the URI https://github.com/apache/arrow/blob/main/format/substrait/extension_types.yaml

    • Alternatively, the URI can be left completely empty and Acero will match based only on function name. This fallback mechanism is non-standard and should be avoided if possible.

Architecture Overview

ExecNode

Each node in the graph is an implementation of the ExecNode interface.

ExecPlan

A set of ExecNode is contained and (to an extent) coordinated by an ExecPlan.

ExecFactoryRegistry

Instances of ExecNode are constructed by factory functions held in a ExecFactoryRegistry.

ExecNodeOptions

Heterogenous parameters for factories of ExecNode are bundled in an ExecNodeOptions.

Declaration

dplyr-inspired helper for efficient construction of an ExecPlan.

ExecBatch

A lightweight container for a single chunk of data in the Arrow format. In contrast to RecordBatch, ExecBatch is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by a Scalar instead of an Array. In addition, ExecBatch may carry execution-relevant properties including a guaranteed-true-filter for Expression simplification.

An example ExecNode implementation which simply passes all input batches through unchanged:

class PassthruNode : public ExecNode {
 public:
  // InputReceived is the main entry point for ExecNodes. It is invoked
  // by an input of this node to push a batch here for processing.
  void InputReceived(ExecNode* input, ExecBatch batch) override {
    // Since this is a passthru node we simply push the batch to our
    // only output here.
    outputs_[0]->InputReceived(this, batch);
  }

  // ErrorReceived is called by an input of this node to report an error.
  // ExecNodes should always forward errors to their outputs unless they
  // are able to fully handle the error (this is rare).
  void ErrorReceived(ExecNode* input, Status error) override {
    outputs_[0]->ErrorReceived(this, error);
  }

  // InputFinished is used to signal how many batches will ultimately arrive.
  // It may be called with any ordering relative to InputReceived/ErrorReceived.
  void InputFinished(ExecNode* input, int total_batches) override {
    outputs_[0]->InputFinished(this, total_batches);
  }

  // ExecNodes may request that their inputs throttle production of batches
  // until they are ready for more, or stop production if no further batches
  // are required.  These signals should typically be forwarded to the inputs
  // of the ExecNode.
  void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
  void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
  void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }

  // An ExecNode has a single output schema to which all its batches conform.
  using ExecNode::output_schema;

  // ExecNodes carry basic introspection for debugging purposes
  const char* kind_name() const override { return "PassthruNode"; }
  using ExecNode::label;
  using ExecNode::SetLabel;
  using ExecNode::ToString;

  // An ExecNode holds references to its inputs and outputs, so it is possible
  // to walk the graph of execution if necessary.
  using ExecNode::inputs;
  using ExecNode::outputs;

  // StartProducing() and StopProducing() are invoked by an ExecPlan to
  // coordinate the graph-wide execution state.  These do not need to be
  // forwarded to inputs or outputs.
  Status StartProducing() override { return Status::OK(); }
  void StopProducing() override {}
  Future<> finished() override { return inputs_[0]->finished(); }
};

Note that each method which is associated with an edge of the graph must be invoked with an ExecNode* to identify the node which invoked it. For example, in an ExecNode which implements JOIN this tagging might be used to differentiate between batches from the left or right inputs. InputReceived, ErrorReceived, InputFinished may only be invoked by the inputs of a node, while ResumeProducing, PauseProducing, StopProducing may only be invoked by outputs of a node.

ExecPlan contains the associated instances of ExecNode and is used to start and stop execution of all nodes and for querying/awaiting their completion:

// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));

// ... add nodes to your ExecPlan

// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());

SetUserCancellationCallback([plan] {
  // stop all nodes in the graph
  plan->StopProducing();
});

// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();

Constructing ExecPlan objects

None of the concrete implementations of ExecNode are exposed in headers, so they can’t be constructed directly outside the translation unit where they are defined. Instead, factories to create them are provided in an extensible registry. This structure provides a number of benefits:

  • This enforces consistent construction.

  • It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)

  • This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate libarrow_dataset.so library.

  • Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.

Factories of ExecNode can be retrieved by name from the registry. The default registry is available through arrow::compute::default_exec_factory_registry() and can be queried for the built-in factories:

// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
                      default_exec_factory_registry()->GetFactory("filter"));

// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
    // the ExecPlan which should own this node
    plan.get(),

    // nodes which will send batches to this node (inputs)
    {scan_node},

    // parameters unique to "filter" nodes
    FilterNodeOptions{filter_expression}));

// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
    plan.get(), {scan_node}, FilterNodeOptions{filter_expression});

Factories can also be added to the default registry as long as they are convertible to std::function<Result<ExecNode*>( ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>.

To build an ExecPlan representing a simple pipeline which reads from a RecordBatchReader then filters, projects, and writes to disk:

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
                                      SourceNodeOptions::FromReader(
                                          reader,
                                          GetCpuThreadPool()));

ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
                                      FilterNodeOptions{
                                        greater(field_ref("score"), literal(3))
                                      });

ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
                                       ProjectNodeOptions{
                                         {add(field_ref("score"), literal(1))},
                                         {"score + 1"}
                                       });

arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
             WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});

Declaration is a dplyr-inspired helper which further decreases the boilerplate associated with populating an ExecPlan from C++:

arrow::dataset::internal::Initialize();

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
              {
                  {"source", SourceNodeOptions::FromReader(
                       reader,
                       GetCpuThreadPool())},
                  {"filter", FilterNodeOptions{
                       greater(field_ref("score"), literal(3))}},
                  {"project", ProjectNodeOptions{
                       {add(field_ref("score"), literal(1))},
                       {"score + 1"}}},
                  {"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
              })
              .AddToPlan(plan.get()));

Note that a source node can wrap anything which resembles a stream of batches. For example, PR#11032 adds support for use of a DuckDB query as a source node. Similarly, a sink node can wrap anything which absorbs a stream of batches. In the example above we’re writing completed batches to disk. However we can also collect these in memory into a Table or forward them to a RecordBatchReader as an out-of-graph stream. This flexibility allows an ExecPlan to be used as streaming middleware between any endpoints which support Arrow formatted batches.

An arrow::dataset::Dataset can also be wrapped as a source node which pushes all the dataset’s batches into an ExecPlan. This factory is added to the default registry with the name "scan" by calling arrow::dataset::internal::Initialize():

arrow::dataset::internal::Initialize();

std::shared_ptr<Dataset> dataset = GetDataset();

ASSERT_OK(Declaration::Sequence(
              {
                  {"scan", ScanNodeOptions{dataset,
                     /* push down predicate, projection, ... */}},
                  {"filter", FilterNodeOptions{/* ... */}},
                  // ...
              })
              .AddToPlan(plan.get()));

Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.

Constructing ExecNode using Options

ExecNode is the component we use as a building block containing in-built operations with various functionalities.

This is the list of operations associated with the execution plan:

Operations and Options

Operation

Options

source

arrow::compute::SourceNodeOptions

table_source

arrow::compute::TableSourceNodeOptions

filter

arrow::compute::FilterNodeOptions

project

arrow::compute::ProjectNodeOptions

aggregate

arrow::compute::AggregateNodeOptions

sink

arrow::compute::SinkNodeOptions

consuming_sink

arrow::compute::ConsumingSinkNodeOptions

order_by_sink

arrow::compute::OrderBySinkNodeOptions

select_k_sink

arrow::compute::SelectKSinkNodeOptions

scan

arrow::dataset::ScanNodeOptions

hash_join

arrow::compute::HashJoinNodeOptions

write

arrow::dataset::WriteNodeOptions

union

N/A

table_sink

arrow::compute::TableSinkNodeOptions

source

A source operation can be considered as an entry point to create a streaming execution plan. arrow::compute::SourceNodeOptions are used to create the source operation. The source operation is the most generic and flexible type of source currently available but it can be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.

The source node requires some kind of function that can be called to poll for more data. This function should take no arguments and should return an arrow::Future<std::optional<arrow::ExecBatch>>. This function might be reading a file, iterating through an in memory structure, or receiving data from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator and there are a number of utilities for working with these functions. For this example we use a vector of record batches that we’ve already stored in memory. In addition, the schema of the data must be known up front. Acero must know the schema of the data at each stage of the execution graph before any processing has begun. This means we must supply the schema for a source node separately from the data itself.

Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :

156struct BatchesWithSchema {
157  std::vector<cp::ExecBatch> batches;
158  std::shared_ptr<arrow::Schema> schema;
159  // This method uses internal arrow utilities to
160  // convert a vector of record batches to an AsyncGenerator of optional batches
161  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
162    auto opt_batches = ::arrow::internal::MapVector(
163        [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
164        batches);
165    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
166    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
167    return gen;
168  }
169};

Generating sample batches for computation:

173arrow::Result<BatchesWithSchema> MakeBasicBatches() {
174  BatchesWithSchema out;
175  auto field_vector = {arrow::field("a", arrow::int32()),
176                       arrow::field("b", arrow::boolean())};
177  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
178  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
179  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
180
181  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
182                        GetArrayDataSample<arrow::BooleanType>({false, true}));
183  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
184                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
185  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
186                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
187
188  ARROW_ASSIGN_OR_RAISE(auto b1,
189                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
190  ARROW_ASSIGN_OR_RAISE(auto b2,
191                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
192  ARROW_ASSIGN_OR_RAISE(auto b3,
193                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
194
195  out.batches = {b1, b2, b3};
196  out.schema = arrow::schema(field_vector);
197  return out;
198}

Example of using source (usage of sink is explained in detail in sink):

294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom souce is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}

table_source

In the previous example, source node, a source node was used to input the data. But when developing an application, if the data is already in memory as a table, it is much easier, and more performant to use arrow::compute::TableSourceNodeOptions. Here the input data can be passed as a std::shared_ptr<arrow::Table> along with a max_batch_size. The max_batch_size is to break up large record batches so that they can be processed in parallel. It is important to note that the table batches will not get merged to form larger batches when the source table has a smaller batch size.

Example of using table_source

317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table.  This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329  int max_batch_size = 2;
330  auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332  ac::Declaration source{"table_source", std::move(table_source_options)};
333
334  return ExecutePlanAndCollectAsTable(std::move(source));
335}

filter

filter operation, as the name suggests, provides an option to define data filtering criteria. It selects rows where the given expression evaluates to true. Filters can be written using arrow::compute::Expression, and the expression should have a return type of boolean. For example, if we wish to keep rows where the value of column b is greater than 3, then we can use the following expression.

Filter example:

340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the exeuction plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349  auto options = std::make_shared<arrow::dataset::ScanOptions>();
350  // specify the filter.  This filter removes all rows where the
351  // value of the "a" column is greater than 3.
352  cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353  // set filter for scanner : on-disk / push-down filtering.
354  // This step can be skipped if you are not reading from disk.
355  options->filter = filter_expr;
356  // empty projection
357  options->projection = cp::project({}, {});
358
359  // construct the scan node
360  std::cout << "Initialized Scanning Options" << std::endl;
361
362  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363  std::cout << "Scan node options created" << std::endl;
364
365  ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367  // pipe the scan node into the filter node
368  // Need to set the filter in scan node options and filter node options.
369  // At scan node it is used for on-disk / push-down filtering.
370  // At filter node it is used for in-memory filtering.
371  ac::Declaration filter{
372      "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374  return ExecutePlanAndCollectAsTable(std::move(filter));
375}

project

project operation rearranges, deletes, transforms, and creates columns. Each output column is computed by evaluating an expression against the source record batch. These must be scalar expressions (expressions consisting of scalar literals, field references and scalar functions, i.e. elementwise functions that return one value for each input row independent of the value of all other rows). This is exposed via arrow::compute::ProjectNodeOptions which requires, an arrow::compute::Expression and name for each of the output columns (if names are not provided, the string representations of exprs will be used).

Project example:

381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390  auto options = std::make_shared<arrow::dataset::ScanOptions>();
391  // projection
392  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393  options->projection = cp::project({}, {});
394
395  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397  ac::Declaration scan{"scan", std::move(scan_node_options)};
398  ac::Declaration project{
399      "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401  return ExecutePlanAndCollectAsTable(std::move(project));
402}

aggregate

The aggregate node computes various types of aggregates over data.

Arrow supports two types of aggregates: “scalar” aggregates, and “hash” aggregates. Scalar aggregates reduce an array or scalar input to a single scalar output (e.g. computing the mean of a column). Hash aggregates act like GROUP BY in SQL and first partition data based on one or more key columns, then reduce the data in each partition. The aggregate node supports both types of computation, and can compute any number of aggregations at once.

arrow::compute::AggregateNodeOptions is used to define the aggregation criteria. It takes a list of aggregation functions and their options; a list of target fields to aggregate, one per function; and a list of names for the output fields, one per function. Optionally, it takes a list of columns that are used to partition the data, in the case of a hash aggregation. The aggregation functions can be selected from this list of aggregation functions.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.

Scalar Aggregation example:

408/// \brief An example showing an aggregation node to aggregate an entire table
409///
410/// Source-Aggregation-Table
411/// This example shows how an aggregation operation can be applied on a
412/// execution plan resulting in a scalar output. The source node loads the
413/// data and the aggregation (counting unique types in column 'a')
414/// is applied on this data. The output is collected into a table (that will
415/// have exactly one row)
416arrow::Status SourceScalarAggregateSinkExample() {
417  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
418
419  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
420
421  ac::Declaration source{"source", std::move(source_node_options)};
422  auto aggregate_options =
423      ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
424  ac::Declaration aggregate{
425      "aggregate", {std::move(source)}, std::move(aggregate_options)};
426
427  return ExecutePlanAndCollectAsTable(std::move(aggregate));
428}

Group Aggregation example:

433/// \brief An example showing an aggregation node to perform a group-by operation
434///
435/// Source-Aggregation-Table
436/// This example shows how an aggregation operation can be applied on a
437/// execution plan resulting in grouped output. The source node loads the
438/// data and the aggregation (counting unique types in column 'a') is
439/// applied on this data. The output is collected into a table that will contain
440/// one row for each unique combination of group keys.
441arrow::Status SourceGroupAggregateSinkExample() {
442  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
443
444  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
445
446  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
447
448  ac::Declaration source{"source", std::move(source_node_options)};
449  auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
450  auto aggregate_options =
451      ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
452                               /*keys=*/{"b"}};
453  ac::Declaration aggregate{
454      "aggregate", {std::move(source)}, std::move(aggregate_options)};
455
456  return ExecutePlanAndCollectAsTable(std::move(aggregate));
457}

sink

sink operation provides output and is the final node of a streaming execution definition. arrow::compute::SinkNodeOptions interface is used to pass the required options. Similar to the source operator the sink operator exposes the output with a function that returns a record batch future each time it is called. It is expected the caller will repeatedly call this function until the generator function is exhausted (returns std::optional::nullopt). If this function is not called often enough then record batches will accumulate in memory. An execution plan should only have one “terminal” node (one sink node). An ExecPlan can terminate early due to cancellation or an error, before the output is fully consumed. However, the plan can be safely destroyed independently of the sink, which will hold the unconsumed batches by exec_plan->finished().

As a part of the Source Example, the Sink operation is also included;

294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom souce is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}

consuming_sink

consuming_sink operator is a sink operation containing consuming operation within the execution plan (i.e. the exec plan should not complete until the consumption has completed). Unlike the sink node this node takes in a callback function that is expected to consume the batch. Once this callback has finished the execution plan will no longer hold any reference to the batch. The consuming function may be called before a previous invocation has completed. If the consuming function does not run quickly enough then many concurrent executions could pile up, blocking the CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks have been completed. Once all batches have been delivered the execution plan will wait for the finish future to complete before marking the execution plan finished. This allows for workflows where the consumption function converts batches into async tasks (this is currently done internally for the dataset write node).

Example:

// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {

    CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
    batches_seen(batches_seen), finish(std::move(finish)) {}
    // Consumption logic can be written here
    arrow::Status Consume(cp::ExecBatch batch) override {
    // data can be consumed in the expected way
    // transfer to another system or just do some work
    // and write to disk
    (*batches_seen)++;
    return arrow::Status::OK();
    }

    arrow::Future<> Finish() override { return finish; }

    std::atomic<uint32_t> *batches_seen;
    arrow::Future<> finish;

};

std::shared_ptr<CustomSinkNodeConsumer> consumer =
        std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

arrow::compute::ExecNode *consuming_sink;

ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
    {source}, cp::ConsumingSinkNodeOptions(consumer)));

Consuming-Sink example:

462/// \brief An example showing a consuming sink node
463///
464/// Source-Consuming-Sink
465/// This example shows how the data can be consumed within the execution plan
466/// by using a ConsumingSink node. There is no data output from this execution plan.
467arrow::Status SourceConsumingSinkExample() {
468  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
469
470  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
471
472  ac::Declaration source{"source", std::move(source_node_options)};
473
474  std::atomic<uint32_t> batches_seen{0};
475  arrow::Future<> finish = arrow::Future<>::Make();
476  struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
477    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
478        : batches_seen(batches_seen), finish(std::move(finish)) {}
479
480    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
481                       ac::BackpressureControl* backpressure_control,
482                       ac::ExecPlan* plan) override {
483      // This will be called as the plan is started (before the first call to Consume)
484      // and provides the schema of the data coming into the node, controls for pausing /
485      // resuming input, and a pointer to the plan itself which can be used to access
486      // other utilities such as the thread indexer or async task scheduler.
487      return arrow::Status::OK();
488    }
489
490    arrow::Status Consume(cp::ExecBatch batch) override {
491      (*batches_seen)++;
492      return arrow::Status::OK();
493    }
494
495    arrow::Future<> Finish() override {
496      // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
497      // output file handles and flushing remaining work
498      return arrow::Future<>::MakeFinished();
499    }
500
501    std::atomic<uint32_t>* batches_seen;
502    arrow::Future<> finish;
503  };
504  std::shared_ptr<CustomSinkNodeConsumer> consumer =
505      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
506
507  ac::Declaration consuming_sink{"consuming_sink",
508                                 {std::move(source)},
509                                 ac::ConsumingSinkNodeOptions(std::move(consumer))};
510
511  // Since we are consuming the data within the plan there is no output and we simply
512  // run the plan to completion instead of collecting into a table.
513  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
514
515  std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
516            << std::endl;
517  return arrow::Status::OK();
518}

order_by_sink

order_by_sink operation is an extension to the sink operation. This operation provides the ability to guarantee the ordering of the stream by providing the arrow::compute::OrderBySinkNodeOptions. Here the arrow::compute::SortOptions are provided to define which columns are used for sorting and whether to sort by ascending or descending values.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

Order-By-Sink example:

523arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
524    std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
525    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
526  // translate sink_gen (async) to sink_reader (sync)
527  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
528      ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
529
530  // validate the ExecPlan
531  ARROW_RETURN_NOT_OK(plan->Validate());
532  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
533  // start the ExecPlan
534  plan->StartProducing();
535
536  // collect sink_reader into a Table
537  std::shared_ptr<arrow::Table> response_table;
538
539  ARROW_ASSIGN_OR_RAISE(response_table,
540                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
541
542  std::cout << "Results : " << response_table->ToString() << std::endl;
543
544  // stop producing
545  plan->StopProducing();
546  // plan mark finished
547  auto future = plan->finished();
548  return future.status();
549}
550
551/// \brief An example showing an order-by node
552///
553/// Source-OrderBy-Sink
554/// In this example, the data enters through the source node
555/// and the data is ordered in the sink node. The order can be
556/// ASCENDING or DESCENDING and it is configurable. The output
557/// is obtained as a table from the sink node.
558arrow::Status SourceOrderBySinkExample() {
559  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
560                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
561
562  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
563
564  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
565
566  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
567  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
568                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
569
570  ARROW_RETURN_NOT_OK(ac::MakeExecNode(
571      "order_by_sink", plan.get(), {source},
572      ac::OrderBySinkNodeOptions{
573          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
574
575  return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
576}

select_k_sink

select_k_sink option enables selecting the top/bottom K elements, similar to a SQL ORDER BY ... LIMIT K clause. arrow::compute::SelectKOptions which is a defined by using OrderBySinkNode definition. This option returns a sink node that receives inputs and then compute top_k/bottom_k.

Note

This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

SelectK example:

609/// \brief An example showing a select-k node
610///
611/// Source-KSelect
612/// This example shows how K number of elements can be selected
613/// either from the top or bottom. The output node is a modified
614/// sink node where output can be obtained as a table.
615arrow::Status SourceKSelectExample() {
616  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
617  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
618                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
619  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
620
621  ARROW_ASSIGN_OR_RAISE(
622      ac::ExecNode * source,
623      ac::MakeExecNode("source", plan.get(), {},
624                       ac::SourceNodeOptions{input.schema, input.gen()}));
625
626  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
627
628  ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
629                                       ac::SelectKSinkNodeOptions{options, &sink_gen}));
630
631  auto schema = arrow::schema(
632      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
633
634  return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
635}

table_sink

The table_sink node provides the ability to receive the output as an in-memory table. This is simpler to use than the other sink nodes provided by the streaming execution engine but it only makes sense when the output fits comfortably in memory. The node is created using arrow::compute::TableSinkNodeOptions.

Example of using table_sink

727/// \brief An example showing a table sink node
728///
729/// TableSink Example
730/// This example shows how a table_sink can be used
731/// in an execution plan. This includes a source node
732/// receiving data as batches and the table sink node
733/// which emits the output as a table.
734arrow::Status TableSinkExample() {
735  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
736                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
737
738  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
739
740  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
741
742  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
743                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
744
745  std::shared_ptr<arrow::Table> output_table;
746  auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
747
748  ARROW_RETURN_NOT_OK(
749      ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
750  // validate the ExecPlan
751  ARROW_RETURN_NOT_OK(plan->Validate());
752  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
753  // start the ExecPlan
754  plan->StartProducing();
755
756  // Wait for the plan to finish
757  auto finished = plan->finished();
758  RETURN_NOT_OK(finished.status());
759  std::cout << "Results : " << output_table->ToString() << std::endl;
760  return arrow::Status::OK();
761}

scan

scan is an operation used to load and process datasets. It should be preferred over the more generic source node when your input is a dataset. The behavior is defined using arrow::dataset::ScanNodeOptions. More information on datasets and the various scan options can be found in Tabular Datasets.

This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.

Scan example:

271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280  auto options = std::make_shared<arrow::dataset::ScanOptions>();
281  options->projection = cp::project({}, {});  // create empty projection
282
283  // construct the scan node
284  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286  ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288  return ExecutePlanAndCollectAsTable(std::move(scan));
289}

write

The write node saves query results as a dataset of files in a format like Parquet, Feather, CSV, etc. using the Tabular Datasets functionality in Arrow. The write options are provided via the arrow::dataset::WriteNodeOptions which in turn contains arrow::dataset::FileSystemDatasetWriteOptions. arrow::dataset::FileSystemDatasetWriteOptions provides control over the written dataset, including options like the output directory, file naming scheme, and so on.

Write example:

641/// \brief An example showing a write node
642/// \param file_path The destination to write to
643///
644/// Scan-Filter-Write
645/// This example shows how scan node can be used to load the data
646/// and after processing how it can be written to disk.
647arrow::Status ScanFilterWriteExample(const std::string& file_path) {
648  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
649
650  auto options = std::make_shared<arrow::dataset::ScanOptions>();
651  // empty projection
652  options->projection = cp::project({}, {});
653
654  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
655
656  ac::Declaration scan{"scan", std::move(scan_node_options)};
657
658  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
659
660  std::string root_path = "";
661  std::string uri = "file://" + file_path;
662  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
663                        arrow::fs::FileSystemFromUri(uri, &root_path));
664
665  auto base_path = root_path + "/parquet_dataset";
666  // Uncomment the following line, if run repeatedly
667  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
668  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
669
670  // The partition schema determines which fields are part of the partitioning.
671  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
672  // We'll use Hive-style partitioning,
673  // which creates directories with "key=value" pairs.
674
675  auto partitioning =
676      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
677  // We'll write Parquet files.
678  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
679
680  arrow::dataset::FileSystemDatasetWriteOptions write_options;
681  write_options.file_write_options = format->DefaultWriteOptions();
682  write_options.filesystem = filesystem;
683  write_options.base_dir = base_path;
684  write_options.partitioning = partitioning;
685  write_options.basename_template = "part{i}.parquet";
686
687  arrow::dataset::WriteNodeOptions write_node_options{write_options};
688
689  ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
690
691  // Since the write node has no output we simply run the plan to completion and the
692  // data should be written
693  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
694
695  std::cout << "Dataset written to " << base_path << std::endl;
696  return arrow::Status::OK();
697}

union

union merges multiple data streams with the same schema into one, similar to a SQL UNION ALL clause.

The following example demonstrates how this can be achieved using two data sources.

Union example:

703/// \brief An example showing a union node
704///
705/// Source-Union-Table
706/// This example shows how a union operation can be applied on two
707/// data sources. The output is collected into a table.
708arrow::Status SourceUnionSinkExample() {
709  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
710
711  ac::Declaration lhs{"source",
712                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
713  lhs.label = "lhs";
714  ac::Declaration rhs{"source",
715                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
716  rhs.label = "rhs";
717  ac::Declaration union_plan{
718      "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
719
720  return ExecutePlanAndCollectAsTable(std::move(union_plan));
721}

hash_join

hash_join operation provides the relational algebra operation, join using hash-based algorithm. arrow::compute::HashJoinNodeOptions contains the options required in defining a join. The hash_join supports left/right/full semi/anti/outerjoins. Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x” which can be appended as a suffix for column names duplicated in both left and right relations.) can be set via the the join options. Read more on hash-joins.

Hash-Join example:

582/// \brief An example showing a hash join node
583///
584/// Source-HashJoin-Table
585/// This example shows how source node gets the data and how a self-join
586/// is applied on the data. The join options are configurable. The output
587/// is collected into a table.
588arrow::Status SourceHashJoinSinkExample() {
589  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
590
591  ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
592  ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
593
594  ac::HashJoinNodeOptions join_opts{
595      ac::JoinType::INNER,
596      /*left_keys=*/{"str"},
597      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
598
599  ac::Declaration hashjoin{
600      "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
601
602  return ExecutePlanAndCollectAsTable(std::move(hashjoin));
603}

Summary

There are examples of these nodes which can be found in cpp/examples/arrow/execution_plan_documentation_examples.cc in the Arrow source.

Complete Example:

 19#include <arrow/array.h>
 20#include <arrow/builder.h>
 21
 22#include <arrow/acero/exec_plan.h>
 23#include <arrow/compute/api.h>
 24#include <arrow/compute/api_vector.h>
 25#include <arrow/compute/cast.h>
 26
 27#include <arrow/csv/api.h>
 28
 29#include <arrow/dataset/dataset.h>
 30#include <arrow/dataset/file_base.h>
 31#include <arrow/dataset/file_parquet.h>
 32#include <arrow/dataset/plan.h>
 33#include <arrow/dataset/scanner.h>
 34
 35#include <arrow/io/interfaces.h>
 36#include <arrow/io/memory.h>
 37
 38#include <arrow/result.h>
 39#include <arrow/status.h>
 40#include <arrow/table.h>
 41
 42#include <arrow/ipc/api.h>
 43
 44#include <arrow/util/future.h>
 45#include <arrow/util/range.h>
 46#include <arrow/util/thread_pool.h>
 47#include <arrow/util/vector.h>
 48
 49#include <iostream>
 50#include <memory>
 51#include <utility>
 52
 53// Demonstrate various operators in Arrow Streaming Execution Engine
 54
 55namespace cp = ::arrow::compute;
 56namespace ac = ::arrow::acero;
 57
 58constexpr char kSep[] = "******";
 59
 60void PrintBlock(const std::string& msg) {
 61  std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
 62}
 63
 64template <typename TYPE,
 65          typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
 66                                             arrow::is_boolean_type<TYPE>::value |
 67                                             arrow::is_temporal_type<TYPE>::value>::type>
 68arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
 69    const std::vector<typename TYPE::c_type>& values) {
 70  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
 71  ArrowBuilderType builder;
 72  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 73  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 74  return builder.Finish();
 75}
 76
 77template <class TYPE>
 78arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
 79    const std::vector<std::string>& values) {
 80  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
 81  ArrowBuilderType builder;
 82  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 83  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 84  return builder.Finish();
 85}
 86
 87arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
 88    const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
 89  std::shared_ptr<arrow::RecordBatch> record_batch;
 90  ARROW_ASSIGN_OR_RAISE(auto struct_result,
 91                        arrow::StructArray::Make(array_vector, field_vector));
 92  return record_batch->FromStructArray(struct_result);
 93}
 94
 95/// \brief Create a sample table
 96/// The table's contents will be:
 97/// a,b
 98/// 1,null
 99/// 2,true
100/// null,true
101/// 3,false
102/// null,true
103/// 4,false
104/// 5,null
105/// 6,false
106/// 7,false
107/// 8,true
108/// \return The created table
109
110arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
111  auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
112  ARROW_ASSIGN_OR_RAISE(auto int64_array,
113                        GetArrayDataSample<arrow::Int64Type>(
114                            {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
115
116  arrow::BooleanBuilder boolean_builder;
117  std::shared_ptr<arrow::BooleanArray> bool_array;
118
119  std::vector<uint8_t> bool_values = {false, true,  true,  false, true,
120                                      false, false, false, false, true};
121  std::vector<bool> is_valid = {false, true,  true, true, true,
122                                true,  false, true, true, true};
123
124  ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
125
126  ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
127
128  ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
129
130  auto record_batch =
131      arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
132                                              arrow::field("b", arrow::boolean())}),
133                               10, {int64_array, bool_array});
134  ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
135  return table;
136}
137
138/// \brief Create a sample dataset
139/// \return An in-memory dataset based on GetTable()
140arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
141  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
142  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
143  return ds;
144}
145
146arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
147    const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
148  std::shared_ptr<arrow::RecordBatch> record_batch;
149  ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
150  cp::ExecBatch batch{*res_batch};
151  return batch;
152}
153
154// (Doc section: BatchesWithSchema Definition)
155struct BatchesWithSchema {
156  std::vector<cp::ExecBatch> batches;
157  std::shared_ptr<arrow::Schema> schema;
158  // This method uses internal arrow utilities to
159  // convert a vector of record batches to an AsyncGenerator of optional batches
160  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
161    auto opt_batches = ::arrow::internal::MapVector(
162        [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
163        batches);
164    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
165    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166    return gen;
167  }
168};
169// (Doc section: BatchesWithSchema Definition)
170
171// (Doc section: MakeBasicBatches Definition)
172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173  BatchesWithSchema out;
174  auto field_vector = {arrow::field("a", arrow::int32()),
175                       arrow::field("b", arrow::boolean())};
176  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181                        GetArrayDataSample<arrow::BooleanType>({false, true}));
182  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187  ARROW_ASSIGN_OR_RAISE(auto b1,
188                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189  ARROW_ASSIGN_OR_RAISE(auto b2,
190                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191  ARROW_ASSIGN_OR_RAISE(auto b3,
192                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194  out.batches = {b1, b2, b3};
195  out.schema = arrow::schema(field_vector);
196  return out;
197}
198// (Doc section: MakeBasicBatches Definition)
199
200arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
201  BatchesWithSchema out;
202  auto field = arrow::field("a", arrow::int32());
203  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
204  ARROW_ASSIGN_OR_RAISE(auto b2_int,
205                        GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
206  ARROW_ASSIGN_OR_RAISE(auto b3_int,
207                        GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
208  ARROW_ASSIGN_OR_RAISE(auto b4_int,
209                        GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
210  ARROW_ASSIGN_OR_RAISE(auto b5_int,
211                        GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
212  ARROW_ASSIGN_OR_RAISE(auto b6_int,
213                        GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
214  ARROW_ASSIGN_OR_RAISE(auto b7_int,
215                        GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
216  ARROW_ASSIGN_OR_RAISE(auto b8_int,
217                        GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
218
219  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
220  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
221  ARROW_ASSIGN_OR_RAISE(auto b3,
222                        GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
223  ARROW_ASSIGN_OR_RAISE(auto b4,
224                        GetExecBatchFromVectors({field, field, field, field},
225                                                {b4_int, b5_int, b6_int, b7_int}));
226  out.batches = {b1, b2, b3, b4};
227  out.schema = arrow::schema({field});
228  return out;
229}
230
231arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
232  BatchesWithSchema out;
233  auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
234  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
235  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
236  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
237  ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
238                                         {"alpha", "beta", "alpha"}));
239  ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
240                                         {"alpha", "gamma", "alpha"}));
241  ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
242                                         {"gamma", "beta", "alpha"}));
243  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
244  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
245  ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
246  out.batches = {b1, b2, b3};
247
248  size_t batch_count = out.batches.size();
249  for (int repeat = 1; repeat < multiplicity; ++repeat) {
250    for (size_t i = 0; i < batch_count; ++i) {
251      out.batches.push_back(out.batches[i]);
252    }
253  }
254
255  out.schema = arrow::schema(fields);
256  return out;
257}
258
259arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) {
260  // collect sink_reader into a Table
261  std::shared_ptr<arrow::Table> response_table;
262  ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan)));
263
264  std::cout << "Results : " << response_table->ToString() << std::endl;
265
266  return arrow::Status::OK();
267}
268
269// (Doc section: Scan Example)
270
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280  auto options = std::make_shared<arrow::dataset::ScanOptions>();
281  options->projection = cp::project({}, {});  // create empty projection
282
283  // construct the scan node
284  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286  ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288  return ExecutePlanAndCollectAsTable(std::move(scan));
289}
290// (Doc section: Scan Example)
291
292// (Doc section: Source Example)
293
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom souce is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}
313// (Doc section: Source Example)
314
315// (Doc section: Table Source Example)
316
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table.  This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329  int max_batch_size = 2;
330  auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332  ac::Declaration source{"table_source", std::move(table_source_options)};
333
334  return ExecutePlanAndCollectAsTable(std::move(source));
335}
336// (Doc section: Table Source Example)
337
338// (Doc section: Filter Example)
339
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the exeuction plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349  auto options = std::make_shared<arrow::dataset::ScanOptions>();
350  // specify the filter.  This filter removes all rows where the
351  // value of the "a" column is greater than 3.
352  cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353  // set filter for scanner : on-disk / push-down filtering.
354  // This step can be skipped if you are not reading from disk.
355  options->filter = filter_expr;
356  // empty projection
357  options->projection = cp::project({}, {});
358
359  // construct the scan node
360  std::cout << "Initialized Scanning Options" << std::endl;
361
362  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363  std::cout << "Scan node options created" << std::endl;
364
365  ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367  // pipe the scan node into the filter node
368  // Need to set the filter in scan node options and filter node options.
369  // At scan node it is used for on-disk / push-down filtering.
370  // At filter node it is used for in-memory filtering.
371  ac::Declaration filter{
372      "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374  return ExecutePlanAndCollectAsTable(std::move(filter));
375}
376
377// (Doc section: Filter Example)
378
379// (Doc section: Project Example)
380
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390  auto options = std::make_shared<arrow::dataset::ScanOptions>();
391  // projection
392  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393  options->projection = cp::project({}, {});
394
395  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397  ac::Declaration scan{"scan", std::move(scan_node_options)};
398  ac::Declaration project{
399      "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401  return ExecutePlanAndCollectAsTable(std::move(project));
402}
403
404// (Doc section: Project Example)
405
406// (Doc section: Scalar Aggregate Example)
407
408/// \brief An example showing an aggregation node to aggregate an entire table
409///
410/// Source-Aggregation-Table
411/// This example shows how an aggregation operation can be applied on a
412/// execution plan resulting in a scalar output. The source node loads the
413/// data and the aggregation (counting unique types in column 'a')
414/// is applied on this data. The output is collected into a table (that will
415/// have exactly one row)
416arrow::Status SourceScalarAggregateSinkExample() {
417  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
418
419  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
420
421  ac::Declaration source{"source", std::move(source_node_options)};
422  auto aggregate_options =
423      ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
424  ac::Declaration aggregate{
425      "aggregate", {std::move(source)}, std::move(aggregate_options)};
426
427  return ExecutePlanAndCollectAsTable(std::move(aggregate));
428}
429// (Doc section: Scalar Aggregate Example)
430
431// (Doc section: Group Aggregate Example)
432
433/// \brief An example showing an aggregation node to perform a group-by operation
434///
435/// Source-Aggregation-Table
436/// This example shows how an aggregation operation can be applied on a
437/// execution plan resulting in grouped output. The source node loads the
438/// data and the aggregation (counting unique types in column 'a') is
439/// applied on this data. The output is collected into a table that will contain
440/// one row for each unique combination of group keys.
441arrow::Status SourceGroupAggregateSinkExample() {
442  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
443
444  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
445
446  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
447
448  ac::Declaration source{"source", std::move(source_node_options)};
449  auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
450  auto aggregate_options =
451      ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
452                               /*keys=*/{"b"}};
453  ac::Declaration aggregate{
454      "aggregate", {std::move(source)}, std::move(aggregate_options)};
455
456  return ExecutePlanAndCollectAsTable(std::move(aggregate));
457}
458// (Doc section: Group Aggregate Example)
459
460// (Doc section: ConsumingSink Example)
461
462/// \brief An example showing a consuming sink node
463///
464/// Source-Consuming-Sink
465/// This example shows how the data can be consumed within the execution plan
466/// by using a ConsumingSink node. There is no data output from this execution plan.
467arrow::Status SourceConsumingSinkExample() {
468  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
469
470  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
471
472  ac::Declaration source{"source", std::move(source_node_options)};
473
474  std::atomic<uint32_t> batches_seen{0};
475  arrow::Future<> finish = arrow::Future<>::Make();
476  struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
477    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
478        : batches_seen(batches_seen), finish(std::move(finish)) {}
479
480    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
481                       ac::BackpressureControl* backpressure_control,
482                       ac::ExecPlan* plan) override {
483      // This will be called as the plan is started (before the first call to Consume)
484      // and provides the schema of the data coming into the node, controls for pausing /
485      // resuming input, and a pointer to the plan itself which can be used to access
486      // other utilities such as the thread indexer or async task scheduler.
487      return arrow::Status::OK();
488    }
489
490    arrow::Status Consume(cp::ExecBatch batch) override {
491      (*batches_seen)++;
492      return arrow::Status::OK();
493    }
494
495    arrow::Future<> Finish() override {
496      // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
497      // output file handles and flushing remaining work
498      return arrow::Future<>::MakeFinished();
499    }
500
501    std::atomic<uint32_t>* batches_seen;
502    arrow::Future<> finish;
503  };
504  std::shared_ptr<CustomSinkNodeConsumer> consumer =
505      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
506
507  ac::Declaration consuming_sink{"consuming_sink",
508                                 {std::move(source)},
509                                 ac::ConsumingSinkNodeOptions(std::move(consumer))};
510
511  // Since we are consuming the data within the plan there is no output and we simply
512  // run the plan to completion instead of collecting into a table.
513  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
514
515  std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
516            << std::endl;
517  return arrow::Status::OK();
518}
519// (Doc section: ConsumingSink Example)
520
521// (Doc section: OrderBySink Example)
522
523arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
524    std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
525    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
526  // translate sink_gen (async) to sink_reader (sync)
527  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
528      ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
529
530  // validate the ExecPlan
531  ARROW_RETURN_NOT_OK(plan->Validate());
532  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
533  // start the ExecPlan
534  plan->StartProducing();
535
536  // collect sink_reader into a Table
537  std::shared_ptr<arrow::Table> response_table;
538
539  ARROW_ASSIGN_OR_RAISE(response_table,
540                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
541
542  std::cout << "Results : " << response_table->ToString() << std::endl;
543
544  // stop producing
545  plan->StopProducing();
546  // plan mark finished
547  auto future = plan->finished();
548  return future.status();
549}
550
551/// \brief An example showing an order-by node
552///
553/// Source-OrderBy-Sink
554/// In this example, the data enters through the source node
555/// and the data is ordered in the sink node. The order can be
556/// ASCENDING or DESCENDING and it is configurable. The output
557/// is obtained as a table from the sink node.
558arrow::Status SourceOrderBySinkExample() {
559  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
560                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
561
562  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
563
564  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
565
566  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
567  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
568                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
569
570  ARROW_RETURN_NOT_OK(ac::MakeExecNode(
571      "order_by_sink", plan.get(), {source},
572      ac::OrderBySinkNodeOptions{
573          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
574
575  return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
576}
577
578// (Doc section: OrderBySink Example)
579
580// (Doc section: HashJoin Example)
581
582/// \brief An example showing a hash join node
583///
584/// Source-HashJoin-Table
585/// This example shows how source node gets the data and how a self-join
586/// is applied on the data. The join options are configurable. The output
587/// is collected into a table.
588arrow::Status SourceHashJoinSinkExample() {
589  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
590
591  ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
592  ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
593
594  ac::HashJoinNodeOptions join_opts{
595      ac::JoinType::INNER,
596      /*left_keys=*/{"str"},
597      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
598
599  ac::Declaration hashjoin{
600      "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
601
602  return ExecutePlanAndCollectAsTable(std::move(hashjoin));
603}
604
605// (Doc section: HashJoin Example)
606
607// (Doc section: KSelect Example)
608
609/// \brief An example showing a select-k node
610///
611/// Source-KSelect
612/// This example shows how K number of elements can be selected
613/// either from the top or bottom. The output node is a modified
614/// sink node where output can be obtained as a table.
615arrow::Status SourceKSelectExample() {
616  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
617  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
618                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
619  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
620
621  ARROW_ASSIGN_OR_RAISE(
622      ac::ExecNode * source,
623      ac::MakeExecNode("source", plan.get(), {},
624                       ac::SourceNodeOptions{input.schema, input.gen()}));
625
626  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
627
628  ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
629                                       ac::SelectKSinkNodeOptions{options, &sink_gen}));
630
631  auto schema = arrow::schema(
632      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
633
634  return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
635}
636
637// (Doc section: KSelect Example)
638
639// (Doc section: Write Example)
640
641/// \brief An example showing a write node
642/// \param file_path The destination to write to
643///
644/// Scan-Filter-Write
645/// This example shows how scan node can be used to load the data
646/// and after processing how it can be written to disk.
647arrow::Status ScanFilterWriteExample(const std::string& file_path) {
648  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
649
650  auto options = std::make_shared<arrow::dataset::ScanOptions>();
651  // empty projection
652  options->projection = cp::project({}, {});
653
654  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
655
656  ac::Declaration scan{"scan", std::move(scan_node_options)};
657
658  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
659
660  std::string root_path = "";
661  std::string uri = "file://" + file_path;
662  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
663                        arrow::fs::FileSystemFromUri(uri, &root_path));
664
665  auto base_path = root_path + "/parquet_dataset";
666  // Uncomment the following line, if run repeatedly
667  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
668  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
669
670  // The partition schema determines which fields are part of the partitioning.
671  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
672  // We'll use Hive-style partitioning,
673  // which creates directories with "key=value" pairs.
674
675  auto partitioning =
676      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
677  // We'll write Parquet files.
678  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
679
680  arrow::dataset::FileSystemDatasetWriteOptions write_options;
681  write_options.file_write_options = format->DefaultWriteOptions();
682  write_options.filesystem = filesystem;
683  write_options.base_dir = base_path;
684  write_options.partitioning = partitioning;
685  write_options.basename_template = "part{i}.parquet";
686
687  arrow::dataset::WriteNodeOptions write_node_options{write_options};
688
689  ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
690
691  // Since the write node has no output we simply run the plan to completion and the
692  // data should be written
693  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
694
695  std::cout << "Dataset written to " << base_path << std::endl;
696  return arrow::Status::OK();
697}
698
699// (Doc section: Write Example)
700
701// (Doc section: Union Example)
702
703/// \brief An example showing a union node
704///
705/// Source-Union-Table
706/// This example shows how a union operation can be applied on two
707/// data sources. The output is collected into a table.
708arrow::Status SourceUnionSinkExample() {
709  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
710
711  ac::Declaration lhs{"source",
712                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
713  lhs.label = "lhs";
714  ac::Declaration rhs{"source",
715                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
716  rhs.label = "rhs";
717  ac::Declaration union_plan{
718      "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
719
720  return ExecutePlanAndCollectAsTable(std::move(union_plan));
721}
722
723// (Doc section: Union Example)
724
725// (Doc section: Table Sink Example)
726
727/// \brief An example showing a table sink node
728///
729/// TableSink Example
730/// This example shows how a table_sink can be used
731/// in an execution plan. This includes a source node
732/// receiving data as batches and the table sink node
733/// which emits the output as a table.
734arrow::Status TableSinkExample() {
735  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
736                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
737
738  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
739
740  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
741
742  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
743                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
744
745  std::shared_ptr<arrow::Table> output_table;
746  auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
747
748  ARROW_RETURN_NOT_OK(
749      ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
750  // validate the ExecPlan
751  ARROW_RETURN_NOT_OK(plan->Validate());
752  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
753  // start the ExecPlan
754  plan->StartProducing();
755
756  // Wait for the plan to finish
757  auto finished = plan->finished();
758  RETURN_NOT_OK(finished.status());
759  std::cout << "Results : " << output_table->ToString() << std::endl;
760  return arrow::Status::OK();
761}
762
763// (Doc section: Table Sink Example)
764
765// (Doc section: RecordBatchReaderSource Example)
766
767/// \brief An example showing the usage of a RecordBatchReader as the data source.
768///
769/// RecordBatchReaderSourceSink Example
770/// This example shows how a record_batch_reader_source can be used
771/// in an execution plan. This includes the source node
772/// receiving data from a TableRecordBatchReader.
773
774arrow::Status RecordBatchReaderSourceSinkExample() {
775  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
776  std::shared_ptr<arrow::RecordBatchReader> reader =
777      std::make_shared<arrow::TableBatchReader>(table);
778  ac::Declaration reader_source{"record_batch_reader_source",
779                                ac::RecordBatchReaderSourceNodeOptions{reader}};
780  return ExecutePlanAndCollectAsTable(std::move(reader_source));
781}
782
783// (Doc section: RecordBatchReaderSource Example)
784
785enum ExampleMode {
786  SOURCE_SINK = 0,
787  TABLE_SOURCE_SINK = 1,
788  SCAN = 2,
789  FILTER = 3,
790  PROJECT = 4,
791  SCALAR_AGGREGATION = 5,
792  GROUP_AGGREGATION = 6,
793  CONSUMING_SINK = 7,
794  ORDER_BY_SINK = 8,
795  HASHJOIN = 9,
796  KSELECT = 10,
797  WRITE = 11,
798  UNION = 12,
799  TABLE_SOURCE_TABLE_SINK = 13,
800  RECORD_BATCH_READER_SOURCE = 14
801};
802
803int main(int argc, char** argv) {
804  if (argc < 3) {
805    // Fake success for CI purposes.
806    return EXIT_SUCCESS;
807  }
808
809  std::string base_save_path = argv[1];
810  int mode = std::atoi(argv[2]);
811  arrow::Status status;
812  // ensure arrow::dataset node factories are in the registry
813  arrow::dataset::internal::Initialize();
814  switch (mode) {
815    case SOURCE_SINK:
816      PrintBlock("Source Sink Example");
817      status = SourceSinkExample();
818      break;
819    case TABLE_SOURCE_SINK:
820      PrintBlock("Table Source Sink Example");
821      status = TableSourceSinkExample();
822      break;
823    case SCAN:
824      PrintBlock("Scan Example");
825      status = ScanSinkExample();
826      break;
827    case FILTER:
828      PrintBlock("Filter Example");
829      status = ScanFilterSinkExample();
830      break;
831    case PROJECT:
832      PrintBlock("Project Example");
833      status = ScanProjectSinkExample();
834      break;
835    case GROUP_AGGREGATION:
836      PrintBlock("Aggregate Example");
837      status = SourceGroupAggregateSinkExample();
838      break;
839    case SCALAR_AGGREGATION:
840      PrintBlock("Aggregate Example");
841      status = SourceScalarAggregateSinkExample();
842      break;
843    case CONSUMING_SINK:
844      PrintBlock("Consuming-Sink Example");
845      status = SourceConsumingSinkExample();
846      break;
847    case ORDER_BY_SINK:
848      PrintBlock("OrderBy Example");
849      status = SourceOrderBySinkExample();
850      break;
851    case HASHJOIN:
852      PrintBlock("HashJoin Example");
853      status = SourceHashJoinSinkExample();
854      break;
855    case KSELECT:
856      PrintBlock("KSelect Example");
857      status = SourceKSelectExample();
858      break;
859    case WRITE:
860      PrintBlock("Write Example");
861      status = ScanFilterWriteExample(base_save_path);
862      break;
863    case UNION:
864      PrintBlock("Union Example");
865      status = SourceUnionSinkExample();
866      break;
867    case TABLE_SOURCE_TABLE_SINK:
868      PrintBlock("TableSink Example");
869      status = TableSinkExample();
870      break;
871    case RECORD_BATCH_READER_SOURCE:
872      PrintBlock("RecordBatchReaderSource Example");
873      status = RecordBatchReaderSourceSinkExample();
874      break;
875    default:
876      break;
877  }
878
879  if (status.ok()) {
880    return EXIT_SUCCESS;
881  } else {
882    std::cout << "Error occurred: " << status.message() << std::endl;
883    return EXIT_FAILURE;
884  }
885}