Streaming execution engine

Warning

The streaming execution engine is experimental, and a stable API is not yet guaranteed.

Motivation

For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, Arrow also provides a streaming query engine with which computations can be formulated and executed.

An example graph of a streaming execution workflow.

ExecNode is provided to reify the graph of operations in a query. Batches of data (ExecBatch) flow along edges of the graph from node to node. Structuring the API around streams of batches allows the working set for each node to be tuned for optimal performance independent of any other nodes in the graph. Each ExecNode processes batches as they are pushed to it along an edge of the graph by upstream nodes (its inputs), and pushes batches along an edge of the graph to downstream nodes (its outputs) as they are finalized.

Overview

ExecNode

Each node in the graph is an implementation of the ExecNode interface.

ExecPlan

A set of ExecNode is contained and (to an extent) coordinated by an ExecPlan.

ExecFactoryRegistry

Instances of ExecNode are constructed by factory functions held in a ExecFactoryRegistry.

ExecNodeOptions

Heterogenous parameters for factories of ExecNode are bundled in an ExecNodeOptions.

Declaration

dplyr-inspired helper for efficient construction of an ExecPlan.

ExecBatch

A lightweight container for a single chunk of data in the Arrow format. In contrast to RecordBatch, ExecBatch is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by a Scalar instead of an Array. In addition, ExecBatch may carry execution-relevant properties including a guaranteed-true-filter for Expression simplification.

An example ExecNode implementation which simply passes all input batches through unchanged:

class PassthruNode : public ExecNode {
 public:
  // InputReceived is the main entry point for ExecNodes. It is invoked
  // by an input of this node to push a batch here for processing.
  void InputReceived(ExecNode* input, ExecBatch batch) override {
    // Since this is a passthru node we simply push the batch to our
    // only output here.
    outputs_[0]->InputReceived(this, batch);
  }

  // ErrorReceived is called by an input of this node to report an error.
  // ExecNodes should always forward errors to their outputs unless they
  // are able to fully handle the error (this is rare).
  void ErrorReceived(ExecNode* input, Status error) override {
    outputs_[0]->ErrorReceived(this, error);
  }

  // InputFinished is used to signal how many batches will ultimately arrive.
  // It may be called with any ordering relative to InputReceived/ErrorReceived.
  void InputFinished(ExecNode* input, int total_batches) override {
    outputs_[0]->InputFinished(this, total_batches);
  }

  // ExecNodes may request that their inputs throttle production of batches
  // until they are ready for more, or stop production if no further batches
  // are required.  These signals should typically be forwarded to the inputs
  // of the ExecNode.
  void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
  void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
  void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }

  // An ExecNode has a single output schema to which all its batches conform.
  using ExecNode::output_schema;

  // ExecNodes carry basic introspection for debugging purposes
  const char* kind_name() const override { return "PassthruNode"; }
  using ExecNode::label;
  using ExecNode::SetLabel;
  using ExecNode::ToString;

  // An ExecNode holds references to its inputs and outputs, so it is possible
  // to walk the graph of execution if necessary.
  using ExecNode::inputs;
  using ExecNode::outputs;

  // StartProducing() and StopProducing() are invoked by an ExecPlan to
  // coordinate the graph-wide execution state.  These do not need to be
  // forwarded to inputs or outputs.
  Status StartProducing() override { return Status::OK(); }
  void StopProducing() override {}
  Future<> finished() override { return inputs_[0]->finished(); }
};

Note that each method which is associated with an edge of the graph must be invoked with an ExecNode* to identify the node which invoked it. For example, in an ExecNode which implements JOIN this tagging might be used to differentiate between batches from the left or right inputs. InputReceived, ErrorReceived, InputFinished may only be invoked by the inputs of a node, while ResumeProducing, PauseProducing, StopProducing may only be invoked by outputs of a node.

ExecPlan contains the associated instances of ExecNode and is used to start and stop execution of all nodes and for querying/awaiting their completion:

// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));

// ... add nodes to your ExecPlan

// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());

SetUserCancellationCallback([plan] {
  // stop all nodes in the graph
  plan->StopProducing();
});

// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();

Constructing ExecPlan objects

Warning

The following will be superceded by construction from Compute IR, see ARROW-14074.

None of the concrete implementations of ExecNode are exposed in headers, so they can’t be constructed directly outside the translation unit where they are defined. Instead, factories to create them are provided in an extensible registry. This structure provides a number of benefits:

  • This enforces consistent construction.

  • It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)

  • This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate libarrow_dataset.so library.

  • Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.

Factories of ExecNode can be retrieved by name from the registry. The default registry is available through arrow::compute::default_exec_factory_registry() and can be queried for the built-in factories:

// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
                      default_exec_factory_registry()->GetFactory("filter"));

// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
    // the ExecPlan which should own this node
    plan.get(),

    // nodes which will send batches to this node (inputs)
    {scan_node},

    // parameters unique to "filter" nodes
    FilterNodeOptions{filter_expression}));

// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
    plan.get(), {scan_node}, FilterNodeOptions{filter_expression});

Factories can also be added to the default registry as long as they are convertible to std::function<Result<ExecNode*>( ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>.

To build an ExecPlan representing a simple pipeline which reads from a RecordBatchReader then filters, projects, and writes to disk:

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
                                      SourceNodeOptions::FromReader(
                                          reader,
                                          GetCpuThreadPool()));

ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
                                      FilterNodeOptions{
                                        greater(field_ref("score"), literal(3))
                                      });

ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
                                       ProjectNodeOptions{
                                         {add(field_ref("score"), literal(1))},
                                         {"score + 1"}
                                       });

arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
             WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});

Declaration is a dplyr-inspired helper which further decreases the boilerplate associated with populating an ExecPlan from C++:

arrow::dataset::internal::Initialize();

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
              {
                  {"source", SourceNodeOptions::FromReader(
                       reader,
                       GetCpuThreadPool())},
                  {"filter", FilterNodeOptions{
                       greater(field_ref("score"), literal(3))}},
                  {"project", ProjectNodeOptions{
                       {add(field_ref("score"), literal(1))},
                       {"score + 1"}}},
                  {"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
              })
              .AddToPlan(plan.get()));

Note that a source node can wrap anything which resembles a stream of batches. For example, PR#11032 adds support for use of a DuckDB query as a source node. Similarly, a sink node can wrap anything which absorbs a stream of batches. In the example above we’re writing completed batches to disk. However we can also collect these in memory into a Table or forward them to a RecordBatchReader as an out-of-graph stream. This flexibility allows an ExecPlan to be used as streaming middleware between any endpoints which support Arrow formatted batches.

An arrow::dataset::Dataset can also be wrapped as a source node which pushes all the dataset’s batches into an ExecPlan. This factory is added to the default registry with the name "scan" by calling arrow::dataset::internal::Initialize():

arrow::dataset::internal::Initialize();

std::shared_ptr<Dataset> dataset = GetDataset();

ASSERT_OK(Declaration::Sequence(
              {
                  {"scan", ScanNodeOptions{dataset,
                     /* push down predicate, projection, ... */}},
                  {"filter", FilterNodeOptions{/* ... */}},
                  // ...
              })
              .AddToPlan(plan.get()));

Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.

Constructing ExecNode using Options

ExecNode is the component we use as a building block containing in-built operations with various functionalities.

This is the list of operations associated with the execution plan:

Operations and Options

Operation

Options

source

arrow::compute::SourceNodeOptions

filter

arrow::compute::FilterNodeOptions

project

arrow::compute::ProjectNodeOptions

aggregate

arrow::compute::AggregateNodeOptions

sink

arrow::compute::SinkNodeOptions

consuming_sink

arrow::compute::ConsumingSinkNodeOptions

order_by_sink

arrow::compute::OrderBySinkNodeOptions

select_k_sink

arrow::compute::SelectKSinkNodeOptions

scan

arrow::dataset::ScanNodeOptions

hash_join

arrow::compute::HashJoinNodeOptions

write

arrow::dataset::WriteNodeOptions

union

N/A

source

A source operation can be considered as an entry point to create a streaming execution plan. arrow::compute::SourceNodeOptions are used to create the source operation. The source operation is the most generic and flexible type of source currently available but it can be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.

The source node requires some kind of function that can be called to poll for more data. This function should take no arguments and should return an arrow::Future<std::shared_ptr<arrow::util::optional<arrow::RecordBatch>>>. This function might be reading a file, iterating through an in memory structure, or receiving data from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator and there are a number of utilities for working with these functions. For this example we use a vector of record batches that we’ve already stored in memory. In addition, the schema of the data must be known up front. Arrow’s streaming execution engine must know the schema of the data at each stage of the execution graph before any processing has begun. This means we must supply the schema for a source node separately from the data itself.

Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :

155 struct BatchesWithSchema {
156   std::vector<cp::ExecBatch> batches;
157   std::shared_ptr<arrow::Schema> schema;
158   // // This method uses internal arrow utilities to
159   // // convert a vector of record batches to an AsyncGenerator of optional batches
160   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
161     auto opt_batches = ::arrow::internal::MapVector(
162         [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
163         batches);
164     arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
165     gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166     return gen;
167   }
168 };

Generating sample batches for computation:

172 arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173   BatchesWithSchema out;
174   auto field_vector = {arrow::field("a", arrow::int32()),
175                        arrow::field("b", arrow::boolean())};
176   ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177   ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178   ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179 
180   ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181                         GetArrayDataSample<arrow::BooleanType>({false, true}));
182   ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183                         GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184   ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185                         GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186 
187   ARROW_ASSIGN_OR_RAISE(auto b1,
188                         GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189   ARROW_ASSIGN_OR_RAISE(auto b2,
190                         GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191   ARROW_ASSIGN_OR_RAISE(auto b3,
192                         GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193 
194   out.batches = {b1, b2, b3};
195   out.schema = arrow::schema(field_vector);
196   return out;
197 }

Example of using source (usage of sink is explained in detail in sink):

326 /**
327  * \brief
328  * Source-Sink Example
329  * This example shows how a source and sink can be used
330  * in an execution plan. This includes source node receiving data
331  * and the sink node emits the data as an output represented in
332  * a table.
333  * \param exec_context : execution context
334  * \return arrow::Status
335  */
336 arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
337   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
338                         cp::ExecPlan::Make(&exec_context));
339 
340   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
341 
342   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
343 
344   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
345 
346   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
347                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
348 
349   ARROW_RETURN_NOT_OK(
350       cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
351 
352   return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
353 }

filter

filter operation, as the name suggests, provides an option to define data filtering criteria. It selects rows matching a given expression. Filters can be written using arrow::compute::Expression. For example, if we wish to keep rows where the value of column b is greater than 3, then we can use the following expression.

Filter example:

357 /**
358  * \brief
359  * Source-Filter-Sink
360  * This example shows how a filter can be used in an execution plan,
361  * along with the source and sink operations. The output from the
362  * exeuction plan is obtained as a table via the sink node.
363  * \param exec_context : execution context
364  * \return arrow::Status
365  */
366 arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
367   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
368                         cp::ExecPlan::Make(&exec_context));
369 
370   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
371 
372   auto options = std::make_shared<arrow::dataset::ScanOptions>();
373   // // specify the filter.  This filter removes all rows where the
374   // value of the "a" column is greater than 3.
375   cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
376   // set filter for scanner : on-disk / push-down filtering.
377   // This step can be skipped if you are not reading from disk.
378   options->filter = filter_opt;
379   // empty projection
380   options->projection = cp::project({}, {});
381 
382   // construct the scan node
383   std::cout << "Initialized Scanning Options" << std::endl;
384 
385   cp::ExecNode* scan;
386 
387   auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
388   std::cout << "Scan node options created" << std::endl;
389 
390   ARROW_ASSIGN_OR_RAISE(scan,
391                         cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
392 
393   // pipe the scan node into a filter node
394   // // Need to set the filter in scan node options and filter node options.
395   // // At scan node it is used for on-disk / push-down filtering.
396   // // At filter node it is used for in-memory filtering.
397   cp::ExecNode* filter;
398   ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
399                                                  cp::FilterNodeOptions{filter_opt}));
400 
401   // // finally, pipe the filter node into a sink node
402   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
403   ARROW_RETURN_NOT_OK(
404       cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
405 
406   return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
407 }

project

project operation rearranges, deletes, transforms, and creates columns. Each output column is computed by evaluating an expression against the source record batch. This is exposed via arrow::compute::ProjectNodeOptions which requires, an arrow::compute::Expression and name for each of the output columns (if names are not provided, the string representations of exprs will be used).

Project example:

412 /**
413  * \brief
414  * Scan-Project-Sink
415  * This example shows how Scan operation can be used to load the data
416  * into the execution plan, how project operation can be applied on the
417  * data stream and how the output is obtained as a table via the sink node.
418  *
419  * \param exec_context : execution context
420  * \return arrow::Status
421  */
422 arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
423   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
424                         cp::ExecPlan::Make(&exec_context));
425 
426   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
427 
428   auto options = std::make_shared<arrow::dataset::ScanOptions>();
429   // projection
430   cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
431   options->projection = cp::project({}, {});
432 
433   cp::ExecNode* scan;
434 
435   auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
436 
437   ARROW_ASSIGN_OR_RAISE(scan,
438                         cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
439 
440   cp::ExecNode* project;
441   ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
442                                                   cp::ProjectNodeOptions{{a_times_2}}));
443   // schema after projection => multiply(a, 2): int64
444   std::cout << "Schema after projection : \n"
445             << project->output_schema()->ToString() << std::endl;
446 
447   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
448   ARROW_RETURN_NOT_OK(
449       cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
450   auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
451 
452   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
453 }

aggregate

The aggregate node computes various types of aggregates over data.

Arrow supports two types of aggregates: “scalar” aggregates, and “hash” aggregates. Scalar aggregates reduce an array or scalar input to a single scalar output (e.g. computing the mean of a column). Hash aggregates act like GROUP BY in SQL and first partition data based on one or more key columns, then reduce the data in each partition. The aggregate node supports both types of computation, and can compute any number of aggregations at once.

arrow::compute::AggregateNodeOptions is used to define the aggregation criteria. It takes a list of aggregation functions and their options; a list of target fields to aggregate, one per function; and a list of names for the output fields, one per function. Optionally, it takes a list of columns that are used to partition the data, in the case of a hash aggregation. The aggregation functions can be selected from this list of aggregation functions.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.

Scalar Aggregation example:

458 /**
459  * \brief
460  * Source-Aggregation-Sink
461  * This example shows how an aggregation operation can be applied on a
462  * execution plan resulting a scalar output. The source node loads the
463  * data and the aggregation (counting unique types in column 'a')
464  * is applied on this data. The output is obtained from the sink node as a table.
465  * \param exec_context : execution context
466  * \return arrow::Status
467  */
468 arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
469   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
470                         cp::ExecPlan::Make(&exec_context));
471 
472   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
473 
474   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
475 
476   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
477 
478   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
479                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
480   auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
481                                                     /*targets=*/{"a"},
482                                                     /*names=*/{"sum(a)"}};
483   ARROW_ASSIGN_OR_RAISE(
484       cp::ExecNode * aggregate,
485       cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
486 
487   ARROW_RETURN_NOT_OK(
488       cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
489   auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
490 
491   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
492 }

Group Aggregation example:

496 /**
497  * \brief
498  * Source-Aggregation-Sink
499  * This example shows how an aggregation operation can be applied on a
500  * execution plan resulting a grouped output. The source node loads the
501  * data and the aggregation (counting unique types in column 'a') is
502  * applied on this data. The output is obtained from the sink node as a table.
503  * \param exec_context : execution context
504  * \return arrow::Status
505  */
506 arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
507   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
508                         cp::ExecPlan::Make(&exec_context));
509 
510   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
511 
512   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
513 
514   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
515 
516   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
517                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
518   cp::CountOptions options(cp::CountOptions::ONLY_VALID);
519   auto aggregate_options =
520       cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
521                                /*targets=*/{"a"},
522                                /*names=*/{"count(a)"},
523                                /*keys=*/{"b"}};
524   ARROW_ASSIGN_OR_RAISE(
525       cp::ExecNode * aggregate,
526       cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
527 
528   ARROW_RETURN_NOT_OK(
529       cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
530   auto schema = arrow::schema({
531       arrow::field("count(a)", arrow::int32()),
532       arrow::field("b", arrow::boolean()),
533   });
534 
535   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
536 }

sink

sink operation provides output and is the final node of a streaming execution definition. arrow::compute::SinkNodeOptions interface is used to pass the required options. Similar to the source operator the sink operator exposes the output with a function that returns a record batch future each time it is called. It is expected the caller will repeatedly call this function until the generator function is exhausted (returns arrow::util::optional::nullopt). If this function is not called often enough then record batches will accumulate in memory. An execution plan should only have one “terminal” node (one sink node). An ExecPlan can terminate early due to cancellation or an error, before the output is fully consumed. However, the plan can be safely destroyed independently of the sink, which will hold the unconsumed batches by exec_plan->finished().

As a part of the Source Example, the Sink operation is also included;

326 /**
327  * \brief
328  * Source-Sink Example
329  * This example shows how a source and sink can be used
330  * in an execution plan. This includes source node receiving data
331  * and the sink node emits the data as an output represented in
332  * a table.
333  * \param exec_context : execution context
334  * \return arrow::Status
335  */
336 arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
337   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
338                         cp::ExecPlan::Make(&exec_context));
339 
340   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
341 
342   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
343 
344   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
345 
346   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
347                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
348 
349   ARROW_RETURN_NOT_OK(
350       cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
351 
352   return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
353 }

consuming_sink

consuming_sink operator is a sink operation containing consuming operation within the execution plan (i.e. the exec plan should not complete until the consumption has completed). Unlike the sink node this node takes in a callback function that is expected to consume the batch. Once this callback has finished the execution plan will no longer hold any reference to the batch. The consuming function may be called before a previous invocation has completed. If the consuming function does not run quickly enough then many concurrent executions could pile up, blocking the CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks have been completed. Once all batches have been delivered the execution plan will wait for the finish future to complete before marking the execution plan finished. This allows for workflows where the consumption function converts batches into async tasks (this is currently done internally for the dataset write node).

Example:

// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {

    CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
    batches_seen(batches_seen), finish(std::move(finish)) {}
    // Consumption logic can be written here
    arrow::Status Consume(cp::ExecBatch batch) override {
    // data can be consumed in the expected way
    // transfer to another system or just do some work
    // and write to disk
    (*batches_seen)++;
    return arrow::Status::OK();
    }

    arrow::Future<> Finish() override { return finish; }

    std::atomic<uint32_t> *batches_seen;
    arrow::Future<> finish;

};

std::shared_ptr<CustomSinkNodeConsumer> consumer =
        std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

arrow::compute::ExecNode *consuming_sink;

ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
    {source}, cp::ConsumingSinkNodeOptions(consumer)));

Consuming-Sink example:

540 /**
541  * \brief
542  * Source-ConsumingSink
543  * This example shows how the data can be consumed within the execution plan
544  * by using a ConsumingSink node. There is no data output from this execution plan.
545  * \param exec_context : execution context
546  * \return arrow::Status
547  */
548 arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
549   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
550                         cp::ExecPlan::Make(&exec_context));
551 
552   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
553 
554   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
555 
556   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
557                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
558 
559   std::atomic<uint32_t> batches_seen{0};
560   arrow::Future<> finish = arrow::Future<>::Make();
561   struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
562     CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
563         : batches_seen(batches_seen), finish(std::move(finish)) {}
564 
565     arrow::Status Consume(cp::ExecBatch batch) override {
566       (*batches_seen)++;
567       return arrow::Status::OK();
568     }
569 
570     arrow::Future<> Finish() override { return finish; }
571 
572     std::atomic<uint32_t>* batches_seen;
573     arrow::Future<> finish;
574   };
575   std::shared_ptr<CustomSinkNodeConsumer> consumer =
576       std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
577 
578   cp::ExecNode* consuming_sink;
579 
580   ARROW_ASSIGN_OR_RAISE(consuming_sink,
581                         MakeExecNode("consuming_sink", plan.get(), {source},
582                                      cp::ConsumingSinkNodeOptions(consumer)));
583 
584   ARROW_RETURN_NOT_OK(consuming_sink->Validate());
585 
586   ARROW_RETURN_NOT_OK(plan->Validate());
587   std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
588   // plan start producing
589   ARROW_RETURN_NOT_OK(plan->StartProducing());
590   // Source should finish fairly quickly
591   ARROW_RETURN_NOT_OK(source->finished().status());
592   std::cout << "Source Finished!" << std::endl;
593   // Mark consumption complete, plan should finish
594   finish.MarkFinished(arrow::Status::OK());
595   ARROW_RETURN_NOT_OK(plan->finished().status());
596   return arrow::Status::OK();
597 }

order_by_sink

order_by_sink operation is an extension to the sink operation. This operation provides the ability to guarantee the ordering of the stream by providing the arrow::compute::OrderBySinkNodeOptions. Here the arrow::compute::SortOptions are provided to define which columns are used for sorting and whether to sort by ascending or descending values.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

Order-By-Sink example:

601 /**
602  * \brief
603  * Source-OrderBySink
604  * In this example, the data enters through the source node
605  * and the data is ordered in the sink node. The order can be
606  * ASCENDING or DESCENDING and it is configurable. The output
607  * is obtained as a table from the sink node.
608  * \param exec_context : execution context
609  * \return arrow::Status
610  */
611 arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
612   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
613                         cp::ExecPlan::Make(&exec_context));
614 
615   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
616 
617   std::cout << "basic data created" << std::endl;
618 
619   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
620 
621   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
622   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
623                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
624 
625   ARROW_RETURN_NOT_OK(cp::MakeExecNode(
626       "order_by_sink", plan.get(), {source},
627       cp::OrderBySinkNodeOptions{
628           cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
629 
630   return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
631 }

select_k_sink

select_k_sink option enables selecting the top/bottom K elements, similar to a SQL ORDER BY ... LIMIT K clause. arrow::compute::SelectKOptions which is a defined by using OrderBySinkNode definition. This option returns a sink node that receives inputs and then compute top_k/bottom_k.

Note

This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

SelectK example:

683 /**
684  * \brief
685  * Source-KSelect
686  * This example shows how K number of elements can be selected
687  * either from the top or bottom. The output node is a modified
688  * sink node where output can be obtained as a table.
689  * \param exec_context : execution context
690  * \return arrow::Status
691  */
692 arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
693   ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
694   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
695                         cp::ExecPlan::Make(&exec_context));
696   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
697 
698   ARROW_ASSIGN_OR_RAISE(
699       cp::ExecNode * source,
700       cp::MakeExecNode("source", plan.get(), {},
701                        cp::SourceNodeOptions{input.schema, input.gen()}));
702 
703   cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
704 
705   ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
706                                        cp::SelectKSinkNodeOptions{options, &sink_gen}));
707 
708   auto schema = arrow::schema(
709       {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
710 
711   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
712 }

scan

scan is an operation used to load and process datasets. It should be preferred over the more generic source node when your input is a dataset. The behavior is defined using arrow::dataset::ScanNodeOptions. More information on datasets and the various scan options can be found in Tabular Datasets.

This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.

Scan example:

289 /**
290  * \brief
291  * Scan-Sink
292  * This example shows how scan operation can be applied on a dataset.
293  * There are operations that can be applied on the scan (project, filter)
294  * and the input data can be processed. THe output is obtained as a table
295  * via the sink node.
296  * \param exec_context : execution context
297  * \return arrow::Status
298  */
299 arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
300   // Execution plan created
301   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
302                         cp::ExecPlan::Make(&exec_context));
303 
304   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
305 
306   auto options = std::make_shared<arrow::dataset::ScanOptions>();
307   options->projection = cp::project({}, {});  // create empty projection
308 
309   // construct the scan node
310   cp::ExecNode* scan;
311   auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
312 
313   ARROW_ASSIGN_OR_RAISE(scan,
314                         cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
315 
316   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
317 
318   ARROW_RETURN_NOT_OK(
319       cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
320 
321   return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
322 }

write

The write node saves query results as a dataset of files in a format like Parquet, Feather, CSV, etc. using the Tabular Datasets functionality in Arrow. The write options are provided via the arrow::dataset::WriteNodeOptions which in turn contains arrow::dataset::FileSystemDatasetWriteOptions. arrow::dataset::FileSystemDatasetWriteOptions provides control over the written dataset, including options like the output directory, file naming scheme, and so on.

Write example:

717 /**
718  * \brief
719  * Scan-Filter-Write
720  * This example shows how scan node can be used to load the data
721  * and after processing how it can be written to disk.
722  * \param exec_context : execution context
723  * \param file_path : file saving path
724  * \return arrow::Status
725  */
726 arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
727                                      const std::string& file_path) {
728   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
729                         cp::ExecPlan::Make(&exec_context));
730 
731   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
732 
733   auto options = std::make_shared<arrow::dataset::ScanOptions>();
734   // empty projection
735   options->projection = cp::project({}, {});
736 
737   cp::ExecNode* scan;
738 
739   auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
740 
741   ARROW_ASSIGN_OR_RAISE(scan,
742                         cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
743 
744   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
745 
746   std::string root_path = "";
747   std::string uri = "file://" + file_path;
748   std::shared_ptr<arrow::fs::FileSystem> filesystem =
749       arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
750 
751   auto base_path = root_path + "/parquet_dataset";
752   // Uncomment the following line, if run repeatedly
753   // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
754   ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
755 
756   // The partition schema determines which fields are part of the partitioning.
757   auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
758   // We'll use Hive-style partitioning,
759   // which creates directories with "key=value" pairs.
760 
761   auto partitioning =
762       std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
763   // We'll write Parquet files.
764   auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
765 
766   arrow::dataset::FileSystemDatasetWriteOptions write_options;
767   write_options.file_write_options = format->DefaultWriteOptions();
768   write_options.filesystem = filesystem;
769   write_options.base_dir = base_path;
770   write_options.partitioning = partitioning;
771   write_options.basename_template = "part{i}.parquet";
772 
773   arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()};
774 
775   ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
776 
777   ARROW_RETURN_NOT_OK(plan->Validate());
778   std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
779   // // // start the ExecPlan
780   ARROW_RETURN_NOT_OK(plan->StartProducing());
781   auto future = plan->finished();
782   ARROW_RETURN_NOT_OK(future.status());
783   future.Wait();
784   return arrow::Status::OK();
785 }

union

union merges multiple data streams with the same schema into one, similar to a SQL UNION ALL clause.

The following example demonstrates how this can be achieved using two data sources.

Union example:

791 /**
792  * \brief
793  * Source-Union-Sink
794  * This example shows how a union operation can be applied on two
795  * data sources. The output is obtained as a table via the sink
796  * node.
797  * \param exec_context : execution context
798  * \return arrow::Status
799  */
800 arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
801   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
802 
803   std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
804   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
805 
806   cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
807   cp::Declaration lhs{"source",
808                       cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
809   lhs.label = "lhs";
810   cp::Declaration rhs{"source",
811                       cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
812   rhs.label = "rhs";
813   union_node.inputs.emplace_back(lhs);
814   union_node.inputs.emplace_back(rhs);
815 
816   cp::CountOptions options(cp::CountOptions::ONLY_VALID);
817   ARROW_ASSIGN_OR_RAISE(
818       auto declr, cp::Declaration::Sequence({
819                                                 union_node,
820                                                 {"sink", cp::SinkNodeOptions{&sink_gen}},
821                                             })
822                       .AddToPlan(plan.get()));
823 
824   ARROW_RETURN_NOT_OK(declr->Validate());
825 
826   ARROW_RETURN_NOT_OK(plan->Validate());
827   return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
828 }

hash_join

hash_join operation provides the relational algebra operation, join using hash-based algorithm. arrow::compute::HashJoinNodeOptions contains the options required in defining a join. The hash_join supports left/right/full semi/anti/outerjoins. Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x” which can be appended as a suffix for column names duplicated in both left and right relations.) can be set via the the join options. Read more on hash-joins.

Hash-Join example:

637 /**
638  * \brief
639  * Source-HashJoin-Sink
640  * This example shows how source node gets the data and how a self-join
641  * is applied on the data. The join options are configurable. The output
642  * is obtained as a table via the sink node.
643  * \param exec_context : execution context
644  * \return arrow::Status
645  */
646 arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
647   ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
648   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
649                         cp::ExecPlan::Make(&exec_context));
650 
651   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
652 
653   cp::ExecNode* left_source;
654   cp::ExecNode* right_source;
655   for (auto source : {&left_source, &right_source}) {
656     ARROW_ASSIGN_OR_RAISE(*source,
657                           MakeExecNode("source", plan.get(), {},
658                                        cp::SourceNodeOptions{input.schema, input.gen()}));
659   }
660 
661   cp::HashJoinNodeOptions join_opts{
662       cp::JoinType::INNER,
663       /*left_keys=*/{"str"},
664       /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
665 
666   ARROW_ASSIGN_OR_RAISE(
667       auto hashjoin,
668       cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
669 
670   ARROW_RETURN_NOT_OK(
671       cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
672   // expected columns i32, str, l_str, r_str
673   auto schema = arrow::schema(
674       {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
675        arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
676 
677   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
678 }

Summary

There are examples of these nodes which can be found in cpp/examples/arrow/execution_plan_documentation_examples.cc in the Arrow source.

Complete Example:

 19 #include <arrow/array.h>
 20 #include <arrow/builder.h>
 21 
 22 #include <arrow/compute/api.h>
 23 #include <arrow/compute/api_vector.h>
 24 #include <arrow/compute/cast.h>
 25 #include <arrow/compute/exec/exec_plan.h>
 26 
 27 #include <arrow/csv/api.h>
 28 
 29 #include <arrow/dataset/dataset.h>
 30 #include <arrow/dataset/file_base.h>
 31 #include <arrow/dataset/file_parquet.h>
 32 #include <arrow/dataset/plan.h>
 33 #include <arrow/dataset/scanner.h>
 34 
 35 #include <arrow/io/interfaces.h>
 36 #include <arrow/io/memory.h>
 37 
 38 #include <arrow/result.h>
 39 #include <arrow/status.h>
 40 #include <arrow/table.h>
 41 
 42 #include <arrow/ipc/api.h>
 43 
 44 #include <arrow/util/future.h>
 45 #include <arrow/util/range.h>
 46 #include <arrow/util/thread_pool.h>
 47 #include <arrow/util/vector.h>
 48 
 49 #include <iostream>
 50 #include <memory>
 51 #include <utility>
 52 
 53 // Demonstrate various operators in Arrow Streaming Execution Engine
 54 
 55 namespace cp = ::arrow::compute;
 56 
 57 constexpr char kSep[] = "******";
 58 
 59 void PrintBlock(const std::string& msg) {
 60   std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
 61 }
 62 
 63 template <typename TYPE,
 64           typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
 65                                              arrow::is_boolean_type<TYPE>::value |
 66                                              arrow::is_temporal_type<TYPE>::value>::type>
 67 arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
 68     const std::vector<typename TYPE::c_type>& values) {
 69   using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
 70   using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
 71   ARROW_BUILDER_TYPE builder;
 72   ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 73   std::shared_ptr<ARROW_ARRAY_TYPE> array;
 74   ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 75   ARROW_RETURN_NOT_OK(builder.Finish(&array));
 76   return array;
 77 }
 78 
 79 template <class TYPE>
 80 arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
 81     const std::vector<std::string>& values) {
 82   using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
 83   using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
 84   ARROW_BUILDER_TYPE builder;
 85   ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 86   std::shared_ptr<ARROW_ARRAY_TYPE> array;
 87   ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 88   ARROW_RETURN_NOT_OK(builder.Finish(&array));
 89   return array;
 90 }
 91 
 92 arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
 93     const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
 94   std::shared_ptr<arrow::RecordBatch> record_batch;
 95   ARROW_ASSIGN_OR_RAISE(auto struct_result,
 96                         arrow::StructArray::Make(array_vector, field_vector));
 97   return record_batch->FromStructArray(struct_result);
 98 }
 99 
100 /**
101  * \brief Get the Dataset object
102  *  Creating Dataset
103  *  a, b
104     1,null
105     2,true
106     null,true
107     3,false
108     null,true
109     4,false
110     5,null
111     6,false
112     7,false
113     8,true
114  * \return arrow::Result<std::shared_ptr<arrow::dataset::Dataset>>
115  */
116 arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
117   auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
118   ARROW_ASSIGN_OR_RAISE(auto int64_array,
119                         GetArrayDataSample<arrow::Int64Type>(
120                             {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
121 
122   arrow::BooleanBuilder boolean_builder;
123   std::shared_ptr<arrow::BooleanArray> bool_array;
124 
125   std::vector<uint8_t> bool_values = {false, true,  true,  false, true,
126                                       false, false, false, false, true};
127   std::vector<bool> is_valid = {false, true,  true, true, true,
128                                 true,  false, true, true, true};
129 
130   ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
131 
132   ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
133 
134   ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
135 
136   auto record_batch =
137       arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
138                                               arrow::field("b", arrow::boolean())}),
139                                10, {int64_array, bool_array});
140   ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
141   auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
142   return ds;
143 }
144 
145 arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
146     const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
147   std::shared_ptr<arrow::RecordBatch> record_batch;
148   ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
149   cp::ExecBatch batch{*res_batch};
150   return batch;
151 }
152 
153 // (Doc section: BatchesWithSchema Definition)
154 struct BatchesWithSchema {
155   std::vector<cp::ExecBatch> batches;
156   std::shared_ptr<arrow::Schema> schema;
157   // // This method uses internal arrow utilities to
158   // // convert a vector of record batches to an AsyncGenerator of optional batches
159   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
160     auto opt_batches = ::arrow::internal::MapVector(
161         [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
162         batches);
163     arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
164     gen = arrow::MakeVectorGenerator(std::move(opt_batches));
165     return gen;
166   }
167 };
168 // (Doc section: BatchesWithSchema Definition)
169 
170 // (Doc section: MakeBasicBatches Definition)
171 arrow::Result<BatchesWithSchema> MakeBasicBatches() {
172   BatchesWithSchema out;
173   auto field_vector = {arrow::field("a", arrow::int32()),
174                        arrow::field("b", arrow::boolean())};
175   ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
176   ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
177   ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
178 
179   ARROW_ASSIGN_OR_RAISE(auto b1_bool,
180                         GetArrayDataSample<arrow::BooleanType>({false, true}));
181   ARROW_ASSIGN_OR_RAISE(auto b2_bool,
182                         GetArrayDataSample<arrow::BooleanType>({true, false, true}));
183   ARROW_ASSIGN_OR_RAISE(auto b3_bool,
184                         GetArrayDataSample<arrow::BooleanType>({false, true, false}));
185 
186   ARROW_ASSIGN_OR_RAISE(auto b1,
187                         GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
188   ARROW_ASSIGN_OR_RAISE(auto b2,
189                         GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
190   ARROW_ASSIGN_OR_RAISE(auto b3,
191                         GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
192 
193   out.batches = {b1, b2, b3};
194   out.schema = arrow::schema(field_vector);
195   return out;
196 }
197 // (Doc section: MakeBasicBatches Definition)
198 
199 arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
200   BatchesWithSchema out;
201   auto field = arrow::field("a", arrow::int32());
202   ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
203   ARROW_ASSIGN_OR_RAISE(auto b2_int,
204                         GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
205   ARROW_ASSIGN_OR_RAISE(auto b3_int,
206                         GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
207   ARROW_ASSIGN_OR_RAISE(auto b4_int,
208                         GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
209   ARROW_ASSIGN_OR_RAISE(auto b5_int,
210                         GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
211   ARROW_ASSIGN_OR_RAISE(auto b6_int,
212                         GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
213   ARROW_ASSIGN_OR_RAISE(auto b7_int,
214                         GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
215   ARROW_ASSIGN_OR_RAISE(auto b8_int,
216                         GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
217 
218   ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
219   ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
220   ARROW_ASSIGN_OR_RAISE(auto b3,
221                         GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
222   ARROW_ASSIGN_OR_RAISE(auto b4,
223                         GetExecBatchFromVectors({field, field, field, field},
224                                                 {b4_int, b5_int, b6_int, b7_int}));
225   out.batches = {b1, b2, b3, b4};
226   out.schema = arrow::schema({field});
227   return out;
228 }
229 
230 arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
231   BatchesWithSchema out;
232   auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
233   ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
234   ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
235   ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
236   ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
237                                          {"alpha", "beta", "alpha"}));
238   ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
239                                          {"alpha", "gamma", "alpha"}));
240   ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
241                                          {"gamma", "beta", "alpha"}));
242   ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
243   ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
244   ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
245   out.batches = {b1, b2, b3};
246 
247   size_t batch_count = out.batches.size();
248   for (int repeat = 1; repeat < multiplicity; ++repeat) {
249     for (size_t i = 0; i < batch_count; ++i) {
250       out.batches.push_back(out.batches[i]);
251     }
252   }
253 
254   out.schema = arrow::schema(fields);
255   return out;
256 }
257 
258 arrow::Status ExecutePlanAndCollectAsTable(
259     cp::ExecContext& exec_context, std::shared_ptr<cp::ExecPlan> plan,
260     std::shared_ptr<arrow::Schema> schema,
261     arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen) {
262   // // translate sink_gen (async) to sink_reader (sync)
263   std::shared_ptr<arrow::RecordBatchReader> sink_reader =
264       cp::MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
265 
266   // validate the ExecPlan
267   ARROW_RETURN_NOT_OK(plan->Validate());
268   std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
269   // // start the ExecPlan
270   ARROW_RETURN_NOT_OK(plan->StartProducing());
271 
272   // // collect sink_reader into a Table
273   std::shared_ptr<arrow::Table> response_table;
274 
275   ARROW_ASSIGN_OR_RAISE(response_table,
276                         arrow::Table::FromRecordBatchReader(sink_reader.get()));
277 
278   std::cout << "Results : " << response_table->ToString() << std::endl;
279 
280   // // stop producing
281   plan->StopProducing();
282   // // plan mark finished
283   auto future = plan->finished();
284   return future.status();
285 }
286 
287 // (Doc section: Scan Example)
288 /**
289  * \brief
290  * Scan-Sink
291  * This example shows how scan operation can be applied on a dataset.
292  * There are operations that can be applied on the scan (project, filter)
293  * and the input data can be processed. THe output is obtained as a table
294  * via the sink node.
295  * \param exec_context : execution context
296  * \return arrow::Status
297  */
298 arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
299   // Execution plan created
300   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
301                         cp::ExecPlan::Make(&exec_context));
302 
303   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
304 
305   auto options = std::make_shared<arrow::dataset::ScanOptions>();
306   options->projection = cp::project({}, {});  // create empty projection
307 
308   // construct the scan node
309   cp::ExecNode* scan;
310   auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
311 
312   ARROW_ASSIGN_OR_RAISE(scan,
313                         cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
314 
315   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
316 
317   ARROW_RETURN_NOT_OK(
318       cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
319 
320   return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
321 }
322 // (Doc section: Scan Example)
323 
324 // (Doc section: Source Example)
325 /**
326  * \brief
327  * Source-Sink Example
328  * This example shows how a source and sink can be used
329  * in an execution plan. This includes source node receiving data
330  * and the sink node emits the data as an output represented in
331  * a table.
332  * \param exec_context : execution context
333  * \return arrow::Status
334  */
335 arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
336   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
337                         cp::ExecPlan::Make(&exec_context));
338 
339   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
340 
341   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
342 
343   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
344 
345   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
346                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
347 
348   ARROW_RETURN_NOT_OK(
349       cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
350 
351   return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
352 }
353 // (Doc section: Source Example)
354 
355 // (Doc section: Filter Example)
356 /**
357  * \brief
358  * Source-Filter-Sink
359  * This example shows how a filter can be used in an execution plan,
360  * along with the source and sink operations. The output from the
361  * exeuction plan is obtained as a table via the sink node.
362  * \param exec_context : execution context
363  * \return arrow::Status
364  */
365 arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
366   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
367                         cp::ExecPlan::Make(&exec_context));
368 
369   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
370 
371   auto options = std::make_shared<arrow::dataset::ScanOptions>();
372   // // specify the filter.  This filter removes all rows where the
373   // value of the "a" column is greater than 3.
374   cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
375   // set filter for scanner : on-disk / push-down filtering.
376   // This step can be skipped if you are not reading from disk.
377   options->filter = filter_opt;
378   // empty projection
379   options->projection = cp::project({}, {});
380 
381   // construct the scan node
382   std::cout << "Initialized Scanning Options" << std::endl;
383 
384   cp::ExecNode* scan;
385 
386   auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
387   std::cout << "Scan node options created" << std::endl;
388 
389   ARROW_ASSIGN_OR_RAISE(scan,
390                         cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
391 
392   // pipe the scan node into a filter node
393   // // Need to set the filter in scan node options and filter node options.
394   // // At scan node it is used for on-disk / push-down filtering.
395   // // At filter node it is used for in-memory filtering.
396   cp::ExecNode* filter;
397   ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
398                                                  cp::FilterNodeOptions{filter_opt}));
399 
400   // // finally, pipe the filter node into a sink node
401   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
402   ARROW_RETURN_NOT_OK(
403       cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
404 
405   return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
406 }
407 
408 // (Doc section: Filter Example)
409 
410 // (Doc section: Project Example)
411 /**
412  * \brief
413  * Scan-Project-Sink
414  * This example shows how Scan operation can be used to load the data
415  * into the execution plan, how project operation can be applied on the
416  * data stream and how the output is obtained as a table via the sink node.
417  *
418  * \param exec_context : execution context
419  * \return arrow::Status
420  */
421 arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
422   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
423                         cp::ExecPlan::Make(&exec_context));
424 
425   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
426 
427   auto options = std::make_shared<arrow::dataset::ScanOptions>();
428   // projection
429   cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
430   options->projection = cp::project({}, {});
431 
432   cp::ExecNode* scan;
433 
434   auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
435 
436   ARROW_ASSIGN_OR_RAISE(scan,
437                         cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
438 
439   cp::ExecNode* project;
440   ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
441                                                   cp::ProjectNodeOptions{{a_times_2}}));
442   // schema after projection => multiply(a, 2): int64
443   std::cout << "Schema after projection : \n"
444             << project->output_schema()->ToString() << std::endl;
445 
446   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
447   ARROW_RETURN_NOT_OK(
448       cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
449   auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
450 
451   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
452 }
453 
454 // (Doc section: Project Example)
455 
456 // (Doc section: Scalar Aggregate Example)
457 /**
458  * \brief
459  * Source-Aggregation-Sink
460  * This example shows how an aggregation operation can be applied on a
461  * execution plan resulting a scalar output. The source node loads the
462  * data and the aggregation (counting unique types in column 'a')
463  * is applied on this data. The output is obtained from the sink node as a table.
464  * \param exec_context : execution context
465  * \return arrow::Status
466  */
467 arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
468   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
469                         cp::ExecPlan::Make(&exec_context));
470 
471   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
472 
473   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
474 
475   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
476 
477   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
478                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
479   auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
480                                                     /*targets=*/{"a"},
481                                                     /*names=*/{"sum(a)"}};
482   ARROW_ASSIGN_OR_RAISE(
483       cp::ExecNode * aggregate,
484       cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
485 
486   ARROW_RETURN_NOT_OK(
487       cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
488   auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
489 
490   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
491 }
492 // (Doc section: Scalar Aggregate Example)
493 
494 // (Doc section: Group Aggregate Example)
495 /**
496  * \brief
497  * Source-Aggregation-Sink
498  * This example shows how an aggregation operation can be applied on a
499  * execution plan resulting a grouped output. The source node loads the
500  * data and the aggregation (counting unique types in column 'a') is
501  * applied on this data. The output is obtained from the sink node as a table.
502  * \param exec_context : execution context
503  * \return arrow::Status
504  */
505 arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
506   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
507                         cp::ExecPlan::Make(&exec_context));
508 
509   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
510 
511   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
512 
513   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
514 
515   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
516                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
517   cp::CountOptions options(cp::CountOptions::ONLY_VALID);
518   auto aggregate_options =
519       cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
520                                /*targets=*/{"a"},
521                                /*names=*/{"count(a)"},
522                                /*keys=*/{"b"}};
523   ARROW_ASSIGN_OR_RAISE(
524       cp::ExecNode * aggregate,
525       cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
526 
527   ARROW_RETURN_NOT_OK(
528       cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
529   auto schema = arrow::schema({
530       arrow::field("count(a)", arrow::int32()),
531       arrow::field("b", arrow::boolean()),
532   });
533 
534   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
535 }
536 // (Doc section: Group Aggregate Example)
537 
538 // (Doc section: ConsumingSink Example)
539 /**
540  * \brief
541  * Source-ConsumingSink
542  * This example shows how the data can be consumed within the execution plan
543  * by using a ConsumingSink node. There is no data output from this execution plan.
544  * \param exec_context : execution context
545  * \return arrow::Status
546  */
547 arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
548   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
549                         cp::ExecPlan::Make(&exec_context));
550 
551   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
552 
553   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
554 
555   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
556                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
557 
558   std::atomic<uint32_t> batches_seen{0};
559   arrow::Future<> finish = arrow::Future<>::Make();
560   struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
561     CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
562         : batches_seen(batches_seen), finish(std::move(finish)) {}
563 
564     arrow::Status Consume(cp::ExecBatch batch) override {
565       (*batches_seen)++;
566       return arrow::Status::OK();
567     }
568 
569     arrow::Future<> Finish() override { return finish; }
570 
571     std::atomic<uint32_t>* batches_seen;
572     arrow::Future<> finish;
573   };
574   std::shared_ptr<CustomSinkNodeConsumer> consumer =
575       std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
576 
577   cp::ExecNode* consuming_sink;
578 
579   ARROW_ASSIGN_OR_RAISE(consuming_sink,
580                         MakeExecNode("consuming_sink", plan.get(), {source},
581                                      cp::ConsumingSinkNodeOptions(consumer)));
582 
583   ARROW_RETURN_NOT_OK(consuming_sink->Validate());
584 
585   ARROW_RETURN_NOT_OK(plan->Validate());
586   std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
587   // plan start producing
588   ARROW_RETURN_NOT_OK(plan->StartProducing());
589   // Source should finish fairly quickly
590   ARROW_RETURN_NOT_OK(source->finished().status());
591   std::cout << "Source Finished!" << std::endl;
592   // Mark consumption complete, plan should finish
593   finish.MarkFinished(arrow::Status::OK());
594   ARROW_RETURN_NOT_OK(plan->finished().status());
595   return arrow::Status::OK();
596 }
597 // (Doc section: ConsumingSink Example)
598 
599 // (Doc section: OrderBySink Example)
600 
601 /**
602  * \brief
603  * Source-OrderBySink
604  * In this example, the data enters through the source node
605  * and the data is ordered in the sink node. The order can be
606  * ASCENDING or DESCENDING and it is configurable. The output
607  * is obtained as a table from the sink node.
608  * \param exec_context : execution context
609  * \return arrow::Status
610  */
611 arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
612   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
613                         cp::ExecPlan::Make(&exec_context));
614 
615   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
616 
617   std::cout << "basic data created" << std::endl;
618 
619   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
620 
621   auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
622   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
623                         cp::MakeExecNode("source", plan.get(), {}, source_node_options));
624 
625   ARROW_RETURN_NOT_OK(cp::MakeExecNode(
626       "order_by_sink", plan.get(), {source},
627       cp::OrderBySinkNodeOptions{
628           cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
629 
630   return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
631 }
632 
633 // (Doc section: OrderBySink Example)
634 
635 // (Doc section: HashJoin Example)
636 /**
637  * \brief
638  * Source-HashJoin-Sink
639  * This example shows how source node gets the data and how a self-join
640  * is applied on the data. The join options are configurable. The output
641  * is obtained as a table via the sink node.
642  * \param exec_context : execution context
643  * \return arrow::Status
644  */
645 arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
646   ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
647   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
648                         cp::ExecPlan::Make(&exec_context));
649 
650   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
651 
652   cp::ExecNode* left_source;
653   cp::ExecNode* right_source;
654   for (auto source : {&left_source, &right_source}) {
655     ARROW_ASSIGN_OR_RAISE(*source,
656                           MakeExecNode("source", plan.get(), {},
657                                        cp::SourceNodeOptions{input.schema, input.gen()}));
658   }
659 
660   cp::HashJoinNodeOptions join_opts{
661       cp::JoinType::INNER,
662       /*left_keys=*/{"str"},
663       /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
664 
665   ARROW_ASSIGN_OR_RAISE(
666       auto hashjoin,
667       cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
668 
669   ARROW_RETURN_NOT_OK(
670       cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
671   // expected columns i32, str, l_str, r_str
672   auto schema = arrow::schema(
673       {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
674        arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
675 
676   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
677 }
678 
679 // (Doc section: HashJoin Example)
680 
681 // (Doc section: KSelect Example)
682 /**
683  * \brief
684  * Source-KSelect
685  * This example shows how K number of elements can be selected
686  * either from the top or bottom. The output node is a modified
687  * sink node where output can be obtained as a table.
688  * \param exec_context : execution context
689  * \return arrow::Status
690  */
691 arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
692   ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
693   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
694                         cp::ExecPlan::Make(&exec_context));
695   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
696 
697   ARROW_ASSIGN_OR_RAISE(
698       cp::ExecNode * source,
699       cp::MakeExecNode("source", plan.get(), {},
700                        cp::SourceNodeOptions{input.schema, input.gen()}));
701 
702   cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
703 
704   ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
705                                        cp::SelectKSinkNodeOptions{options, &sink_gen}));
706 
707   auto schema = arrow::schema(
708       {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
709 
710   return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
711 }
712 
713 // (Doc section: KSelect Example)
714 
715 // (Doc section: Write Example)
716 
717 /**
718  * \brief
719  * Scan-Filter-Write
720  * This example shows how scan node can be used to load the data
721  * and after processing how it can be written to disk.
722  * \param exec_context : execution context
723  * \param file_path : file saving path
724  * \return arrow::Status
725  */
726 arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
727                                      const std::string& file_path) {
728   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
729                         cp::ExecPlan::Make(&exec_context));
730 
731   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
732 
733   auto options = std::make_shared<arrow::dataset::ScanOptions>();
734   // empty projection
735   options->projection = cp::project({}, {});
736 
737   cp::ExecNode* scan;
738 
739   auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
740 
741   ARROW_ASSIGN_OR_RAISE(scan,
742                         cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
743 
744   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
745 
746   std::string root_path = "";
747   std::string uri = "file://" + file_path;
748   std::shared_ptr<arrow::fs::FileSystem> filesystem =
749       arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
750 
751   auto base_path = root_path + "/parquet_dataset";
752   // Uncomment the following line, if run repeatedly
753   // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
754   ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
755 
756   // The partition schema determines which fields are part of the partitioning.
757   auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
758   // We'll use Hive-style partitioning,
759   // which creates directories with "key=value" pairs.
760 
761   auto partitioning =
762       std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
763   // We'll write Parquet files.
764   auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
765 
766   arrow::dataset::FileSystemDatasetWriteOptions write_options;
767   write_options.file_write_options = format->DefaultWriteOptions();
768   write_options.filesystem = filesystem;
769   write_options.base_dir = base_path;
770   write_options.partitioning = partitioning;
771   write_options.basename_template = "part{i}.parquet";
772 
773   arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()};
774 
775   ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
776 
777   ARROW_RETURN_NOT_OK(plan->Validate());
778   std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
779   // // // start the ExecPlan
780   ARROW_RETURN_NOT_OK(plan->StartProducing());
781   auto future = plan->finished();
782   ARROW_RETURN_NOT_OK(future.status());
783   future.Wait();
784   return arrow::Status::OK();
785 }
786 
787 // (Doc section: Write Example)
788 
789 // (Doc section: Union Example)
790 
791 /**
792  * \brief
793  * Source-Union-Sink
794  * This example shows how a union operation can be applied on two
795  * data sources. The output is obtained as a table via the sink
796  * node.
797  * \param exec_context : execution context
798  * \return arrow::Status
799  */
800 arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
801   ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
802 
803   std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
804   arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
805 
806   cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
807   cp::Declaration lhs{"source",
808                       cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
809   lhs.label = "lhs";
810   cp::Declaration rhs{"source",
811                       cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
812   rhs.label = "rhs";
813   union_node.inputs.emplace_back(lhs);
814   union_node.inputs.emplace_back(rhs);
815 
816   cp::CountOptions options(cp::CountOptions::ONLY_VALID);
817   ARROW_ASSIGN_OR_RAISE(
818       auto declr, cp::Declaration::Sequence({
819                                                 union_node,
820                                                 {"sink", cp::SinkNodeOptions{&sink_gen}},
821                                             })
822                       .AddToPlan(plan.get()));
823 
824   ARROW_RETURN_NOT_OK(declr->Validate());
825 
826   ARROW_RETURN_NOT_OK(plan->Validate());
827   return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
828 }
829 
830 // (Doc section: Union Example)
831 
832 enum ExampleMode {
833   SOURCE_SINK = 0,
834   SCAN = 1,
835   FILTER = 2,
836   PROJECT = 3,
837   SCALAR_AGGREGATION = 4,
838   GROUP_AGGREGATION = 5,
839   CONSUMING_SINK = 6,
840   ORDER_BY_SINK = 7,
841   HASHJOIN = 8,
842   KSELECT = 9,
843   WRITE = 10,
844   UNION = 11,
845 };
846 
847 int main(int argc, char** argv) {
848   if (argc < 2) {
849     // Fake success for CI purposes.
850     return EXIT_SUCCESS;
851   }
852 
853   std::string base_save_path = argv[1];
854   int mode = std::atoi(argv[2]);
855   arrow::Status status;
856   // ensure arrow::dataset node factories are in the registry
857   arrow::dataset::internal::Initialize();
858   // execution context
859   cp::ExecContext exec_context(arrow::default_memory_pool(),
860                                ::arrow::internal::GetCpuThreadPool());
861   switch (mode) {
862     case SOURCE_SINK:
863       PrintBlock("Source Sink Example");
864       status = SourceSinkExample(exec_context);
865       break;
866     case SCAN:
867       PrintBlock("Scan Example");
868       status = ScanSinkExample(exec_context);
869       break;
870     case FILTER:
871       PrintBlock("Filter Example");
872       status = ScanFilterSinkExample(exec_context);
873       break;
874     case PROJECT:
875       PrintBlock("Project Example");
876       status = ScanProjectSinkExample(exec_context);
877       break;
878     case GROUP_AGGREGATION:
879       PrintBlock("Aggregate Example");
880       status = SourceGroupAggregateSinkExample(exec_context);
881       break;
882     case SCALAR_AGGREGATION:
883       PrintBlock("Aggregate Example");
884       status = SourceScalarAggregateSinkExample(exec_context);
885       break;
886     case CONSUMING_SINK:
887       PrintBlock("Consuming-Sink Example");
888       status = SourceConsumingSinkExample(exec_context);
889       break;
890     case ORDER_BY_SINK:
891       PrintBlock("OrderBy Example");
892       status = SourceOrderBySinkExample(exec_context);
893       break;
894     case HASHJOIN:
895       PrintBlock("HashJoin Example");
896       status = SourceHashJoinSinkExample(exec_context);
897       break;
898     case KSELECT:
899       PrintBlock("KSelect Example");
900       status = SourceKSelectExample(exec_context);
901       break;
902     case WRITE:
903       PrintBlock("Write Example");
904       status = ScanFilterWriteExample(exec_context, base_save_path);
905       break;
906     case UNION:
907       PrintBlock("Union Example");
908       status = SourceUnionSinkExample(exec_context);
909       break;
910     default:
911       break;
912   }
913 
914   if (status.ok()) {
915     return EXIT_SUCCESS;
916   } else {
917     std::cout << "Error occurred: " << status.message() << std::endl;
918     return EXIT_FAILURE;
919   }
920 }