Streaming execution engine#

Warning

The streaming execution engine is experimental, and a stable API is not yet guaranteed.

Motivation#

For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, Arrow also provides a streaming query engine with which computations can be formulated and executed.

An example graph of a streaming execution workflow.

ExecNode is provided to reify the graph of operations in a query. Batches of data (ExecBatch) flow along edges of the graph from node to node. Structuring the API around streams of batches allows the working set for each node to be tuned for optimal performance independent of any other nodes in the graph. Each ExecNode processes batches as they are pushed to it along an edge of the graph by upstream nodes (its inputs), and pushes batches along an edge of the graph to downstream nodes (its outputs) as they are finalized.

Overview#

ExecNode

Each node in the graph is an implementation of the ExecNode interface.

ExecPlan

A set of ExecNode is contained and (to an extent) coordinated by an ExecPlan.

ExecFactoryRegistry

Instances of ExecNode are constructed by factory functions held in a ExecFactoryRegistry.

ExecNodeOptions

Heterogenous parameters for factories of ExecNode are bundled in an ExecNodeOptions.

Declaration

dplyr-inspired helper for efficient construction of an ExecPlan.

ExecBatch

A lightweight container for a single chunk of data in the Arrow format. In contrast to RecordBatch, ExecBatch is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by a Scalar instead of an Array. In addition, ExecBatch may carry execution-relevant properties including a guaranteed-true-filter for Expression simplification.

An example ExecNode implementation which simply passes all input batches through unchanged:

class PassthruNode : public ExecNode {
 public:
  // InputReceived is the main entry point for ExecNodes. It is invoked
  // by an input of this node to push a batch here for processing.
  void InputReceived(ExecNode* input, ExecBatch batch) override {
    // Since this is a passthru node we simply push the batch to our
    // only output here.
    outputs_[0]->InputReceived(this, batch);
  }

  // ErrorReceived is called by an input of this node to report an error.
  // ExecNodes should always forward errors to their outputs unless they
  // are able to fully handle the error (this is rare).
  void ErrorReceived(ExecNode* input, Status error) override {
    outputs_[0]->ErrorReceived(this, error);
  }

  // InputFinished is used to signal how many batches will ultimately arrive.
  // It may be called with any ordering relative to InputReceived/ErrorReceived.
  void InputFinished(ExecNode* input, int total_batches) override {
    outputs_[0]->InputFinished(this, total_batches);
  }

  // ExecNodes may request that their inputs throttle production of batches
  // until they are ready for more, or stop production if no further batches
  // are required.  These signals should typically be forwarded to the inputs
  // of the ExecNode.
  void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
  void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
  void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }

  // An ExecNode has a single output schema to which all its batches conform.
  using ExecNode::output_schema;

  // ExecNodes carry basic introspection for debugging purposes
  const char* kind_name() const override { return "PassthruNode"; }
  using ExecNode::label;
  using ExecNode::SetLabel;
  using ExecNode::ToString;

  // An ExecNode holds references to its inputs and outputs, so it is possible
  // to walk the graph of execution if necessary.
  using ExecNode::inputs;
  using ExecNode::outputs;

  // StartProducing() and StopProducing() are invoked by an ExecPlan to
  // coordinate the graph-wide execution state.  These do not need to be
  // forwarded to inputs or outputs.
  Status StartProducing() override { return Status::OK(); }
  void StopProducing() override {}
  Future<> finished() override { return inputs_[0]->finished(); }
};

Note that each method which is associated with an edge of the graph must be invoked with an ExecNode* to identify the node which invoked it. For example, in an ExecNode which implements JOIN this tagging might be used to differentiate between batches from the left or right inputs. InputReceived, ErrorReceived, InputFinished may only be invoked by the inputs of a node, while ResumeProducing, PauseProducing, StopProducing may only be invoked by outputs of a node.

ExecPlan contains the associated instances of ExecNode and is used to start and stop execution of all nodes and for querying/awaiting their completion:

// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));

// ... add nodes to your ExecPlan

// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());

SetUserCancellationCallback([plan] {
  // stop all nodes in the graph
  plan->StopProducing();
});

// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();

Constructing ExecPlan objects#

Warning

The following will be superceded by construction from Compute IR, see ARROW-14074.

None of the concrete implementations of ExecNode are exposed in headers, so they can’t be constructed directly outside the translation unit where they are defined. Instead, factories to create them are provided in an extensible registry. This structure provides a number of benefits:

  • This enforces consistent construction.

  • It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)

  • This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate libarrow_dataset.so library.

  • Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.

Factories of ExecNode can be retrieved by name from the registry. The default registry is available through arrow::compute::default_exec_factory_registry() and can be queried for the built-in factories:

// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
                      default_exec_factory_registry()->GetFactory("filter"));

// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
    // the ExecPlan which should own this node
    plan.get(),

    // nodes which will send batches to this node (inputs)
    {scan_node},

    // parameters unique to "filter" nodes
    FilterNodeOptions{filter_expression}));

// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
    plan.get(), {scan_node}, FilterNodeOptions{filter_expression});

Factories can also be added to the default registry as long as they are convertible to std::function<Result<ExecNode*>( ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>.

To build an ExecPlan representing a simple pipeline which reads from a RecordBatchReader then filters, projects, and writes to disk:

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
                                      SourceNodeOptions::FromReader(
                                          reader,
                                          GetCpuThreadPool()));

ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
                                      FilterNodeOptions{
                                        greater(field_ref("score"), literal(3))
                                      });

ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
                                       ProjectNodeOptions{
                                         {add(field_ref("score"), literal(1))},
                                         {"score + 1"}
                                       });

arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
             WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});

Declaration is a dplyr-inspired helper which further decreases the boilerplate associated with populating an ExecPlan from C++:

arrow::dataset::internal::Initialize();

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
              {
                  {"source", SourceNodeOptions::FromReader(
                       reader,
                       GetCpuThreadPool())},
                  {"filter", FilterNodeOptions{
                       greater(field_ref("score"), literal(3))}},
                  {"project", ProjectNodeOptions{
                       {add(field_ref("score"), literal(1))},
                       {"score + 1"}}},
                  {"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
              })
              .AddToPlan(plan.get()));

Note that a source node can wrap anything which resembles a stream of batches. For example, PR#11032 adds support for use of a DuckDB query as a source node. Similarly, a sink node can wrap anything which absorbs a stream of batches. In the example above we’re writing completed batches to disk. However we can also collect these in memory into a Table or forward them to a RecordBatchReader as an out-of-graph stream. This flexibility allows an ExecPlan to be used as streaming middleware between any endpoints which support Arrow formatted batches.

An arrow::dataset::Dataset can also be wrapped as a source node which pushes all the dataset’s batches into an ExecPlan. This factory is added to the default registry with the name "scan" by calling arrow::dataset::internal::Initialize():

arrow::dataset::internal::Initialize();

std::shared_ptr<Dataset> dataset = GetDataset();

ASSERT_OK(Declaration::Sequence(
              {
                  {"scan", ScanNodeOptions{dataset,
                     /* push down predicate, projection, ... */}},
                  {"filter", FilterNodeOptions{/* ... */}},
                  // ...
              })
              .AddToPlan(plan.get()));

Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.

Constructing ExecNode using Options#

ExecNode is the component we use as a building block containing in-built operations with various functionalities.

This is the list of operations associated with the execution plan:

Operations and Options#

Operation

Options

source

arrow::compute::SourceNodeOptions

table_source

arrow::compute::TableSourceNodeOptions

filter

arrow::compute::FilterNodeOptions

project

arrow::compute::ProjectNodeOptions

aggregate

arrow::compute::AggregateNodeOptions

sink

arrow::compute::SinkNodeOptions

consuming_sink

arrow::compute::ConsumingSinkNodeOptions

order_by_sink

arrow::compute::OrderBySinkNodeOptions

select_k_sink

arrow::compute::SelectKSinkNodeOptions

scan

arrow::dataset::ScanNodeOptions

hash_join

arrow::compute::HashJoinNodeOptions

write

arrow::dataset::WriteNodeOptions

union

N/A

table_sink

arrow::compute::TableSinkNodeOptions

source#

A source operation can be considered as an entry point to create a streaming execution plan. arrow::compute::SourceNodeOptions are used to create the source operation. The source operation is the most generic and flexible type of source currently available but it can be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.

The source node requires some kind of function that can be called to poll for more data. This function should take no arguments and should return an arrow::Future<std::shared_ptr<arrow::util::optional<arrow::RecordBatch>>>. This function might be reading a file, iterating through an in memory structure, or receiving data from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator and there are a number of utilities for working with these functions. For this example we use a vector of record batches that we’ve already stored in memory. In addition, the schema of the data must be known up front. Arrow’s streaming execution engine must know the schema of the data at each stage of the execution graph before any processing has begun. This means we must supply the schema for a source node separately from the data itself.

Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :

161struct BatchesWithSchema {
162  std::vector<cp::ExecBatch> batches;
163  std::shared_ptr<arrow::Schema> schema;
164  // This method uses internal arrow utilities to
165  // convert a vector of record batches to an AsyncGenerator of optional batches
166  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
167    auto opt_batches = ::arrow::internal::MapVector(
168        [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
169        batches);
170    arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
171    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
172    return gen;
173  }
174};

Generating sample batches for computation:

178arrow::Result<BatchesWithSchema> MakeBasicBatches() {
179  BatchesWithSchema out;
180  auto field_vector = {arrow::field("a", arrow::int32()),
181                       arrow::field("b", arrow::boolean())};
182  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
183  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
184  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
185
186  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
187                        GetArrayDataSample<arrow::BooleanType>({false, true}));
188  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
189                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
190  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
191                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
192
193  ARROW_ASSIGN_OR_RAISE(auto b1,
194                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
195  ARROW_ASSIGN_OR_RAISE(auto b2,
196                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
197  ARROW_ASSIGN_OR_RAISE(auto b3,
198                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
199
200  out.batches = {b1, b2, b3};
201  out.schema = arrow::schema(field_vector);
202  return out;
203}

Example of using source (usage of sink is explained in detail in sink):

331/// \brief An example demonstrating a source and sink node
332/// \param exec_context The execution context to run the plan in
333///
334/// Source-Sink Example
335/// This example shows how a source and sink can be used
336/// in an execution plan. This includes source node receiving data
337/// and the sink node emits the data as an output represented in
338/// a table.
339arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
340  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
341                        cp::ExecPlan::Make(&exec_context));
342
343  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
344
345  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
346
347  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
348
349  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
350                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
351
352  ARROW_RETURN_NOT_OK(
353      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
354
355  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
356}

table_source#

In the previous example, source node, a source node was used to input the data. But when developing an application, if the data is already in memory as a table, it is much easier, and more performant to use arrow::compute::TableSourceNodeOptions. Here the input data can be passed as a std::shared_ptr<arrow::Table> along with a max_batch_size. The max_batch_size is to break up large record batches so that they can be processed in parallel. It is important to note that the table batches will not get merged to form larger batches when the source table has a smaller batch size.

Example of using table_source

361/// \brief An example showing a table source node
362/// \param exec_context The execution context to run the plan in
363///
364/// TableSource-Sink Example
365/// This example shows how a table_source and sink can be used
366/// in an execution plan. This includes a table source node
367/// receiving data from a table and the sink node emits
368/// the data to a generator which we collect into a table.
369arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
370  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
371                        cp::ExecPlan::Make(&exec_context));
372
373  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
374
375  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
376  int max_batch_size = 2;
377  auto table_source_options = cp::TableSourceNodeOptions{table, max_batch_size};
378
379  ARROW_ASSIGN_OR_RAISE(
380      cp::ExecNode * source,
381      cp::MakeExecNode("table_source", plan.get(), {}, table_source_options));
382
383  ARROW_RETURN_NOT_OK(
384      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
385
386  return ExecutePlanAndCollectAsTable(exec_context, plan, table->schema(), sink_gen);
387}

filter#

filter operation, as the name suggests, provides an option to define data filtering criteria. It selects rows matching a given expression. Filters can be written using arrow::compute::Expression. For example, if we wish to keep rows where the value of column b is greater than 3, then we can use the following expression.

Filter example:

392/// \brief An example showing a filter node
393/// \param exec_context The execution context to run the plan in
394///
395/// Source-Filter-Sink
396/// This example shows how a filter can be used in an execution plan,
397/// along with the source and sink operations. The output from the
398/// exeuction plan is obtained as a table via the sink node.
399arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
400  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
401                        cp::ExecPlan::Make(&exec_context));
402
403  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
404
405  auto options = std::make_shared<arrow::dataset::ScanOptions>();
406  // specify the filter.  This filter removes all rows where the
407  // value of the "a" column is greater than 3.
408  cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
409  // set filter for scanner : on-disk / push-down filtering.
410  // This step can be skipped if you are not reading from disk.
411  options->filter = filter_opt;
412  // empty projection
413  options->projection = cp::project({}, {});
414
415  // construct the scan node
416  std::cout << "Initialized Scanning Options" << std::endl;
417
418  cp::ExecNode* scan;
419
420  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
421  std::cout << "Scan node options created" << std::endl;
422
423  ARROW_ASSIGN_OR_RAISE(scan,
424                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
425
426  // pipe the scan node into the filter node
427  // Need to set the filter in scan node options and filter node options.
428  // At scan node it is used for on-disk / push-down filtering.
429  // At filter node it is used for in-memory filtering.
430  cp::ExecNode* filter;
431  ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
432                                                 cp::FilterNodeOptions{filter_opt}));
433
434  // finally, pipe the filter node into a sink node
435  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
436  ARROW_RETURN_NOT_OK(
437      cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
438
439  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
440}

project#

project operation rearranges, deletes, transforms, and creates columns. Each output column is computed by evaluating an expression against the source record batch. This is exposed via arrow::compute::ProjectNodeOptions which requires, an arrow::compute::Expression and name for each of the output columns (if names are not provided, the string representations of exprs will be used).

Project example:

446/// \brief An example showing a project node
447/// \param exec_context The execution context to run the plan in
448///
449/// Scan-Project-Sink
450/// This example shows how Scan operation can be used to load the data
451/// into the execution plan, how project operation can be applied on the
452/// data stream and how the output is obtained as a table via the sink node.
453arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
454  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
455                        cp::ExecPlan::Make(&exec_context));
456
457  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
458
459  auto options = std::make_shared<arrow::dataset::ScanOptions>();
460  // projection
461  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
462  options->projection = cp::project({}, {});
463
464  cp::ExecNode* scan;
465
466  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
467
468  ARROW_ASSIGN_OR_RAISE(scan,
469                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
470
471  cp::ExecNode* project;
472  ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
473                                                  cp::ProjectNodeOptions{{a_times_2}}));
474  // schema after projection => multiply(a, 2): int64
475  std::cout << "Schema after projection : \n"
476            << project->output_schema()->ToString() << std::endl;
477
478  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
479  ARROW_RETURN_NOT_OK(
480      cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
481  auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
482
483  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
484}

aggregate#

The aggregate node computes various types of aggregates over data.

Arrow supports two types of aggregates: “scalar” aggregates, and “hash” aggregates. Scalar aggregates reduce an array or scalar input to a single scalar output (e.g. computing the mean of a column). Hash aggregates act like GROUP BY in SQL and first partition data based on one or more key columns, then reduce the data in each partition. The aggregate node supports both types of computation, and can compute any number of aggregations at once.

arrow::compute::AggregateNodeOptions is used to define the aggregation criteria. It takes a list of aggregation functions and their options; a list of target fields to aggregate, one per function; and a list of names for the output fields, one per function. Optionally, it takes a list of columns that are used to partition the data, in the case of a hash aggregation. The aggregation functions can be selected from this list of aggregation functions.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.

Scalar Aggregation example:

490/// \brief An example showing an aggregation node to aggregate an entire table
491/// \param exec_context The execution context to run the plan in
492///
493/// Source-Aggregation-Sink
494/// This example shows how an aggregation operation can be applied on a
495/// execution plan resulting a scalar output. The source node loads the
496/// data and the aggregation (counting unique types in column 'a')
497/// is applied on this data. The output is obtained from the sink node as a table.
498arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
499  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
500                        cp::ExecPlan::Make(&exec_context));
501
502  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
503
504  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
505
506  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
507
508  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
509                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
510  auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
511                                                    /*targets=*/{"a"},
512                                                    /*names=*/{"sum(a)"}};
513  ARROW_ASSIGN_OR_RAISE(
514      cp::ExecNode * aggregate,
515      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
516
517  ARROW_RETURN_NOT_OK(
518      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
519  auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
520
521  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
522}

Group Aggregation example:

527/// \brief An example showing an aggregation node to perform a group-by operation
528/// \param exec_context The execution context to run the plan in
529///
530/// Source-Aggregation-Sink
531/// This example shows how an aggregation operation can be applied on a
532/// execution plan resulting a grouped output. The source node loads the
533/// data and the aggregation (counting unique types in column 'a') is
534/// applied on this data. The output is obtained from the sink node as a table.
535arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
536  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
537                        cp::ExecPlan::Make(&exec_context));
538
539  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
540
541  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
542
543  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
544
545  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
546                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
547  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
548  auto aggregate_options =
549      cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
550                               /*targets=*/{"a"},
551                               /*names=*/{"count(a)"},
552                               /*keys=*/{"b"}};
553  ARROW_ASSIGN_OR_RAISE(
554      cp::ExecNode * aggregate,
555      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
556
557  ARROW_RETURN_NOT_OK(
558      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
559  auto schema = arrow::schema({
560      arrow::field("count(a)", arrow::int32()),
561      arrow::field("b", arrow::boolean()),
562  });
563
564  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
565}

sink#

sink operation provides output and is the final node of a streaming execution definition. arrow::compute::SinkNodeOptions interface is used to pass the required options. Similar to the source operator the sink operator exposes the output with a function that returns a record batch future each time it is called. It is expected the caller will repeatedly call this function until the generator function is exhausted (returns arrow::util::optional::nullopt). If this function is not called often enough then record batches will accumulate in memory. An execution plan should only have one “terminal” node (one sink node). An ExecPlan can terminate early due to cancellation or an error, before the output is fully consumed. However, the plan can be safely destroyed independently of the sink, which will hold the unconsumed batches by exec_plan->finished().

As a part of the Source Example, the Sink operation is also included;

331/// \brief An example demonstrating a source and sink node
332/// \param exec_context The execution context to run the plan in
333///
334/// Source-Sink Example
335/// This example shows how a source and sink can be used
336/// in an execution plan. This includes source node receiving data
337/// and the sink node emits the data as an output represented in
338/// a table.
339arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
340  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
341                        cp::ExecPlan::Make(&exec_context));
342
343  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
344
345  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
346
347  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
348
349  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
350                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
351
352  ARROW_RETURN_NOT_OK(
353      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
354
355  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
356}

consuming_sink#

consuming_sink operator is a sink operation containing consuming operation within the execution plan (i.e. the exec plan should not complete until the consumption has completed). Unlike the sink node this node takes in a callback function that is expected to consume the batch. Once this callback has finished the execution plan will no longer hold any reference to the batch. The consuming function may be called before a previous invocation has completed. If the consuming function does not run quickly enough then many concurrent executions could pile up, blocking the CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks have been completed. Once all batches have been delivered the execution plan will wait for the finish future to complete before marking the execution plan finished. This allows for workflows where the consumption function converts batches into async tasks (this is currently done internally for the dataset write node).

Example:

// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {

    CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
    batches_seen(batches_seen), finish(std::move(finish)) {}
    // Consumption logic can be written here
    arrow::Status Consume(cp::ExecBatch batch) override {
    // data can be consumed in the expected way
    // transfer to another system or just do some work
    // and write to disk
    (*batches_seen)++;
    return arrow::Status::OK();
    }

    arrow::Future<> Finish() override { return finish; }

    std::atomic<uint32_t> *batches_seen;
    arrow::Future<> finish;

};

std::shared_ptr<CustomSinkNodeConsumer> consumer =
        std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

arrow::compute::ExecNode *consuming_sink;

ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
    {source}, cp::ConsumingSinkNodeOptions(consumer)));

Consuming-Sink example:

570/// \brief An example showing a consuming sink node
571/// \param exec_context The execution context to run the plan in
572///
573/// Source-Consuming-Sink
574/// This example shows how the data can be consumed within the execution plan
575/// by using a ConsumingSink node. There is no data output from this execution plan.
576arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
577  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
578                        cp::ExecPlan::Make(&exec_context));
579
580  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
581
582  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
583
584  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
585                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
586
587  std::atomic<uint32_t> batches_seen{0};
588  arrow::Future<> finish = arrow::Future<>::Make();
589  struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
590    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
591        : batches_seen(batches_seen), finish(std::move(finish)) {}
592
593    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
594                       cp::BackpressureControl* backpressure_control) override {
595      return arrow::Status::OK();
596    }
597
598    arrow::Status Consume(cp::ExecBatch batch) override {
599      (*batches_seen)++;
600      return arrow::Status::OK();
601    }
602
603    arrow::Future<> Finish() override { return finish; }
604
605    std::atomic<uint32_t>* batches_seen;
606    arrow::Future<> finish;
607  };
608  std::shared_ptr<CustomSinkNodeConsumer> consumer =
609      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
610
611  cp::ExecNode* consuming_sink;
612
613  ARROW_ASSIGN_OR_RAISE(consuming_sink,
614                        MakeExecNode("consuming_sink", plan.get(), {source},
615                                     cp::ConsumingSinkNodeOptions(consumer)));
616
617  ARROW_RETURN_NOT_OK(consuming_sink->Validate());
618
619  ARROW_RETURN_NOT_OK(plan->Validate());
620  std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
621  // plan start producing
622  ARROW_RETURN_NOT_OK(plan->StartProducing());
623  // Source should finish fairly quickly
624  ARROW_RETURN_NOT_OK(source->finished().status());
625  std::cout << "Source Finished!" << std::endl;
626  // Mark consumption complete, plan should finish
627  finish.MarkFinished(arrow::Status::OK());
628  ARROW_RETURN_NOT_OK(plan->finished().status());
629  return arrow::Status::OK();
630}

order_by_sink#

order_by_sink operation is an extension to the sink operation. This operation provides the ability to guarantee the ordering of the stream by providing the arrow::compute::OrderBySinkNodeOptions. Here the arrow::compute::SortOptions are provided to define which columns are used for sorting and whether to sort by ascending or descending values.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

Order-By-Sink example:

635/// \brief An example showing an order-by node
636/// \param exec_context The execution context to run the plan in
637///
638/// Source-OrderBy-Sink
639/// In this example, the data enters through the source node
640/// and the data is ordered in the sink node. The order can be
641/// ASCENDING or DESCENDING and it is configurable. The output
642/// is obtained as a table from the sink node.
643arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
644  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
645                        cp::ExecPlan::Make(&exec_context));
646
647  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
648
649  std::cout << "basic data created" << std::endl;
650
651  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
652
653  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
654  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
655                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
656
657  ARROW_RETURN_NOT_OK(cp::MakeExecNode(
658      "order_by_sink", plan.get(), {source},
659      cp::OrderBySinkNodeOptions{
660          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
661
662  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
663}

select_k_sink#

select_k_sink option enables selecting the top/bottom K elements, similar to a SQL ORDER BY ... LIMIT K clause. arrow::compute::SelectKOptions which is a defined by using OrderBySinkNode definition. This option returns a sink node that receives inputs and then compute top_k/bottom_k.

Note

This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

SelectK example:

714/// \brief An example showing a select-k node
715/// \param exec_context The execution context to run the plan in
716///
717/// Source-KSelect
718/// This example shows how K number of elements can be selected
719/// either from the top or bottom. The output node is a modified
720/// sink node where output can be obtained as a table.
721arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
722  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
723  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
724                        cp::ExecPlan::Make(&exec_context));
725  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
726
727  ARROW_ASSIGN_OR_RAISE(
728      cp::ExecNode * source,
729      cp::MakeExecNode("source", plan.get(), {},
730                       cp::SourceNodeOptions{input.schema, input.gen()}));
731
732  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
733
734  ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
735                                       cp::SelectKSinkNodeOptions{options, &sink_gen}));
736
737  auto schema = arrow::schema(
738      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
739
740  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
741}

table_sink#

The table_sink node provides the ability to receive the output as an in-memory table. This is simpler to use than the other sink nodes provided by the streaming execution engine but it only makes sense when the output fits comfortably in memory. The node is created using arrow::compute::TableSinkNodeOptions.

Example of using table_sink

860/// \brief An example showing a table sink node
861/// \param exec_context The execution context to run the plan in
862///
863/// TableSink Example
864/// This example shows how a table_sink can be used
865/// in an execution plan. This includes a source node
866/// receiving data as batches and the table sink node
867/// which emits the output as a table.
868arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
869  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
870                        cp::ExecPlan::Make(&exec_context));
871
872  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
873
874  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
875
876  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
877                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
878
879  std::shared_ptr<arrow::Table> output_table;
880  auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
881
882  ARROW_RETURN_NOT_OK(
883      cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
884  // validate the ExecPlan
885  ARROW_RETURN_NOT_OK(plan->Validate());
886  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
887  // start the ExecPlan
888  ARROW_RETURN_NOT_OK(plan->StartProducing());
889
890  // Wait for the plan to finish
891  auto finished = plan->finished();
892  RETURN_NOT_OK(finished.status());
893  std::cout << "Results : " << output_table->ToString() << std::endl;
894  return arrow::Status::OK();
895}

scan#

scan is an operation used to load and process datasets. It should be preferred over the more generic source node when your input is a dataset. The behavior is defined using arrow::dataset::ScanNodeOptions. More information on datasets and the various scan options can be found in Tabular Datasets.

This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.

Scan example:

295/// \brief An example demonstrating a scan and sink node
296/// \param exec_context The execution context to run the plan in
297///
298/// Scan-Sink
299/// This example shows how scan operation can be applied on a dataset.
300/// There are operations that can be applied on the scan (project, filter)
301/// and the input data can be processed. The output is obtained as a table
302/// via the sink node.
303arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
304  // Execution plan created
305  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
306                        cp::ExecPlan::Make(&exec_context));
307
308  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
309
310  auto options = std::make_shared<arrow::dataset::ScanOptions>();
311  options->projection = cp::project({}, {});  // create empty projection
312
313  // construct the scan node
314  cp::ExecNode* scan;
315  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
316
317  ARROW_ASSIGN_OR_RAISE(scan,
318                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
319
320  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
321
322  ARROW_RETURN_NOT_OK(
323      cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
324
325  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
326}

write#

The write node saves query results as a dataset of files in a format like Parquet, Feather, CSV, etc. using the Tabular Datasets functionality in Arrow. The write options are provided via the arrow::dataset::WriteNodeOptions which in turn contains arrow::dataset::FileSystemDatasetWriteOptions. arrow::dataset::FileSystemDatasetWriteOptions provides control over the written dataset, including options like the output directory, file naming scheme, and so on.

Write example:

747/// \brief An example showing a write node
748/// \param exec_context The execution context to run the plan in
749/// \param file_path The destination to write to
750///
751/// Scan-Filter-Write
752/// This example shows how scan node can be used to load the data
753/// and after processing how it can be written to disk.
754arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
755                                     const std::string& file_path) {
756  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
757                        cp::ExecPlan::Make(&exec_context));
758
759  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
760
761  auto options = std::make_shared<arrow::dataset::ScanOptions>();
762  // empty projection
763  options->projection = cp::project({}, {});
764
765  cp::ExecNode* scan;
766
767  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
768
769  ARROW_ASSIGN_OR_RAISE(scan,
770                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
771
772  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
773
774  std::string root_path = "";
775  std::string uri = "file://" + file_path;
776  std::shared_ptr<arrow::fs::FileSystem> filesystem =
777      arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
778
779  auto base_path = root_path + "/parquet_dataset";
780  // Uncomment the following line, if run repeatedly
781  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
782  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
783
784  // The partition schema determines which fields are part of the partitioning.
785  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
786  // We'll use Hive-style partitioning,
787  // which creates directories with "key=value" pairs.
788
789  auto partitioning =
790      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
791  // We'll write Parquet files.
792  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
793
794  arrow::dataset::FileSystemDatasetWriteOptions write_options;
795  write_options.file_write_options = format->DefaultWriteOptions();
796  write_options.filesystem = filesystem;
797  write_options.base_dir = base_path;
798  write_options.partitioning = partitioning;
799  write_options.basename_template = "part{i}.parquet";
800
801  arrow::dataset::WriteNodeOptions write_node_options{write_options};
802
803  ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
804
805  ARROW_RETURN_NOT_OK(plan->Validate());
806  std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
807  // // // start the ExecPlan
808  ARROW_RETURN_NOT_OK(plan->StartProducing());
809  auto future = plan->finished();
810  ARROW_RETURN_NOT_OK(future.status());
811  future.Wait();
812  return arrow::Status::OK();
813}

union#

union merges multiple data streams with the same schema into one, similar to a SQL UNION ALL clause.

The following example demonstrates how this can be achieved using two data sources.

Union example:

819/// \brief An example showing a union node
820/// \param exec_context The execution context to run the plan in
821///
822/// Source-Union-Sink
823/// This example shows how a union operation can be applied on two
824/// data sources. The output is obtained as a table via the sink
825/// node.
826arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
827  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
828
829  std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
830  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
831
832  cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
833  cp::Declaration lhs{"source",
834                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
835  lhs.label = "lhs";
836  cp::Declaration rhs{"source",
837                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
838  rhs.label = "rhs";
839  union_node.inputs.emplace_back(lhs);
840  union_node.inputs.emplace_back(rhs);
841
842  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
843  ARROW_ASSIGN_OR_RAISE(
844      auto declr, cp::Declaration::Sequence({
845                                                union_node,
846                                                {"sink", cp::SinkNodeOptions{&sink_gen}},
847                                            })
848                      .AddToPlan(plan.get()));
849
850  ARROW_RETURN_NOT_OK(declr->Validate());
851
852  ARROW_RETURN_NOT_OK(plan->Validate());
853  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
854}

hash_join#

hash_join operation provides the relational algebra operation, join using hash-based algorithm. arrow::compute::HashJoinNodeOptions contains the options required in defining a join. The hash_join supports left/right/full semi/anti/outerjoins. Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x” which can be appended as a suffix for column names duplicated in both left and right relations.) can be set via the the join options. Read more on hash-joins.

Hash-Join example:

669/// \brief An example showing a hash join node
670/// \param exec_context The execution context to run the plan in
671///
672/// Source-HashJoin-Sink
673/// This example shows how source node gets the data and how a self-join
674/// is applied on the data. The join options are configurable. The output
675/// is obtained as a table via the sink node.
676arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
677  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
678  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
679                        cp::ExecPlan::Make(&exec_context));
680
681  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
682
683  cp::ExecNode* left_source;
684  cp::ExecNode* right_source;
685  for (auto source : {&left_source, &right_source}) {
686    ARROW_ASSIGN_OR_RAISE(*source,
687                          MakeExecNode("source", plan.get(), {},
688                                       cp::SourceNodeOptions{input.schema, input.gen()}));
689  }
690
691  cp::HashJoinNodeOptions join_opts{
692      cp::JoinType::INNER,
693      /*left_keys=*/{"str"},
694      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
695
696  ARROW_ASSIGN_OR_RAISE(
697      auto hashjoin,
698      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
699
700  ARROW_RETURN_NOT_OK(
701      cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
702  // expected columns i32, str, l_str, r_str
703  auto schema = arrow::schema(
704      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
705       arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
706
707  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
708}

Summary#

There are examples of these nodes which can be found in cpp/examples/arrow/execution_plan_documentation_examples.cc in the Arrow source.

Complete Example:

 19#include <arrow/array.h>
 20#include <arrow/builder.h>
 21
 22#include <arrow/compute/api.h>
 23#include <arrow/compute/api_vector.h>
 24#include <arrow/compute/cast.h>
 25#include <arrow/compute/exec/exec_plan.h>
 26
 27#include <arrow/csv/api.h>
 28
 29#include <arrow/dataset/dataset.h>
 30#include <arrow/dataset/file_base.h>
 31#include <arrow/dataset/file_parquet.h>
 32#include <arrow/dataset/plan.h>
 33#include <arrow/dataset/scanner.h>
 34
 35#include <arrow/io/interfaces.h>
 36#include <arrow/io/memory.h>
 37
 38#include <arrow/result.h>
 39#include <arrow/status.h>
 40#include <arrow/table.h>
 41
 42#include <arrow/ipc/api.h>
 43
 44#include <arrow/util/future.h>
 45#include <arrow/util/range.h>
 46#include <arrow/util/thread_pool.h>
 47#include <arrow/util/vector.h>
 48
 49#include <iostream>
 50#include <memory>
 51#include <utility>
 52
 53// Demonstrate various operators in Arrow Streaming Execution Engine
 54
 55namespace cp = ::arrow::compute;
 56
 57constexpr char kSep[] = "******";
 58
 59void PrintBlock(const std::string& msg) {
 60  std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
 61}
 62
 63template <typename TYPE,
 64          typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
 65                                             arrow::is_boolean_type<TYPE>::value |
 66                                             arrow::is_temporal_type<TYPE>::value>::type>
 67arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
 68    const std::vector<typename TYPE::c_type>& values) {
 69  using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
 70  using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
 71  ARROW_BUILDER_TYPE builder;
 72  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 73  std::shared_ptr<ARROW_ARRAY_TYPE> array;
 74  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 75  ARROW_RETURN_NOT_OK(builder.Finish(&array));
 76  return array;
 77}
 78
 79template <class TYPE>
 80arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
 81    const std::vector<std::string>& values) {
 82  using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
 83  using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
 84  ARROW_BUILDER_TYPE builder;
 85  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 86  std::shared_ptr<ARROW_ARRAY_TYPE> array;
 87  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 88  ARROW_RETURN_NOT_OK(builder.Finish(&array));
 89  return array;
 90}
 91
 92arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
 93    const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
 94  std::shared_ptr<arrow::RecordBatch> record_batch;
 95  ARROW_ASSIGN_OR_RAISE(auto struct_result,
 96                        arrow::StructArray::Make(array_vector, field_vector));
 97  return record_batch->FromStructArray(struct_result);
 98}
 99
100/// \brief Create a sample table
101/// The table's contents will be:
102/// a,b
103/// 1,null
104/// 2,true
105/// null,true
106/// 3,false
107/// null,true
108/// 4,false
109/// 5,null
110/// 6,false
111/// 7,false
112/// 8,true
113/// \return The created table
114
115arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
116  auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
117  ARROW_ASSIGN_OR_RAISE(auto int64_array,
118                        GetArrayDataSample<arrow::Int64Type>(
119                            {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
120
121  arrow::BooleanBuilder boolean_builder;
122  std::shared_ptr<arrow::BooleanArray> bool_array;
123
124  std::vector<uint8_t> bool_values = {false, true,  true,  false, true,
125                                      false, false, false, false, true};
126  std::vector<bool> is_valid = {false, true,  true, true, true,
127                                true,  false, true, true, true};
128
129  ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
130
131  ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
132
133  ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
134
135  auto record_batch =
136      arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
137                                              arrow::field("b", arrow::boolean())}),
138                               10, {int64_array, bool_array});
139  ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
140  return table;
141}
142
143/// \brief Create a sample dataset
144/// \return An in-memory dataset based on GetTable()
145arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
146  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
147  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
148  return ds;
149}
150
151arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
152    const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
153  std::shared_ptr<arrow::RecordBatch> record_batch;
154  ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
155  cp::ExecBatch batch{*res_batch};
156  return batch;
157}
158
159// (Doc section: BatchesWithSchema Definition)
160struct BatchesWithSchema {
161  std::vector<cp::ExecBatch> batches;
162  std::shared_ptr<arrow::Schema> schema;
163  // This method uses internal arrow utilities to
164  // convert a vector of record batches to an AsyncGenerator of optional batches
165  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
166    auto opt_batches = ::arrow::internal::MapVector(
167        [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
168        batches);
169    arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
170    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
171    return gen;
172  }
173};
174// (Doc section: BatchesWithSchema Definition)
175
176// (Doc section: MakeBasicBatches Definition)
177arrow::Result<BatchesWithSchema> MakeBasicBatches() {
178  BatchesWithSchema out;
179  auto field_vector = {arrow::field("a", arrow::int32()),
180                       arrow::field("b", arrow::boolean())};
181  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
182  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
183  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
184
185  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
186                        GetArrayDataSample<arrow::BooleanType>({false, true}));
187  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
188                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
189  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
190                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
191
192  ARROW_ASSIGN_OR_RAISE(auto b1,
193                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
194  ARROW_ASSIGN_OR_RAISE(auto b2,
195                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
196  ARROW_ASSIGN_OR_RAISE(auto b3,
197                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
198
199  out.batches = {b1, b2, b3};
200  out.schema = arrow::schema(field_vector);
201  return out;
202}
203// (Doc section: MakeBasicBatches Definition)
204
205arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
206  BatchesWithSchema out;
207  auto field = arrow::field("a", arrow::int32());
208  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
209  ARROW_ASSIGN_OR_RAISE(auto b2_int,
210                        GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
211  ARROW_ASSIGN_OR_RAISE(auto b3_int,
212                        GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
213  ARROW_ASSIGN_OR_RAISE(auto b4_int,
214                        GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
215  ARROW_ASSIGN_OR_RAISE(auto b5_int,
216                        GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
217  ARROW_ASSIGN_OR_RAISE(auto b6_int,
218                        GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
219  ARROW_ASSIGN_OR_RAISE(auto b7_int,
220                        GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
221  ARROW_ASSIGN_OR_RAISE(auto b8_int,
222                        GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
223
224  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
225  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
226  ARROW_ASSIGN_OR_RAISE(auto b3,
227                        GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
228  ARROW_ASSIGN_OR_RAISE(auto b4,
229                        GetExecBatchFromVectors({field, field, field, field},
230                                                {b4_int, b5_int, b6_int, b7_int}));
231  out.batches = {b1, b2, b3, b4};
232  out.schema = arrow::schema({field});
233  return out;
234}
235
236arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
237  BatchesWithSchema out;
238  auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
239  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
240  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
241  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
242  ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
243                                         {"alpha", "beta", "alpha"}));
244  ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
245                                         {"alpha", "gamma", "alpha"}));
246  ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
247                                         {"gamma", "beta", "alpha"}));
248  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
249  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
250  ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
251  out.batches = {b1, b2, b3};
252
253  size_t batch_count = out.batches.size();
254  for (int repeat = 1; repeat < multiplicity; ++repeat) {
255    for (size_t i = 0; i < batch_count; ++i) {
256      out.batches.push_back(out.batches[i]);
257    }
258  }
259
260  out.schema = arrow::schema(fields);
261  return out;
262}
263
264arrow::Status ExecutePlanAndCollectAsTable(
265    cp::ExecContext& exec_context, std::shared_ptr<cp::ExecPlan> plan,
266    std::shared_ptr<arrow::Schema> schema,
267    arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen) {
268  // translate sink_gen (async) to sink_reader (sync)
269  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
270      cp::MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
271
272  // validate the ExecPlan
273  ARROW_RETURN_NOT_OK(plan->Validate());
274  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
275  // start the ExecPlan
276  ARROW_RETURN_NOT_OK(plan->StartProducing());
277
278  // collect sink_reader into a Table
279  std::shared_ptr<arrow::Table> response_table;
280
281  ARROW_ASSIGN_OR_RAISE(response_table,
282                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
283
284  std::cout << "Results : " << response_table->ToString() << std::endl;
285
286  // stop producing
287  plan->StopProducing();
288  // plan mark finished
289  auto future = plan->finished();
290  return future.status();
291}
292
293// (Doc section: Scan Example)
294
295/// \brief An example demonstrating a scan and sink node
296/// \param exec_context The execution context to run the plan in
297///
298/// Scan-Sink
299/// This example shows how scan operation can be applied on a dataset.
300/// There are operations that can be applied on the scan (project, filter)
301/// and the input data can be processed. The output is obtained as a table
302/// via the sink node.
303arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
304  // Execution plan created
305  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
306                        cp::ExecPlan::Make(&exec_context));
307
308  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
309
310  auto options = std::make_shared<arrow::dataset::ScanOptions>();
311  options->projection = cp::project({}, {});  // create empty projection
312
313  // construct the scan node
314  cp::ExecNode* scan;
315  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
316
317  ARROW_ASSIGN_OR_RAISE(scan,
318                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
319
320  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
321
322  ARROW_RETURN_NOT_OK(
323      cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
324
325  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
326}
327// (Doc section: Scan Example)
328
329// (Doc section: Source Example)
330
331/// \brief An example demonstrating a source and sink node
332/// \param exec_context The execution context to run the plan in
333///
334/// Source-Sink Example
335/// This example shows how a source and sink can be used
336/// in an execution plan. This includes source node receiving data
337/// and the sink node emits the data as an output represented in
338/// a table.
339arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
340  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
341                        cp::ExecPlan::Make(&exec_context));
342
343  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
344
345  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
346
347  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
348
349  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
350                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
351
352  ARROW_RETURN_NOT_OK(
353      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
354
355  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
356}
357// (Doc section: Source Example)
358
359// (Doc section: Table Source Example)
360
361/// \brief An example showing a table source node
362/// \param exec_context The execution context to run the plan in
363///
364/// TableSource-Sink Example
365/// This example shows how a table_source and sink can be used
366/// in an execution plan. This includes a table source node
367/// receiving data from a table and the sink node emits
368/// the data to a generator which we collect into a table.
369arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
370  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
371                        cp::ExecPlan::Make(&exec_context));
372
373  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
374
375  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
376  int max_batch_size = 2;
377  auto table_source_options = cp::TableSourceNodeOptions{table, max_batch_size};
378
379  ARROW_ASSIGN_OR_RAISE(
380      cp::ExecNode * source,
381      cp::MakeExecNode("table_source", plan.get(), {}, table_source_options));
382
383  ARROW_RETURN_NOT_OK(
384      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
385
386  return ExecutePlanAndCollectAsTable(exec_context, plan, table->schema(), sink_gen);
387}
388// (Doc section: Table Source Example)
389
390// (Doc section: Filter Example)
391
392/// \brief An example showing a filter node
393/// \param exec_context The execution context to run the plan in
394///
395/// Source-Filter-Sink
396/// This example shows how a filter can be used in an execution plan,
397/// along with the source and sink operations. The output from the
398/// exeuction plan is obtained as a table via the sink node.
399arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
400  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
401                        cp::ExecPlan::Make(&exec_context));
402
403  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
404
405  auto options = std::make_shared<arrow::dataset::ScanOptions>();
406  // specify the filter.  This filter removes all rows where the
407  // value of the "a" column is greater than 3.
408  cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
409  // set filter for scanner : on-disk / push-down filtering.
410  // This step can be skipped if you are not reading from disk.
411  options->filter = filter_opt;
412  // empty projection
413  options->projection = cp::project({}, {});
414
415  // construct the scan node
416  std::cout << "Initialized Scanning Options" << std::endl;
417
418  cp::ExecNode* scan;
419
420  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
421  std::cout << "Scan node options created" << std::endl;
422
423  ARROW_ASSIGN_OR_RAISE(scan,
424                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
425
426  // pipe the scan node into the filter node
427  // Need to set the filter in scan node options and filter node options.
428  // At scan node it is used for on-disk / push-down filtering.
429  // At filter node it is used for in-memory filtering.
430  cp::ExecNode* filter;
431  ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
432                                                 cp::FilterNodeOptions{filter_opt}));
433
434  // finally, pipe the filter node into a sink node
435  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
436  ARROW_RETURN_NOT_OK(
437      cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
438
439  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
440}
441
442// (Doc section: Filter Example)
443
444// (Doc section: Project Example)
445
446/// \brief An example showing a project node
447/// \param exec_context The execution context to run the plan in
448///
449/// Scan-Project-Sink
450/// This example shows how Scan operation can be used to load the data
451/// into the execution plan, how project operation can be applied on the
452/// data stream and how the output is obtained as a table via the sink node.
453arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
454  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
455                        cp::ExecPlan::Make(&exec_context));
456
457  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
458
459  auto options = std::make_shared<arrow::dataset::ScanOptions>();
460  // projection
461  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
462  options->projection = cp::project({}, {});
463
464  cp::ExecNode* scan;
465
466  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
467
468  ARROW_ASSIGN_OR_RAISE(scan,
469                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
470
471  cp::ExecNode* project;
472  ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
473                                                  cp::ProjectNodeOptions{{a_times_2}}));
474  // schema after projection => multiply(a, 2): int64
475  std::cout << "Schema after projection : \n"
476            << project->output_schema()->ToString() << std::endl;
477
478  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
479  ARROW_RETURN_NOT_OK(
480      cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
481  auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
482
483  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
484}
485
486// (Doc section: Project Example)
487
488// (Doc section: Scalar Aggregate Example)
489
490/// \brief An example showing an aggregation node to aggregate an entire table
491/// \param exec_context The execution context to run the plan in
492///
493/// Source-Aggregation-Sink
494/// This example shows how an aggregation operation can be applied on a
495/// execution plan resulting a scalar output. The source node loads the
496/// data and the aggregation (counting unique types in column 'a')
497/// is applied on this data. The output is obtained from the sink node as a table.
498arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
499  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
500                        cp::ExecPlan::Make(&exec_context));
501
502  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
503
504  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
505
506  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
507
508  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
509                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
510  auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
511                                                    /*targets=*/{"a"},
512                                                    /*names=*/{"sum(a)"}};
513  ARROW_ASSIGN_OR_RAISE(
514      cp::ExecNode * aggregate,
515      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
516
517  ARROW_RETURN_NOT_OK(
518      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
519  auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
520
521  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
522}
523// (Doc section: Scalar Aggregate Example)
524
525// (Doc section: Group Aggregate Example)
526
527/// \brief An example showing an aggregation node to perform a group-by operation
528/// \param exec_context The execution context to run the plan in
529///
530/// Source-Aggregation-Sink
531/// This example shows how an aggregation operation can be applied on a
532/// execution plan resulting a grouped output. The source node loads the
533/// data and the aggregation (counting unique types in column 'a') is
534/// applied on this data. The output is obtained from the sink node as a table.
535arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
536  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
537                        cp::ExecPlan::Make(&exec_context));
538
539  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
540
541  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
542
543  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
544
545  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
546                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
547  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
548  auto aggregate_options =
549      cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
550                               /*targets=*/{"a"},
551                               /*names=*/{"count(a)"},
552                               /*keys=*/{"b"}};
553  ARROW_ASSIGN_OR_RAISE(
554      cp::ExecNode * aggregate,
555      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
556
557  ARROW_RETURN_NOT_OK(
558      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
559  auto schema = arrow::schema({
560      arrow::field("count(a)", arrow::int32()),
561      arrow::field("b", arrow::boolean()),
562  });
563
564  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
565}
566// (Doc section: Group Aggregate Example)
567
568// (Doc section: ConsumingSink Example)
569
570/// \brief An example showing a consuming sink node
571/// \param exec_context The execution context to run the plan in
572///
573/// Source-Consuming-Sink
574/// This example shows how the data can be consumed within the execution plan
575/// by using a ConsumingSink node. There is no data output from this execution plan.
576arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
577  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
578                        cp::ExecPlan::Make(&exec_context));
579
580  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
581
582  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
583
584  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
585                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
586
587  std::atomic<uint32_t> batches_seen{0};
588  arrow::Future<> finish = arrow::Future<>::Make();
589  struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
590    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
591        : batches_seen(batches_seen), finish(std::move(finish)) {}
592
593    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
594                       cp::BackpressureControl* backpressure_control) override {
595      return arrow::Status::OK();
596    }
597
598    arrow::Status Consume(cp::ExecBatch batch) override {
599      (*batches_seen)++;
600      return arrow::Status::OK();
601    }
602
603    arrow::Future<> Finish() override { return finish; }
604
605    std::atomic<uint32_t>* batches_seen;
606    arrow::Future<> finish;
607  };
608  std::shared_ptr<CustomSinkNodeConsumer> consumer =
609      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
610
611  cp::ExecNode* consuming_sink;
612
613  ARROW_ASSIGN_OR_RAISE(consuming_sink,
614                        MakeExecNode("consuming_sink", plan.get(), {source},
615                                     cp::ConsumingSinkNodeOptions(consumer)));
616
617  ARROW_RETURN_NOT_OK(consuming_sink->Validate());
618
619  ARROW_RETURN_NOT_OK(plan->Validate());
620  std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
621  // plan start producing
622  ARROW_RETURN_NOT_OK(plan->StartProducing());
623  // Source should finish fairly quickly
624  ARROW_RETURN_NOT_OK(source->finished().status());
625  std::cout << "Source Finished!" << std::endl;
626  // Mark consumption complete, plan should finish
627  finish.MarkFinished(arrow::Status::OK());
628  ARROW_RETURN_NOT_OK(plan->finished().status());
629  return arrow::Status::OK();
630}
631// (Doc section: ConsumingSink Example)
632
633// (Doc section: OrderBySink Example)
634
635/// \brief An example showing an order-by node
636/// \param exec_context The execution context to run the plan in
637///
638/// Source-OrderBy-Sink
639/// In this example, the data enters through the source node
640/// and the data is ordered in the sink node. The order can be
641/// ASCENDING or DESCENDING and it is configurable. The output
642/// is obtained as a table from the sink node.
643arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
644  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
645                        cp::ExecPlan::Make(&exec_context));
646
647  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
648
649  std::cout << "basic data created" << std::endl;
650
651  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
652
653  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
654  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
655                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
656
657  ARROW_RETURN_NOT_OK(cp::MakeExecNode(
658      "order_by_sink", plan.get(), {source},
659      cp::OrderBySinkNodeOptions{
660          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
661
662  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
663}
664
665// (Doc section: OrderBySink Example)
666
667// (Doc section: HashJoin Example)
668
669/// \brief An example showing a hash join node
670/// \param exec_context The execution context to run the plan in
671///
672/// Source-HashJoin-Sink
673/// This example shows how source node gets the data and how a self-join
674/// is applied on the data. The join options are configurable. The output
675/// is obtained as a table via the sink node.
676arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
677  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
678  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
679                        cp::ExecPlan::Make(&exec_context));
680
681  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
682
683  cp::ExecNode* left_source;
684  cp::ExecNode* right_source;
685  for (auto source : {&left_source, &right_source}) {
686    ARROW_ASSIGN_OR_RAISE(*source,
687                          MakeExecNode("source", plan.get(), {},
688                                       cp::SourceNodeOptions{input.schema, input.gen()}));
689  }
690
691  cp::HashJoinNodeOptions join_opts{
692      cp::JoinType::INNER,
693      /*left_keys=*/{"str"},
694      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
695
696  ARROW_ASSIGN_OR_RAISE(
697      auto hashjoin,
698      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
699
700  ARROW_RETURN_NOT_OK(
701      cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
702  // expected columns i32, str, l_str, r_str
703  auto schema = arrow::schema(
704      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
705       arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
706
707  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
708}
709
710// (Doc section: HashJoin Example)
711
712// (Doc section: KSelect Example)
713
714/// \brief An example showing a select-k node
715/// \param exec_context The execution context to run the plan in
716///
717/// Source-KSelect
718/// This example shows how K number of elements can be selected
719/// either from the top or bottom. The output node is a modified
720/// sink node where output can be obtained as a table.
721arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
722  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
723  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
724                        cp::ExecPlan::Make(&exec_context));
725  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
726
727  ARROW_ASSIGN_OR_RAISE(
728      cp::ExecNode * source,
729      cp::MakeExecNode("source", plan.get(), {},
730                       cp::SourceNodeOptions{input.schema, input.gen()}));
731
732  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
733
734  ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
735                                       cp::SelectKSinkNodeOptions{options, &sink_gen}));
736
737  auto schema = arrow::schema(
738      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
739
740  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
741}
742
743// (Doc section: KSelect Example)
744
745// (Doc section: Write Example)
746
747/// \brief An example showing a write node
748/// \param exec_context The execution context to run the plan in
749/// \param file_path The destination to write to
750///
751/// Scan-Filter-Write
752/// This example shows how scan node can be used to load the data
753/// and after processing how it can be written to disk.
754arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
755                                     const std::string& file_path) {
756  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
757                        cp::ExecPlan::Make(&exec_context));
758
759  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
760
761  auto options = std::make_shared<arrow::dataset::ScanOptions>();
762  // empty projection
763  options->projection = cp::project({}, {});
764
765  cp::ExecNode* scan;
766
767  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
768
769  ARROW_ASSIGN_OR_RAISE(scan,
770                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
771
772  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
773
774  std::string root_path = "";
775  std::string uri = "file://" + file_path;
776  std::shared_ptr<arrow::fs::FileSystem> filesystem =
777      arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
778
779  auto base_path = root_path + "/parquet_dataset";
780  // Uncomment the following line, if run repeatedly
781  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
782  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
783
784  // The partition schema determines which fields are part of the partitioning.
785  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
786  // We'll use Hive-style partitioning,
787  // which creates directories with "key=value" pairs.
788
789  auto partitioning =
790      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
791  // We'll write Parquet files.
792  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
793
794  arrow::dataset::FileSystemDatasetWriteOptions write_options;
795  write_options.file_write_options = format->DefaultWriteOptions();
796  write_options.filesystem = filesystem;
797  write_options.base_dir = base_path;
798  write_options.partitioning = partitioning;
799  write_options.basename_template = "part{i}.parquet";
800
801  arrow::dataset::WriteNodeOptions write_node_options{write_options};
802
803  ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
804
805  ARROW_RETURN_NOT_OK(plan->Validate());
806  std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
807  // // // start the ExecPlan
808  ARROW_RETURN_NOT_OK(plan->StartProducing());
809  auto future = plan->finished();
810  ARROW_RETURN_NOT_OK(future.status());
811  future.Wait();
812  return arrow::Status::OK();
813}
814
815// (Doc section: Write Example)
816
817// (Doc section: Union Example)
818
819/// \brief An example showing a union node
820/// \param exec_context The execution context to run the plan in
821///
822/// Source-Union-Sink
823/// This example shows how a union operation can be applied on two
824/// data sources. The output is obtained as a table via the sink
825/// node.
826arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
827  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
828
829  std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
830  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
831
832  cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
833  cp::Declaration lhs{"source",
834                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
835  lhs.label = "lhs";
836  cp::Declaration rhs{"source",
837                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
838  rhs.label = "rhs";
839  union_node.inputs.emplace_back(lhs);
840  union_node.inputs.emplace_back(rhs);
841
842  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
843  ARROW_ASSIGN_OR_RAISE(
844      auto declr, cp::Declaration::Sequence({
845                                                union_node,
846                                                {"sink", cp::SinkNodeOptions{&sink_gen}},
847                                            })
848                      .AddToPlan(plan.get()));
849
850  ARROW_RETURN_NOT_OK(declr->Validate());
851
852  ARROW_RETURN_NOT_OK(plan->Validate());
853  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
854}
855
856// (Doc section: Union Example)
857
858// (Doc section: Table Sink Example)
859
860/// \brief An example showing a table sink node
861/// \param exec_context The execution context to run the plan in
862///
863/// TableSink Example
864/// This example shows how a table_sink can be used
865/// in an execution plan. This includes a source node
866/// receiving data as batches and the table sink node
867/// which emits the output as a table.
868arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
869  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
870                        cp::ExecPlan::Make(&exec_context));
871
872  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
873
874  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
875
876  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
877                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
878
879  std::shared_ptr<arrow::Table> output_table;
880  auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
881
882  ARROW_RETURN_NOT_OK(
883      cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
884  // validate the ExecPlan
885  ARROW_RETURN_NOT_OK(plan->Validate());
886  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
887  // start the ExecPlan
888  ARROW_RETURN_NOT_OK(plan->StartProducing());
889
890  // Wait for the plan to finish
891  auto finished = plan->finished();
892  RETURN_NOT_OK(finished.status());
893  std::cout << "Results : " << output_table->ToString() << std::endl;
894  return arrow::Status::OK();
895}
896// (Doc section: Table Sink Example)
897
898enum ExampleMode {
899  SOURCE_SINK = 0,
900  TABLE_SOURCE_SINK = 1,
901  SCAN = 2,
902  FILTER = 3,
903  PROJECT = 4,
904  SCALAR_AGGREGATION = 5,
905  GROUP_AGGREGATION = 6,
906  CONSUMING_SINK = 7,
907  ORDER_BY_SINK = 8,
908  HASHJOIN = 9,
909  KSELECT = 10,
910  WRITE = 11,
911  UNION = 12,
912  TABLE_SOURCE_TABLE_SINK = 13
913};
914
915int main(int argc, char** argv) {
916  if (argc < 2) {
917    // Fake success for CI purposes.
918    return EXIT_SUCCESS;
919  }
920
921  std::string base_save_path = argv[1];
922  int mode = std::atoi(argv[2]);
923  arrow::Status status;
924  // ensure arrow::dataset node factories are in the registry
925  arrow::dataset::internal::Initialize();
926  // execution context
927  cp::ExecContext exec_context;
928  switch (mode) {
929    case SOURCE_SINK:
930      PrintBlock("Source Sink Example");
931      status = SourceSinkExample(exec_context);
932      break;
933    case TABLE_SOURCE_SINK:
934      PrintBlock("Table Source Sink Example");
935      status = TableSourceSinkExample(exec_context);
936      break;
937    case SCAN:
938      PrintBlock("Scan Example");
939      status = ScanSinkExample(exec_context);
940      break;
941    case FILTER:
942      PrintBlock("Filter Example");
943      status = ScanFilterSinkExample(exec_context);
944      break;
945    case PROJECT:
946      PrintBlock("Project Example");
947      status = ScanProjectSinkExample(exec_context);
948      break;
949    case GROUP_AGGREGATION:
950      PrintBlock("Aggregate Example");
951      status = SourceGroupAggregateSinkExample(exec_context);
952      break;
953    case SCALAR_AGGREGATION:
954      PrintBlock("Aggregate Example");
955      status = SourceScalarAggregateSinkExample(exec_context);
956      break;
957    case CONSUMING_SINK:
958      PrintBlock("Consuming-Sink Example");
959      status = SourceConsumingSinkExample(exec_context);
960      break;
961    case ORDER_BY_SINK:
962      PrintBlock("OrderBy Example");
963      status = SourceOrderBySinkExample(exec_context);
964      break;
965    case HASHJOIN:
966      PrintBlock("HashJoin Example");
967      status = SourceHashJoinSinkExample(exec_context);
968      break;
969    case KSELECT:
970      PrintBlock("KSelect Example");
971      status = SourceKSelectExample(exec_context);
972      break;
973    case WRITE:
974      PrintBlock("Write Example");
975      status = ScanFilterWriteExample(exec_context, base_save_path);
976      break;
977    case UNION:
978      PrintBlock("Union Example");
979      status = SourceUnionSinkExample(exec_context);
980      break;
981    case TABLE_SOURCE_TABLE_SINK:
982      PrintBlock("TableSink Example");
983      status = TableSinkExample(exec_context);
984      break;
985    default:
986      break;
987  }
988
989  if (status.ok()) {
990    return EXIT_SUCCESS;
991  } else {
992    std::cout << "Error occurred: " << status.message() << std::endl;
993    return EXIT_FAILURE;
994  }
995}