Acero User’s Guide#

This page describes how to use Acero. It’s recommended that you read the overview first and familiarize yourself with the basic concepts.

Using Acero#

The basic workflow for Acero is this:

  1. First, create a graph of Declaration objects describing the plan

  2. Call one of the DeclarationToXyz methods to execute the Declaration.

    1. A new ExecPlan is created from the graph of Declarations. Each Declaration will correspond to one ExecNode in the plan. In addition, a sink node will be added, depending on which DeclarationToXyz method was used.

    2. The ExecPlan is executed. Typically this happens as part of the DeclarationToXyz call but in DeclarationToReader the reader is returned before the plan is finished executing.

    3. Once the plan is finished it is destroyed

Creating a Plan#

Using Substrait#

Substrait is the preferred mechanism for creating a plan (graph of Declaration). There are a few reasons for this:

  • Substrait producers spend a lot of time and energy in creating user-friendly APIs for producing complex execution plans in a simple way. For example, the pivot_wider operation can be achieved using a complex series of aggregate nodes. Rather than create all of those aggregate nodes by hand a producer will give you a much simpler API.

  • If you are using Substrait then you can easily switch out to any other Substrait-consuming engine should you at some point find that it serves your needs better than Acero.

  • We hope that tools will eventually emerge for Substrait-based optimizers and planners. By using Substrait you will be making it much easier to use these tools in the future.

You could create the Substrait plan yourself but you’ll probably have a much easier time finding an existing Substrait producer. For example, you could use ibis-substrait to easily create Substrait plans from python expressions. There are a few different tools that are able to create Substrait plans from SQL. Eventually, we hope that C++ based Substrait producers will emerge. However, we are not aware of any at this time.

Detailed instructions on creating an execution plan from Substrait can be found in the Substrait page

Programmatic Plan Creation#

Creating an execution plan programmatically is simpler than creating a plan from Substrait, though loses some of the flexibility and future-proofing guarantees. The simplest way to create a Declaration is to simply instantiate one. You will need the name of the declaration, a vector of inputs, and an options object. For example:

381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390  auto options = std::make_shared<arrow::dataset::ScanOptions>();
391  // projection
392  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393  options->projection = cp::project({}, {});
394
395  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397  ac::Declaration scan{"scan", std::move(scan_node_options)};
398  ac::Declaration project{
399      "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401  return ExecutePlanAndCollectAsTable(std::move(project));
402}

The above code creates a scan declaration (which has no inputs) and a project declaration (using the scan as input). This is simple enough but we can make it slightly easier. If you are creating a linear sequence of declarations (like in the above example) then you can also use the Declaration::Sequence() function.

420  // Inputs do not have to be passed to the project node when using Sequence
421  ac::Declaration plan =
422      ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
423                                 {"project", ac::ProjectNodeOptions({a_times_2})}});

There are many more examples of programmatic plan creation later in this document.

Executing a Plan#

There are a number of different methods that can be used to execute a declaration. Each one provides the data in a slightly different form. Since all of these methods start with DeclarationTo... this guide will often refer to these methods as the DeclarationToXyz methods.

DeclarationToTable#

The DeclarationToTable() method will accumulate all of the results into a single arrow::Table. This is perhaps the simplest way to collect results from Acero. The main disadvantage to this approach is that it requires accumulating all results into memory.

Note

Acero processes large datasets in small chunks. This is described in more detail in the developer’s guide. As a result, you may be surprised to find that a table collected with DeclarationToTable is chunked differently than your input. For example, your input might be a large table with a single chunk with 2 million rows. Your output table might then have 64 chunks with 32Ki rows each. There is a current request to specify the chunk size for the output in GH-15155.

DeclarationToReader#

The DeclarationToReader() method allows you to iteratively consume the results. It will create an arrow::RecordBatchReader which you can read from at your leisure. If you do not read from the reader quickly enough then backpressure will be applied and the execution plan will pause. Closing the reader will cancel the running execution plan and the reader’s destructor will wait for the execution plan to finish whatever it is doing and so it may block.

DeclarationToStatus#

The DeclarationToStatus() method is useful if you want to run the plan but do not actually want to consume the results. For example, this is useful when benchmarking or when the plan has side effects such as a dataset write node. If the plan generates any results then they will be immediately discarded.

Running a Plan Directly#

If one of the DeclarationToXyz methods is not sufficient for some reason then it is possible to run a plan directly. This should only be needed if you are doing something unique. For example, if you have created a custom sink node or if you need a plan that has multiple outputs.

Note

In academic literature and many existing systems there is a general assumption that an execution plan has at most one output. There are some things in Acero, such as the DeclarationToXyz methods, which will expect this. However, there is nothing in the design that strictly prevents having multiple sink nodes.

Detailed instructions on how to do this are out of scope for this guide but the rough steps are:

  1. Create a new ExecPlan object.

  2. Add sink nodes to your graph of Declaration objects (this is the only type you will need to create declarations for sink nodes)

  3. Use Declaration::AddToPlan() to add your declaration to your plan (if you have more than one output then you will not be able to use this method and will need to add your nodes one at a time)

  4. Validate the plan with ExecPlan::Validate()

  5. Start the plan with ExecPlan::StartProducing()

  6. Wait for the future returned by ExecPlan::finished() to complete.

Providing Input#

Input data for an exec plan can come from a variety of sources. It is often read from files stored on some kind of filesystem. It is also common for input to come from in-memory data. In-memory data is typical, for example, in a pandas-like frontend. Input could also come from network streams like a Flight request. Acero can support all of these cases and can even support unique and custom situations not mentioned here.

There are pre-defined source nodes that cover the most common input scenarios. These are listed below. However, if your source data is unique then you will need to use the generic source node. This node expects you to provide an asynchronous stream of batches and is covered in more detail here.

Available ExecNode Implementations#

The following tables quickly summarize the available operators.

Sources#

These nodes can be used as sources of data

Source Nodes#

Factory Name

Options

Brief Description

source

SourceNodeOptions

A generic source node that wraps an asynchronous stream of data (example)

table_source

TableSourceNodeOptions

Generates data from an arrow::Table (example)

record_batch_source

RecordBatchSourceNodeOptions

Generates data from an iterator of arrow::RecordBatch

record_batch_reader_source

RecordBatchReaderSourceNodeOptions

Generates data from an arrow::RecordBatchReader

exec_batch_source

ExecBatchSourceNodeOptions

Generates data from an iterator of arrow::compute::ExecBatch

array_vector_source

ArrayVectorSourceNodeOptions

Generates data from an iterator of vectors of arrow::Array

scan

arrow::dataset::ScanNodeOptions

Generates data from an arrow::dataset::Dataset (requires the datasets module) (example)

Compute Nodes#

These nodes perform computations on data and may transform or reshape the data

Compute Nodes#

Factory Name

Options

Brief Description

filter

FilterNodeOptions

Removes rows that do not match a given filter expression (example)

project

ProjectNodeOptions

Creates new columns by evaluating compute expressions. Can also drop and reorder columns (example)

aggregate

AggregateNodeOptions

Calculates summary statistics across the entire input stream or on groups of data (example)

pivot_longer

PivotLongerNodeOptions

Reshapes data by converting some columns into additional rows

Arrangement Nodes#

These nodes reorder, combine, or slice streams of data

Arrangement Nodes#

Factory Name

Options

Brief Description

hash_join

HashJoinNodeOptions

Joins two inputs based on common columns (example)

asofjoin

AsofJoinNodeOptions

Joins multiple inputs to the first input based on a common ordered column (often time)

union

N/A

Merges two inputs with identical schemas (example)

order_by

OrderByNodeOptions

Reorders a stream

fetch

FetchNodeOptions

Slices a range of rows from a stream

Sink Nodes#

These nodes terminate a plan. Users do not typically create sink nodes as they are selected based on the DeclarationToXyz method used to consume the plan. However, this list may be useful for those developing new sink nodes or using Acero in advanced ways.

Sink Nodes#

Factory Name

Options

Brief Description

sink

SinkNodeOptions

Collects batches into a FIFO queue with optional backpressure

write

arrow::dataset::WriteNodeOptions

Writes batches to a filesystem (example)

consuming_sink

ConsumingSinkNodeOptions

Consumes batches using a user provided callback function

table_sink

TableSinkNodeOptions

Collects batches into an arrow::Table

order_by_sink

OrderBySinkNodeOptions

Deprecated

select_k_sink

SelectKSinkNodeOptions

Deprecated

Examples#

The rest of this document contains example execution plans. Each example highlights the behavior of a specific execution node.

source#

A source operation can be considered as an entry point to create a streaming execution plan. SourceNodeOptions are used to create the source operation. The source operation is the most generic and flexible type of source currently available but it can be quite tricky to configure. First you should review the other source node types to ensure there isn’t a simpler choice.

The source node requires some kind of function that can be called to poll for more data. This function should take no arguments and should return an arrow::Future<std::optional<arrow::ExecBatch>>. This function might be reading a file, iterating through an in memory structure, or receiving data from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator and there are a number of utilities for working with these functions. For this example we use a vector of record batches that we’ve already stored in memory. In addition, the schema of the data must be known up front. Acero must know the schema of the data at each stage of the execution graph before any processing has begun. This means we must supply the schema for a source node separately from the data itself.

Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :

156struct BatchesWithSchema {
157  std::vector<cp::ExecBatch> batches;
158  std::shared_ptr<arrow::Schema> schema;
159  // This method uses internal arrow utilities to
160  // convert a vector of record batches to an AsyncGenerator of optional batches
161  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
162    auto opt_batches = ::arrow::internal::MapVector(
163        [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
164        batches);
165    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
166    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
167    return gen;
168  }
169};

Generating sample batches for computation:

173arrow::Result<BatchesWithSchema> MakeBasicBatches() {
174  BatchesWithSchema out;
175  auto field_vector = {arrow::field("a", arrow::int32()),
176                       arrow::field("b", arrow::boolean())};
177  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
178  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
179  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
180
181  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
182                        GetArrayDataSample<arrow::BooleanType>({false, true}));
183  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
184                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
185  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
186                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
187
188  ARROW_ASSIGN_OR_RAISE(auto b1,
189                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
190  ARROW_ASSIGN_OR_RAISE(auto b2,
191                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
192  ARROW_ASSIGN_OR_RAISE(auto b3,
193                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
194
195  out.batches = {b1, b2, b3};
196  out.schema = arrow::schema(field_vector);
197  return out;
198}

Example of using source (usage of sink is explained in detail in sink):

294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}

table_source#

In the previous example, source node, a source node was used to input the data. But when developing an application, if the data is already in memory as a table, it is much easier, and more performant to use TableSourceNodeOptions. Here the input data can be passed as a std::shared_ptr<arrow::Table> along with a max_batch_size. The max_batch_size is to break up large record batches so that they can be processed in parallel. It is important to note that the table batches will not get merged to form larger batches when the source table has a smaller batch size.

Example of using table_source

317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table.  This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329  int max_batch_size = 2;
330  auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332  ac::Declaration source{"table_source", std::move(table_source_options)};
333
334  return ExecutePlanAndCollectAsTable(std::move(source));
335}

filter#

filter operation, as the name suggests, provides an option to define data filtering criteria. It selects rows where the given expression evaluates to true. Filters can be written using arrow::compute::Expression, and the expression should have a return type of boolean. For example, if we wish to keep rows where the value of column b is greater than 3, then we can use the following expression.

Filter example:

340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the execution plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349  auto options = std::make_shared<arrow::dataset::ScanOptions>();
350  // specify the filter.  This filter removes all rows where the
351  // value of the "a" column is greater than 3.
352  cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353  // set filter for scanner : on-disk / push-down filtering.
354  // This step can be skipped if you are not reading from disk.
355  options->filter = filter_expr;
356  // empty projection
357  options->projection = cp::project({}, {});
358
359  // construct the scan node
360  std::cout << "Initialized Scanning Options" << std::endl;
361
362  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363  std::cout << "Scan node options created" << std::endl;
364
365  ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367  // pipe the scan node into the filter node
368  // Need to set the filter in scan node options and filter node options.
369  // At scan node it is used for on-disk / push-down filtering.
370  // At filter node it is used for in-memory filtering.
371  ac::Declaration filter{
372      "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374  return ExecutePlanAndCollectAsTable(std::move(filter));
375}

project#

project operation rearranges, deletes, transforms, and creates columns. Each output column is computed by evaluating an expression against the source record batch. These must be scalar expressions (expressions consisting of scalar literals, field references and scalar functions, i.e. elementwise functions that return one value for each input row independent of the value of all other rows). This is exposed via ProjectNodeOptions which requires, an arrow::compute::Expression and name for each of the output columns (if names are not provided, the string representations of exprs will be used).

Project example:

381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390  auto options = std::make_shared<arrow::dataset::ScanOptions>();
391  // projection
392  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393  options->projection = cp::project({}, {});
394
395  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397  ac::Declaration scan{"scan", std::move(scan_node_options)};
398  ac::Declaration project{
399      "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401  return ExecutePlanAndCollectAsTable(std::move(project));
402}

aggregate#

The aggregate node computes various types of aggregates over data.

Arrow supports two types of aggregates: “scalar” aggregates, and “hash” aggregates. Scalar aggregates reduce an array or scalar input to a single scalar output (e.g. computing the mean of a column). Hash aggregates act like GROUP BY in SQL and first partition data based on one or more key columns, then reduce the data in each partition. The aggregate node supports both types of computation, and can compute any number of aggregations at once.

AggregateNodeOptions is used to define the aggregation criteria. It takes a list of aggregation functions and their options; a list of target fields to aggregate, one per function; and a list of names for the output fields, one per function. Optionally, it takes a list of columns that are used to partition the data, in the case of a hash aggregation. The aggregation functions can be selected from this list of aggregation functions.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.

Scalar Aggregation example:

430/// \brief An example showing an aggregation node to aggregate an entire table
431///
432/// Source-Aggregation-Table
433/// This example shows how an aggregation operation can be applied on a
434/// execution plan resulting in a scalar output. The source node loads the
435/// data and the aggregation (counting unique types in column 'a')
436/// is applied on this data. The output is collected into a table (that will
437/// have exactly one row)
438arrow::Status SourceScalarAggregateSinkExample() {
439  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
440
441  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
442
443  ac::Declaration source{"source", std::move(source_node_options)};
444  auto aggregate_options =
445      ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
446  ac::Declaration aggregate{
447      "aggregate", {std::move(source)}, std::move(aggregate_options)};
448
449  return ExecutePlanAndCollectAsTable(std::move(aggregate));
450}

Group Aggregation example:

455/// \brief An example showing an aggregation node to perform a group-by operation
456///
457/// Source-Aggregation-Table
458/// This example shows how an aggregation operation can be applied on a
459/// execution plan resulting in grouped output. The source node loads the
460/// data and the aggregation (counting unique types in column 'a') is
461/// applied on this data. The output is collected into a table that will contain
462/// one row for each unique combination of group keys.
463arrow::Status SourceGroupAggregateSinkExample() {
464  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
465
466  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
467
468  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
469
470  ac::Declaration source{"source", std::move(source_node_options)};
471  auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
472  auto aggregate_options =
473      ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
474                               /*keys=*/{"b"}};
475  ac::Declaration aggregate{
476      "aggregate", {std::move(source)}, std::move(aggregate_options)};
477
478  return ExecutePlanAndCollectAsTable(std::move(aggregate));
479}

sink#

sink operation provides output and is the final node of a streaming execution definition. SinkNodeOptions interface is used to pass the required options. Similar to the source operator the sink operator exposes the output with a function that returns a record batch future each time it is called. It is expected the caller will repeatedly call this function until the generator function is exhausted (returns std::optional::nullopt). If this function is not called often enough then record batches will accumulate in memory. An execution plan should only have one “terminal” node (one sink node). An ExecPlan can terminate early due to cancellation or an error, before the output is fully consumed. However, the plan can be safely destroyed independently of the sink, which will hold the unconsumed batches by exec_plan->finished().

As a part of the Source Example, the Sink operation is also included;

294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}

consuming_sink#

consuming_sink operator is a sink operation containing consuming operation within the execution plan (i.e. the exec plan should not complete until the consumption has completed). Unlike the sink node this node takes in a callback function that is expected to consume the batch. Once this callback has finished the execution plan will no longer hold any reference to the batch. The consuming function may be called before a previous invocation has completed. If the consuming function does not run quickly enough then many concurrent executions could pile up, blocking the CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks have been completed. Once all batches have been delivered the execution plan will wait for the finish future to complete before marking the execution plan finished. This allows for workflows where the consumption function converts batches into async tasks (this is currently done internally for the dataset write node).

Example:

// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {

    CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
    batches_seen(batches_seen), finish(std::move(finish)) {}
    // Consumption logic can be written here
    arrow::Status Consume(cp::ExecBatch batch) override {
    // data can be consumed in the expected way
    // transfer to another system or just do some work
    // and write to disk
    (*batches_seen)++;
    return arrow::Status::OK();
    }

    arrow::Future<> Finish() override { return finish; }

    std::atomic<uint32_t> *batches_seen;
    arrow::Future<> finish;

};

std::shared_ptr<CustomSinkNodeConsumer> consumer =
        std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

arrow::acero::ExecNode *consuming_sink;

ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
    {source}, cp::ConsumingSinkNodeOptions(consumer)));

Consuming-Sink example:

484/// \brief An example showing a consuming sink node
485///
486/// Source-Consuming-Sink
487/// This example shows how the data can be consumed within the execution plan
488/// by using a ConsumingSink node. There is no data output from this execution plan.
489arrow::Status SourceConsumingSinkExample() {
490  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
491
492  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
493
494  ac::Declaration source{"source", std::move(source_node_options)};
495
496  std::atomic<uint32_t> batches_seen{0};
497  arrow::Future<> finish = arrow::Future<>::Make();
498  struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
499    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
500        : batches_seen(batches_seen), finish(std::move(finish)) {}
501
502    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
503                       ac::BackpressureControl* backpressure_control,
504                       ac::ExecPlan* plan) override {
505      // This will be called as the plan is started (before the first call to Consume)
506      // and provides the schema of the data coming into the node, controls for pausing /
507      // resuming input, and a pointer to the plan itself which can be used to access
508      // other utilities such as the thread indexer or async task scheduler.
509      return arrow::Status::OK();
510    }
511
512    arrow::Status Consume(cp::ExecBatch batch) override {
513      (*batches_seen)++;
514      return arrow::Status::OK();
515    }
516
517    arrow::Future<> Finish() override {
518      // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
519      // output file handles and flushing remaining work
520      return arrow::Future<>::MakeFinished();
521    }
522
523    std::atomic<uint32_t>* batches_seen;
524    arrow::Future<> finish;
525  };
526  std::shared_ptr<CustomSinkNodeConsumer> consumer =
527      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
528
529  ac::Declaration consuming_sink{"consuming_sink",
530                                 {std::move(source)},
531                                 ac::ConsumingSinkNodeOptions(std::move(consumer))};
532
533  // Since we are consuming the data within the plan there is no output and we simply
534  // run the plan to completion instead of collecting into a table.
535  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
536
537  std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
538            << std::endl;
539  return arrow::Status::OK();
540}

order_by_sink#

order_by_sink operation is an extension to the sink operation. This operation provides the ability to guarantee the ordering of the stream by providing the OrderBySinkNodeOptions. Here the arrow::compute::SortOptions are provided to define which columns are used for sorting and whether to sort by ascending or descending values.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

Order-By-Sink example:

545arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
546    std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
547    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
548  // translate sink_gen (async) to sink_reader (sync)
549  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
550      ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
551
552  // validate the ExecPlan
553  ARROW_RETURN_NOT_OK(plan->Validate());
554  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
555  // start the ExecPlan
556  plan->StartProducing();
557
558  // collect sink_reader into a Table
559  std::shared_ptr<arrow::Table> response_table;
560
561  ARROW_ASSIGN_OR_RAISE(response_table,
562                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
563
564  std::cout << "Results : " << response_table->ToString() << std::endl;
565
566  // stop producing
567  plan->StopProducing();
568  // plan mark finished
569  auto future = plan->finished();
570  return future.status();
571}
572
573/// \brief An example showing an order-by node
574///
575/// Source-OrderBy-Sink
576/// In this example, the data enters through the source node
577/// and the data is ordered in the sink node. The order can be
578/// ASCENDING or DESCENDING and it is configurable. The output
579/// is obtained as a table from the sink node.
580arrow::Status SourceOrderBySinkExample() {
581  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
582                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
583
584  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
585
586  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
587
588  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
589  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
590                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
591
592  ARROW_RETURN_NOT_OK(ac::MakeExecNode(
593      "order_by_sink", plan.get(), {source},
594      ac::OrderBySinkNodeOptions{
595          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
596
597  return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
598}

select_k_sink#

select_k_sink option enables selecting the top/bottom K elements, similar to a SQL ORDER BY ... LIMIT K clause. SelectKOptions which is a defined by using OrderBySinkNode definition. This option returns a sink node that receives inputs and then compute top_k/bottom_k.

Note

This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

SelectK example:

631/// \brief An example showing a select-k node
632///
633/// Source-KSelect
634/// This example shows how K number of elements can be selected
635/// either from the top or bottom. The output node is a modified
636/// sink node where output can be obtained as a table.
637arrow::Status SourceKSelectExample() {
638  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
639  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
640                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
641  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
642
643  ARROW_ASSIGN_OR_RAISE(
644      ac::ExecNode * source,
645      ac::MakeExecNode("source", plan.get(), {},
646                       ac::SourceNodeOptions{input.schema, input.gen()}));
647
648  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
649
650  ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
651                                       ac::SelectKSinkNodeOptions{options, &sink_gen}));
652
653  auto schema = arrow::schema(
654      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
655
656  return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
657}

table_sink#

The table_sink node provides the ability to receive the output as an in-memory table. This is simpler to use than the other sink nodes provided by the streaming execution engine but it only makes sense when the output fits comfortably in memory. The node is created using TableSinkNodeOptions.

Example of using table_sink

749/// \brief An example showing a table sink node
750///
751/// TableSink Example
752/// This example shows how a table_sink can be used
753/// in an execution plan. This includes a source node
754/// receiving data as batches and the table sink node
755/// which emits the output as a table.
756arrow::Status TableSinkExample() {
757  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
758                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
759
760  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
761
762  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
763
764  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
765                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
766
767  std::shared_ptr<arrow::Table> output_table;
768  auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
769
770  ARROW_RETURN_NOT_OK(
771      ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
772  // validate the ExecPlan
773  ARROW_RETURN_NOT_OK(plan->Validate());
774  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
775  // start the ExecPlan
776  plan->StartProducing();
777
778  // Wait for the plan to finish
779  auto finished = plan->finished();
780  RETURN_NOT_OK(finished.status());
781  std::cout << "Results : " << output_table->ToString() << std::endl;
782  return arrow::Status::OK();
783}

scan#

scan is an operation used to load and process datasets. It should be preferred over the more generic source node when your input is a dataset. The behavior is defined using arrow::dataset::ScanNodeOptions. More information on datasets and the various scan options can be found in Tabular Datasets.

This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.

Scan example:

271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280  auto options = std::make_shared<arrow::dataset::ScanOptions>();
281  options->projection = cp::project({}, {});  // create empty projection
282
283  // construct the scan node
284  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286  ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288  return ExecutePlanAndCollectAsTable(std::move(scan));
289}

write#

The write node saves query results as a dataset of files in a format like Parquet, Feather, CSV, etc. using the Tabular Datasets functionality in Arrow. The write options are provided via the arrow::dataset::WriteNodeOptions which in turn contains arrow::dataset::FileSystemDatasetWriteOptions. arrow::dataset::FileSystemDatasetWriteOptions provides control over the written dataset, including options like the output directory, file naming scheme, and so on.

Write example:

663/// \brief An example showing a write node
664/// \param file_path The destination to write to
665///
666/// Scan-Filter-Write
667/// This example shows how scan node can be used to load the data
668/// and after processing how it can be written to disk.
669arrow::Status ScanFilterWriteExample(const std::string& file_path) {
670  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
671
672  auto options = std::make_shared<arrow::dataset::ScanOptions>();
673  // empty projection
674  options->projection = cp::project({}, {});
675
676  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
677
678  ac::Declaration scan{"scan", std::move(scan_node_options)};
679
680  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
681
682  std::string root_path = "";
683  std::string uri = "file://" + file_path;
684  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
685                        arrow::fs::FileSystemFromUri(uri, &root_path));
686
687  auto base_path = root_path + "/parquet_dataset";
688  // Uncomment the following line, if run repeatedly
689  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
690  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
691
692  // The partition schema determines which fields are part of the partitioning.
693  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
694  // We'll use Hive-style partitioning,
695  // which creates directories with "key=value" pairs.
696
697  auto partitioning =
698      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
699  // We'll write Parquet files.
700  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
701
702  arrow::dataset::FileSystemDatasetWriteOptions write_options;
703  write_options.file_write_options = format->DefaultWriteOptions();
704  write_options.filesystem = filesystem;
705  write_options.base_dir = base_path;
706  write_options.partitioning = partitioning;
707  write_options.basename_template = "part{i}.parquet";
708
709  arrow::dataset::WriteNodeOptions write_node_options{write_options};
710
711  ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
712
713  // Since the write node has no output we simply run the plan to completion and the
714  // data should be written
715  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
716
717  std::cout << "Dataset written to " << base_path << std::endl;
718  return arrow::Status::OK();
719}

union#

union merges multiple data streams with the same schema into one, similar to a SQL UNION ALL clause.

The following example demonstrates how this can be achieved using two data sources.

Union example:

725/// \brief An example showing a union node
726///
727/// Source-Union-Table
728/// This example shows how a union operation can be applied on two
729/// data sources. The output is collected into a table.
730arrow::Status SourceUnionSinkExample() {
731  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
732
733  ac::Declaration lhs{"source",
734                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
735  lhs.label = "lhs";
736  ac::Declaration rhs{"source",
737                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
738  rhs.label = "rhs";
739  ac::Declaration union_plan{
740      "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
741
742  return ExecutePlanAndCollectAsTable(std::move(union_plan));
743}

hash_join#

hash_join operation provides the relational algebra operation, join using hash-based algorithm. HashJoinNodeOptions contains the options required in defining a join. The hash_join supports left/right/full semi/anti/outerjoins. Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x” which can be appended as a suffix for column names duplicated in both left and right relations.) can be set via the join options. Read more on hash-joins.

Hash-Join example:

604/// \brief An example showing a hash join node
605///
606/// Source-HashJoin-Table
607/// This example shows how source node gets the data and how a self-join
608/// is applied on the data. The join options are configurable. The output
609/// is collected into a table.
610arrow::Status SourceHashJoinSinkExample() {
611  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
612
613  ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
614  ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
615
616  ac::HashJoinNodeOptions join_opts{
617      ac::JoinType::INNER,
618      /*left_keys=*/{"str"},
619      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
620
621  ac::Declaration hashjoin{
622      "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
623
624  return ExecutePlanAndCollectAsTable(std::move(hashjoin));
625}

Summary#

There are examples of these nodes which can be found in cpp/examples/arrow/execution_plan_documentation_examples.cc in the Arrow source.

Complete Example:

 19#include <arrow/array.h>
 20#include <arrow/builder.h>
 21
 22#include <arrow/acero/exec_plan.h>
 23#include <arrow/compute/api.h>
 24#include <arrow/compute/api_vector.h>
 25#include <arrow/compute/cast.h>
 26
 27#include <arrow/csv/api.h>
 28
 29#include <arrow/dataset/dataset.h>
 30#include <arrow/dataset/file_base.h>
 31#include <arrow/dataset/file_parquet.h>
 32#include <arrow/dataset/plan.h>
 33#include <arrow/dataset/scanner.h>
 34
 35#include <arrow/io/interfaces.h>
 36#include <arrow/io/memory.h>
 37
 38#include <arrow/result.h>
 39#include <arrow/status.h>
 40#include <arrow/table.h>
 41
 42#include <arrow/ipc/api.h>
 43
 44#include <arrow/util/future.h>
 45#include <arrow/util/range.h>
 46#include <arrow/util/thread_pool.h>
 47#include <arrow/util/vector.h>
 48
 49#include <iostream>
 50#include <memory>
 51#include <utility>
 52
 53// Demonstrate various operators in Arrow Streaming Execution Engine
 54
 55namespace cp = ::arrow::compute;
 56namespace ac = ::arrow::acero;
 57
 58constexpr char kSep[] = "******";
 59
 60void PrintBlock(const std::string& msg) {
 61  std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
 62}
 63
 64template <typename TYPE,
 65          typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
 66                                             arrow::is_boolean_type<TYPE>::value |
 67                                             arrow::is_temporal_type<TYPE>::value>::type>
 68arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
 69    const std::vector<typename TYPE::c_type>& values) {
 70  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
 71  ArrowBuilderType builder;
 72  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 73  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 74  return builder.Finish();
 75}
 76
 77template <class TYPE>
 78arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
 79    const std::vector<std::string>& values) {
 80  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
 81  ArrowBuilderType builder;
 82  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 83  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 84  return builder.Finish();
 85}
 86
 87arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
 88    const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
 89  std::shared_ptr<arrow::RecordBatch> record_batch;
 90  ARROW_ASSIGN_OR_RAISE(auto struct_result,
 91                        arrow::StructArray::Make(array_vector, field_vector));
 92  return record_batch->FromStructArray(struct_result);
 93}
 94
 95/// \brief Create a sample table
 96/// The table's contents will be:
 97/// a,b
 98/// 1,null
 99/// 2,true
100/// null,true
101/// 3,false
102/// null,true
103/// 4,false
104/// 5,null
105/// 6,false
106/// 7,false
107/// 8,true
108/// \return The created table
109
110arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
111  auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
112  ARROW_ASSIGN_OR_RAISE(auto int64_array,
113                        GetArrayDataSample<arrow::Int64Type>(
114                            {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
115
116  arrow::BooleanBuilder boolean_builder;
117  std::shared_ptr<arrow::BooleanArray> bool_array;
118
119  std::vector<uint8_t> bool_values = {false, true,  true,  false, true,
120                                      false, false, false, false, true};
121  std::vector<bool> is_valid = {false, true,  true, true, true,
122                                true,  false, true, true, true};
123
124  ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
125
126  ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
127
128  ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
129
130  auto record_batch =
131      arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
132                                              arrow::field("b", arrow::boolean())}),
133                               10, {int64_array, bool_array});
134  ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
135  return table;
136}
137
138/// \brief Create a sample dataset
139/// \return An in-memory dataset based on GetTable()
140arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
141  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
142  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
143  return ds;
144}
145
146arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
147    const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
148  std::shared_ptr<arrow::RecordBatch> record_batch;
149  ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
150  cp::ExecBatch batch{*res_batch};
151  return batch;
152}
153
154// (Doc section: BatchesWithSchema Definition)
155struct BatchesWithSchema {
156  std::vector<cp::ExecBatch> batches;
157  std::shared_ptr<arrow::Schema> schema;
158  // This method uses internal arrow utilities to
159  // convert a vector of record batches to an AsyncGenerator of optional batches
160  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
161    auto opt_batches = ::arrow::internal::MapVector(
162        [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
163        batches);
164    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
165    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166    return gen;
167  }
168};
169// (Doc section: BatchesWithSchema Definition)
170
171// (Doc section: MakeBasicBatches Definition)
172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173  BatchesWithSchema out;
174  auto field_vector = {arrow::field("a", arrow::int32()),
175                       arrow::field("b", arrow::boolean())};
176  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181                        GetArrayDataSample<arrow::BooleanType>({false, true}));
182  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187  ARROW_ASSIGN_OR_RAISE(auto b1,
188                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189  ARROW_ASSIGN_OR_RAISE(auto b2,
190                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191  ARROW_ASSIGN_OR_RAISE(auto b3,
192                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194  out.batches = {b1, b2, b3};
195  out.schema = arrow::schema(field_vector);
196  return out;
197}
198// (Doc section: MakeBasicBatches Definition)
199
200arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
201  BatchesWithSchema out;
202  auto field = arrow::field("a", arrow::int32());
203  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
204  ARROW_ASSIGN_OR_RAISE(auto b2_int,
205                        GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
206  ARROW_ASSIGN_OR_RAISE(auto b3_int,
207                        GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
208  ARROW_ASSIGN_OR_RAISE(auto b4_int,
209                        GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
210  ARROW_ASSIGN_OR_RAISE(auto b5_int,
211                        GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
212  ARROW_ASSIGN_OR_RAISE(auto b6_int,
213                        GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
214  ARROW_ASSIGN_OR_RAISE(auto b7_int,
215                        GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
216  ARROW_ASSIGN_OR_RAISE(auto b8_int,
217                        GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
218
219  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
220  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
221  ARROW_ASSIGN_OR_RAISE(auto b3,
222                        GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
223  ARROW_ASSIGN_OR_RAISE(auto b4,
224                        GetExecBatchFromVectors({field, field, field, field},
225                                                {b4_int, b5_int, b6_int, b7_int}));
226  out.batches = {b1, b2, b3, b4};
227  out.schema = arrow::schema({field});
228  return out;
229}
230
231arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
232  BatchesWithSchema out;
233  auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
234  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
235  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
236  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
237  ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
238                                         {"alpha", "beta", "alpha"}));
239  ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
240                                         {"alpha", "gamma", "alpha"}));
241  ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
242                                         {"gamma", "beta", "alpha"}));
243  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
244  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
245  ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
246  out.batches = {b1, b2, b3};
247
248  size_t batch_count = out.batches.size();
249  for (int repeat = 1; repeat < multiplicity; ++repeat) {
250    for (size_t i = 0; i < batch_count; ++i) {
251      out.batches.push_back(out.batches[i]);
252    }
253  }
254
255  out.schema = arrow::schema(fields);
256  return out;
257}
258
259arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) {
260  // collect sink_reader into a Table
261  std::shared_ptr<arrow::Table> response_table;
262  ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan)));
263
264  std::cout << "Results : " << response_table->ToString() << std::endl;
265
266  return arrow::Status::OK();
267}
268
269// (Doc section: Scan Example)
270
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280  auto options = std::make_shared<arrow::dataset::ScanOptions>();
281  options->projection = cp::project({}, {});  // create empty projection
282
283  // construct the scan node
284  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286  ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288  return ExecutePlanAndCollectAsTable(std::move(scan));
289}
290// (Doc section: Scan Example)
291
292// (Doc section: Source Example)
293
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}
313// (Doc section: Source Example)
314
315// (Doc section: Table Source Example)
316
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table.  This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329  int max_batch_size = 2;
330  auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332  ac::Declaration source{"table_source", std::move(table_source_options)};
333
334  return ExecutePlanAndCollectAsTable(std::move(source));
335}
336// (Doc section: Table Source Example)
337
338// (Doc section: Filter Example)
339
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the execution plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349  auto options = std::make_shared<arrow::dataset::ScanOptions>();
350  // specify the filter.  This filter removes all rows where the
351  // value of the "a" column is greater than 3.
352  cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353  // set filter for scanner : on-disk / push-down filtering.
354  // This step can be skipped if you are not reading from disk.
355  options->filter = filter_expr;
356  // empty projection
357  options->projection = cp::project({}, {});
358
359  // construct the scan node
360  std::cout << "Initialized Scanning Options" << std::endl;
361
362  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363  std::cout << "Scan node options created" << std::endl;
364
365  ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367  // pipe the scan node into the filter node
368  // Need to set the filter in scan node options and filter node options.
369  // At scan node it is used for on-disk / push-down filtering.
370  // At filter node it is used for in-memory filtering.
371  ac::Declaration filter{
372      "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374  return ExecutePlanAndCollectAsTable(std::move(filter));
375}
376
377// (Doc section: Filter Example)
378
379// (Doc section: Project Example)
380
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390  auto options = std::make_shared<arrow::dataset::ScanOptions>();
391  // projection
392  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393  options->projection = cp::project({}, {});
394
395  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397  ac::Declaration scan{"scan", std::move(scan_node_options)};
398  ac::Declaration project{
399      "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401  return ExecutePlanAndCollectAsTable(std::move(project));
402}
403
404// (Doc section: Project Example)
405
406// This is a variation of ScanProjectSinkExample introducing how to use the
407// Declaration::Sequence function
408arrow::Status ScanProjectSequenceSinkExample() {
409  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
410
411  auto options = std::make_shared<arrow::dataset::ScanOptions>();
412  // projection
413  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
414  options->projection = cp::project({}, {});
415
416  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
417
418  // (Doc section: Project Sequence Example)
419  // Inputs do not have to be passed to the project node when using Sequence
420  ac::Declaration plan =
421      ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
422                                 {"project", ac::ProjectNodeOptions({a_times_2})}});
423  // (Doc section: Project Sequence Example)
424
425  return ExecutePlanAndCollectAsTable(std::move(plan));
426}
427
428// (Doc section: Scalar Aggregate Example)
429
430/// \brief An example showing an aggregation node to aggregate an entire table
431///
432/// Source-Aggregation-Table
433/// This example shows how an aggregation operation can be applied on a
434/// execution plan resulting in a scalar output. The source node loads the
435/// data and the aggregation (counting unique types in column 'a')
436/// is applied on this data. The output is collected into a table (that will
437/// have exactly one row)
438arrow::Status SourceScalarAggregateSinkExample() {
439  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
440
441  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
442
443  ac::Declaration source{"source", std::move(source_node_options)};
444  auto aggregate_options =
445      ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
446  ac::Declaration aggregate{
447      "aggregate", {std::move(source)}, std::move(aggregate_options)};
448
449  return ExecutePlanAndCollectAsTable(std::move(aggregate));
450}
451// (Doc section: Scalar Aggregate Example)
452
453// (Doc section: Group Aggregate Example)
454
455/// \brief An example showing an aggregation node to perform a group-by operation
456///
457/// Source-Aggregation-Table
458/// This example shows how an aggregation operation can be applied on a
459/// execution plan resulting in grouped output. The source node loads the
460/// data and the aggregation (counting unique types in column 'a') is
461/// applied on this data. The output is collected into a table that will contain
462/// one row for each unique combination of group keys.
463arrow::Status SourceGroupAggregateSinkExample() {
464  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
465
466  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
467
468  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
469
470  ac::Declaration source{"source", std::move(source_node_options)};
471  auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
472  auto aggregate_options =
473      ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
474                               /*keys=*/{"b"}};
475  ac::Declaration aggregate{
476      "aggregate", {std::move(source)}, std::move(aggregate_options)};
477
478  return ExecutePlanAndCollectAsTable(std::move(aggregate));
479}
480// (Doc section: Group Aggregate Example)
481
482// (Doc section: ConsumingSink Example)
483
484/// \brief An example showing a consuming sink node
485///
486/// Source-Consuming-Sink
487/// This example shows how the data can be consumed within the execution plan
488/// by using a ConsumingSink node. There is no data output from this execution plan.
489arrow::Status SourceConsumingSinkExample() {
490  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
491
492  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
493
494  ac::Declaration source{"source", std::move(source_node_options)};
495
496  std::atomic<uint32_t> batches_seen{0};
497  arrow::Future<> finish = arrow::Future<>::Make();
498  struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
499    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
500        : batches_seen(batches_seen), finish(std::move(finish)) {}
501
502    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
503                       ac::BackpressureControl* backpressure_control,
504                       ac::ExecPlan* plan) override {
505      // This will be called as the plan is started (before the first call to Consume)
506      // and provides the schema of the data coming into the node, controls for pausing /
507      // resuming input, and a pointer to the plan itself which can be used to access
508      // other utilities such as the thread indexer or async task scheduler.
509      return arrow::Status::OK();
510    }
511
512    arrow::Status Consume(cp::ExecBatch batch) override {
513      (*batches_seen)++;
514      return arrow::Status::OK();
515    }
516
517    arrow::Future<> Finish() override {
518      // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
519      // output file handles and flushing remaining work
520      return arrow::Future<>::MakeFinished();
521    }
522
523    std::atomic<uint32_t>* batches_seen;
524    arrow::Future<> finish;
525  };
526  std::shared_ptr<CustomSinkNodeConsumer> consumer =
527      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
528
529  ac::Declaration consuming_sink{"consuming_sink",
530                                 {std::move(source)},
531                                 ac::ConsumingSinkNodeOptions(std::move(consumer))};
532
533  // Since we are consuming the data within the plan there is no output and we simply
534  // run the plan to completion instead of collecting into a table.
535  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
536
537  std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
538            << std::endl;
539  return arrow::Status::OK();
540}
541// (Doc section: ConsumingSink Example)
542
543// (Doc section: OrderBySink Example)
544
545arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
546    std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
547    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
548  // translate sink_gen (async) to sink_reader (sync)
549  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
550      ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
551
552  // validate the ExecPlan
553  ARROW_RETURN_NOT_OK(plan->Validate());
554  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
555  // start the ExecPlan
556  plan->StartProducing();
557
558  // collect sink_reader into a Table
559  std::shared_ptr<arrow::Table> response_table;
560
561  ARROW_ASSIGN_OR_RAISE(response_table,
562                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
563
564  std::cout << "Results : " << response_table->ToString() << std::endl;
565
566  // stop producing
567  plan->StopProducing();
568  // plan mark finished
569  auto future = plan->finished();
570  return future.status();
571}
572
573/// \brief An example showing an order-by node
574///
575/// Source-OrderBy-Sink
576/// In this example, the data enters through the source node
577/// and the data is ordered in the sink node. The order can be
578/// ASCENDING or DESCENDING and it is configurable. The output
579/// is obtained as a table from the sink node.
580arrow::Status SourceOrderBySinkExample() {
581  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
582                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
583
584  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
585
586  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
587
588  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
589  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
590                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
591
592  ARROW_RETURN_NOT_OK(ac::MakeExecNode(
593      "order_by_sink", plan.get(), {source},
594      ac::OrderBySinkNodeOptions{
595          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
596
597  return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
598}
599
600// (Doc section: OrderBySink Example)
601
602// (Doc section: HashJoin Example)
603
604/// \brief An example showing a hash join node
605///
606/// Source-HashJoin-Table
607/// This example shows how source node gets the data and how a self-join
608/// is applied on the data. The join options are configurable. The output
609/// is collected into a table.
610arrow::Status SourceHashJoinSinkExample() {
611  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
612
613  ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
614  ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
615
616  ac::HashJoinNodeOptions join_opts{
617      ac::JoinType::INNER,
618      /*left_keys=*/{"str"},
619      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
620
621  ac::Declaration hashjoin{
622      "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
623
624  return ExecutePlanAndCollectAsTable(std::move(hashjoin));
625}
626
627// (Doc section: HashJoin Example)
628
629// (Doc section: KSelect Example)
630
631/// \brief An example showing a select-k node
632///
633/// Source-KSelect
634/// This example shows how K number of elements can be selected
635/// either from the top or bottom. The output node is a modified
636/// sink node where output can be obtained as a table.
637arrow::Status SourceKSelectExample() {
638  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
639  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
640                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
641  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
642
643  ARROW_ASSIGN_OR_RAISE(
644      ac::ExecNode * source,
645      ac::MakeExecNode("source", plan.get(), {},
646                       ac::SourceNodeOptions{input.schema, input.gen()}));
647
648  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
649
650  ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
651                                       ac::SelectKSinkNodeOptions{options, &sink_gen}));
652
653  auto schema = arrow::schema(
654      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
655
656  return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
657}
658
659// (Doc section: KSelect Example)
660
661// (Doc section: Write Example)
662
663/// \brief An example showing a write node
664/// \param file_path The destination to write to
665///
666/// Scan-Filter-Write
667/// This example shows how scan node can be used to load the data
668/// and after processing how it can be written to disk.
669arrow::Status ScanFilterWriteExample(const std::string& file_path) {
670  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
671
672  auto options = std::make_shared<arrow::dataset::ScanOptions>();
673  // empty projection
674  options->projection = cp::project({}, {});
675
676  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
677
678  ac::Declaration scan{"scan", std::move(scan_node_options)};
679
680  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
681
682  std::string root_path = "";
683  std::string uri = "file://" + file_path;
684  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
685                        arrow::fs::FileSystemFromUri(uri, &root_path));
686
687  auto base_path = root_path + "/parquet_dataset";
688  // Uncomment the following line, if run repeatedly
689  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
690  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
691
692  // The partition schema determines which fields are part of the partitioning.
693  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
694  // We'll use Hive-style partitioning,
695  // which creates directories with "key=value" pairs.
696
697  auto partitioning =
698      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
699  // We'll write Parquet files.
700  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
701
702  arrow::dataset::FileSystemDatasetWriteOptions write_options;
703  write_options.file_write_options = format->DefaultWriteOptions();
704  write_options.filesystem = filesystem;
705  write_options.base_dir = base_path;
706  write_options.partitioning = partitioning;
707  write_options.basename_template = "part{i}.parquet";
708
709  arrow::dataset::WriteNodeOptions write_node_options{write_options};
710
711  ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
712
713  // Since the write node has no output we simply run the plan to completion and the
714  // data should be written
715  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
716
717  std::cout << "Dataset written to " << base_path << std::endl;
718  return arrow::Status::OK();
719}
720
721// (Doc section: Write Example)
722
723// (Doc section: Union Example)
724
725/// \brief An example showing a union node
726///
727/// Source-Union-Table
728/// This example shows how a union operation can be applied on two
729/// data sources. The output is collected into a table.
730arrow::Status SourceUnionSinkExample() {
731  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
732
733  ac::Declaration lhs{"source",
734                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
735  lhs.label = "lhs";
736  ac::Declaration rhs{"source",
737                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
738  rhs.label = "rhs";
739  ac::Declaration union_plan{
740      "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
741
742  return ExecutePlanAndCollectAsTable(std::move(union_plan));
743}
744
745// (Doc section: Union Example)
746
747// (Doc section: Table Sink Example)
748
749/// \brief An example showing a table sink node
750///
751/// TableSink Example
752/// This example shows how a table_sink can be used
753/// in an execution plan. This includes a source node
754/// receiving data as batches and the table sink node
755/// which emits the output as a table.
756arrow::Status TableSinkExample() {
757  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
758                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
759
760  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
761
762  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
763
764  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
765                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
766
767  std::shared_ptr<arrow::Table> output_table;
768  auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
769
770  ARROW_RETURN_NOT_OK(
771      ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
772  // validate the ExecPlan
773  ARROW_RETURN_NOT_OK(plan->Validate());
774  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
775  // start the ExecPlan
776  plan->StartProducing();
777
778  // Wait for the plan to finish
779  auto finished = plan->finished();
780  RETURN_NOT_OK(finished.status());
781  std::cout << "Results : " << output_table->ToString() << std::endl;
782  return arrow::Status::OK();
783}
784
785// (Doc section: Table Sink Example)
786
787// (Doc section: RecordBatchReaderSource Example)
788
789/// \brief An example showing the usage of a RecordBatchReader as the data source.
790///
791/// RecordBatchReaderSourceSink Example
792/// This example shows how a record_batch_reader_source can be used
793/// in an execution plan. This includes the source node
794/// receiving data from a TableRecordBatchReader.
795
796arrow::Status RecordBatchReaderSourceSinkExample() {
797  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
798  std::shared_ptr<arrow::RecordBatchReader> reader =
799      std::make_shared<arrow::TableBatchReader>(table);
800  ac::Declaration reader_source{"record_batch_reader_source",
801                                ac::RecordBatchReaderSourceNodeOptions{reader}};
802  return ExecutePlanAndCollectAsTable(std::move(reader_source));
803}
804
805// (Doc section: RecordBatchReaderSource Example)
806
807enum ExampleMode {
808  SOURCE_SINK = 0,
809  TABLE_SOURCE_SINK = 1,
810  SCAN = 2,
811  FILTER = 3,
812  PROJECT = 4,
813  SCALAR_AGGREGATION = 5,
814  GROUP_AGGREGATION = 6,
815  CONSUMING_SINK = 7,
816  ORDER_BY_SINK = 8,
817  HASHJOIN = 9,
818  KSELECT = 10,
819  WRITE = 11,
820  UNION = 12,
821  TABLE_SOURCE_TABLE_SINK = 13,
822  RECORD_BATCH_READER_SOURCE = 14,
823  PROJECT_SEQUENCE = 15
824};
825
826int main(int argc, char** argv) {
827  if (argc < 3) {
828    // Fake success for CI purposes.
829    return EXIT_SUCCESS;
830  }
831
832  std::string base_save_path = argv[1];
833  int mode = std::atoi(argv[2]);
834  arrow::Status status;
835  // ensure arrow::dataset node factories are in the registry
836  arrow::dataset::internal::Initialize();
837  switch (mode) {
838    case SOURCE_SINK:
839      PrintBlock("Source Sink Example");
840      status = SourceSinkExample();
841      break;
842    case TABLE_SOURCE_SINK:
843      PrintBlock("Table Source Sink Example");
844      status = TableSourceSinkExample();
845      break;
846    case SCAN:
847      PrintBlock("Scan Example");
848      status = ScanSinkExample();
849      break;
850    case FILTER:
851      PrintBlock("Filter Example");
852      status = ScanFilterSinkExample();
853      break;
854    case PROJECT:
855      PrintBlock("Project Example");
856      status = ScanProjectSinkExample();
857      break;
858    case PROJECT_SEQUENCE:
859      PrintBlock("Project Example (using Declaration::Sequence)");
860      status = ScanProjectSequenceSinkExample();
861      break;
862    case GROUP_AGGREGATION:
863      PrintBlock("Aggregate Example");
864      status = SourceGroupAggregateSinkExample();
865      break;
866    case SCALAR_AGGREGATION:
867      PrintBlock("Aggregate Example");
868      status = SourceScalarAggregateSinkExample();
869      break;
870    case CONSUMING_SINK:
871      PrintBlock("Consuming-Sink Example");
872      status = SourceConsumingSinkExample();
873      break;
874    case ORDER_BY_SINK:
875      PrintBlock("OrderBy Example");
876      status = SourceOrderBySinkExample();
877      break;
878    case HASHJOIN:
879      PrintBlock("HashJoin Example");
880      status = SourceHashJoinSinkExample();
881      break;
882    case KSELECT:
883      PrintBlock("KSelect Example");
884      status = SourceKSelectExample();
885      break;
886    case WRITE:
887      PrintBlock("Write Example");
888      status = ScanFilterWriteExample(base_save_path);
889      break;
890    case UNION:
891      PrintBlock("Union Example");
892      status = SourceUnionSinkExample();
893      break;
894    case TABLE_SOURCE_TABLE_SINK:
895      PrintBlock("TableSink Example");
896      status = TableSinkExample();
897      break;
898    case RECORD_BATCH_READER_SOURCE:
899      PrintBlock("RecordBatchReaderSource Example");
900      status = RecordBatchReaderSourceSinkExample();
901      break;
902    default:
903      break;
904  }
905
906  if (status.ok()) {
907    return EXIT_SUCCESS;
908  } else {
909    std::cout << "Error occurred: " << status.message() << std::endl;
910    return EXIT_FAILURE;
911  }
912}