Acero User’s Guide#
This page describes how to use Acero. It’s recommended that you read the overview first and familiarize yourself with the basic concepts.
Using Acero#
The basic workflow for Acero is this:
First, create a graph of
Declaration
objects describing the planCall one of the DeclarationToXyz methods to execute the Declaration.
A new ExecPlan is created from the graph of Declarations. Each Declaration will correspond to one ExecNode in the plan. In addition, a sink node will be added, depending on which DeclarationToXyz method was used.
The ExecPlan is executed. Typically this happens as part of the DeclarationToXyz call but in DeclarationToReader the reader is returned before the plan is finished executing.
Once the plan is finished it is destroyed
Creating a Plan#
Using Substrait#
Substrait is the preferred mechanism for creating a plan (graph of Declaration
). There are a few
reasons for this:
Substrait producers spend a lot of time and energy in creating user-friendly APIs for producing complex execution plans in a simple way. For example, the
pivot_wider
operation can be achieved using a complex series ofaggregate
nodes. Rather than create all of thoseaggregate
nodes by hand a producer will give you a much simpler API.If you are using Substrait then you can easily switch out to any other Substrait-consuming engine should you at some point find that it serves your needs better than Acero.
We hope that tools will eventually emerge for Substrait-based optimizers and planners. By using Substrait you will be making it much easier to use these tools in the future.
You could create the Substrait plan yourself but you’ll probably have a much easier time finding an existing Substrait producer. For example, you could use ibis-substrait to easily create Substrait plans from python expressions. There are a few different tools that are able to create Substrait plans from SQL. Eventually, we hope that C++ based Substrait producers will emerge. However, we are not aware of any at this time.
Detailed instructions on creating an execution plan from Substrait can be found in the Substrait page
Programmatic Plan Creation#
Creating an execution plan programmatically is simpler than creating a plan from Substrait, though loses some of the flexibility and future-proofing guarantees. The simplest way to create a Declaration is to simply instantiate one. You will need the name of the declaration, a vector of inputs, and an options object. For example:
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390 auto options = std::make_shared<arrow::dataset::ScanOptions>();
391 // projection
392 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393 options->projection = cp::project({}, {});
394
395 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397 ac::Declaration scan{"scan", std::move(scan_node_options)};
398 ac::Declaration project{
399 "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401 return ExecutePlanAndCollectAsTable(std::move(project));
402}
The above code creates a scan declaration (which has no inputs) and a project declaration (using the scan as
input). This is simple enough but we can make it slightly easier. If you are creating a linear sequence of
declarations (like in the above example) then you can also use the Declaration::Sequence()
function.
420 // Inputs do not have to be passed to the project node when using Sequence
421 ac::Declaration plan =
422 ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
423 {"project", ac::ProjectNodeOptions({a_times_2})}});
There are many more examples of programmatic plan creation later in this document.
Executing a Plan#
There are a number of different methods that can be used to execute a declaration. Each one provides the
data in a slightly different form. Since all of these methods start with DeclarationTo...
this guide
will often refer to these methods as the DeclarationToXyz
methods.
DeclarationToTable#
The DeclarationToTable()
method will accumulate all of the results into a single arrow::Table
.
This is perhaps the simplest way to collect results from Acero. The main disadvantage to this approach is
that it requires accumulating all results into memory.
Note
Acero processes large datasets in small chunks. This is described in more detail in the developer’s guide. As a result, you may be surprised to find that a table collected with DeclarationToTable is chunked differently than your input. For example, your input might be a large table with a single chunk with 2 million rows. Your output table might then have 64 chunks with 32Ki rows each. There is a current request to specify the chunk size for the output in GH-15155.
DeclarationToReader#
The DeclarationToReader()
method allows you to iteratively consume the results. It will create an
arrow::RecordBatchReader
which you can read from at your leisure. If you do not read from the
reader quickly enough then backpressure will be applied and the execution plan will pause. Closing the
reader will cancel the running execution plan and the reader’s destructor will wait for the execution plan
to finish whatever it is doing and so it may block.
DeclarationToStatus#
The DeclarationToStatus()
method is useful if you want to run the plan but do not actually want to
consume the results. For example, this is useful when benchmarking or when the plan has side effects such
as a dataset write node. If the plan generates any results then they will be immediately discarded.
Running a Plan Directly#
If one of the DeclarationToXyz
methods is not sufficient for some reason then it is possible to run a plan
directly. This should only be needed if you are doing something unique. For example, if you have created a
custom sink node or if you need a plan that has multiple outputs.
Note
In academic literature and many existing systems there is a general assumption that an execution plan has at most one output. There are some things in Acero, such as the DeclarationToXyz methods, which will expect this. However, there is nothing in the design that strictly prevents having multiple sink nodes.
Detailed instructions on how to do this are out of scope for this guide but the rough steps are:
Create a new
ExecPlan
object.Add sink nodes to your graph of
Declaration
objects (this is the only type you will need to create declarations for sink nodes)Use
Declaration::AddToPlan()
to add your declaration to your plan (if you have more than one output then you will not be able to use this method and will need to add your nodes one at a time)Validate the plan with
ExecPlan::Validate()
Start the plan with
ExecPlan::StartProducing()
Wait for the future returned by
ExecPlan::finished()
to complete.
Providing Input#
Input data for an exec plan can come from a variety of sources. It is often read from files stored on some kind of filesystem. It is also common for input to come from in-memory data. In-memory data is typical, for example, in a pandas-like frontend. Input could also come from network streams like a Flight request. Acero can support all of these cases and can even support unique and custom situations not mentioned here.
There are pre-defined source nodes that cover the most common input scenarios. These are listed below. However,
if your source data is unique then you will need to use the generic source
node. This node expects you to
provide an asycnhronous stream of batches and is covered in more detail here.
Available ExecNode
Implementations#
The following tables quickly summarize the available operators.
Sources#
These nodes can be used as sources of data
Factory Name |
Options |
Brief Description |
---|---|---|
|
A generic source node that wraps an asynchronous stream of data (example) |
|
|
Generates data from an |
|
|
Generates data from an iterator of |
|
|
Generates data from an |
|
|
Generates data from an iterator of |
|
|
Generates data from an iterator of vectors of |
|
|
Generates data from an |
Compute Nodes#
These nodes perform computations on data and may transform or reshape the data
Factory Name |
Options |
Brief Description |
---|---|---|
|
Removes rows that do not match a given filter expression (example) |
|
|
Creates new columns by evaluating compute expressions. Can also drop and reorder columns (example) |
|
|
Calculates summary statistics across the entire input stream or on groups of data (example) |
|
|
Reshapes data by converting some columns into additional rows |
Arrangement Nodes#
These nodes reorder, combine, or slice streams of data
Factory Name |
Options |
Brief Description |
---|---|---|
|
Joins two inputs based on common columns (example) |
|
|
Joins multiple inputs to the first input based on a common ordered column (often time) |
|
|
N/A |
Merges two inputs with identical schemas (example) |
|
Reorders a stream |
|
|
Slices a range of rows from a stream |
Sink Nodes#
These nodes terminate a plan. Users do not typically create sink nodes as they are selected based on the DeclarationToXyz method used to consume the plan. However, this list may be useful for those developing new sink nodes or using Acero in advanced ways.
Factory Name |
Options |
Brief Description |
---|---|---|
|
Collects batches into a FIFO queue with optional backpressure |
|
|
Writes batches to a filesystem (example) |
|
|
Consumes batches using a user provided callback function |
|
|
Collects batches into an |
|
|
Deprecated |
|
|
Deprecated |
Examples#
The rest of this document contains example execution plans. Each example highlights the behavior of a specific execution node.
source
#
A source
operation can be considered as an entry point to create a streaming execution plan.
SourceNodeOptions
are used to create the source
operation. The
source
operation is the most generic and flexible type of source currently available but it can
be quite tricky to configure. First you should review the other source node types to ensure there
isn’t a simpler choice.
The source node requires some kind of function that can be called to poll for more data. This
function should take no arguments and should return an
arrow::Future<std::optional<arrow::ExecBatch>>
.
This function might be reading a file, iterating through an in memory structure, or receiving data
from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator
and there are a number of utilities for working with these functions. For this example we use
a vector of record batches that we’ve already stored in memory.
In addition, the schema of the data must be known up front. Acero must know the schema of the data
at each stage of the execution graph before any processing has begun. This means we must supply the
schema for a source node separately from the data itself.
Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :
156struct BatchesWithSchema {
157 std::vector<cp::ExecBatch> batches;
158 std::shared_ptr<arrow::Schema> schema;
159 // This method uses internal arrow utilities to
160 // convert a vector of record batches to an AsyncGenerator of optional batches
161 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
162 auto opt_batches = ::arrow::internal::MapVector(
163 [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
164 batches);
165 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
166 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
167 return gen;
168 }
169};
Generating sample batches for computation:
173arrow::Result<BatchesWithSchema> MakeBasicBatches() {
174 BatchesWithSchema out;
175 auto field_vector = {arrow::field("a", arrow::int32()),
176 arrow::field("b", arrow::boolean())};
177 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
178 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
179 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
180
181 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
182 GetArrayDataSample<arrow::BooleanType>({false, true}));
183 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
184 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
185 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
186 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
187
188 ARROW_ASSIGN_OR_RAISE(auto b1,
189 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
190 ARROW_ASSIGN_OR_RAISE(auto b2,
191 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
192 ARROW_ASSIGN_OR_RAISE(auto b3,
193 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
194
195 out.batches = {b1, b2, b3};
196 out.schema = arrow::schema(field_vector);
197 return out;
198}
Example of using source
(usage of sink is explained in detail in sink):
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
table_source
#
In the previous example, source node, a source node
was used to input the data. But when developing an application, if the data is already in memory
as a table, it is much easier, and more performant to use TableSourceNodeOptions
.
Here the input data can be passed as a std::shared_ptr<arrow::Table>
along with a max_batch_size
.
The max_batch_size
is to break up large record batches so that they can be processed in parallel.
It is important to note that the table batches will not get merged to form larger batches when the source
table has a smaller batch size.
Example of using table_source
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table. This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329 int max_batch_size = 2;
330 auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332 ac::Declaration source{"table_source", std::move(table_source_options)};
333
334 return ExecutePlanAndCollectAsTable(std::move(source));
335}
filter
#
filter
operation, as the name suggests, provides an option to define data filtering
criteria. It selects rows where the given expression evaluates to true. Filters can be written using
arrow::compute::Expression
, and the expression should have a return type of boolean.
For example, if we wish to keep rows where the value
of column b
is greater than 3, then we can use the following expression.
Filter example:
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the exeuction plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349 auto options = std::make_shared<arrow::dataset::ScanOptions>();
350 // specify the filter. This filter removes all rows where the
351 // value of the "a" column is greater than 3.
352 cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353 // set filter for scanner : on-disk / push-down filtering.
354 // This step can be skipped if you are not reading from disk.
355 options->filter = filter_expr;
356 // empty projection
357 options->projection = cp::project({}, {});
358
359 // construct the scan node
360 std::cout << "Initialized Scanning Options" << std::endl;
361
362 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363 std::cout << "Scan node options created" << std::endl;
364
365 ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367 // pipe the scan node into the filter node
368 // Need to set the filter in scan node options and filter node options.
369 // At scan node it is used for on-disk / push-down filtering.
370 // At filter node it is used for in-memory filtering.
371 ac::Declaration filter{
372 "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374 return ExecutePlanAndCollectAsTable(std::move(filter));
375}
project
#
project
operation rearranges, deletes, transforms, and creates columns.
Each output column is computed by evaluating an expression
against the source record batch. These must be scalar expressions
(expressions consisting of scalar literals, field references and scalar
functions, i.e. elementwise functions that return one value for each input
row independent of the value of all other rows).
This is exposed via ProjectNodeOptions
which requires,
an arrow::compute::Expression
and name for each of the output columns (if names are not
provided, the string representations of exprs will be used).
Project example:
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390 auto options = std::make_shared<arrow::dataset::ScanOptions>();
391 // projection
392 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393 options->projection = cp::project({}, {});
394
395 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397 ac::Declaration scan{"scan", std::move(scan_node_options)};
398 ac::Declaration project{
399 "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401 return ExecutePlanAndCollectAsTable(std::move(project));
402}
aggregate
#
The aggregate
node computes various types of aggregates over data.
Arrow supports two types of aggregates: “scalar” aggregates, and
“hash” aggregates. Scalar aggregates reduce an array or scalar input
to a single scalar output (e.g. computing the mean of a column). Hash
aggregates act like GROUP BY
in SQL and first partition data based
on one or more key columns, then reduce the data in each
partition. The aggregate
node supports both types of computation,
and can compute any number of aggregations at once.
AggregateNodeOptions
is used to define the
aggregation criteria. It takes a list of aggregation functions and
their options; a list of target fields to aggregate, one per function;
and a list of names for the output fields, one per function.
Optionally, it takes a list of columns that are used to partition the
data, in the case of a hash aggregation. The aggregation functions
can be selected from this list of aggregation functions.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.
Scalar Aggregation example:
430/// \brief An example showing an aggregation node to aggregate an entire table
431///
432/// Source-Aggregation-Table
433/// This example shows how an aggregation operation can be applied on a
434/// execution plan resulting in a scalar output. The source node loads the
435/// data and the aggregation (counting unique types in column 'a')
436/// is applied on this data. The output is collected into a table (that will
437/// have exactly one row)
438arrow::Status SourceScalarAggregateSinkExample() {
439 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
440
441 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
442
443 ac::Declaration source{"source", std::move(source_node_options)};
444 auto aggregate_options =
445 ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
446 ac::Declaration aggregate{
447 "aggregate", {std::move(source)}, std::move(aggregate_options)};
448
449 return ExecutePlanAndCollectAsTable(std::move(aggregate));
450}
Group Aggregation example:
455/// \brief An example showing an aggregation node to perform a group-by operation
456///
457/// Source-Aggregation-Table
458/// This example shows how an aggregation operation can be applied on a
459/// execution plan resulting in grouped output. The source node loads the
460/// data and the aggregation (counting unique types in column 'a') is
461/// applied on this data. The output is collected into a table that will contain
462/// one row for each unique combination of group keys.
463arrow::Status SourceGroupAggregateSinkExample() {
464 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
465
466 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
467
468 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
469
470 ac::Declaration source{"source", std::move(source_node_options)};
471 auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
472 auto aggregate_options =
473 ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
474 /*keys=*/{"b"}};
475 ac::Declaration aggregate{
476 "aggregate", {std::move(source)}, std::move(aggregate_options)};
477
478 return ExecutePlanAndCollectAsTable(std::move(aggregate));
479}
sink
#
sink
operation provides output and is the final node of a streaming
execution definition. SinkNodeOptions
interface is used to pass
the required options. Similar to the source operator the sink operator exposes the output
with a function that returns a record batch future each time it is called. It is expected the
caller will repeatedly call this function until the generator function is exhausted (returns
std::optional::nullopt
). If this function is not called often enough then record batches
will accumulate in memory. An execution plan should only have one
“terminal” node (one sink node). An ExecPlan
can terminate early due to cancellation or
an error, before the output is fully consumed. However, the plan can be safely destroyed independently
of the sink, which will hold the unconsumed batches by exec_plan->finished().
As a part of the Source Example, the Sink operation is also included;
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
consuming_sink
#
consuming_sink
operator is a sink operation containing consuming operation within the
execution plan (i.e. the exec plan should not complete until the consumption has completed).
Unlike the sink
node this node takes in a callback function that is expected to consume the
batch. Once this callback has finished the execution plan will no longer hold any reference to
the batch.
The consuming function may be called before a previous invocation has completed. If the consuming
function does not run quickly enough then many concurrent executions could pile up, blocking the
CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks
have been completed.
Once all batches have been delivered the execution plan will wait for the finish future to complete
before marking the execution plan finished. This allows for workflows where the consumption function
converts batches into async tasks (this is currently done internally for the dataset write node).
Example:
// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
batches_seen(batches_seen), finish(std::move(finish)) {}
// Consumption logic can be written here
arrow::Status Consume(cp::ExecBatch batch) override {
// data can be consumed in the expected way
// transfer to another system or just do some work
// and write to disk
(*batches_seen)++;
return arrow::Status::OK();
}
arrow::Future<> Finish() override { return finish; }
std::atomic<uint32_t> *batches_seen;
arrow::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
arrow::acero::ExecNode *consuming_sink;
ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
{source}, cp::ConsumingSinkNodeOptions(consumer)));
Consuming-Sink example:
484/// \brief An example showing a consuming sink node
485///
486/// Source-Consuming-Sink
487/// This example shows how the data can be consumed within the execution plan
488/// by using a ConsumingSink node. There is no data output from this execution plan.
489arrow::Status SourceConsumingSinkExample() {
490 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
491
492 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
493
494 ac::Declaration source{"source", std::move(source_node_options)};
495
496 std::atomic<uint32_t> batches_seen{0};
497 arrow::Future<> finish = arrow::Future<>::Make();
498 struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
499 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
500 : batches_seen(batches_seen), finish(std::move(finish)) {}
501
502 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
503 ac::BackpressureControl* backpressure_control,
504 ac::ExecPlan* plan) override {
505 // This will be called as the plan is started (before the first call to Consume)
506 // and provides the schema of the data coming into the node, controls for pausing /
507 // resuming input, and a pointer to the plan itself which can be used to access
508 // other utilities such as the thread indexer or async task scheduler.
509 return arrow::Status::OK();
510 }
511
512 arrow::Status Consume(cp::ExecBatch batch) override {
513 (*batches_seen)++;
514 return arrow::Status::OK();
515 }
516
517 arrow::Future<> Finish() override {
518 // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
519 // output file handles and flushing remaining work
520 return arrow::Future<>::MakeFinished();
521 }
522
523 std::atomic<uint32_t>* batches_seen;
524 arrow::Future<> finish;
525 };
526 std::shared_ptr<CustomSinkNodeConsumer> consumer =
527 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
528
529 ac::Declaration consuming_sink{"consuming_sink",
530 {std::move(source)},
531 ac::ConsumingSinkNodeOptions(std::move(consumer))};
532
533 // Since we are consuming the data within the plan there is no output and we simply
534 // run the plan to completion instead of collecting into a table.
535 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
536
537 std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
538 << std::endl;
539 return arrow::Status::OK();
540}
order_by_sink
#
order_by_sink
operation is an extension to the sink
operation.
This operation provides the ability to guarantee the ordering of the
stream by providing the OrderBySinkNodeOptions
.
Here the arrow::compute::SortOptions
are provided to define which columns
are used for sorting and whether to sort by ascending or descending values.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
Order-By-Sink example:
545arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
546 std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
547 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
548 // translate sink_gen (async) to sink_reader (sync)
549 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
550 ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
551
552 // validate the ExecPlan
553 ARROW_RETURN_NOT_OK(plan->Validate());
554 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
555 // start the ExecPlan
556 plan->StartProducing();
557
558 // collect sink_reader into a Table
559 std::shared_ptr<arrow::Table> response_table;
560
561 ARROW_ASSIGN_OR_RAISE(response_table,
562 arrow::Table::FromRecordBatchReader(sink_reader.get()));
563
564 std::cout << "Results : " << response_table->ToString() << std::endl;
565
566 // stop producing
567 plan->StopProducing();
568 // plan mark finished
569 auto future = plan->finished();
570 return future.status();
571}
572
573/// \brief An example showing an order-by node
574///
575/// Source-OrderBy-Sink
576/// In this example, the data enters through the source node
577/// and the data is ordered in the sink node. The order can be
578/// ASCENDING or DESCENDING and it is configurable. The output
579/// is obtained as a table from the sink node.
580arrow::Status SourceOrderBySinkExample() {
581 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
582 ac::ExecPlan::Make(*cp::threaded_exec_context()));
583
584 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
585
586 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
587
588 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
589 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
590 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
591
592 ARROW_RETURN_NOT_OK(ac::MakeExecNode(
593 "order_by_sink", plan.get(), {source},
594 ac::OrderBySinkNodeOptions{
595 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
596
597 return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
598}
select_k_sink
#
select_k_sink
option enables selecting the top/bottom K elements,
similar to a SQL ORDER BY ... LIMIT K
clause.
SelectKOptions
which is a defined by
using OrderBySinkNode
definition. This option returns a sink node that receives
inputs and then compute top_k/bottom_k.
Note
This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
SelectK example:
631/// \brief An example showing a select-k node
632///
633/// Source-KSelect
634/// This example shows how K number of elements can be selected
635/// either from the top or bottom. The output node is a modified
636/// sink node where output can be obtained as a table.
637arrow::Status SourceKSelectExample() {
638 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
639 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
640 ac::ExecPlan::Make(*cp::threaded_exec_context()));
641 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
642
643 ARROW_ASSIGN_OR_RAISE(
644 ac::ExecNode * source,
645 ac::MakeExecNode("source", plan.get(), {},
646 ac::SourceNodeOptions{input.schema, input.gen()}));
647
648 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
649
650 ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
651 ac::SelectKSinkNodeOptions{options, &sink_gen}));
652
653 auto schema = arrow::schema(
654 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
655
656 return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
657}
table_sink
#
The table_sink
node provides the ability to receive the output as an in-memory table.
This is simpler to use than the other sink nodes provided by the streaming execution engine
but it only makes sense when the output fits comfortably in memory.
The node is created using TableSinkNodeOptions
.
Example of using table_sink
749/// \brief An example showing a table sink node
750///
751/// TableSink Example
752/// This example shows how a table_sink can be used
753/// in an execution plan. This includes a source node
754/// receiving data as batches and the table sink node
755/// which emits the output as a table.
756arrow::Status TableSinkExample() {
757 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
758 ac::ExecPlan::Make(*cp::threaded_exec_context()));
759
760 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
761
762 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
763
764 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
765 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
766
767 std::shared_ptr<arrow::Table> output_table;
768 auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
769
770 ARROW_RETURN_NOT_OK(
771 ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
772 // validate the ExecPlan
773 ARROW_RETURN_NOT_OK(plan->Validate());
774 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
775 // start the ExecPlan
776 plan->StartProducing();
777
778 // Wait for the plan to finish
779 auto finished = plan->finished();
780 RETURN_NOT_OK(finished.status());
781 std::cout << "Results : " << output_table->ToString() << std::endl;
782 return arrow::Status::OK();
783}
scan
#
scan
is an operation used to load and process datasets. It should be preferred over the
more generic source
node when your input is a dataset. The behavior is defined using
arrow::dataset::ScanNodeOptions
. More information on datasets and the various
scan options can be found in Tabular Datasets.
This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.
Scan example:
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280 auto options = std::make_shared<arrow::dataset::ScanOptions>();
281 options->projection = cp::project({}, {}); // create empty projection
282
283 // construct the scan node
284 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286 ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288 return ExecutePlanAndCollectAsTable(std::move(scan));
289}
write
#
The write
node saves query results as a dataset of files in a
format like Parquet, Feather, CSV, etc. using the Tabular Datasets
functionality in Arrow. The write options are provided via the
arrow::dataset::WriteNodeOptions
which in turn contains
arrow::dataset::FileSystemDatasetWriteOptions
.
arrow::dataset::FileSystemDatasetWriteOptions
provides
control over the written dataset, including options like the output
directory, file naming scheme, and so on.
Write example:
663/// \brief An example showing a write node
664/// \param file_path The destination to write to
665///
666/// Scan-Filter-Write
667/// This example shows how scan node can be used to load the data
668/// and after processing how it can be written to disk.
669arrow::Status ScanFilterWriteExample(const std::string& file_path) {
670 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
671
672 auto options = std::make_shared<arrow::dataset::ScanOptions>();
673 // empty projection
674 options->projection = cp::project({}, {});
675
676 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
677
678 ac::Declaration scan{"scan", std::move(scan_node_options)};
679
680 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
681
682 std::string root_path = "";
683 std::string uri = "file://" + file_path;
684 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
685 arrow::fs::FileSystemFromUri(uri, &root_path));
686
687 auto base_path = root_path + "/parquet_dataset";
688 // Uncomment the following line, if run repeatedly
689 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
690 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
691
692 // The partition schema determines which fields are part of the partitioning.
693 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
694 // We'll use Hive-style partitioning,
695 // which creates directories with "key=value" pairs.
696
697 auto partitioning =
698 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
699 // We'll write Parquet files.
700 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
701
702 arrow::dataset::FileSystemDatasetWriteOptions write_options;
703 write_options.file_write_options = format->DefaultWriteOptions();
704 write_options.filesystem = filesystem;
705 write_options.base_dir = base_path;
706 write_options.partitioning = partitioning;
707 write_options.basename_template = "part{i}.parquet";
708
709 arrow::dataset::WriteNodeOptions write_node_options{write_options};
710
711 ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
712
713 // Since the write node has no output we simply run the plan to completion and the
714 // data should be written
715 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
716
717 std::cout << "Dataset written to " << base_path << std::endl;
718 return arrow::Status::OK();
719}
union
#
union
merges multiple data streams with the same schema into one, similar to
a SQL UNION ALL
clause.
The following example demonstrates how this can be achieved using two data sources.
Union example:
725/// \brief An example showing a union node
726///
727/// Source-Union-Table
728/// This example shows how a union operation can be applied on two
729/// data sources. The output is collected into a table.
730arrow::Status SourceUnionSinkExample() {
731 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
732
733 ac::Declaration lhs{"source",
734 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
735 lhs.label = "lhs";
736 ac::Declaration rhs{"source",
737 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
738 rhs.label = "rhs";
739 ac::Declaration union_plan{
740 "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
741
742 return ExecutePlanAndCollectAsTable(std::move(union_plan));
743}
hash_join
#
hash_join
operation provides the relational algebra operation, join using hash-based
algorithm. HashJoinNodeOptions
contains the options required in
defining a join. The hash_join supports
left/right/full semi/anti/outerjoins.
Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x”
which can be appended as a suffix for column names duplicated in both left and right
relations.) can be set via the the join options.
Read more on hash-joins.
Hash-Join example:
604/// \brief An example showing a hash join node
605///
606/// Source-HashJoin-Table
607/// This example shows how source node gets the data and how a self-join
608/// is applied on the data. The join options are configurable. The output
609/// is collected into a table.
610arrow::Status SourceHashJoinSinkExample() {
611 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
612
613 ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
614 ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
615
616 ac::HashJoinNodeOptions join_opts{
617 ac::JoinType::INNER,
618 /*left_keys=*/{"str"},
619 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
620
621 ac::Declaration hashjoin{
622 "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
623
624 return ExecutePlanAndCollectAsTable(std::move(hashjoin));
625}
Summary#
There are examples of these nodes which can be found in
cpp/examples/arrow/execution_plan_documentation_examples.cc
in the Arrow source.
Complete Example:
19#include <arrow/array.h>
20#include <arrow/builder.h>
21
22#include <arrow/acero/exec_plan.h>
23#include <arrow/compute/api.h>
24#include <arrow/compute/api_vector.h>
25#include <arrow/compute/cast.h>
26
27#include <arrow/csv/api.h>
28
29#include <arrow/dataset/dataset.h>
30#include <arrow/dataset/file_base.h>
31#include <arrow/dataset/file_parquet.h>
32#include <arrow/dataset/plan.h>
33#include <arrow/dataset/scanner.h>
34
35#include <arrow/io/interfaces.h>
36#include <arrow/io/memory.h>
37
38#include <arrow/result.h>
39#include <arrow/status.h>
40#include <arrow/table.h>
41
42#include <arrow/ipc/api.h>
43
44#include <arrow/util/future.h>
45#include <arrow/util/range.h>
46#include <arrow/util/thread_pool.h>
47#include <arrow/util/vector.h>
48
49#include <iostream>
50#include <memory>
51#include <utility>
52
53// Demonstrate various operators in Arrow Streaming Execution Engine
54
55namespace cp = ::arrow::compute;
56namespace ac = ::arrow::acero;
57
58constexpr char kSep[] = "******";
59
60void PrintBlock(const std::string& msg) {
61 std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
62}
63
64template <typename TYPE,
65 typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
66 arrow::is_boolean_type<TYPE>::value |
67 arrow::is_temporal_type<TYPE>::value>::type>
68arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
69 const std::vector<typename TYPE::c_type>& values) {
70 using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
71 ArrowBuilderType builder;
72 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
73 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
74 return builder.Finish();
75}
76
77template <class TYPE>
78arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
79 const std::vector<std::string>& values) {
80 using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
81 ArrowBuilderType builder;
82 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
83 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
84 return builder.Finish();
85}
86
87arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
88 const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
89 std::shared_ptr<arrow::RecordBatch> record_batch;
90 ARROW_ASSIGN_OR_RAISE(auto struct_result,
91 arrow::StructArray::Make(array_vector, field_vector));
92 return record_batch->FromStructArray(struct_result);
93}
94
95/// \brief Create a sample table
96/// The table's contents will be:
97/// a,b
98/// 1,null
99/// 2,true
100/// null,true
101/// 3,false
102/// null,true
103/// 4,false
104/// 5,null
105/// 6,false
106/// 7,false
107/// 8,true
108/// \return The created table
109
110arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
111 auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
112 ARROW_ASSIGN_OR_RAISE(auto int64_array,
113 GetArrayDataSample<arrow::Int64Type>(
114 {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
115
116 arrow::BooleanBuilder boolean_builder;
117 std::shared_ptr<arrow::BooleanArray> bool_array;
118
119 std::vector<uint8_t> bool_values = {false, true, true, false, true,
120 false, false, false, false, true};
121 std::vector<bool> is_valid = {false, true, true, true, true,
122 true, false, true, true, true};
123
124 ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
125
126 ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
127
128 ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
129
130 auto record_batch =
131 arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
132 arrow::field("b", arrow::boolean())}),
133 10, {int64_array, bool_array});
134 ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
135 return table;
136}
137
138/// \brief Create a sample dataset
139/// \return An in-memory dataset based on GetTable()
140arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
141 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
142 auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
143 return ds;
144}
145
146arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
147 const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
148 std::shared_ptr<arrow::RecordBatch> record_batch;
149 ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
150 cp::ExecBatch batch{*res_batch};
151 return batch;
152}
153
154// (Doc section: BatchesWithSchema Definition)
155struct BatchesWithSchema {
156 std::vector<cp::ExecBatch> batches;
157 std::shared_ptr<arrow::Schema> schema;
158 // This method uses internal arrow utilities to
159 // convert a vector of record batches to an AsyncGenerator of optional batches
160 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
161 auto opt_batches = ::arrow::internal::MapVector(
162 [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
163 batches);
164 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
165 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166 return gen;
167 }
168};
169// (Doc section: BatchesWithSchema Definition)
170
171// (Doc section: MakeBasicBatches Definition)
172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173 BatchesWithSchema out;
174 auto field_vector = {arrow::field("a", arrow::int32()),
175 arrow::field("b", arrow::boolean())};
176 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181 GetArrayDataSample<arrow::BooleanType>({false, true}));
182 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187 ARROW_ASSIGN_OR_RAISE(auto b1,
188 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189 ARROW_ASSIGN_OR_RAISE(auto b2,
190 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191 ARROW_ASSIGN_OR_RAISE(auto b3,
192 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194 out.batches = {b1, b2, b3};
195 out.schema = arrow::schema(field_vector);
196 return out;
197}
198// (Doc section: MakeBasicBatches Definition)
199
200arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
201 BatchesWithSchema out;
202 auto field = arrow::field("a", arrow::int32());
203 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
204 ARROW_ASSIGN_OR_RAISE(auto b2_int,
205 GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
206 ARROW_ASSIGN_OR_RAISE(auto b3_int,
207 GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
208 ARROW_ASSIGN_OR_RAISE(auto b4_int,
209 GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
210 ARROW_ASSIGN_OR_RAISE(auto b5_int,
211 GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
212 ARROW_ASSIGN_OR_RAISE(auto b6_int,
213 GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
214 ARROW_ASSIGN_OR_RAISE(auto b7_int,
215 GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
216 ARROW_ASSIGN_OR_RAISE(auto b8_int,
217 GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
218
219 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
220 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
221 ARROW_ASSIGN_OR_RAISE(auto b3,
222 GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
223 ARROW_ASSIGN_OR_RAISE(auto b4,
224 GetExecBatchFromVectors({field, field, field, field},
225 {b4_int, b5_int, b6_int, b7_int}));
226 out.batches = {b1, b2, b3, b4};
227 out.schema = arrow::schema({field});
228 return out;
229}
230
231arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
232 BatchesWithSchema out;
233 auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
234 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
235 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
236 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
237 ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
238 {"alpha", "beta", "alpha"}));
239 ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
240 {"alpha", "gamma", "alpha"}));
241 ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
242 {"gamma", "beta", "alpha"}));
243 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
244 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
245 ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
246 out.batches = {b1, b2, b3};
247
248 size_t batch_count = out.batches.size();
249 for (int repeat = 1; repeat < multiplicity; ++repeat) {
250 for (size_t i = 0; i < batch_count; ++i) {
251 out.batches.push_back(out.batches[i]);
252 }
253 }
254
255 out.schema = arrow::schema(fields);
256 return out;
257}
258
259arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) {
260 // collect sink_reader into a Table
261 std::shared_ptr<arrow::Table> response_table;
262 ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan)));
263
264 std::cout << "Results : " << response_table->ToString() << std::endl;
265
266 return arrow::Status::OK();
267}
268
269// (Doc section: Scan Example)
270
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280 auto options = std::make_shared<arrow::dataset::ScanOptions>();
281 options->projection = cp::project({}, {}); // create empty projection
282
283 // construct the scan node
284 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286 ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288 return ExecutePlanAndCollectAsTable(std::move(scan));
289}
290// (Doc section: Scan Example)
291
292// (Doc section: Source Example)
293
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
313// (Doc section: Source Example)
314
315// (Doc section: Table Source Example)
316
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table. This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329 int max_batch_size = 2;
330 auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332 ac::Declaration source{"table_source", std::move(table_source_options)};
333
334 return ExecutePlanAndCollectAsTable(std::move(source));
335}
336// (Doc section: Table Source Example)
337
338// (Doc section: Filter Example)
339
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the exeuction plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349 auto options = std::make_shared<arrow::dataset::ScanOptions>();
350 // specify the filter. This filter removes all rows where the
351 // value of the "a" column is greater than 3.
352 cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353 // set filter for scanner : on-disk / push-down filtering.
354 // This step can be skipped if you are not reading from disk.
355 options->filter = filter_expr;
356 // empty projection
357 options->projection = cp::project({}, {});
358
359 // construct the scan node
360 std::cout << "Initialized Scanning Options" << std::endl;
361
362 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363 std::cout << "Scan node options created" << std::endl;
364
365 ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367 // pipe the scan node into the filter node
368 // Need to set the filter in scan node options and filter node options.
369 // At scan node it is used for on-disk / push-down filtering.
370 // At filter node it is used for in-memory filtering.
371 ac::Declaration filter{
372 "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374 return ExecutePlanAndCollectAsTable(std::move(filter));
375}
376
377// (Doc section: Filter Example)
378
379// (Doc section: Project Example)
380
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390 auto options = std::make_shared<arrow::dataset::ScanOptions>();
391 // projection
392 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393 options->projection = cp::project({}, {});
394
395 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397 ac::Declaration scan{"scan", std::move(scan_node_options)};
398 ac::Declaration project{
399 "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401 return ExecutePlanAndCollectAsTable(std::move(project));
402}
403
404// (Doc section: Project Example)
405
406// This is a variation of ScanProjectSinkExample introducing how to use the
407// Declaration::Sequence function
408arrow::Status ScanProjectSequenceSinkExample() {
409 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
410
411 auto options = std::make_shared<arrow::dataset::ScanOptions>();
412 // projection
413 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
414 options->projection = cp::project({}, {});
415
416 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
417
418 // (Doc section: Project Sequence Example)
419 // Inputs do not have to be passed to the project node when using Sequence
420 ac::Declaration plan =
421 ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
422 {"project", ac::ProjectNodeOptions({a_times_2})}});
423 // (Doc section: Project Sequence Example)
424
425 return ExecutePlanAndCollectAsTable(std::move(plan));
426}
427
428// (Doc section: Scalar Aggregate Example)
429
430/// \brief An example showing an aggregation node to aggregate an entire table
431///
432/// Source-Aggregation-Table
433/// This example shows how an aggregation operation can be applied on a
434/// execution plan resulting in a scalar output. The source node loads the
435/// data and the aggregation (counting unique types in column 'a')
436/// is applied on this data. The output is collected into a table (that will
437/// have exactly one row)
438arrow::Status SourceScalarAggregateSinkExample() {
439 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
440
441 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
442
443 ac::Declaration source{"source", std::move(source_node_options)};
444 auto aggregate_options =
445 ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
446 ac::Declaration aggregate{
447 "aggregate", {std::move(source)}, std::move(aggregate_options)};
448
449 return ExecutePlanAndCollectAsTable(std::move(aggregate));
450}
451// (Doc section: Scalar Aggregate Example)
452
453// (Doc section: Group Aggregate Example)
454
455/// \brief An example showing an aggregation node to perform a group-by operation
456///
457/// Source-Aggregation-Table
458/// This example shows how an aggregation operation can be applied on a
459/// execution plan resulting in grouped output. The source node loads the
460/// data and the aggregation (counting unique types in column 'a') is
461/// applied on this data. The output is collected into a table that will contain
462/// one row for each unique combination of group keys.
463arrow::Status SourceGroupAggregateSinkExample() {
464 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
465
466 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
467
468 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
469
470 ac::Declaration source{"source", std::move(source_node_options)};
471 auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
472 auto aggregate_options =
473 ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
474 /*keys=*/{"b"}};
475 ac::Declaration aggregate{
476 "aggregate", {std::move(source)}, std::move(aggregate_options)};
477
478 return ExecutePlanAndCollectAsTable(std::move(aggregate));
479}
480// (Doc section: Group Aggregate Example)
481
482// (Doc section: ConsumingSink Example)
483
484/// \brief An example showing a consuming sink node
485///
486/// Source-Consuming-Sink
487/// This example shows how the data can be consumed within the execution plan
488/// by using a ConsumingSink node. There is no data output from this execution plan.
489arrow::Status SourceConsumingSinkExample() {
490 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
491
492 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
493
494 ac::Declaration source{"source", std::move(source_node_options)};
495
496 std::atomic<uint32_t> batches_seen{0};
497 arrow::Future<> finish = arrow::Future<>::Make();
498 struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
499 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
500 : batches_seen(batches_seen), finish(std::move(finish)) {}
501
502 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
503 ac::BackpressureControl* backpressure_control,
504 ac::ExecPlan* plan) override {
505 // This will be called as the plan is started (before the first call to Consume)
506 // and provides the schema of the data coming into the node, controls for pausing /
507 // resuming input, and a pointer to the plan itself which can be used to access
508 // other utilities such as the thread indexer or async task scheduler.
509 return arrow::Status::OK();
510 }
511
512 arrow::Status Consume(cp::ExecBatch batch) override {
513 (*batches_seen)++;
514 return arrow::Status::OK();
515 }
516
517 arrow::Future<> Finish() override {
518 // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
519 // output file handles and flushing remaining work
520 return arrow::Future<>::MakeFinished();
521 }
522
523 std::atomic<uint32_t>* batches_seen;
524 arrow::Future<> finish;
525 };
526 std::shared_ptr<CustomSinkNodeConsumer> consumer =
527 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
528
529 ac::Declaration consuming_sink{"consuming_sink",
530 {std::move(source)},
531 ac::ConsumingSinkNodeOptions(std::move(consumer))};
532
533 // Since we are consuming the data within the plan there is no output and we simply
534 // run the plan to completion instead of collecting into a table.
535 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
536
537 std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
538 << std::endl;
539 return arrow::Status::OK();
540}
541// (Doc section: ConsumingSink Example)
542
543// (Doc section: OrderBySink Example)
544
545arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
546 std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
547 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
548 // translate sink_gen (async) to sink_reader (sync)
549 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
550 ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
551
552 // validate the ExecPlan
553 ARROW_RETURN_NOT_OK(plan->Validate());
554 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
555 // start the ExecPlan
556 plan->StartProducing();
557
558 // collect sink_reader into a Table
559 std::shared_ptr<arrow::Table> response_table;
560
561 ARROW_ASSIGN_OR_RAISE(response_table,
562 arrow::Table::FromRecordBatchReader(sink_reader.get()));
563
564 std::cout << "Results : " << response_table->ToString() << std::endl;
565
566 // stop producing
567 plan->StopProducing();
568 // plan mark finished
569 auto future = plan->finished();
570 return future.status();
571}
572
573/// \brief An example showing an order-by node
574///
575/// Source-OrderBy-Sink
576/// In this example, the data enters through the source node
577/// and the data is ordered in the sink node. The order can be
578/// ASCENDING or DESCENDING and it is configurable. The output
579/// is obtained as a table from the sink node.
580arrow::Status SourceOrderBySinkExample() {
581 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
582 ac::ExecPlan::Make(*cp::threaded_exec_context()));
583
584 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
585
586 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
587
588 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
589 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
590 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
591
592 ARROW_RETURN_NOT_OK(ac::MakeExecNode(
593 "order_by_sink", plan.get(), {source},
594 ac::OrderBySinkNodeOptions{
595 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
596
597 return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
598}
599
600// (Doc section: OrderBySink Example)
601
602// (Doc section: HashJoin Example)
603
604/// \brief An example showing a hash join node
605///
606/// Source-HashJoin-Table
607/// This example shows how source node gets the data and how a self-join
608/// is applied on the data. The join options are configurable. The output
609/// is collected into a table.
610arrow::Status SourceHashJoinSinkExample() {
611 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
612
613 ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
614 ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
615
616 ac::HashJoinNodeOptions join_opts{
617 ac::JoinType::INNER,
618 /*left_keys=*/{"str"},
619 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
620
621 ac::Declaration hashjoin{
622 "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
623
624 return ExecutePlanAndCollectAsTable(std::move(hashjoin));
625}
626
627// (Doc section: HashJoin Example)
628
629// (Doc section: KSelect Example)
630
631/// \brief An example showing a select-k node
632///
633/// Source-KSelect
634/// This example shows how K number of elements can be selected
635/// either from the top or bottom. The output node is a modified
636/// sink node where output can be obtained as a table.
637arrow::Status SourceKSelectExample() {
638 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
639 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
640 ac::ExecPlan::Make(*cp::threaded_exec_context()));
641 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
642
643 ARROW_ASSIGN_OR_RAISE(
644 ac::ExecNode * source,
645 ac::MakeExecNode("source", plan.get(), {},
646 ac::SourceNodeOptions{input.schema, input.gen()}));
647
648 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
649
650 ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
651 ac::SelectKSinkNodeOptions{options, &sink_gen}));
652
653 auto schema = arrow::schema(
654 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
655
656 return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
657}
658
659// (Doc section: KSelect Example)
660
661// (Doc section: Write Example)
662
663/// \brief An example showing a write node
664/// \param file_path The destination to write to
665///
666/// Scan-Filter-Write
667/// This example shows how scan node can be used to load the data
668/// and after processing how it can be written to disk.
669arrow::Status ScanFilterWriteExample(const std::string& file_path) {
670 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
671
672 auto options = std::make_shared<arrow::dataset::ScanOptions>();
673 // empty projection
674 options->projection = cp::project({}, {});
675
676 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
677
678 ac::Declaration scan{"scan", std::move(scan_node_options)};
679
680 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
681
682 std::string root_path = "";
683 std::string uri = "file://" + file_path;
684 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
685 arrow::fs::FileSystemFromUri(uri, &root_path));
686
687 auto base_path = root_path + "/parquet_dataset";
688 // Uncomment the following line, if run repeatedly
689 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
690 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
691
692 // The partition schema determines which fields are part of the partitioning.
693 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
694 // We'll use Hive-style partitioning,
695 // which creates directories with "key=value" pairs.
696
697 auto partitioning =
698 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
699 // We'll write Parquet files.
700 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
701
702 arrow::dataset::FileSystemDatasetWriteOptions write_options;
703 write_options.file_write_options = format->DefaultWriteOptions();
704 write_options.filesystem = filesystem;
705 write_options.base_dir = base_path;
706 write_options.partitioning = partitioning;
707 write_options.basename_template = "part{i}.parquet";
708
709 arrow::dataset::WriteNodeOptions write_node_options{write_options};
710
711 ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
712
713 // Since the write node has no output we simply run the plan to completion and the
714 // data should be written
715 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
716
717 std::cout << "Dataset written to " << base_path << std::endl;
718 return arrow::Status::OK();
719}
720
721// (Doc section: Write Example)
722
723// (Doc section: Union Example)
724
725/// \brief An example showing a union node
726///
727/// Source-Union-Table
728/// This example shows how a union operation can be applied on two
729/// data sources. The output is collected into a table.
730arrow::Status SourceUnionSinkExample() {
731 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
732
733 ac::Declaration lhs{"source",
734 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
735 lhs.label = "lhs";
736 ac::Declaration rhs{"source",
737 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
738 rhs.label = "rhs";
739 ac::Declaration union_plan{
740 "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
741
742 return ExecutePlanAndCollectAsTable(std::move(union_plan));
743}
744
745// (Doc section: Union Example)
746
747// (Doc section: Table Sink Example)
748
749/// \brief An example showing a table sink node
750///
751/// TableSink Example
752/// This example shows how a table_sink can be used
753/// in an execution plan. This includes a source node
754/// receiving data as batches and the table sink node
755/// which emits the output as a table.
756arrow::Status TableSinkExample() {
757 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
758 ac::ExecPlan::Make(*cp::threaded_exec_context()));
759
760 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
761
762 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
763
764 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
765 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
766
767 std::shared_ptr<arrow::Table> output_table;
768 auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
769
770 ARROW_RETURN_NOT_OK(
771 ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
772 // validate the ExecPlan
773 ARROW_RETURN_NOT_OK(plan->Validate());
774 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
775 // start the ExecPlan
776 plan->StartProducing();
777
778 // Wait for the plan to finish
779 auto finished = plan->finished();
780 RETURN_NOT_OK(finished.status());
781 std::cout << "Results : " << output_table->ToString() << std::endl;
782 return arrow::Status::OK();
783}
784
785// (Doc section: Table Sink Example)
786
787// (Doc section: RecordBatchReaderSource Example)
788
789/// \brief An example showing the usage of a RecordBatchReader as the data source.
790///
791/// RecordBatchReaderSourceSink Example
792/// This example shows how a record_batch_reader_source can be used
793/// in an execution plan. This includes the source node
794/// receiving data from a TableRecordBatchReader.
795
796arrow::Status RecordBatchReaderSourceSinkExample() {
797 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
798 std::shared_ptr<arrow::RecordBatchReader> reader =
799 std::make_shared<arrow::TableBatchReader>(table);
800 ac::Declaration reader_source{"record_batch_reader_source",
801 ac::RecordBatchReaderSourceNodeOptions{reader}};
802 return ExecutePlanAndCollectAsTable(std::move(reader_source));
803}
804
805// (Doc section: RecordBatchReaderSource Example)
806
807enum ExampleMode {
808 SOURCE_SINK = 0,
809 TABLE_SOURCE_SINK = 1,
810 SCAN = 2,
811 FILTER = 3,
812 PROJECT = 4,
813 SCALAR_AGGREGATION = 5,
814 GROUP_AGGREGATION = 6,
815 CONSUMING_SINK = 7,
816 ORDER_BY_SINK = 8,
817 HASHJOIN = 9,
818 KSELECT = 10,
819 WRITE = 11,
820 UNION = 12,
821 TABLE_SOURCE_TABLE_SINK = 13,
822 RECORD_BATCH_READER_SOURCE = 14,
823 PROJECT_SEQUENCE = 15
824};
825
826int main(int argc, char** argv) {
827 if (argc < 3) {
828 // Fake success for CI purposes.
829 return EXIT_SUCCESS;
830 }
831
832 std::string base_save_path = argv[1];
833 int mode = std::atoi(argv[2]);
834 arrow::Status status;
835 // ensure arrow::dataset node factories are in the registry
836 arrow::dataset::internal::Initialize();
837 switch (mode) {
838 case SOURCE_SINK:
839 PrintBlock("Source Sink Example");
840 status = SourceSinkExample();
841 break;
842 case TABLE_SOURCE_SINK:
843 PrintBlock("Table Source Sink Example");
844 status = TableSourceSinkExample();
845 break;
846 case SCAN:
847 PrintBlock("Scan Example");
848 status = ScanSinkExample();
849 break;
850 case FILTER:
851 PrintBlock("Filter Example");
852 status = ScanFilterSinkExample();
853 break;
854 case PROJECT:
855 PrintBlock("Project Example");
856 status = ScanProjectSinkExample();
857 break;
858 case PROJECT_SEQUENCE:
859 PrintBlock("Project Example (using Declaration::Sequence)");
860 status = ScanProjectSequenceSinkExample();
861 break;
862 case GROUP_AGGREGATION:
863 PrintBlock("Aggregate Example");
864 status = SourceGroupAggregateSinkExample();
865 break;
866 case SCALAR_AGGREGATION:
867 PrintBlock("Aggregate Example");
868 status = SourceScalarAggregateSinkExample();
869 break;
870 case CONSUMING_SINK:
871 PrintBlock("Consuming-Sink Example");
872 status = SourceConsumingSinkExample();
873 break;
874 case ORDER_BY_SINK:
875 PrintBlock("OrderBy Example");
876 status = SourceOrderBySinkExample();
877 break;
878 case HASHJOIN:
879 PrintBlock("HashJoin Example");
880 status = SourceHashJoinSinkExample();
881 break;
882 case KSELECT:
883 PrintBlock("KSelect Example");
884 status = SourceKSelectExample();
885 break;
886 case WRITE:
887 PrintBlock("Write Example");
888 status = ScanFilterWriteExample(base_save_path);
889 break;
890 case UNION:
891 PrintBlock("Union Example");
892 status = SourceUnionSinkExample();
893 break;
894 case TABLE_SOURCE_TABLE_SINK:
895 PrintBlock("TableSink Example");
896 status = TableSinkExample();
897 break;
898 case RECORD_BATCH_READER_SOURCE:
899 PrintBlock("RecordBatchReaderSource Example");
900 status = RecordBatchReaderSourceSinkExample();
901 break;
902 default:
903 break;
904 }
905
906 if (status.ok()) {
907 return EXIT_SUCCESS;
908 } else {
909 std::cout << "Error occurred: " << status.message() << std::endl;
910 return EXIT_FAILURE;
911 }
912}