Acero: A C++ streaming execution engine¶
Warning
Acero is experimental and a stable API is not yet guaranteed.
Motivation¶
For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, the Arrow C++ implementation also provides Acero, a streaming query engine with which computations can be formulated and executed.
Acero allows computation to be expressed as an “execution plan”
(ExecPlan
) which is a directed graph of operators. Each operator
(ExecNode
) provides, transforms, or consumes the data passing
through it. Batches of data (ExecBatch
) flow along edges of
the graph from node to node. Structuring the API around streams of batches
allows the working set for each node to be tuned for optimal performance
independent of any other nodes in the graph. Each ExecNode
processes batches as they are pushed to it along an edge of the graph by
upstream nodes (its inputs), and pushes batches along an edge of the graph
to downstream nodes (its outputs) as they are finalized.
Substrait¶
In order to use Acero you will need to create an execution plan. This is the model that describes the computation you want to apply to your data. Acero has its own internal representation for execution plans but most users should not interact with this directly as it will couple their code to Acero.
Substrait is an open standard for execution plans. Acero implements the Substrait “consumer” interface. This means that Acero can accept a Substrait plan and fulfill the plan, loading the requested data and applying the desired computation. By using Substrait plans users can easily switch out to a different execution engine at a later time.
Substrait Conformance¶
Substrait defines a broad set of operators and functions for many different situations and it is unlikely that Acero will ever completely satisfy all defined Substrait operators and functions. To help understand what features are available the following sections define which features have been currently implemented in Acero and any caveats that apply.
Plans¶
A plan should have a single top-level relation.
The consumer is currently based on version 0.20.0 of Substrait. Any features added that are newer will not be supported.
Due to a breaking change in 0.20.0 any Substrait plan older than 0.20.0 will be rejected.
Extensions¶
If a plan contains any extension type variations it will be rejected.
Advanced extensions can be provided by supplying a custom implementation of
arrow::engine::ExtensionProvider
.
Relations (in general)¶
Any relation not explicitly listed below will not be supported and will cause the plan to be rejected.
Read Relations¶
The
projection
property is not supported and plans containing this property will be rejected.The
VirtualTable
and ``ExtensionTable``read types are not supported. Plans containing these types will be rejected.Only the parquet and arrow file formats are currently supported.
All URIs must use the
file
scheme
partition_index
,start
, andlength
are not supported. Plans containing non-default values for these properties will be rejected.The Substrait spec requires that a
filter
be completely satisfied by a read relation. However, Acero only uses a read filter for pushdown projection and it may not be fully satisfied. Users should generally attach an additional filter relation with the same filter expression after the read relation.
Filter Relations¶
No known caveats
Project Relations¶
No known caveats
Join Relations¶
The join type
JOIN_TYPE_SINGLE
is not supported and plans containing this will be rejected.The join expression must be a call to either the
equal
oris_not_distinct_from
functions. Both arguments to the call must be direct references. Only a single join key is supported.The
post_join_filter
property is not supported and will be ignored.
Aggregate Relations¶
At most one grouping set is supported.
Each grouping expression must be a direct reference.
Each measure’s arguments must be direct references.
A measure may not have a filter
A measure may not have sorts
A measure’s invocation must be AGGREGATION_INVOCATION_ALL or AGGREGATION_INVOCATION_UNSPECIFIED
A measure’s phase must be AGGREGATION_PHASE_INITIAL_TO_RESULT
Expressions (general)¶
Various places in the Substrait spec allow for expressions to be used outside of a filter or project relation. For example, a join expression or an aggregate grouping set. Acero typically expects these expressions to be direct references. Planners should extract the implicit projection into a formal project relation before delivering the plan to Acero.
Literals¶
A literal with non-default nullability will cause a plan to be rejected.
Types¶
Acero does not have full support for non-nullable types and may allow input to have nulls without rejecting it.
The table below shows the mapping between Arrow types and Substrait type classes that are currently supported
Substrait Type |
Arrow Type |
Caveat |
---|---|---|
boolean |
boolean |
|
i8 |
int8 |
|
i16 |
int16 |
|
i32 |
int32 |
|
i64 |
int64 |
|
fp32 |
float32 |
|
fp64 |
float64 |
|
string |
string |
|
binary |
binary |
|
timestamp |
timestamp<MICRO,””> |
|
timestamp_tz |
timestamp<MICRO,”UTC”> |
|
date |
date32<DAY> |
|
time |
time64<MICRO> |
|
interval_year |
Not currently supported |
|
interval_day |
Not currently supported |
|
uuid |
Not currently supported |
|
FIXEDCHAR<L> |
Not currently supported |
|
VARCHAR<L> |
Not currently supported |
|
FIXEDBINARY<L> |
fixed_size_binary<L> |
|
DECIMAL<P,S> |
decimal128<P,S> |
|
STRUCT<T1…TN> |
struct<T1…TN> |
Arrow struct fields will have no name (empty string) |
NSTRUCT<N:T1…N:Tn> |
Not currently supported |
|
LIST<T> |
list<T> |
|
MAP<K,V> |
map<K,V> |
K must not be nullable |
Functions¶
The following functions have caveats or are not supported at all. Note that this is not a comprehensive list. Functions are being added to Substrait at a rapid pace and new functions may be missing.
Acero does not support the SATURATE option for overflow
Acero does not support kernels that take more than two arguments for the functions
and
,or
,xor
Acero does not support temporal arithmetic
Acero does not support the following standard functions:
is_not_distinct_from
like
substring
starts_with
ends_with
contains
count
count_distinct
approx_count_distinct
The functions above should be referenced using the URI
https://github.com/apache/arrow/blob/main/format/substrait/extension_types.yaml
Alternatively, the URI can be left completely empty and Acero will match based only on function name. This fallback mechanism is non-standard and should be avoided if possible.
Architecture Overview¶
ExecNode
Each node in the graph is an implementation of the
ExecNode
interface.ExecPlan
A set of
ExecNode
is contained and (to an extent) coordinated by anExecPlan
.ExecFactoryRegistry
Instances of
ExecNode
are constructed by factory functions held in aExecFactoryRegistry
.ExecNodeOptions
Heterogenous parameters for factories of
ExecNode
are bundled in anExecNodeOptions
.Declaration
dplyr
-inspired helper for efficient construction of anExecPlan
.ExecBatch
A lightweight container for a single chunk of data in the Arrow format. In contrast to
RecordBatch
,ExecBatch
is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by aScalar
instead of anArray
. In addition,ExecBatch
may carry execution-relevant properties including a guaranteed-true-filter forExpression
simplification.
An example ExecNode
implementation which simply passes all input batches
through unchanged:
class PassthruNode : public ExecNode {
public:
// InputReceived is the main entry point for ExecNodes. It is invoked
// by an input of this node to push a batch here for processing.
void InputReceived(ExecNode* input, ExecBatch batch) override {
// Since this is a passthru node we simply push the batch to our
// only output here.
outputs_[0]->InputReceived(this, batch);
}
// ErrorReceived is called by an input of this node to report an error.
// ExecNodes should always forward errors to their outputs unless they
// are able to fully handle the error (this is rare).
void ErrorReceived(ExecNode* input, Status error) override {
outputs_[0]->ErrorReceived(this, error);
}
// InputFinished is used to signal how many batches will ultimately arrive.
// It may be called with any ordering relative to InputReceived/ErrorReceived.
void InputFinished(ExecNode* input, int total_batches) override {
outputs_[0]->InputFinished(this, total_batches);
}
// ExecNodes may request that their inputs throttle production of batches
// until they are ready for more, or stop production if no further batches
// are required. These signals should typically be forwarded to the inputs
// of the ExecNode.
void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }
// An ExecNode has a single output schema to which all its batches conform.
using ExecNode::output_schema;
// ExecNodes carry basic introspection for debugging purposes
const char* kind_name() const override { return "PassthruNode"; }
using ExecNode::label;
using ExecNode::SetLabel;
using ExecNode::ToString;
// An ExecNode holds references to its inputs and outputs, so it is possible
// to walk the graph of execution if necessary.
using ExecNode::inputs;
using ExecNode::outputs;
// StartProducing() and StopProducing() are invoked by an ExecPlan to
// coordinate the graph-wide execution state. These do not need to be
// forwarded to inputs or outputs.
Status StartProducing() override { return Status::OK(); }
void StopProducing() override {}
Future<> finished() override { return inputs_[0]->finished(); }
};
Note that each method which is associated with an edge of the graph must be invoked
with an ExecNode*
to identify the node which invoked it. For example, in an
ExecNode
which implements JOIN
this tagging might be used to differentiate
between batches from the left or right inputs.
InputReceived
, ErrorReceived
, InputFinished
may only be invoked by
the inputs of a node, while ResumeProducing
, PauseProducing
, StopProducing
may only be invoked by outputs of a node.
ExecPlan
contains the associated instances of ExecNode
and is used to start and stop execution of all nodes and for querying/awaiting
their completion:
// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));
// ... add nodes to your ExecPlan
// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());
SetUserCancellationCallback([plan] {
// stop all nodes in the graph
plan->StopProducing();
});
// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();
Constructing ExecPlan
objects¶
None of the concrete implementations of ExecNode
are exposed
in headers, so they can’t be constructed directly outside the
translation unit where they are defined. Instead, factories to
create them are provided in an extensible registry. This structure
provides a number of benefits:
This enforces consistent construction.
It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)
This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate
libarrow_dataset.so
library.Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.
Factories of ExecNode
can be retrieved by name from the registry.
The default registry is available through
arrow::compute::default_exec_factory_registry()
and can be queried for the built-in factories:
// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
default_exec_factory_registry()->GetFactory("filter"));
// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
// the ExecPlan which should own this node
plan.get(),
// nodes which will send batches to this node (inputs)
{scan_node},
// parameters unique to "filter" nodes
FilterNodeOptions{filter_expression}));
// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
plan.get(), {scan_node}, FilterNodeOptions{filter_expression});
Factories can also be added to the default registry as long as they are
convertible to std::function<Result<ExecNode*>(
ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>
.
To build an ExecPlan
representing a simple pipeline which
reads from a RecordBatchReader
then filters, projects, and
writes to disk:
std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
SourceNodeOptions::FromReader(
reader,
GetCpuThreadPool()));
ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
FilterNodeOptions{
greater(field_ref("score"), literal(3))
});
ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
ProjectNodeOptions{
{add(field_ref("score"), literal(1))},
{"score + 1"}
});
arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});
Declaration
is a dplyr-inspired
helper which further decreases the boilerplate associated with populating
an ExecPlan
from C++:
arrow::dataset::internal::Initialize();
std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
{
{"source", SourceNodeOptions::FromReader(
reader,
GetCpuThreadPool())},
{"filter", FilterNodeOptions{
greater(field_ref("score"), literal(3))}},
{"project", ProjectNodeOptions{
{add(field_ref("score"), literal(1))},
{"score + 1"}}},
{"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
})
.AddToPlan(plan.get()));
Note that a source node can wrap anything which resembles a stream of batches.
For example, PR#11032 adds
support for use of a DuckDB query as a source node.
Similarly, a sink node can wrap anything which absorbs a stream of batches.
In the example above we’re writing completed
batches to disk. However we can also collect these in memory into a Table
or forward them to a RecordBatchReader
as an out-of-graph stream.
This flexibility allows an ExecPlan
to be used as streaming middleware
between any endpoints which support Arrow formatted batches.
An arrow::dataset::Dataset
can also be wrapped as a source node which
pushes all the dataset’s batches into an ExecPlan
. This factory is added
to the default registry with the name "scan"
by calling
arrow::dataset::internal::Initialize()
:
arrow::dataset::internal::Initialize();
std::shared_ptr<Dataset> dataset = GetDataset();
ASSERT_OK(Declaration::Sequence(
{
{"scan", ScanNodeOptions{dataset,
/* push down predicate, projection, ... */}},
{"filter", FilterNodeOptions{/* ... */}},
// ...
})
.AddToPlan(plan.get()));
Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.
Constructing ExecNode
using Options¶
ExecNode
is the component we use as a building block
containing in-built operations with various functionalities.
This is the list of operations associated with the execution plan:
Operation |
Options |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
N/A |
|
|
source
¶
A source
operation can be considered as an entry point to create a streaming execution plan.
arrow::compute::SourceNodeOptions
are used to create the source
operation. The
source
operation is the most generic and flexible type of source currently available but it can
be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.
The source node requires some kind of function that can be called to poll for more data. This
function should take no arguments and should return an
arrow::Future<std::optional<arrow::ExecBatch>>
.
This function might be reading a file, iterating through an in memory structure, or receiving data
from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator
and there are a number of utilities for working with these functions. For this example we use
a vector of record batches that we’ve already stored in memory.
In addition, the schema of the data must be known up front. Acero must know the schema of the data
at each stage of the execution graph before any processing has begun. This means we must supply the
schema for a source node separately from the data itself.
Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :
156struct BatchesWithSchema {
157 std::vector<cp::ExecBatch> batches;
158 std::shared_ptr<arrow::Schema> schema;
159 // This method uses internal arrow utilities to
160 // convert a vector of record batches to an AsyncGenerator of optional batches
161 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
162 auto opt_batches = ::arrow::internal::MapVector(
163 [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
164 batches);
165 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
166 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
167 return gen;
168 }
169};
Generating sample batches for computation:
173arrow::Result<BatchesWithSchema> MakeBasicBatches() {
174 BatchesWithSchema out;
175 auto field_vector = {arrow::field("a", arrow::int32()),
176 arrow::field("b", arrow::boolean())};
177 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
178 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
179 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
180
181 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
182 GetArrayDataSample<arrow::BooleanType>({false, true}));
183 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
184 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
185 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
186 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
187
188 ARROW_ASSIGN_OR_RAISE(auto b1,
189 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
190 ARROW_ASSIGN_OR_RAISE(auto b2,
191 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
192 ARROW_ASSIGN_OR_RAISE(auto b3,
193 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
194
195 out.batches = {b1, b2, b3};
196 out.schema = arrow::schema(field_vector);
197 return out;
198}
Example of using source
(usage of sink is explained in detail in sink):
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom souce is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
table_source
¶
In the previous example, source node, a source node
was used to input the data. But when developing an application, if the data is already in memory
as a table, it is much easier, and more performant to use arrow::compute::TableSourceNodeOptions
.
Here the input data can be passed as a std::shared_ptr<arrow::Table>
along with a max_batch_size
.
The max_batch_size
is to break up large record batches so that they can be processed in parallel.
It is important to note that the table batches will not get merged to form larger batches when the source
table has a smaller batch size.
Example of using table_source
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table. This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329 int max_batch_size = 2;
330 auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332 ac::Declaration source{"table_source", std::move(table_source_options)};
333
334 return ExecutePlanAndCollectAsTable(std::move(source));
335}
filter
¶
filter
operation, as the name suggests, provides an option to define data filtering
criteria. It selects rows where the given expression evaluates to true. Filters can be written using
arrow::compute::Expression
, and the expression should have a return type of boolean.
For example, if we wish to keep rows where the value
of column b
is greater than 3, then we can use the following expression.
Filter example:
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the exeuction plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349 auto options = std::make_shared<arrow::dataset::ScanOptions>();
350 // specify the filter. This filter removes all rows where the
351 // value of the "a" column is greater than 3.
352 cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353 // set filter for scanner : on-disk / push-down filtering.
354 // This step can be skipped if you are not reading from disk.
355 options->filter = filter_expr;
356 // empty projection
357 options->projection = cp::project({}, {});
358
359 // construct the scan node
360 std::cout << "Initialized Scanning Options" << std::endl;
361
362 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363 std::cout << "Scan node options created" << std::endl;
364
365 ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367 // pipe the scan node into the filter node
368 // Need to set the filter in scan node options and filter node options.
369 // At scan node it is used for on-disk / push-down filtering.
370 // At filter node it is used for in-memory filtering.
371 ac::Declaration filter{
372 "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374 return ExecutePlanAndCollectAsTable(std::move(filter));
375}
project
¶
project
operation rearranges, deletes, transforms, and creates columns.
Each output column is computed by evaluating an expression
against the source record batch. These must be scalar expressions
(expressions consisting of scalar literals, field references and scalar
functions, i.e. elementwise functions that return one value for each input
row independent of the value of all other rows).
This is exposed via arrow::compute::ProjectNodeOptions
which requires,
an arrow::compute::Expression
and name for each of the output columns (if names are not
provided, the string representations of exprs will be used).
Project example:
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390 auto options = std::make_shared<arrow::dataset::ScanOptions>();
391 // projection
392 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393 options->projection = cp::project({}, {});
394
395 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397 ac::Declaration scan{"scan", std::move(scan_node_options)};
398 ac::Declaration project{
399 "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401 return ExecutePlanAndCollectAsTable(std::move(project));
402}
aggregate
¶
The aggregate
node computes various types of aggregates over data.
Arrow supports two types of aggregates: “scalar” aggregates, and
“hash” aggregates. Scalar aggregates reduce an array or scalar input
to a single scalar output (e.g. computing the mean of a column). Hash
aggregates act like GROUP BY
in SQL and first partition data based
on one or more key columns, then reduce the data in each
partition. The aggregate
node supports both types of computation,
and can compute any number of aggregations at once.
arrow::compute::AggregateNodeOptions
is used to define the
aggregation criteria. It takes a list of aggregation functions and
their options; a list of target fields to aggregate, one per function;
and a list of names for the output fields, one per function.
Optionally, it takes a list of columns that are used to partition the
data, in the case of a hash aggregation. The aggregation functions
can be selected from this list of aggregation functions.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.
Scalar Aggregation example:
408/// \brief An example showing an aggregation node to aggregate an entire table
409///
410/// Source-Aggregation-Table
411/// This example shows how an aggregation operation can be applied on a
412/// execution plan resulting in a scalar output. The source node loads the
413/// data and the aggregation (counting unique types in column 'a')
414/// is applied on this data. The output is collected into a table (that will
415/// have exactly one row)
416arrow::Status SourceScalarAggregateSinkExample() {
417 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
418
419 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
420
421 ac::Declaration source{"source", std::move(source_node_options)};
422 auto aggregate_options =
423 ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
424 ac::Declaration aggregate{
425 "aggregate", {std::move(source)}, std::move(aggregate_options)};
426
427 return ExecutePlanAndCollectAsTable(std::move(aggregate));
428}
Group Aggregation example:
433/// \brief An example showing an aggregation node to perform a group-by operation
434///
435/// Source-Aggregation-Table
436/// This example shows how an aggregation operation can be applied on a
437/// execution plan resulting in grouped output. The source node loads the
438/// data and the aggregation (counting unique types in column 'a') is
439/// applied on this data. The output is collected into a table that will contain
440/// one row for each unique combination of group keys.
441arrow::Status SourceGroupAggregateSinkExample() {
442 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
443
444 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
445
446 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
447
448 ac::Declaration source{"source", std::move(source_node_options)};
449 auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
450 auto aggregate_options =
451 ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
452 /*keys=*/{"b"}};
453 ac::Declaration aggregate{
454 "aggregate", {std::move(source)}, std::move(aggregate_options)};
455
456 return ExecutePlanAndCollectAsTable(std::move(aggregate));
457}
sink
¶
sink
operation provides output and is the final node of a streaming
execution definition. arrow::compute::SinkNodeOptions
interface is used to pass
the required options. Similar to the source operator the sink operator exposes the output
with a function that returns a record batch future each time it is called. It is expected the
caller will repeatedly call this function until the generator function is exhausted (returns
std::optional::nullopt
). If this function is not called often enough then record batches
will accumulate in memory. An execution plan should only have one
“terminal” node (one sink node). An ExecPlan
can terminate early due to cancellation or
an error, before the output is fully consumed. However, the plan can be safely destroyed independently
of the sink, which will hold the unconsumed batches by exec_plan->finished().
As a part of the Source Example, the Sink operation is also included;
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom souce is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
consuming_sink
¶
consuming_sink
operator is a sink operation containing consuming operation within the
execution plan (i.e. the exec plan should not complete until the consumption has completed).
Unlike the sink
node this node takes in a callback function that is expected to consume the
batch. Once this callback has finished the execution plan will no longer hold any reference to
the batch.
The consuming function may be called before a previous invocation has completed. If the consuming
function does not run quickly enough then many concurrent executions could pile up, blocking the
CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks
have been completed.
Once all batches have been delivered the execution plan will wait for the finish future to complete
before marking the execution plan finished. This allows for workflows where the consumption function
converts batches into async tasks (this is currently done internally for the dataset write node).
Example:
// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
batches_seen(batches_seen), finish(std::move(finish)) {}
// Consumption logic can be written here
arrow::Status Consume(cp::ExecBatch batch) override {
// data can be consumed in the expected way
// transfer to another system or just do some work
// and write to disk
(*batches_seen)++;
return arrow::Status::OK();
}
arrow::Future<> Finish() override { return finish; }
std::atomic<uint32_t> *batches_seen;
arrow::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
arrow::compute::ExecNode *consuming_sink;
ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
{source}, cp::ConsumingSinkNodeOptions(consumer)));
Consuming-Sink example:
462/// \brief An example showing a consuming sink node
463///
464/// Source-Consuming-Sink
465/// This example shows how the data can be consumed within the execution plan
466/// by using a ConsumingSink node. There is no data output from this execution plan.
467arrow::Status SourceConsumingSinkExample() {
468 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
469
470 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
471
472 ac::Declaration source{"source", std::move(source_node_options)};
473
474 std::atomic<uint32_t> batches_seen{0};
475 arrow::Future<> finish = arrow::Future<>::Make();
476 struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
477 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
478 : batches_seen(batches_seen), finish(std::move(finish)) {}
479
480 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
481 ac::BackpressureControl* backpressure_control,
482 ac::ExecPlan* plan) override {
483 // This will be called as the plan is started (before the first call to Consume)
484 // and provides the schema of the data coming into the node, controls for pausing /
485 // resuming input, and a pointer to the plan itself which can be used to access
486 // other utilities such as the thread indexer or async task scheduler.
487 return arrow::Status::OK();
488 }
489
490 arrow::Status Consume(cp::ExecBatch batch) override {
491 (*batches_seen)++;
492 return arrow::Status::OK();
493 }
494
495 arrow::Future<> Finish() override {
496 // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
497 // output file handles and flushing remaining work
498 return arrow::Future<>::MakeFinished();
499 }
500
501 std::atomic<uint32_t>* batches_seen;
502 arrow::Future<> finish;
503 };
504 std::shared_ptr<CustomSinkNodeConsumer> consumer =
505 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
506
507 ac::Declaration consuming_sink{"consuming_sink",
508 {std::move(source)},
509 ac::ConsumingSinkNodeOptions(std::move(consumer))};
510
511 // Since we are consuming the data within the plan there is no output and we simply
512 // run the plan to completion instead of collecting into a table.
513 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
514
515 std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
516 << std::endl;
517 return arrow::Status::OK();
518}
order_by_sink
¶
order_by_sink
operation is an extension to the sink
operation.
This operation provides the ability to guarantee the ordering of the
stream by providing the arrow::compute::OrderBySinkNodeOptions
.
Here the arrow::compute::SortOptions
are provided to define which columns
are used for sorting and whether to sort by ascending or descending values.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
Order-By-Sink example:
523arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
524 std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
525 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
526 // translate sink_gen (async) to sink_reader (sync)
527 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
528 ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
529
530 // validate the ExecPlan
531 ARROW_RETURN_NOT_OK(plan->Validate());
532 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
533 // start the ExecPlan
534 plan->StartProducing();
535
536 // collect sink_reader into a Table
537 std::shared_ptr<arrow::Table> response_table;
538
539 ARROW_ASSIGN_OR_RAISE(response_table,
540 arrow::Table::FromRecordBatchReader(sink_reader.get()));
541
542 std::cout << "Results : " << response_table->ToString() << std::endl;
543
544 // stop producing
545 plan->StopProducing();
546 // plan mark finished
547 auto future = plan->finished();
548 return future.status();
549}
550
551/// \brief An example showing an order-by node
552///
553/// Source-OrderBy-Sink
554/// In this example, the data enters through the source node
555/// and the data is ordered in the sink node. The order can be
556/// ASCENDING or DESCENDING and it is configurable. The output
557/// is obtained as a table from the sink node.
558arrow::Status SourceOrderBySinkExample() {
559 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
560 ac::ExecPlan::Make(*cp::threaded_exec_context()));
561
562 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
563
564 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
565
566 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
567 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
568 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
569
570 ARROW_RETURN_NOT_OK(ac::MakeExecNode(
571 "order_by_sink", plan.get(), {source},
572 ac::OrderBySinkNodeOptions{
573 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
574
575 return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
576}
select_k_sink
¶
select_k_sink
option enables selecting the top/bottom K elements,
similar to a SQL ORDER BY ... LIMIT K
clause.
arrow::compute::SelectKOptions
which is a defined by
using OrderBySinkNode
definition. This option returns a sink node that receives
inputs and then compute top_k/bottom_k.
Note
This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
SelectK example:
609/// \brief An example showing a select-k node
610///
611/// Source-KSelect
612/// This example shows how K number of elements can be selected
613/// either from the top or bottom. The output node is a modified
614/// sink node where output can be obtained as a table.
615arrow::Status SourceKSelectExample() {
616 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
617 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
618 ac::ExecPlan::Make(*cp::threaded_exec_context()));
619 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
620
621 ARROW_ASSIGN_OR_RAISE(
622 ac::ExecNode * source,
623 ac::MakeExecNode("source", plan.get(), {},
624 ac::SourceNodeOptions{input.schema, input.gen()}));
625
626 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
627
628 ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
629 ac::SelectKSinkNodeOptions{options, &sink_gen}));
630
631 auto schema = arrow::schema(
632 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
633
634 return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
635}
table_sink
¶
The table_sink
node provides the ability to receive the output as an in-memory table.
This is simpler to use than the other sink nodes provided by the streaming execution engine
but it only makes sense when the output fits comfortably in memory.
The node is created using arrow::compute::TableSinkNodeOptions
.
Example of using table_sink
727/// \brief An example showing a table sink node
728///
729/// TableSink Example
730/// This example shows how a table_sink can be used
731/// in an execution plan. This includes a source node
732/// receiving data as batches and the table sink node
733/// which emits the output as a table.
734arrow::Status TableSinkExample() {
735 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
736 ac::ExecPlan::Make(*cp::threaded_exec_context()));
737
738 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
739
740 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
741
742 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
743 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
744
745 std::shared_ptr<arrow::Table> output_table;
746 auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
747
748 ARROW_RETURN_NOT_OK(
749 ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
750 // validate the ExecPlan
751 ARROW_RETURN_NOT_OK(plan->Validate());
752 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
753 // start the ExecPlan
754 plan->StartProducing();
755
756 // Wait for the plan to finish
757 auto finished = plan->finished();
758 RETURN_NOT_OK(finished.status());
759 std::cout << "Results : " << output_table->ToString() << std::endl;
760 return arrow::Status::OK();
761}
scan
¶
scan
is an operation used to load and process datasets. It should be preferred over the
more generic source
node when your input is a dataset. The behavior is defined using
arrow::dataset::ScanNodeOptions
. More information on datasets and the various
scan options can be found in Tabular Datasets.
This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.
Scan example:
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280 auto options = std::make_shared<arrow::dataset::ScanOptions>();
281 options->projection = cp::project({}, {}); // create empty projection
282
283 // construct the scan node
284 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286 ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288 return ExecutePlanAndCollectAsTable(std::move(scan));
289}
write
¶
The write
node saves query results as a dataset of files in a
format like Parquet, Feather, CSV, etc. using the Tabular Datasets
functionality in Arrow. The write options are provided via the
arrow::dataset::WriteNodeOptions
which in turn contains
arrow::dataset::FileSystemDatasetWriteOptions
.
arrow::dataset::FileSystemDatasetWriteOptions
provides
control over the written dataset, including options like the output
directory, file naming scheme, and so on.
Write example:
641/// \brief An example showing a write node
642/// \param file_path The destination to write to
643///
644/// Scan-Filter-Write
645/// This example shows how scan node can be used to load the data
646/// and after processing how it can be written to disk.
647arrow::Status ScanFilterWriteExample(const std::string& file_path) {
648 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
649
650 auto options = std::make_shared<arrow::dataset::ScanOptions>();
651 // empty projection
652 options->projection = cp::project({}, {});
653
654 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
655
656 ac::Declaration scan{"scan", std::move(scan_node_options)};
657
658 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
659
660 std::string root_path = "";
661 std::string uri = "file://" + file_path;
662 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
663 arrow::fs::FileSystemFromUri(uri, &root_path));
664
665 auto base_path = root_path + "/parquet_dataset";
666 // Uncomment the following line, if run repeatedly
667 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
668 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
669
670 // The partition schema determines which fields are part of the partitioning.
671 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
672 // We'll use Hive-style partitioning,
673 // which creates directories with "key=value" pairs.
674
675 auto partitioning =
676 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
677 // We'll write Parquet files.
678 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
679
680 arrow::dataset::FileSystemDatasetWriteOptions write_options;
681 write_options.file_write_options = format->DefaultWriteOptions();
682 write_options.filesystem = filesystem;
683 write_options.base_dir = base_path;
684 write_options.partitioning = partitioning;
685 write_options.basename_template = "part{i}.parquet";
686
687 arrow::dataset::WriteNodeOptions write_node_options{write_options};
688
689 ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
690
691 // Since the write node has no output we simply run the plan to completion and the
692 // data should be written
693 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
694
695 std::cout << "Dataset written to " << base_path << std::endl;
696 return arrow::Status::OK();
697}
union
¶
union
merges multiple data streams with the same schema into one, similar to
a SQL UNION ALL
clause.
The following example demonstrates how this can be achieved using two data sources.
Union example:
703/// \brief An example showing a union node
704///
705/// Source-Union-Table
706/// This example shows how a union operation can be applied on two
707/// data sources. The output is collected into a table.
708arrow::Status SourceUnionSinkExample() {
709 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
710
711 ac::Declaration lhs{"source",
712 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
713 lhs.label = "lhs";
714 ac::Declaration rhs{"source",
715 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
716 rhs.label = "rhs";
717 ac::Declaration union_plan{
718 "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
719
720 return ExecutePlanAndCollectAsTable(std::move(union_plan));
721}
hash_join
¶
hash_join
operation provides the relational algebra operation, join using hash-based
algorithm. arrow::compute::HashJoinNodeOptions
contains the options required in
defining a join. The hash_join supports
left/right/full semi/anti/outerjoins.
Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x”
which can be appended as a suffix for column names duplicated in both left and right
relations.) can be set via the the join options.
Read more on hash-joins.
Hash-Join example:
582/// \brief An example showing a hash join node
583///
584/// Source-HashJoin-Table
585/// This example shows how source node gets the data and how a self-join
586/// is applied on the data. The join options are configurable. The output
587/// is collected into a table.
588arrow::Status SourceHashJoinSinkExample() {
589 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
590
591 ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
592 ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
593
594 ac::HashJoinNodeOptions join_opts{
595 ac::JoinType::INNER,
596 /*left_keys=*/{"str"},
597 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
598
599 ac::Declaration hashjoin{
600 "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
601
602 return ExecutePlanAndCollectAsTable(std::move(hashjoin));
603}
Summary¶
There are examples of these nodes which can be found in
cpp/examples/arrow/execution_plan_documentation_examples.cc
in the Arrow source.
Complete Example:
19#include <arrow/array.h>
20#include <arrow/builder.h>
21
22#include <arrow/acero/exec_plan.h>
23#include <arrow/compute/api.h>
24#include <arrow/compute/api_vector.h>
25#include <arrow/compute/cast.h>
26
27#include <arrow/csv/api.h>
28
29#include <arrow/dataset/dataset.h>
30#include <arrow/dataset/file_base.h>
31#include <arrow/dataset/file_parquet.h>
32#include <arrow/dataset/plan.h>
33#include <arrow/dataset/scanner.h>
34
35#include <arrow/io/interfaces.h>
36#include <arrow/io/memory.h>
37
38#include <arrow/result.h>
39#include <arrow/status.h>
40#include <arrow/table.h>
41
42#include <arrow/ipc/api.h>
43
44#include <arrow/util/future.h>
45#include <arrow/util/range.h>
46#include <arrow/util/thread_pool.h>
47#include <arrow/util/vector.h>
48
49#include <iostream>
50#include <memory>
51#include <utility>
52
53// Demonstrate various operators in Arrow Streaming Execution Engine
54
55namespace cp = ::arrow::compute;
56namespace ac = ::arrow::acero;
57
58constexpr char kSep[] = "******";
59
60void PrintBlock(const std::string& msg) {
61 std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
62}
63
64template <typename TYPE,
65 typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
66 arrow::is_boolean_type<TYPE>::value |
67 arrow::is_temporal_type<TYPE>::value>::type>
68arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
69 const std::vector<typename TYPE::c_type>& values) {
70 using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
71 ArrowBuilderType builder;
72 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
73 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
74 return builder.Finish();
75}
76
77template <class TYPE>
78arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
79 const std::vector<std::string>& values) {
80 using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
81 ArrowBuilderType builder;
82 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
83 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
84 return builder.Finish();
85}
86
87arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
88 const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
89 std::shared_ptr<arrow::RecordBatch> record_batch;
90 ARROW_ASSIGN_OR_RAISE(auto struct_result,
91 arrow::StructArray::Make(array_vector, field_vector));
92 return record_batch->FromStructArray(struct_result);
93}
94
95/// \brief Create a sample table
96/// The table's contents will be:
97/// a,b
98/// 1,null
99/// 2,true
100/// null,true
101/// 3,false
102/// null,true
103/// 4,false
104/// 5,null
105/// 6,false
106/// 7,false
107/// 8,true
108/// \return The created table
109
110arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
111 auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
112 ARROW_ASSIGN_OR_RAISE(auto int64_array,
113 GetArrayDataSample<arrow::Int64Type>(
114 {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
115
116 arrow::BooleanBuilder boolean_builder;
117 std::shared_ptr<arrow::BooleanArray> bool_array;
118
119 std::vector<uint8_t> bool_values = {false, true, true, false, true,
120 false, false, false, false, true};
121 std::vector<bool> is_valid = {false, true, true, true, true,
122 true, false, true, true, true};
123
124 ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
125
126 ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
127
128 ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
129
130 auto record_batch =
131 arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
132 arrow::field("b", arrow::boolean())}),
133 10, {int64_array, bool_array});
134 ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
135 return table;
136}
137
138/// \brief Create a sample dataset
139/// \return An in-memory dataset based on GetTable()
140arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
141 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
142 auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
143 return ds;
144}
145
146arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
147 const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
148 std::shared_ptr<arrow::RecordBatch> record_batch;
149 ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
150 cp::ExecBatch batch{*res_batch};
151 return batch;
152}
153
154// (Doc section: BatchesWithSchema Definition)
155struct BatchesWithSchema {
156 std::vector<cp::ExecBatch> batches;
157 std::shared_ptr<arrow::Schema> schema;
158 // This method uses internal arrow utilities to
159 // convert a vector of record batches to an AsyncGenerator of optional batches
160 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
161 auto opt_batches = ::arrow::internal::MapVector(
162 [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
163 batches);
164 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
165 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166 return gen;
167 }
168};
169// (Doc section: BatchesWithSchema Definition)
170
171// (Doc section: MakeBasicBatches Definition)
172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173 BatchesWithSchema out;
174 auto field_vector = {arrow::field("a", arrow::int32()),
175 arrow::field("b", arrow::boolean())};
176 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181 GetArrayDataSample<arrow::BooleanType>({false, true}));
182 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187 ARROW_ASSIGN_OR_RAISE(auto b1,
188 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189 ARROW_ASSIGN_OR_RAISE(auto b2,
190 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191 ARROW_ASSIGN_OR_RAISE(auto b3,
192 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194 out.batches = {b1, b2, b3};
195 out.schema = arrow::schema(field_vector);
196 return out;
197}
198// (Doc section: MakeBasicBatches Definition)
199
200arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
201 BatchesWithSchema out;
202 auto field = arrow::field("a", arrow::int32());
203 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
204 ARROW_ASSIGN_OR_RAISE(auto b2_int,
205 GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
206 ARROW_ASSIGN_OR_RAISE(auto b3_int,
207 GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
208 ARROW_ASSIGN_OR_RAISE(auto b4_int,
209 GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
210 ARROW_ASSIGN_OR_RAISE(auto b5_int,
211 GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
212 ARROW_ASSIGN_OR_RAISE(auto b6_int,
213 GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
214 ARROW_ASSIGN_OR_RAISE(auto b7_int,
215 GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
216 ARROW_ASSIGN_OR_RAISE(auto b8_int,
217 GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
218
219 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
220 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
221 ARROW_ASSIGN_OR_RAISE(auto b3,
222 GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
223 ARROW_ASSIGN_OR_RAISE(auto b4,
224 GetExecBatchFromVectors({field, field, field, field},
225 {b4_int, b5_int, b6_int, b7_int}));
226 out.batches = {b1, b2, b3, b4};
227 out.schema = arrow::schema({field});
228 return out;
229}
230
231arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
232 BatchesWithSchema out;
233 auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
234 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
235 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
236 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
237 ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
238 {"alpha", "beta", "alpha"}));
239 ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
240 {"alpha", "gamma", "alpha"}));
241 ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
242 {"gamma", "beta", "alpha"}));
243 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
244 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
245 ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
246 out.batches = {b1, b2, b3};
247
248 size_t batch_count = out.batches.size();
249 for (int repeat = 1; repeat < multiplicity; ++repeat) {
250 for (size_t i = 0; i < batch_count; ++i) {
251 out.batches.push_back(out.batches[i]);
252 }
253 }
254
255 out.schema = arrow::schema(fields);
256 return out;
257}
258
259arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) {
260 // collect sink_reader into a Table
261 std::shared_ptr<arrow::Table> response_table;
262 ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan)));
263
264 std::cout << "Results : " << response_table->ToString() << std::endl;
265
266 return arrow::Status::OK();
267}
268
269// (Doc section: Scan Example)
270
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280 auto options = std::make_shared<arrow::dataset::ScanOptions>();
281 options->projection = cp::project({}, {}); // create empty projection
282
283 // construct the scan node
284 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286 ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288 return ExecutePlanAndCollectAsTable(std::move(scan));
289}
290// (Doc section: Scan Example)
291
292// (Doc section: Source Example)
293
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom souce is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
313// (Doc section: Source Example)
314
315// (Doc section: Table Source Example)
316
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table. This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329 int max_batch_size = 2;
330 auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332 ac::Declaration source{"table_source", std::move(table_source_options)};
333
334 return ExecutePlanAndCollectAsTable(std::move(source));
335}
336// (Doc section: Table Source Example)
337
338// (Doc section: Filter Example)
339
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the exeuction plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349 auto options = std::make_shared<arrow::dataset::ScanOptions>();
350 // specify the filter. This filter removes all rows where the
351 // value of the "a" column is greater than 3.
352 cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353 // set filter for scanner : on-disk / push-down filtering.
354 // This step can be skipped if you are not reading from disk.
355 options->filter = filter_expr;
356 // empty projection
357 options->projection = cp::project({}, {});
358
359 // construct the scan node
360 std::cout << "Initialized Scanning Options" << std::endl;
361
362 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363 std::cout << "Scan node options created" << std::endl;
364
365 ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367 // pipe the scan node into the filter node
368 // Need to set the filter in scan node options and filter node options.
369 // At scan node it is used for on-disk / push-down filtering.
370 // At filter node it is used for in-memory filtering.
371 ac::Declaration filter{
372 "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374 return ExecutePlanAndCollectAsTable(std::move(filter));
375}
376
377// (Doc section: Filter Example)
378
379// (Doc section: Project Example)
380
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390 auto options = std::make_shared<arrow::dataset::ScanOptions>();
391 // projection
392 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393 options->projection = cp::project({}, {});
394
395 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397 ac::Declaration scan{"scan", std::move(scan_node_options)};
398 ac::Declaration project{
399 "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401 return ExecutePlanAndCollectAsTable(std::move(project));
402}
403
404// (Doc section: Project Example)
405
406// (Doc section: Scalar Aggregate Example)
407
408/// \brief An example showing an aggregation node to aggregate an entire table
409///
410/// Source-Aggregation-Table
411/// This example shows how an aggregation operation can be applied on a
412/// execution plan resulting in a scalar output. The source node loads the
413/// data and the aggregation (counting unique types in column 'a')
414/// is applied on this data. The output is collected into a table (that will
415/// have exactly one row)
416arrow::Status SourceScalarAggregateSinkExample() {
417 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
418
419 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
420
421 ac::Declaration source{"source", std::move(source_node_options)};
422 auto aggregate_options =
423 ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
424 ac::Declaration aggregate{
425 "aggregate", {std::move(source)}, std::move(aggregate_options)};
426
427 return ExecutePlanAndCollectAsTable(std::move(aggregate));
428}
429// (Doc section: Scalar Aggregate Example)
430
431// (Doc section: Group Aggregate Example)
432
433/// \brief An example showing an aggregation node to perform a group-by operation
434///
435/// Source-Aggregation-Table
436/// This example shows how an aggregation operation can be applied on a
437/// execution plan resulting in grouped output. The source node loads the
438/// data and the aggregation (counting unique types in column 'a') is
439/// applied on this data. The output is collected into a table that will contain
440/// one row for each unique combination of group keys.
441arrow::Status SourceGroupAggregateSinkExample() {
442 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
443
444 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
445
446 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
447
448 ac::Declaration source{"source", std::move(source_node_options)};
449 auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
450 auto aggregate_options =
451 ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
452 /*keys=*/{"b"}};
453 ac::Declaration aggregate{
454 "aggregate", {std::move(source)}, std::move(aggregate_options)};
455
456 return ExecutePlanAndCollectAsTable(std::move(aggregate));
457}
458// (Doc section: Group Aggregate Example)
459
460// (Doc section: ConsumingSink Example)
461
462/// \brief An example showing a consuming sink node
463///
464/// Source-Consuming-Sink
465/// This example shows how the data can be consumed within the execution plan
466/// by using a ConsumingSink node. There is no data output from this execution plan.
467arrow::Status SourceConsumingSinkExample() {
468 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
469
470 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
471
472 ac::Declaration source{"source", std::move(source_node_options)};
473
474 std::atomic<uint32_t> batches_seen{0};
475 arrow::Future<> finish = arrow::Future<>::Make();
476 struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
477 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
478 : batches_seen(batches_seen), finish(std::move(finish)) {}
479
480 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
481 ac::BackpressureControl* backpressure_control,
482 ac::ExecPlan* plan) override {
483 // This will be called as the plan is started (before the first call to Consume)
484 // and provides the schema of the data coming into the node, controls for pausing /
485 // resuming input, and a pointer to the plan itself which can be used to access
486 // other utilities such as the thread indexer or async task scheduler.
487 return arrow::Status::OK();
488 }
489
490 arrow::Status Consume(cp::ExecBatch batch) override {
491 (*batches_seen)++;
492 return arrow::Status::OK();
493 }
494
495 arrow::Future<> Finish() override {
496 // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
497 // output file handles and flushing remaining work
498 return arrow::Future<>::MakeFinished();
499 }
500
501 std::atomic<uint32_t>* batches_seen;
502 arrow::Future<> finish;
503 };
504 std::shared_ptr<CustomSinkNodeConsumer> consumer =
505 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
506
507 ac::Declaration consuming_sink{"consuming_sink",
508 {std::move(source)},
509 ac::ConsumingSinkNodeOptions(std::move(consumer))};
510
511 // Since we are consuming the data within the plan there is no output and we simply
512 // run the plan to completion instead of collecting into a table.
513 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
514
515 std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
516 << std::endl;
517 return arrow::Status::OK();
518}
519// (Doc section: ConsumingSink Example)
520
521// (Doc section: OrderBySink Example)
522
523arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
524 std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
525 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
526 // translate sink_gen (async) to sink_reader (sync)
527 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
528 ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
529
530 // validate the ExecPlan
531 ARROW_RETURN_NOT_OK(plan->Validate());
532 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
533 // start the ExecPlan
534 plan->StartProducing();
535
536 // collect sink_reader into a Table
537 std::shared_ptr<arrow::Table> response_table;
538
539 ARROW_ASSIGN_OR_RAISE(response_table,
540 arrow::Table::FromRecordBatchReader(sink_reader.get()));
541
542 std::cout << "Results : " << response_table->ToString() << std::endl;
543
544 // stop producing
545 plan->StopProducing();
546 // plan mark finished
547 auto future = plan->finished();
548 return future.status();
549}
550
551/// \brief An example showing an order-by node
552///
553/// Source-OrderBy-Sink
554/// In this example, the data enters through the source node
555/// and the data is ordered in the sink node. The order can be
556/// ASCENDING or DESCENDING and it is configurable. The output
557/// is obtained as a table from the sink node.
558arrow::Status SourceOrderBySinkExample() {
559 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
560 ac::ExecPlan::Make(*cp::threaded_exec_context()));
561
562 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
563
564 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
565
566 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
567 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
568 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
569
570 ARROW_RETURN_NOT_OK(ac::MakeExecNode(
571 "order_by_sink", plan.get(), {source},
572 ac::OrderBySinkNodeOptions{
573 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
574
575 return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
576}
577
578// (Doc section: OrderBySink Example)
579
580// (Doc section: HashJoin Example)
581
582/// \brief An example showing a hash join node
583///
584/// Source-HashJoin-Table
585/// This example shows how source node gets the data and how a self-join
586/// is applied on the data. The join options are configurable. The output
587/// is collected into a table.
588arrow::Status SourceHashJoinSinkExample() {
589 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
590
591 ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
592 ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
593
594 ac::HashJoinNodeOptions join_opts{
595 ac::JoinType::INNER,
596 /*left_keys=*/{"str"},
597 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
598
599 ac::Declaration hashjoin{
600 "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
601
602 return ExecutePlanAndCollectAsTable(std::move(hashjoin));
603}
604
605// (Doc section: HashJoin Example)
606
607// (Doc section: KSelect Example)
608
609/// \brief An example showing a select-k node
610///
611/// Source-KSelect
612/// This example shows how K number of elements can be selected
613/// either from the top or bottom. The output node is a modified
614/// sink node where output can be obtained as a table.
615arrow::Status SourceKSelectExample() {
616 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
617 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
618 ac::ExecPlan::Make(*cp::threaded_exec_context()));
619 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
620
621 ARROW_ASSIGN_OR_RAISE(
622 ac::ExecNode * source,
623 ac::MakeExecNode("source", plan.get(), {},
624 ac::SourceNodeOptions{input.schema, input.gen()}));
625
626 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
627
628 ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
629 ac::SelectKSinkNodeOptions{options, &sink_gen}));
630
631 auto schema = arrow::schema(
632 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
633
634 return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
635}
636
637// (Doc section: KSelect Example)
638
639// (Doc section: Write Example)
640
641/// \brief An example showing a write node
642/// \param file_path The destination to write to
643///
644/// Scan-Filter-Write
645/// This example shows how scan node can be used to load the data
646/// and after processing how it can be written to disk.
647arrow::Status ScanFilterWriteExample(const std::string& file_path) {
648 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
649
650 auto options = std::make_shared<arrow::dataset::ScanOptions>();
651 // empty projection
652 options->projection = cp::project({}, {});
653
654 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
655
656 ac::Declaration scan{"scan", std::move(scan_node_options)};
657
658 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
659
660 std::string root_path = "";
661 std::string uri = "file://" + file_path;
662 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
663 arrow::fs::FileSystemFromUri(uri, &root_path));
664
665 auto base_path = root_path + "/parquet_dataset";
666 // Uncomment the following line, if run repeatedly
667 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
668 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
669
670 // The partition schema determines which fields are part of the partitioning.
671 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
672 // We'll use Hive-style partitioning,
673 // which creates directories with "key=value" pairs.
674
675 auto partitioning =
676 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
677 // We'll write Parquet files.
678 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
679
680 arrow::dataset::FileSystemDatasetWriteOptions write_options;
681 write_options.file_write_options = format->DefaultWriteOptions();
682 write_options.filesystem = filesystem;
683 write_options.base_dir = base_path;
684 write_options.partitioning = partitioning;
685 write_options.basename_template = "part{i}.parquet";
686
687 arrow::dataset::WriteNodeOptions write_node_options{write_options};
688
689 ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
690
691 // Since the write node has no output we simply run the plan to completion and the
692 // data should be written
693 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
694
695 std::cout << "Dataset written to " << base_path << std::endl;
696 return arrow::Status::OK();
697}
698
699// (Doc section: Write Example)
700
701// (Doc section: Union Example)
702
703/// \brief An example showing a union node
704///
705/// Source-Union-Table
706/// This example shows how a union operation can be applied on two
707/// data sources. The output is collected into a table.
708arrow::Status SourceUnionSinkExample() {
709 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
710
711 ac::Declaration lhs{"source",
712 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
713 lhs.label = "lhs";
714 ac::Declaration rhs{"source",
715 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
716 rhs.label = "rhs";
717 ac::Declaration union_plan{
718 "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
719
720 return ExecutePlanAndCollectAsTable(std::move(union_plan));
721}
722
723// (Doc section: Union Example)
724
725// (Doc section: Table Sink Example)
726
727/// \brief An example showing a table sink node
728///
729/// TableSink Example
730/// This example shows how a table_sink can be used
731/// in an execution plan. This includes a source node
732/// receiving data as batches and the table sink node
733/// which emits the output as a table.
734arrow::Status TableSinkExample() {
735 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
736 ac::ExecPlan::Make(*cp::threaded_exec_context()));
737
738 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
739
740 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
741
742 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
743 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
744
745 std::shared_ptr<arrow::Table> output_table;
746 auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
747
748 ARROW_RETURN_NOT_OK(
749 ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
750 // validate the ExecPlan
751 ARROW_RETURN_NOT_OK(plan->Validate());
752 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
753 // start the ExecPlan
754 plan->StartProducing();
755
756 // Wait for the plan to finish
757 auto finished = plan->finished();
758 RETURN_NOT_OK(finished.status());
759 std::cout << "Results : " << output_table->ToString() << std::endl;
760 return arrow::Status::OK();
761}
762
763// (Doc section: Table Sink Example)
764
765// (Doc section: RecordBatchReaderSource Example)
766
767/// \brief An example showing the usage of a RecordBatchReader as the data source.
768///
769/// RecordBatchReaderSourceSink Example
770/// This example shows how a record_batch_reader_source can be used
771/// in an execution plan. This includes the source node
772/// receiving data from a TableRecordBatchReader.
773
774arrow::Status RecordBatchReaderSourceSinkExample() {
775 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
776 std::shared_ptr<arrow::RecordBatchReader> reader =
777 std::make_shared<arrow::TableBatchReader>(table);
778 ac::Declaration reader_source{"record_batch_reader_source",
779 ac::RecordBatchReaderSourceNodeOptions{reader}};
780 return ExecutePlanAndCollectAsTable(std::move(reader_source));
781}
782
783// (Doc section: RecordBatchReaderSource Example)
784
785enum ExampleMode {
786 SOURCE_SINK = 0,
787 TABLE_SOURCE_SINK = 1,
788 SCAN = 2,
789 FILTER = 3,
790 PROJECT = 4,
791 SCALAR_AGGREGATION = 5,
792 GROUP_AGGREGATION = 6,
793 CONSUMING_SINK = 7,
794 ORDER_BY_SINK = 8,
795 HASHJOIN = 9,
796 KSELECT = 10,
797 WRITE = 11,
798 UNION = 12,
799 TABLE_SOURCE_TABLE_SINK = 13,
800 RECORD_BATCH_READER_SOURCE = 14
801};
802
803int main(int argc, char** argv) {
804 if (argc < 3) {
805 // Fake success for CI purposes.
806 return EXIT_SUCCESS;
807 }
808
809 std::string base_save_path = argv[1];
810 int mode = std::atoi(argv[2]);
811 arrow::Status status;
812 // ensure arrow::dataset node factories are in the registry
813 arrow::dataset::internal::Initialize();
814 switch (mode) {
815 case SOURCE_SINK:
816 PrintBlock("Source Sink Example");
817 status = SourceSinkExample();
818 break;
819 case TABLE_SOURCE_SINK:
820 PrintBlock("Table Source Sink Example");
821 status = TableSourceSinkExample();
822 break;
823 case SCAN:
824 PrintBlock("Scan Example");
825 status = ScanSinkExample();
826 break;
827 case FILTER:
828 PrintBlock("Filter Example");
829 status = ScanFilterSinkExample();
830 break;
831 case PROJECT:
832 PrintBlock("Project Example");
833 status = ScanProjectSinkExample();
834 break;
835 case GROUP_AGGREGATION:
836 PrintBlock("Aggregate Example");
837 status = SourceGroupAggregateSinkExample();
838 break;
839 case SCALAR_AGGREGATION:
840 PrintBlock("Aggregate Example");
841 status = SourceScalarAggregateSinkExample();
842 break;
843 case CONSUMING_SINK:
844 PrintBlock("Consuming-Sink Example");
845 status = SourceConsumingSinkExample();
846 break;
847 case ORDER_BY_SINK:
848 PrintBlock("OrderBy Example");
849 status = SourceOrderBySinkExample();
850 break;
851 case HASHJOIN:
852 PrintBlock("HashJoin Example");
853 status = SourceHashJoinSinkExample();
854 break;
855 case KSELECT:
856 PrintBlock("KSelect Example");
857 status = SourceKSelectExample();
858 break;
859 case WRITE:
860 PrintBlock("Write Example");
861 status = ScanFilterWriteExample(base_save_path);
862 break;
863 case UNION:
864 PrintBlock("Union Example");
865 status = SourceUnionSinkExample();
866 break;
867 case TABLE_SOURCE_TABLE_SINK:
868 PrintBlock("TableSink Example");
869 status = TableSinkExample();
870 break;
871 case RECORD_BATCH_READER_SOURCE:
872 PrintBlock("RecordBatchReaderSource Example");
873 status = RecordBatchReaderSourceSinkExample();
874 break;
875 default:
876 break;
877 }
878
879 if (status.ok()) {
880 return EXIT_SUCCESS;
881 } else {
882 std::cout << "Error occurred: " << status.message() << std::endl;
883 return EXIT_FAILURE;
884 }
885}