Acero: A C++ streaming execution engine

Warning

Acero is experimental and a stable API is not yet guaranteed.

Motivation

For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, the Arrow C++ implementation also provides Acero, a streaming query engine with which computations can be formulated and executed.

An example graph of a streaming execution workflow.

Acero allows computation to be expressed as an “execution plan” (ExecPlan) which is a directed graph of operators. Each operator (ExecNode) provides, transforms, or consumes the data passing through it. Batches of data (ExecBatch) flow along edges of the graph from node to node. Structuring the API around streams of batches allows the working set for each node to be tuned for optimal performance independent of any other nodes in the graph. Each ExecNode processes batches as they are pushed to it along an edge of the graph by upstream nodes (its inputs), and pushes batches along an edge of the graph to downstream nodes (its outputs) as they are finalized.

Substrait

In order to use Acero you will need to create an execution plan. This is the model that describes the computation you want to apply to your data. Acero has its own internal representation for execution plans but most users should not interact with this directly as it will couple their code to Acero.

Substrait is an open standard for execution plans. Acero implements the Substrait “consumer” interface. This means that Acero can accept a Substrait plan and fulfill the plan, loading the requested data and applying the desired computation. By using Substrait plans users can easily switch out to a different execution engine at a later time.

Substrait Conformance

Substrait defines a broad set of operators and functions for many different situations and it is unlikely that Acero will ever completely satisfy all defined Substrait operators and functions. To help understand what features are available the following sections define which features have been currently implemented in Acero and any caveats that apply.

Plans

  • A plan should have a single top-level relation.

  • The consumer is currently based on a custom build of Substrait that is older than 0.1.0. Any features added that are newer than 0.1.0 will not be supported.

Extensions

  • If a plan contains any extension type variations it will be rejected.

  • If a plan contains any advanced extensions it will be rejected.

Relations (in general)

  • The emit property (to customize output order of a node or to drop columns) is not supported and plans containing this property will be rejected.

  • The hint property is not supported and plans containing this property will be rejected.

  • Any advanced extensions will cause a plan to be rejected.

  • Any relation not explicitly listed below will not be supported and will cause the plan to be rejected.

Read Relations

  • The projection property is not supported and plans containing this property will be rejected.

  • The only supported read type is LocalFiles. Plans with any other type will be rejected.

  • Only the parquet file format is currently supported.

  • All URIs must use the file scheme

  • partition_index, start, and length are not supported. Plans containing these properties will be rejected.

  • The Substrait spec requires that a filter be completely satisfied by a read relation. However, Acero only uses a read filter for pushdown projection and it may not be fully satisfied. Users should generally attach an additional filter relation with the same filter expression after the read relation.

Filter Relations

  • No know caveats

Project Relations

  • No known caveats

Join Relations

  • The join type JOIN_TYPE_SINGLE is not supported and plans containing this will be rejected.

  • The join expression must be a call to either the equal or is_not_distinct_from functions. Both arguments to the call must be direct references. Only a single join key is supported.

  • The post_join_filter property is not supported and will be ignored.

Aggregate Relations

  • At most one grouping set is supported.

  • Each grouping expression must be a direct reference.

  • Each measure’s arguments must be direct references.

  • A measure may not have a filter

  • A measure may not have sorts

  • A measure’s invocation must be AGGREGATION_INVOCATION_ALL

  • A measure’s phase must be AGGREGATION_PHASE_INITIAL_TO_RESULT

Expressions (general)

  • Various places in the Substrait spec allow for expressions to be used outside of a filter or project relation. For example, a join expression or an aggregate grouping set. Acero typically expects these expressions to be direct references. Planners should extract the implicit projection into a formal project relation before delivering the plan to Acero.

  • Older versions of Isthmus would omit optional arguments instead of including them as unspecified enums. Acero will not support these plans.

Literals

  • A literal with non-default nullability will cause a plan to be rejected.

Types

  • Acero does not have full support for non-nullable types and may allow input to have nulls without rejecting it.

  • The table below shows the mapping between Arrow types and Substrait type classes that are currently supported

Functions

  • Acero does not support the legacy args style of declaring arguments

  • The following functions have caveats or are not supported at all. Note that this is not a comprehensive list. Functions are being added to Substrait at a rapid pace and new functions may be missing.

    • Acero does not support the SATURATE option for overflow

    • Acero does not support kernels that take more than two arguments for the functions and, or, xor

    • Acero does not support temporal arithmetic

    • Acero does not support the following standard functions:

      • is_not_distinct_from

      • like

      • substring

      • starts_with

      • ends_with

      • contains

      • count

      • count_distinct

      • approx_count_distinct

  • The functions above must be referenced using the URI https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml

Architecture Overview

ExecNode

Each node in the graph is an implementation of the ExecNode interface.

ExecPlan

A set of ExecNode is contained and (to an extent) coordinated by an ExecPlan.

ExecFactoryRegistry

Instances of ExecNode are constructed by factory functions held in a ExecFactoryRegistry.

ExecNodeOptions

Heterogenous parameters for factories of ExecNode are bundled in an ExecNodeOptions.

Declaration

dplyr-inspired helper for efficient construction of an ExecPlan.

ExecBatch

A lightweight container for a single chunk of data in the Arrow format. In contrast to RecordBatch, ExecBatch is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by a Scalar instead of an Array. In addition, ExecBatch may carry execution-relevant properties including a guaranteed-true-filter for Expression simplification.

An example ExecNode implementation which simply passes all input batches through unchanged:

class PassthruNode : public ExecNode {
 public:
  // InputReceived is the main entry point for ExecNodes. It is invoked
  // by an input of this node to push a batch here for processing.
  void InputReceived(ExecNode* input, ExecBatch batch) override {
    // Since this is a passthru node we simply push the batch to our
    // only output here.
    outputs_[0]->InputReceived(this, batch);
  }

  // ErrorReceived is called by an input of this node to report an error.
  // ExecNodes should always forward errors to their outputs unless they
  // are able to fully handle the error (this is rare).
  void ErrorReceived(ExecNode* input, Status error) override {
    outputs_[0]->ErrorReceived(this, error);
  }

  // InputFinished is used to signal how many batches will ultimately arrive.
  // It may be called with any ordering relative to InputReceived/ErrorReceived.
  void InputFinished(ExecNode* input, int total_batches) override {
    outputs_[0]->InputFinished(this, total_batches);
  }

  // ExecNodes may request that their inputs throttle production of batches
  // until they are ready for more, or stop production if no further batches
  // are required.  These signals should typically be forwarded to the inputs
  // of the ExecNode.
  void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
  void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
  void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }

  // An ExecNode has a single output schema to which all its batches conform.
  using ExecNode::output_schema;

  // ExecNodes carry basic introspection for debugging purposes
  const char* kind_name() const override { return "PassthruNode"; }
  using ExecNode::label;
  using ExecNode::SetLabel;
  using ExecNode::ToString;

  // An ExecNode holds references to its inputs and outputs, so it is possible
  // to walk the graph of execution if necessary.
  using ExecNode::inputs;
  using ExecNode::outputs;

  // StartProducing() and StopProducing() are invoked by an ExecPlan to
  // coordinate the graph-wide execution state.  These do not need to be
  // forwarded to inputs or outputs.
  Status StartProducing() override { return Status::OK(); }
  void StopProducing() override {}
  Future<> finished() override { return inputs_[0]->finished(); }
};

Note that each method which is associated with an edge of the graph must be invoked with an ExecNode* to identify the node which invoked it. For example, in an ExecNode which implements JOIN this tagging might be used to differentiate between batches from the left or right inputs. InputReceived, ErrorReceived, InputFinished may only be invoked by the inputs of a node, while ResumeProducing, PauseProducing, StopProducing may only be invoked by outputs of a node.

ExecPlan contains the associated instances of ExecNode and is used to start and stop execution of all nodes and for querying/awaiting their completion:

// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));

// ... add nodes to your ExecPlan

// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());

SetUserCancellationCallback([plan] {
  // stop all nodes in the graph
  plan->StopProducing();
});

// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();

Constructing ExecPlan objects

Warning

The following will be superceded by construction from Compute IR, see ARROW-14074.

None of the concrete implementations of ExecNode are exposed in headers, so they can’t be constructed directly outside the translation unit where they are defined. Instead, factories to create them are provided in an extensible registry. This structure provides a number of benefits:

  • This enforces consistent construction.

  • It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)

  • This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate libarrow_dataset.so library.

  • Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.

Factories of ExecNode can be retrieved by name from the registry. The default registry is available through arrow::compute::default_exec_factory_registry() and can be queried for the built-in factories:

// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
                      default_exec_factory_registry()->GetFactory("filter"));

// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
    // the ExecPlan which should own this node
    plan.get(),

    // nodes which will send batches to this node (inputs)
    {scan_node},

    // parameters unique to "filter" nodes
    FilterNodeOptions{filter_expression}));

// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
    plan.get(), {scan_node}, FilterNodeOptions{filter_expression});

Factories can also be added to the default registry as long as they are convertible to std::function<Result<ExecNode*>( ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>.

To build an ExecPlan representing a simple pipeline which reads from a RecordBatchReader then filters, projects, and writes to disk:

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
                                      SourceNodeOptions::FromReader(
                                          reader,
                                          GetCpuThreadPool()));

ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
                                      FilterNodeOptions{
                                        greater(field_ref("score"), literal(3))
                                      });

ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
                                       ProjectNodeOptions{
                                         {add(field_ref("score"), literal(1))},
                                         {"score + 1"}
                                       });

arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
             WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});

Declaration is a dplyr-inspired helper which further decreases the boilerplate associated with populating an ExecPlan from C++:

arrow::dataset::internal::Initialize();

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
              {
                  {"source", SourceNodeOptions::FromReader(
                       reader,
                       GetCpuThreadPool())},
                  {"filter", FilterNodeOptions{
                       greater(field_ref("score"), literal(3))}},
                  {"project", ProjectNodeOptions{
                       {add(field_ref("score"), literal(1))},
                       {"score + 1"}}},
                  {"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
              })
              .AddToPlan(plan.get()));

Note that a source node can wrap anything which resembles a stream of batches. For example, PR#11032 adds support for use of a DuckDB query as a source node. Similarly, a sink node can wrap anything which absorbs a stream of batches. In the example above we’re writing completed batches to disk. However we can also collect these in memory into a Table or forward them to a RecordBatchReader as an out-of-graph stream. This flexibility allows an ExecPlan to be used as streaming middleware between any endpoints which support Arrow formatted batches.

An arrow::dataset::Dataset can also be wrapped as a source node which pushes all the dataset’s batches into an ExecPlan. This factory is added to the default registry with the name "scan" by calling arrow::dataset::internal::Initialize():

arrow::dataset::internal::Initialize();

std::shared_ptr<Dataset> dataset = GetDataset();

ASSERT_OK(Declaration::Sequence(
              {
                  {"scan", ScanNodeOptions{dataset,
                     /* push down predicate, projection, ... */}},
                  {"filter", FilterNodeOptions{/* ... */}},
                  // ...
              })
              .AddToPlan(plan.get()));

Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.

Constructing ExecNode using Options

ExecNode is the component we use as a building block containing in-built operations with various functionalities.

This is the list of operations associated with the execution plan:

Operations and Options

Operation

Options

source

arrow::compute::SourceNodeOptions

table_source

arrow::compute::TableSourceNodeOptions

filter

arrow::compute::FilterNodeOptions

project

arrow::compute::ProjectNodeOptions

aggregate

arrow::compute::AggregateNodeOptions

sink

arrow::compute::SinkNodeOptions

consuming_sink

arrow::compute::ConsumingSinkNodeOptions

order_by_sink

arrow::compute::OrderBySinkNodeOptions

select_k_sink

arrow::compute::SelectKSinkNodeOptions

scan

arrow::dataset::ScanNodeOptions

hash_join

arrow::compute::HashJoinNodeOptions

write

arrow::dataset::WriteNodeOptions

union

N/A

table_sink

arrow::compute::TableSinkNodeOptions

source

A source operation can be considered as an entry point to create a streaming execution plan. arrow::compute::SourceNodeOptions are used to create the source operation. The source operation is the most generic and flexible type of source currently available but it can be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.

The source node requires some kind of function that can be called to poll for more data. This function should take no arguments and should return an arrow::Future<std::optional<arrow::ExecBatch>>. This function might be reading a file, iterating through an in memory structure, or receiving data from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator and there are a number of utilities for working with these functions. For this example we use a vector of record batches that we’ve already stored in memory. In addition, the schema of the data must be known up front. Acero must know the schema of the data at each stage of the execution graph before any processing has begun. This means we must supply the schema for a source node separately from the data itself.

Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :

155struct BatchesWithSchema {
156  std::vector<cp::ExecBatch> batches;
157  std::shared_ptr<arrow::Schema> schema;
158  // This method uses internal arrow utilities to
159  // convert a vector of record batches to an AsyncGenerator of optional batches
160  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
161    auto opt_batches = ::arrow::internal::MapVector(
162        [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
163        batches);
164    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
165    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166    return gen;
167  }
168};

Generating sample batches for computation:

172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173  BatchesWithSchema out;
174  auto field_vector = {arrow::field("a", arrow::int32()),
175                       arrow::field("b", arrow::boolean())};
176  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181                        GetArrayDataSample<arrow::BooleanType>({false, true}));
182  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187  ARROW_ASSIGN_OR_RAISE(auto b1,
188                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189  ARROW_ASSIGN_OR_RAISE(auto b2,
190                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191  ARROW_ASSIGN_OR_RAISE(auto b3,
192                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194  out.batches = {b1, b2, b3};
195  out.schema = arrow::schema(field_vector);
196  return out;
197}

Example of using source (usage of sink is explained in detail in sink):

325/// \brief An example demonstrating a source and sink node
326/// \param exec_context The execution context to run the plan in
327///
328/// Source-Sink Example
329/// This example shows how a source and sink can be used
330/// in an execution plan. This includes source node receiving data
331/// and the sink node emits the data as an output represented in
332/// a table.
333arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
334  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
335                        cp::ExecPlan::Make(&exec_context));
336
337  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
338
339  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
340
341  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
342
343  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
344                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
345
346  ARROW_RETURN_NOT_OK(
347      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
348
349  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
350}

table_source

In the previous example, source node, a source node was used to input the data. But when developing an application, if the data is already in memory as a table, it is much easier, and more performant to use arrow::compute::TableSourceNodeOptions. Here the input data can be passed as a std::shared_ptr<arrow::Table> along with a max_batch_size. The max_batch_size is to break up large record batches so that they can be processed in parallel. It is important to note that the table batches will not get merged to form larger batches when the source table has a smaller batch size.

Example of using table_source

355/// \brief An example showing a table source node
356/// \param exec_context The execution context to run the plan in
357///
358/// TableSource-Sink Example
359/// This example shows how a table_source and sink can be used
360/// in an execution plan. This includes a table source node
361/// receiving data from a table and the sink node emits
362/// the data to a generator which we collect into a table.
363arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
364  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
365                        cp::ExecPlan::Make(&exec_context));
366
367  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
368
369  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
370  int max_batch_size = 2;
371  auto table_source_options = cp::TableSourceNodeOptions{table, max_batch_size};
372
373  ARROW_ASSIGN_OR_RAISE(
374      cp::ExecNode * source,
375      cp::MakeExecNode("table_source", plan.get(), {}, table_source_options));
376
377  ARROW_RETURN_NOT_OK(
378      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
379
380  return ExecutePlanAndCollectAsTable(exec_context, plan, table->schema(), sink_gen);
381}

filter

filter operation, as the name suggests, provides an option to define data filtering criteria. It selects rows matching a given expression. Filters can be written using arrow::compute::Expression. For example, if we wish to keep rows where the value of column b is greater than 3, then we can use the following expression.

Filter example:

386/// \brief An example showing a filter node
387/// \param exec_context The execution context to run the plan in
388///
389/// Source-Filter-Sink
390/// This example shows how a filter can be used in an execution plan,
391/// along with the source and sink operations. The output from the
392/// exeuction plan is obtained as a table via the sink node.
393arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
394  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
395                        cp::ExecPlan::Make(&exec_context));
396
397  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
398
399  auto options = std::make_shared<arrow::dataset::ScanOptions>();
400  // specify the filter.  This filter removes all rows where the
401  // value of the "a" column is greater than 3.
402  cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
403  // set filter for scanner : on-disk / push-down filtering.
404  // This step can be skipped if you are not reading from disk.
405  options->filter = filter_opt;
406  // empty projection
407  options->projection = cp::project({}, {});
408
409  // construct the scan node
410  std::cout << "Initialized Scanning Options" << std::endl;
411
412  cp::ExecNode* scan;
413
414  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
415  std::cout << "Scan node options created" << std::endl;
416
417  ARROW_ASSIGN_OR_RAISE(scan,
418                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
419
420  // pipe the scan node into the filter node
421  // Need to set the filter in scan node options and filter node options.
422  // At scan node it is used for on-disk / push-down filtering.
423  // At filter node it is used for in-memory filtering.
424  cp::ExecNode* filter;
425  ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
426                                                 cp::FilterNodeOptions{filter_opt}));
427
428  // finally, pipe the filter node into a sink node
429  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
430  ARROW_RETURN_NOT_OK(
431      cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
432
433  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
434}

project

project operation rearranges, deletes, transforms, and creates columns. Each output column is computed by evaluating an expression against the source record batch. This is exposed via arrow::compute::ProjectNodeOptions which requires, an arrow::compute::Expression and name for each of the output columns (if names are not provided, the string representations of exprs will be used).

Project example:

440/// \brief An example showing a project node
441/// \param exec_context The execution context to run the plan in
442///
443/// Scan-Project-Sink
444/// This example shows how Scan operation can be used to load the data
445/// into the execution plan, how project operation can be applied on the
446/// data stream and how the output is obtained as a table via the sink node.
447arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
448  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
449                        cp::ExecPlan::Make(&exec_context));
450
451  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
452
453  auto options = std::make_shared<arrow::dataset::ScanOptions>();
454  // projection
455  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
456  options->projection = cp::project({}, {});
457
458  cp::ExecNode* scan;
459
460  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
461
462  ARROW_ASSIGN_OR_RAISE(scan,
463                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
464
465  cp::ExecNode* project;
466  ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
467                                                  cp::ProjectNodeOptions{{a_times_2}}));
468  // schema after projection => multiply(a, 2): int64
469  std::cout << "Schema after projection : \n"
470            << project->output_schema()->ToString() << std::endl;
471
472  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
473  ARROW_RETURN_NOT_OK(
474      cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
475  auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
476
477  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
478}

aggregate

The aggregate node computes various types of aggregates over data.

Arrow supports two types of aggregates: “scalar” aggregates, and “hash” aggregates. Scalar aggregates reduce an array or scalar input to a single scalar output (e.g. computing the mean of a column). Hash aggregates act like GROUP BY in SQL and first partition data based on one or more key columns, then reduce the data in each partition. The aggregate node supports both types of computation, and can compute any number of aggregations at once.

arrow::compute::AggregateNodeOptions is used to define the aggregation criteria. It takes a list of aggregation functions and their options; a list of target fields to aggregate, one per function; and a list of names for the output fields, one per function. Optionally, it takes a list of columns that are used to partition the data, in the case of a hash aggregation. The aggregation functions can be selected from this list of aggregation functions.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.

Scalar Aggregation example:

484/// \brief An example showing an aggregation node to aggregate an entire table
485/// \param exec_context The execution context to run the plan in
486///
487/// Source-Aggregation-Sink
488/// This example shows how an aggregation operation can be applied on a
489/// execution plan resulting a scalar output. The source node loads the
490/// data and the aggregation (counting unique types in column 'a')
491/// is applied on this data. The output is obtained from the sink node as a table.
492arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
493  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
494                        cp::ExecPlan::Make(&exec_context));
495
496  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
497
498  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
499
500  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
501
502  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
503                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
504  auto aggregate_options =
505      cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
506  ARROW_ASSIGN_OR_RAISE(
507      cp::ExecNode * aggregate,
508      cp::MakeExecNode("aggregate", plan.get(), {source}, std::move(aggregate_options)));
509
510  ARROW_RETURN_NOT_OK(
511      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
512  auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
513
514  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
515}

Group Aggregation example:

520/// \brief An example showing an aggregation node to perform a group-by operation
521/// \param exec_context The execution context to run the plan in
522///
523/// Source-Aggregation-Sink
524/// This example shows how an aggregation operation can be applied on a
525/// execution plan resulting a grouped output. The source node loads the
526/// data and the aggregation (counting unique types in column 'a') is
527/// applied on this data. The output is obtained from the sink node as a table.
528arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
529  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
530                        cp::ExecPlan::Make(&exec_context));
531
532  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
533
534  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
535
536  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
537
538  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
539                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
540  auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
541  auto aggregate_options =
542      cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
543                               /*keys=*/{"b"}};
544  ARROW_ASSIGN_OR_RAISE(
545      cp::ExecNode * aggregate,
546      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
547
548  ARROW_RETURN_NOT_OK(
549      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
550  auto schema = arrow::schema({
551      arrow::field("count(a)", arrow::int32()),
552      arrow::field("b", arrow::boolean()),
553  });
554
555  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
556}

sink

sink operation provides output and is the final node of a streaming execution definition. arrow::compute::SinkNodeOptions interface is used to pass the required options. Similar to the source operator the sink operator exposes the output with a function that returns a record batch future each time it is called. It is expected the caller will repeatedly call this function until the generator function is exhausted (returns std::optional::nullopt). If this function is not called often enough then record batches will accumulate in memory. An execution plan should only have one “terminal” node (one sink node). An ExecPlan can terminate early due to cancellation or an error, before the output is fully consumed. However, the plan can be safely destroyed independently of the sink, which will hold the unconsumed batches by exec_plan->finished().

As a part of the Source Example, the Sink operation is also included;

325/// \brief An example demonstrating a source and sink node
326/// \param exec_context The execution context to run the plan in
327///
328/// Source-Sink Example
329/// This example shows how a source and sink can be used
330/// in an execution plan. This includes source node receiving data
331/// and the sink node emits the data as an output represented in
332/// a table.
333arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
334  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
335                        cp::ExecPlan::Make(&exec_context));
336
337  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
338
339  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
340
341  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
342
343  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
344                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
345
346  ARROW_RETURN_NOT_OK(
347      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
348
349  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
350}

consuming_sink

consuming_sink operator is a sink operation containing consuming operation within the execution plan (i.e. the exec plan should not complete until the consumption has completed). Unlike the sink node this node takes in a callback function that is expected to consume the batch. Once this callback has finished the execution plan will no longer hold any reference to the batch. The consuming function may be called before a previous invocation has completed. If the consuming function does not run quickly enough then many concurrent executions could pile up, blocking the CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks have been completed. Once all batches have been delivered the execution plan will wait for the finish future to complete before marking the execution plan finished. This allows for workflows where the consumption function converts batches into async tasks (this is currently done internally for the dataset write node).

Example:

// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {

    CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
    batches_seen(batches_seen), finish(std::move(finish)) {}
    // Consumption logic can be written here
    arrow::Status Consume(cp::ExecBatch batch) override {
    // data can be consumed in the expected way
    // transfer to another system or just do some work
    // and write to disk
    (*batches_seen)++;
    return arrow::Status::OK();
    }

    arrow::Future<> Finish() override { return finish; }

    std::atomic<uint32_t> *batches_seen;
    arrow::Future<> finish;

};

std::shared_ptr<CustomSinkNodeConsumer> consumer =
        std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

arrow::compute::ExecNode *consuming_sink;

ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
    {source}, cp::ConsumingSinkNodeOptions(consumer)));

Consuming-Sink example:

561/// \brief An example showing a consuming sink node
562/// \param exec_context The execution context to run the plan in
563///
564/// Source-Consuming-Sink
565/// This example shows how the data can be consumed within the execution plan
566/// by using a ConsumingSink node. There is no data output from this execution plan.
567arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
568  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
569                        cp::ExecPlan::Make(&exec_context));
570
571  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
572
573  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
574
575  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
576                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
577
578  std::atomic<uint32_t> batches_seen{0};
579  arrow::Future<> finish = arrow::Future<>::Make();
580  struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
581    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
582        : batches_seen(batches_seen), finish(std::move(finish)) {}
583
584    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
585                       cp::BackpressureControl* backpressure_control,
586                       cp::ExecPlan* plan) override {
587      return arrow::Status::OK();
588    }
589
590    arrow::Status Consume(cp::ExecBatch batch) override {
591      (*batches_seen)++;
592      return arrow::Status::OK();
593    }
594
595    arrow::Future<> Finish() override { return finish; }
596
597    std::atomic<uint32_t>* batches_seen;
598    arrow::Future<> finish;
599  };
600  std::shared_ptr<CustomSinkNodeConsumer> consumer =
601      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
602
603  cp::ExecNode* consuming_sink;
604
605  ARROW_ASSIGN_OR_RAISE(consuming_sink,
606                        MakeExecNode("consuming_sink", plan.get(), {source},
607                                     cp::ConsumingSinkNodeOptions(consumer)));
608
609  ARROW_RETURN_NOT_OK(consuming_sink->Validate());
610
611  ARROW_RETURN_NOT_OK(plan->Validate());
612  std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
613  // plan start producing
614  ARROW_RETURN_NOT_OK(plan->StartProducing());
615  // Source should finish fairly quickly
616  ARROW_RETURN_NOT_OK(source->finished().status());
617  std::cout << "Source Finished!" << std::endl;
618  // Mark consumption complete, plan should finish
619  finish.MarkFinished(arrow::Status::OK());
620  ARROW_RETURN_NOT_OK(plan->finished().status());
621  return arrow::Status::OK();
622}

order_by_sink

order_by_sink operation is an extension to the sink operation. This operation provides the ability to guarantee the ordering of the stream by providing the arrow::compute::OrderBySinkNodeOptions. Here the arrow::compute::SortOptions are provided to define which columns are used for sorting and whether to sort by ascending or descending values.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

Order-By-Sink example:

627/// \brief An example showing an order-by node
628/// \param exec_context The execution context to run the plan in
629///
630/// Source-OrderBy-Sink
631/// In this example, the data enters through the source node
632/// and the data is ordered in the sink node. The order can be
633/// ASCENDING or DESCENDING and it is configurable. The output
634/// is obtained as a table from the sink node.
635arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
636  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
637                        cp::ExecPlan::Make(&exec_context));
638
639  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
640
641  std::cout << "basic data created" << std::endl;
642
643  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
644
645  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
646  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
647                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
648
649  ARROW_RETURN_NOT_OK(cp::MakeExecNode(
650      "order_by_sink", plan.get(), {source},
651      cp::OrderBySinkNodeOptions{
652          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
653
654  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
655}

select_k_sink

select_k_sink option enables selecting the top/bottom K elements, similar to a SQL ORDER BY ... LIMIT K clause. arrow::compute::SelectKOptions which is a defined by using OrderBySinkNode definition. This option returns a sink node that receives inputs and then compute top_k/bottom_k.

Note

This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

SelectK example:

706/// \brief An example showing a select-k node
707/// \param exec_context The execution context to run the plan in
708///
709/// Source-KSelect
710/// This example shows how K number of elements can be selected
711/// either from the top or bottom. The output node is a modified
712/// sink node where output can be obtained as a table.
713arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
714  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
715  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
716                        cp::ExecPlan::Make(&exec_context));
717  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
718
719  ARROW_ASSIGN_OR_RAISE(
720      cp::ExecNode * source,
721      cp::MakeExecNode("source", plan.get(), {},
722                       cp::SourceNodeOptions{input.schema, input.gen()}));
723
724  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
725
726  ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
727                                       cp::SelectKSinkNodeOptions{options, &sink_gen}));
728
729  auto schema = arrow::schema(
730      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
731
732  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
733}

table_sink

The table_sink node provides the ability to receive the output as an in-memory table. This is simpler to use than the other sink nodes provided by the streaming execution engine but it only makes sense when the output fits comfortably in memory. The node is created using arrow::compute::TableSinkNodeOptions.

Example of using table_sink

853/// \brief An example showing a table sink node
854/// \param exec_context The execution context to run the plan in
855///
856/// TableSink Example
857/// This example shows how a table_sink can be used
858/// in an execution plan. This includes a source node
859/// receiving data as batches and the table sink node
860/// which emits the output as a table.
861arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
862  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
863                        cp::ExecPlan::Make(&exec_context));
864
865  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
866
867  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
868
869  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
870                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
871
872  std::shared_ptr<arrow::Table> output_table;
873  auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
874
875  ARROW_RETURN_NOT_OK(
876      cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
877  // validate the ExecPlan
878  ARROW_RETURN_NOT_OK(plan->Validate());
879  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
880  // start the ExecPlan
881  ARROW_RETURN_NOT_OK(plan->StartProducing());
882
883  // Wait for the plan to finish
884  auto finished = plan->finished();
885  RETURN_NOT_OK(finished.status());
886  std::cout << "Results : " << output_table->ToString() << std::endl;
887  return arrow::Status::OK();
888}

scan

scan is an operation used to load and process datasets. It should be preferred over the more generic source node when your input is a dataset. The behavior is defined using arrow::dataset::ScanNodeOptions. More information on datasets and the various scan options can be found in Tabular Datasets.

This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.

Scan example:

289/// \brief An example demonstrating a scan and sink node
290/// \param exec_context The execution context to run the plan in
291///
292/// Scan-Sink
293/// This example shows how scan operation can be applied on a dataset.
294/// There are operations that can be applied on the scan (project, filter)
295/// and the input data can be processed. The output is obtained as a table
296/// via the sink node.
297arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
298  // Execution plan created
299  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
300                        cp::ExecPlan::Make(&exec_context));
301
302  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
303
304  auto options = std::make_shared<arrow::dataset::ScanOptions>();
305  options->projection = cp::project({}, {});  // create empty projection
306
307  // construct the scan node
308  cp::ExecNode* scan;
309  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
310
311  ARROW_ASSIGN_OR_RAISE(scan,
312                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
313
314  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
315
316  ARROW_RETURN_NOT_OK(
317      cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
318
319  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
320}

write

The write node saves query results as a dataset of files in a format like Parquet, Feather, CSV, etc. using the Tabular Datasets functionality in Arrow. The write options are provided via the arrow::dataset::WriteNodeOptions which in turn contains arrow::dataset::FileSystemDatasetWriteOptions. arrow::dataset::FileSystemDatasetWriteOptions provides control over the written dataset, including options like the output directory, file naming scheme, and so on.

Write example:

739/// \brief An example showing a write node
740/// \param exec_context The execution context to run the plan in
741/// \param file_path The destination to write to
742///
743/// Scan-Filter-Write
744/// This example shows how scan node can be used to load the data
745/// and after processing how it can be written to disk.
746arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
747                                     const std::string& file_path) {
748  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
749                        cp::ExecPlan::Make(&exec_context));
750
751  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
752
753  auto options = std::make_shared<arrow::dataset::ScanOptions>();
754  // empty projection
755  options->projection = cp::project({}, {});
756
757  cp::ExecNode* scan;
758
759  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
760
761  ARROW_ASSIGN_OR_RAISE(scan,
762                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
763
764  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
765
766  std::string root_path = "";
767  std::string uri = "file://" + file_path;
768  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
769                        arrow::fs::FileSystemFromUri(uri, &root_path));
770
771  auto base_path = root_path + "/parquet_dataset";
772  // Uncomment the following line, if run repeatedly
773  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
774  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
775
776  // The partition schema determines which fields are part of the partitioning.
777  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
778  // We'll use Hive-style partitioning,
779  // which creates directories with "key=value" pairs.
780
781  auto partitioning =
782      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
783  // We'll write Parquet files.
784  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
785
786  arrow::dataset::FileSystemDatasetWriteOptions write_options;
787  write_options.file_write_options = format->DefaultWriteOptions();
788  write_options.filesystem = filesystem;
789  write_options.base_dir = base_path;
790  write_options.partitioning = partitioning;
791  write_options.basename_template = "part{i}.parquet";
792
793  arrow::dataset::WriteNodeOptions write_node_options{write_options};
794
795  ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
796
797  ARROW_RETURN_NOT_OK(plan->Validate());
798  std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
799  // // // start the ExecPlan
800  ARROW_RETURN_NOT_OK(plan->StartProducing());
801  auto future = plan->finished();
802  ARROW_RETURN_NOT_OK(future.status());
803  future.Wait();
804  return arrow::Status::OK();
805}

union

union merges multiple data streams with the same schema into one, similar to a SQL UNION ALL clause.

The following example demonstrates how this can be achieved using two data sources.

Union example:

811/// \brief An example showing a union node
812/// \param exec_context The execution context to run the plan in
813///
814/// Source-Union-Sink
815/// This example shows how a union operation can be applied on two
816/// data sources. The output is obtained as a table via the sink
817/// node.
818arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
819  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
820
821  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
822                        cp::ExecPlan::Make(&exec_context));
823  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
824
825  cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
826  cp::Declaration lhs{"source",
827                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
828  lhs.label = "lhs";
829  cp::Declaration rhs{"source",
830                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
831  rhs.label = "rhs";
832  union_node.inputs.emplace_back(lhs);
833  union_node.inputs.emplace_back(rhs);
834
835  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
836  ARROW_ASSIGN_OR_RAISE(
837      auto declr, cp::Declaration::Sequence({
838                                                union_node,
839                                                {"sink", cp::SinkNodeOptions{&sink_gen}},
840                                            })
841                      .AddToPlan(plan.get()));
842
843  ARROW_RETURN_NOT_OK(declr->Validate());
844
845  ARROW_RETURN_NOT_OK(plan->Validate());
846  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
847}

hash_join

hash_join operation provides the relational algebra operation, join using hash-based algorithm. arrow::compute::HashJoinNodeOptions contains the options required in defining a join. The hash_join supports left/right/full semi/anti/outerjoins. Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x” which can be appended as a suffix for column names duplicated in both left and right relations.) can be set via the the join options. Read more on hash-joins.

Hash-Join example:

661/// \brief An example showing a hash join node
662/// \param exec_context The execution context to run the plan in
663///
664/// Source-HashJoin-Sink
665/// This example shows how source node gets the data and how a self-join
666/// is applied on the data. The join options are configurable. The output
667/// is obtained as a table via the sink node.
668arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
669  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
670  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
671                        cp::ExecPlan::Make(&exec_context));
672
673  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
674
675  cp::ExecNode* left_source;
676  cp::ExecNode* right_source;
677  for (auto source : {&left_source, &right_source}) {
678    ARROW_ASSIGN_OR_RAISE(*source,
679                          MakeExecNode("source", plan.get(), {},
680                                       cp::SourceNodeOptions{input.schema, input.gen()}));
681  }
682
683  cp::HashJoinNodeOptions join_opts{
684      cp::JoinType::INNER,
685      /*left_keys=*/{"str"},
686      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
687
688  ARROW_ASSIGN_OR_RAISE(
689      auto hashjoin,
690      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
691
692  ARROW_RETURN_NOT_OK(
693      cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
694  // expected columns i32, str, l_str, r_str
695  auto schema = arrow::schema(
696      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
697       arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
698
699  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
700}

Summary

There are examples of these nodes which can be found in cpp/examples/arrow/execution_plan_documentation_examples.cc in the Arrow source.

Complete Example:

 19#include <arrow/array.h>
 20#include <arrow/builder.h>
 21
 22#include <arrow/compute/api.h>
 23#include <arrow/compute/api_vector.h>
 24#include <arrow/compute/cast.h>
 25#include <arrow/compute/exec/exec_plan.h>
 26
 27#include <arrow/csv/api.h>
 28
 29#include <arrow/dataset/dataset.h>
 30#include <arrow/dataset/file_base.h>
 31#include <arrow/dataset/file_parquet.h>
 32#include <arrow/dataset/plan.h>
 33#include <arrow/dataset/scanner.h>
 34
 35#include <arrow/io/interfaces.h>
 36#include <arrow/io/memory.h>
 37
 38#include <arrow/result.h>
 39#include <arrow/status.h>
 40#include <arrow/table.h>
 41
 42#include <arrow/ipc/api.h>
 43
 44#include <arrow/util/future.h>
 45#include <arrow/util/range.h>
 46#include <arrow/util/thread_pool.h>
 47#include <arrow/util/vector.h>
 48
 49#include <iostream>
 50#include <memory>
 51#include <utility>
 52
 53// Demonstrate various operators in Arrow Streaming Execution Engine
 54
 55namespace cp = ::arrow::compute;
 56
 57constexpr char kSep[] = "******";
 58
 59void PrintBlock(const std::string& msg) {
 60  std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
 61}
 62
 63template <typename TYPE,
 64          typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
 65                                             arrow::is_boolean_type<TYPE>::value |
 66                                             arrow::is_temporal_type<TYPE>::value>::type>
 67arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
 68    const std::vector<typename TYPE::c_type>& values) {
 69  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
 70  ArrowBuilderType builder;
 71  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 72  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 73  return builder.Finish();
 74}
 75
 76template <class TYPE>
 77arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
 78    const std::vector<std::string>& values) {
 79  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
 80  ArrowBuilderType builder;
 81  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 82  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 83  return builder.Finish();
 84}
 85
 86arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
 87    const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
 88  std::shared_ptr<arrow::RecordBatch> record_batch;
 89  ARROW_ASSIGN_OR_RAISE(auto struct_result,
 90                        arrow::StructArray::Make(array_vector, field_vector));
 91  return record_batch->FromStructArray(struct_result);
 92}
 93
 94/// \brief Create a sample table
 95/// The table's contents will be:
 96/// a,b
 97/// 1,null
 98/// 2,true
 99/// null,true
100/// 3,false
101/// null,true
102/// 4,false
103/// 5,null
104/// 6,false
105/// 7,false
106/// 8,true
107/// \return The created table
108
109arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
110  auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
111  ARROW_ASSIGN_OR_RAISE(auto int64_array,
112                        GetArrayDataSample<arrow::Int64Type>(
113                            {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
114
115  arrow::BooleanBuilder boolean_builder;
116  std::shared_ptr<arrow::BooleanArray> bool_array;
117
118  std::vector<uint8_t> bool_values = {false, true,  true,  false, true,
119                                      false, false, false, false, true};
120  std::vector<bool> is_valid = {false, true,  true, true, true,
121                                true,  false, true, true, true};
122
123  ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
124
125  ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
126
127  ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
128
129  auto record_batch =
130      arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
131                                              arrow::field("b", arrow::boolean())}),
132                               10, {int64_array, bool_array});
133  ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
134  return table;
135}
136
137/// \brief Create a sample dataset
138/// \return An in-memory dataset based on GetTable()
139arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
140  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
141  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
142  return ds;
143}
144
145arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
146    const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
147  std::shared_ptr<arrow::RecordBatch> record_batch;
148  ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
149  cp::ExecBatch batch{*res_batch};
150  return batch;
151}
152
153// (Doc section: BatchesWithSchema Definition)
154struct BatchesWithSchema {
155  std::vector<cp::ExecBatch> batches;
156  std::shared_ptr<arrow::Schema> schema;
157  // This method uses internal arrow utilities to
158  // convert a vector of record batches to an AsyncGenerator of optional batches
159  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
160    auto opt_batches = ::arrow::internal::MapVector(
161        [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
162        batches);
163    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
164    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
165    return gen;
166  }
167};
168// (Doc section: BatchesWithSchema Definition)
169
170// (Doc section: MakeBasicBatches Definition)
171arrow::Result<BatchesWithSchema> MakeBasicBatches() {
172  BatchesWithSchema out;
173  auto field_vector = {arrow::field("a", arrow::int32()),
174                       arrow::field("b", arrow::boolean())};
175  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
176  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
177  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
178
179  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
180                        GetArrayDataSample<arrow::BooleanType>({false, true}));
181  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
182                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
183  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
184                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
185
186  ARROW_ASSIGN_OR_RAISE(auto b1,
187                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
188  ARROW_ASSIGN_OR_RAISE(auto b2,
189                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
190  ARROW_ASSIGN_OR_RAISE(auto b3,
191                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
192
193  out.batches = {b1, b2, b3};
194  out.schema = arrow::schema(field_vector);
195  return out;
196}
197// (Doc section: MakeBasicBatches Definition)
198
199arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
200  BatchesWithSchema out;
201  auto field = arrow::field("a", arrow::int32());
202  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
203  ARROW_ASSIGN_OR_RAISE(auto b2_int,
204                        GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
205  ARROW_ASSIGN_OR_RAISE(auto b3_int,
206                        GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
207  ARROW_ASSIGN_OR_RAISE(auto b4_int,
208                        GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
209  ARROW_ASSIGN_OR_RAISE(auto b5_int,
210                        GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
211  ARROW_ASSIGN_OR_RAISE(auto b6_int,
212                        GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
213  ARROW_ASSIGN_OR_RAISE(auto b7_int,
214                        GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
215  ARROW_ASSIGN_OR_RAISE(auto b8_int,
216                        GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
217
218  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
219  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
220  ARROW_ASSIGN_OR_RAISE(auto b3,
221                        GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
222  ARROW_ASSIGN_OR_RAISE(auto b4,
223                        GetExecBatchFromVectors({field, field, field, field},
224                                                {b4_int, b5_int, b6_int, b7_int}));
225  out.batches = {b1, b2, b3, b4};
226  out.schema = arrow::schema({field});
227  return out;
228}
229
230arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
231  BatchesWithSchema out;
232  auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
233  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
234  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
235  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
236  ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
237                                         {"alpha", "beta", "alpha"}));
238  ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
239                                         {"alpha", "gamma", "alpha"}));
240  ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
241                                         {"gamma", "beta", "alpha"}));
242  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
243  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
244  ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
245  out.batches = {b1, b2, b3};
246
247  size_t batch_count = out.batches.size();
248  for (int repeat = 1; repeat < multiplicity; ++repeat) {
249    for (size_t i = 0; i < batch_count; ++i) {
250      out.batches.push_back(out.batches[i]);
251    }
252  }
253
254  out.schema = arrow::schema(fields);
255  return out;
256}
257
258arrow::Status ExecutePlanAndCollectAsTable(
259    cp::ExecContext& exec_context, std::shared_ptr<cp::ExecPlan> plan,
260    std::shared_ptr<arrow::Schema> schema,
261    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
262  // translate sink_gen (async) to sink_reader (sync)
263  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
264      cp::MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
265
266  // validate the ExecPlan
267  ARROW_RETURN_NOT_OK(plan->Validate());
268  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
269  // start the ExecPlan
270  ARROW_RETURN_NOT_OK(plan->StartProducing());
271
272  // collect sink_reader into a Table
273  std::shared_ptr<arrow::Table> response_table;
274
275  ARROW_ASSIGN_OR_RAISE(response_table,
276                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
277
278  std::cout << "Results : " << response_table->ToString() << std::endl;
279
280  // stop producing
281  plan->StopProducing();
282  // plan mark finished
283  auto future = plan->finished();
284  return future.status();
285}
286
287// (Doc section: Scan Example)
288
289/// \brief An example demonstrating a scan and sink node
290/// \param exec_context The execution context to run the plan in
291///
292/// Scan-Sink
293/// This example shows how scan operation can be applied on a dataset.
294/// There are operations that can be applied on the scan (project, filter)
295/// and the input data can be processed. The output is obtained as a table
296/// via the sink node.
297arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
298  // Execution plan created
299  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
300                        cp::ExecPlan::Make(&exec_context));
301
302  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
303
304  auto options = std::make_shared<arrow::dataset::ScanOptions>();
305  options->projection = cp::project({}, {});  // create empty projection
306
307  // construct the scan node
308  cp::ExecNode* scan;
309  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
310
311  ARROW_ASSIGN_OR_RAISE(scan,
312                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
313
314  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
315
316  ARROW_RETURN_NOT_OK(
317      cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
318
319  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
320}
321// (Doc section: Scan Example)
322
323// (Doc section: Source Example)
324
325/// \brief An example demonstrating a source and sink node
326/// \param exec_context The execution context to run the plan in
327///
328/// Source-Sink Example
329/// This example shows how a source and sink can be used
330/// in an execution plan. This includes source node receiving data
331/// and the sink node emits the data as an output represented in
332/// a table.
333arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
334  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
335                        cp::ExecPlan::Make(&exec_context));
336
337  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
338
339  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
340
341  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
342
343  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
344                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
345
346  ARROW_RETURN_NOT_OK(
347      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
348
349  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
350}
351// (Doc section: Source Example)
352
353// (Doc section: Table Source Example)
354
355/// \brief An example showing a table source node
356/// \param exec_context The execution context to run the plan in
357///
358/// TableSource-Sink Example
359/// This example shows how a table_source and sink can be used
360/// in an execution plan. This includes a table source node
361/// receiving data from a table and the sink node emits
362/// the data to a generator which we collect into a table.
363arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
364  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
365                        cp::ExecPlan::Make(&exec_context));
366
367  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
368
369  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
370  int max_batch_size = 2;
371  auto table_source_options = cp::TableSourceNodeOptions{table, max_batch_size};
372
373  ARROW_ASSIGN_OR_RAISE(
374      cp::ExecNode * source,
375      cp::MakeExecNode("table_source", plan.get(), {}, table_source_options));
376
377  ARROW_RETURN_NOT_OK(
378      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
379
380  return ExecutePlanAndCollectAsTable(exec_context, plan, table->schema(), sink_gen);
381}
382// (Doc section: Table Source Example)
383
384// (Doc section: Filter Example)
385
386/// \brief An example showing a filter node
387/// \param exec_context The execution context to run the plan in
388///
389/// Source-Filter-Sink
390/// This example shows how a filter can be used in an execution plan,
391/// along with the source and sink operations. The output from the
392/// exeuction plan is obtained as a table via the sink node.
393arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
394  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
395                        cp::ExecPlan::Make(&exec_context));
396
397  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
398
399  auto options = std::make_shared<arrow::dataset::ScanOptions>();
400  // specify the filter.  This filter removes all rows where the
401  // value of the "a" column is greater than 3.
402  cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
403  // set filter for scanner : on-disk / push-down filtering.
404  // This step can be skipped if you are not reading from disk.
405  options->filter = filter_opt;
406  // empty projection
407  options->projection = cp::project({}, {});
408
409  // construct the scan node
410  std::cout << "Initialized Scanning Options" << std::endl;
411
412  cp::ExecNode* scan;
413
414  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
415  std::cout << "Scan node options created" << std::endl;
416
417  ARROW_ASSIGN_OR_RAISE(scan,
418                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
419
420  // pipe the scan node into the filter node
421  // Need to set the filter in scan node options and filter node options.
422  // At scan node it is used for on-disk / push-down filtering.
423  // At filter node it is used for in-memory filtering.
424  cp::ExecNode* filter;
425  ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
426                                                 cp::FilterNodeOptions{filter_opt}));
427
428  // finally, pipe the filter node into a sink node
429  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
430  ARROW_RETURN_NOT_OK(
431      cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
432
433  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
434}
435
436// (Doc section: Filter Example)
437
438// (Doc section: Project Example)
439
440/// \brief An example showing a project node
441/// \param exec_context The execution context to run the plan in
442///
443/// Scan-Project-Sink
444/// This example shows how Scan operation can be used to load the data
445/// into the execution plan, how project operation can be applied on the
446/// data stream and how the output is obtained as a table via the sink node.
447arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
448  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
449                        cp::ExecPlan::Make(&exec_context));
450
451  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
452
453  auto options = std::make_shared<arrow::dataset::ScanOptions>();
454  // projection
455  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
456  options->projection = cp::project({}, {});
457
458  cp::ExecNode* scan;
459
460  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
461
462  ARROW_ASSIGN_OR_RAISE(scan,
463                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
464
465  cp::ExecNode* project;
466  ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
467                                                  cp::ProjectNodeOptions{{a_times_2}}));
468  // schema after projection => multiply(a, 2): int64
469  std::cout << "Schema after projection : \n"
470            << project->output_schema()->ToString() << std::endl;
471
472  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
473  ARROW_RETURN_NOT_OK(
474      cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
475  auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
476
477  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
478}
479
480// (Doc section: Project Example)
481
482// (Doc section: Scalar Aggregate Example)
483
484/// \brief An example showing an aggregation node to aggregate an entire table
485/// \param exec_context The execution context to run the plan in
486///
487/// Source-Aggregation-Sink
488/// This example shows how an aggregation operation can be applied on a
489/// execution plan resulting a scalar output. The source node loads the
490/// data and the aggregation (counting unique types in column 'a')
491/// is applied on this data. The output is obtained from the sink node as a table.
492arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
493  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
494                        cp::ExecPlan::Make(&exec_context));
495
496  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
497
498  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
499
500  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
501
502  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
503                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
504  auto aggregate_options =
505      cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
506  ARROW_ASSIGN_OR_RAISE(
507      cp::ExecNode * aggregate,
508      cp::MakeExecNode("aggregate", plan.get(), {source}, std::move(aggregate_options)));
509
510  ARROW_RETURN_NOT_OK(
511      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
512  auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
513
514  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
515}
516// (Doc section: Scalar Aggregate Example)
517
518// (Doc section: Group Aggregate Example)
519
520/// \brief An example showing an aggregation node to perform a group-by operation
521/// \param exec_context The execution context to run the plan in
522///
523/// Source-Aggregation-Sink
524/// This example shows how an aggregation operation can be applied on a
525/// execution plan resulting a grouped output. The source node loads the
526/// data and the aggregation (counting unique types in column 'a') is
527/// applied on this data. The output is obtained from the sink node as a table.
528arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
529  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
530                        cp::ExecPlan::Make(&exec_context));
531
532  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
533
534  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
535
536  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
537
538  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
539                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
540  auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
541  auto aggregate_options =
542      cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
543                               /*keys=*/{"b"}};
544  ARROW_ASSIGN_OR_RAISE(
545      cp::ExecNode * aggregate,
546      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
547
548  ARROW_RETURN_NOT_OK(
549      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
550  auto schema = arrow::schema({
551      arrow::field("count(a)", arrow::int32()),
552      arrow::field("b", arrow::boolean()),
553  });
554
555  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
556}
557// (Doc section: Group Aggregate Example)
558
559// (Doc section: ConsumingSink Example)
560
561/// \brief An example showing a consuming sink node
562/// \param exec_context The execution context to run the plan in
563///
564/// Source-Consuming-Sink
565/// This example shows how the data can be consumed within the execution plan
566/// by using a ConsumingSink node. There is no data output from this execution plan.
567arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
568  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
569                        cp::ExecPlan::Make(&exec_context));
570
571  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
572
573  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
574
575  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
576                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
577
578  std::atomic<uint32_t> batches_seen{0};
579  arrow::Future<> finish = arrow::Future<>::Make();
580  struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
581    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
582        : batches_seen(batches_seen), finish(std::move(finish)) {}
583
584    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
585                       cp::BackpressureControl* backpressure_control,
586                       cp::ExecPlan* plan) override {
587      return arrow::Status::OK();
588    }
589
590    arrow::Status Consume(cp::ExecBatch batch) override {
591      (*batches_seen)++;
592      return arrow::Status::OK();
593    }
594
595    arrow::Future<> Finish() override { return finish; }
596
597    std::atomic<uint32_t>* batches_seen;
598    arrow::Future<> finish;
599  };
600  std::shared_ptr<CustomSinkNodeConsumer> consumer =
601      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
602
603  cp::ExecNode* consuming_sink;
604
605  ARROW_ASSIGN_OR_RAISE(consuming_sink,
606                        MakeExecNode("consuming_sink", plan.get(), {source},
607                                     cp::ConsumingSinkNodeOptions(consumer)));
608
609  ARROW_RETURN_NOT_OK(consuming_sink->Validate());
610
611  ARROW_RETURN_NOT_OK(plan->Validate());
612  std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
613  // plan start producing
614  ARROW_RETURN_NOT_OK(plan->StartProducing());
615  // Source should finish fairly quickly
616  ARROW_RETURN_NOT_OK(source->finished().status());
617  std::cout << "Source Finished!" << std::endl;
618  // Mark consumption complete, plan should finish
619  finish.MarkFinished(arrow::Status::OK());
620  ARROW_RETURN_NOT_OK(plan->finished().status());
621  return arrow::Status::OK();
622}
623// (Doc section: ConsumingSink Example)
624
625// (Doc section: OrderBySink Example)
626
627/// \brief An example showing an order-by node
628/// \param exec_context The execution context to run the plan in
629///
630/// Source-OrderBy-Sink
631/// In this example, the data enters through the source node
632/// and the data is ordered in the sink node. The order can be
633/// ASCENDING or DESCENDING and it is configurable. The output
634/// is obtained as a table from the sink node.
635arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
636  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
637                        cp::ExecPlan::Make(&exec_context));
638
639  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
640
641  std::cout << "basic data created" << std::endl;
642
643  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
644
645  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
646  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
647                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
648
649  ARROW_RETURN_NOT_OK(cp::MakeExecNode(
650      "order_by_sink", plan.get(), {source},
651      cp::OrderBySinkNodeOptions{
652          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
653
654  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
655}
656
657// (Doc section: OrderBySink Example)
658
659// (Doc section: HashJoin Example)
660
661/// \brief An example showing a hash join node
662/// \param exec_context The execution context to run the plan in
663///
664/// Source-HashJoin-Sink
665/// This example shows how source node gets the data and how a self-join
666/// is applied on the data. The join options are configurable. The output
667/// is obtained as a table via the sink node.
668arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
669  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
670  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
671                        cp::ExecPlan::Make(&exec_context));
672
673  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
674
675  cp::ExecNode* left_source;
676  cp::ExecNode* right_source;
677  for (auto source : {&left_source, &right_source}) {
678    ARROW_ASSIGN_OR_RAISE(*source,
679                          MakeExecNode("source", plan.get(), {},
680                                       cp::SourceNodeOptions{input.schema, input.gen()}));
681  }
682
683  cp::HashJoinNodeOptions join_opts{
684      cp::JoinType::INNER,
685      /*left_keys=*/{"str"},
686      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
687
688  ARROW_ASSIGN_OR_RAISE(
689      auto hashjoin,
690      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
691
692  ARROW_RETURN_NOT_OK(
693      cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
694  // expected columns i32, str, l_str, r_str
695  auto schema = arrow::schema(
696      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
697       arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
698
699  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
700}
701
702// (Doc section: HashJoin Example)
703
704// (Doc section: KSelect Example)
705
706/// \brief An example showing a select-k node
707/// \param exec_context The execution context to run the plan in
708///
709/// Source-KSelect
710/// This example shows how K number of elements can be selected
711/// either from the top or bottom. The output node is a modified
712/// sink node where output can be obtained as a table.
713arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
714  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
715  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
716                        cp::ExecPlan::Make(&exec_context));
717  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
718
719  ARROW_ASSIGN_OR_RAISE(
720      cp::ExecNode * source,
721      cp::MakeExecNode("source", plan.get(), {},
722                       cp::SourceNodeOptions{input.schema, input.gen()}));
723
724  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
725
726  ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
727                                       cp::SelectKSinkNodeOptions{options, &sink_gen}));
728
729  auto schema = arrow::schema(
730      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
731
732  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
733}
734
735// (Doc section: KSelect Example)
736
737// (Doc section: Write Example)
738
739/// \brief An example showing a write node
740/// \param exec_context The execution context to run the plan in
741/// \param file_path The destination to write to
742///
743/// Scan-Filter-Write
744/// This example shows how scan node can be used to load the data
745/// and after processing how it can be written to disk.
746arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
747                                     const std::string& file_path) {
748  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
749                        cp::ExecPlan::Make(&exec_context));
750
751  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
752
753  auto options = std::make_shared<arrow::dataset::ScanOptions>();
754  // empty projection
755  options->projection = cp::project({}, {});
756
757  cp::ExecNode* scan;
758
759  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
760
761  ARROW_ASSIGN_OR_RAISE(scan,
762                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
763
764  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
765
766  std::string root_path = "";
767  std::string uri = "file://" + file_path;
768  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
769                        arrow::fs::FileSystemFromUri(uri, &root_path));
770
771  auto base_path = root_path + "/parquet_dataset";
772  // Uncomment the following line, if run repeatedly
773  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
774  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
775
776  // The partition schema determines which fields are part of the partitioning.
777  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
778  // We'll use Hive-style partitioning,
779  // which creates directories with "key=value" pairs.
780
781  auto partitioning =
782      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
783  // We'll write Parquet files.
784  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
785
786  arrow::dataset::FileSystemDatasetWriteOptions write_options;
787  write_options.file_write_options = format->DefaultWriteOptions();
788  write_options.filesystem = filesystem;
789  write_options.base_dir = base_path;
790  write_options.partitioning = partitioning;
791  write_options.basename_template = "part{i}.parquet";
792
793  arrow::dataset::WriteNodeOptions write_node_options{write_options};
794
795  ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
796
797  ARROW_RETURN_NOT_OK(plan->Validate());
798  std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
799  // // // start the ExecPlan
800  ARROW_RETURN_NOT_OK(plan->StartProducing());
801  auto future = plan->finished();
802  ARROW_RETURN_NOT_OK(future.status());
803  future.Wait();
804  return arrow::Status::OK();
805}
806
807// (Doc section: Write Example)
808
809// (Doc section: Union Example)
810
811/// \brief An example showing a union node
812/// \param exec_context The execution context to run the plan in
813///
814/// Source-Union-Sink
815/// This example shows how a union operation can be applied on two
816/// data sources. The output is obtained as a table via the sink
817/// node.
818arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
819  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
820
821  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
822                        cp::ExecPlan::Make(&exec_context));
823  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
824
825  cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
826  cp::Declaration lhs{"source",
827                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
828  lhs.label = "lhs";
829  cp::Declaration rhs{"source",
830                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
831  rhs.label = "rhs";
832  union_node.inputs.emplace_back(lhs);
833  union_node.inputs.emplace_back(rhs);
834
835  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
836  ARROW_ASSIGN_OR_RAISE(
837      auto declr, cp::Declaration::Sequence({
838                                                union_node,
839                                                {"sink", cp::SinkNodeOptions{&sink_gen}},
840                                            })
841                      .AddToPlan(plan.get()));
842
843  ARROW_RETURN_NOT_OK(declr->Validate());
844
845  ARROW_RETURN_NOT_OK(plan->Validate());
846  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
847}
848
849// (Doc section: Union Example)
850
851// (Doc section: Table Sink Example)
852
853/// \brief An example showing a table sink node
854/// \param exec_context The execution context to run the plan in
855///
856/// TableSink Example
857/// This example shows how a table_sink can be used
858/// in an execution plan. This includes a source node
859/// receiving data as batches and the table sink node
860/// which emits the output as a table.
861arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
862  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
863                        cp::ExecPlan::Make(&exec_context));
864
865  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
866
867  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
868
869  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
870                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
871
872  std::shared_ptr<arrow::Table> output_table;
873  auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
874
875  ARROW_RETURN_NOT_OK(
876      cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
877  // validate the ExecPlan
878  ARROW_RETURN_NOT_OK(plan->Validate());
879  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
880  // start the ExecPlan
881  ARROW_RETURN_NOT_OK(plan->StartProducing());
882
883  // Wait for the plan to finish
884  auto finished = plan->finished();
885  RETURN_NOT_OK(finished.status());
886  std::cout << "Results : " << output_table->ToString() << std::endl;
887  return arrow::Status::OK();
888}
889// (Doc section: Table Sink Example)
890
891enum ExampleMode {
892  SOURCE_SINK = 0,
893  TABLE_SOURCE_SINK = 1,
894  SCAN = 2,
895  FILTER = 3,
896  PROJECT = 4,
897  SCALAR_AGGREGATION = 5,
898  GROUP_AGGREGATION = 6,
899  CONSUMING_SINK = 7,
900  ORDER_BY_SINK = 8,
901  HASHJOIN = 9,
902  KSELECT = 10,
903  WRITE = 11,
904  UNION = 12,
905  TABLE_SOURCE_TABLE_SINK = 13
906};
907
908int main(int argc, char** argv) {
909  if (argc < 2) {
910    // Fake success for CI purposes.
911    return EXIT_SUCCESS;
912  }
913
914  std::string base_save_path = argv[1];
915  int mode = std::atoi(argv[2]);
916  arrow::Status status;
917  // ensure arrow::dataset node factories are in the registry
918  arrow::dataset::internal::Initialize();
919  // execution context
920  cp::ExecContext exec_context;
921  switch (mode) {
922    case SOURCE_SINK:
923      PrintBlock("Source Sink Example");
924      status = SourceSinkExample(exec_context);
925      break;
926    case TABLE_SOURCE_SINK:
927      PrintBlock("Table Source Sink Example");
928      status = TableSourceSinkExample(exec_context);
929      break;
930    case SCAN:
931      PrintBlock("Scan Example");
932      status = ScanSinkExample(exec_context);
933      break;
934    case FILTER:
935      PrintBlock("Filter Example");
936      status = ScanFilterSinkExample(exec_context);
937      break;
938    case PROJECT:
939      PrintBlock("Project Example");
940      status = ScanProjectSinkExample(exec_context);
941      break;
942    case GROUP_AGGREGATION:
943      PrintBlock("Aggregate Example");
944      status = SourceGroupAggregateSinkExample(exec_context);
945      break;
946    case SCALAR_AGGREGATION:
947      PrintBlock("Aggregate Example");
948      status = SourceScalarAggregateSinkExample(exec_context);
949      break;
950    case CONSUMING_SINK:
951      PrintBlock("Consuming-Sink Example");
952      status = SourceConsumingSinkExample(exec_context);
953      break;
954    case ORDER_BY_SINK:
955      PrintBlock("OrderBy Example");
956      status = SourceOrderBySinkExample(exec_context);
957      break;
958    case HASHJOIN:
959      PrintBlock("HashJoin Example");
960      status = SourceHashJoinSinkExample(exec_context);
961      break;
962    case KSELECT:
963      PrintBlock("KSelect Example");
964      status = SourceKSelectExample(exec_context);
965      break;
966    case WRITE:
967      PrintBlock("Write Example");
968      status = ScanFilterWriteExample(exec_context, base_save_path);
969      break;
970    case UNION:
971      PrintBlock("Union Example");
972      status = SourceUnionSinkExample(exec_context);
973      break;
974    case TABLE_SOURCE_TABLE_SINK:
975      PrintBlock("TableSink Example");
976      status = TableSinkExample(exec_context);
977      break;
978    default:
979      break;
980  }
981
982  if (status.ok()) {
983    return EXIT_SUCCESS;
984  } else {
985    std::cout << "Error occurred: " << status.message() << std::endl;
986    return EXIT_FAILURE;
987  }
988}