Acero: A C++ streaming execution engine¶
Warning
Acero is experimental and a stable API is not yet guaranteed.
Motivation¶
For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, the Arrow C++ implementation also provides Acero, a streaming query engine with which computations can be formulated and executed.
Acero allows computation to be expressed as an “execution plan”
(ExecPlan
) which is a directed graph of operators. Each operator
(ExecNode
) provides, transforms, or consumes the data passing
through it. Batches of data (ExecBatch
) flow along edges of
the graph from node to node. Structuring the API around streams of batches
allows the working set for each node to be tuned for optimal performance
independent of any other nodes in the graph. Each ExecNode
processes batches as they are pushed to it along an edge of the graph by
upstream nodes (its inputs), and pushes batches along an edge of the graph
to downstream nodes (its outputs) as they are finalized.
Substrait¶
In order to use Acero you will need to create an execution plan. This is the model that describes the computation you want to apply to your data. Acero has its own internal representation for execution plans but most users should not interact with this directly as it will couple their code to Acero.
Substrait is an open standard for execution plans. Acero implements the Substrait “consumer” interface. This means that Acero can accept a Substrait plan and fulfill the plan, loading the requested data and applying the desired computation. By using Substrait plans users can easily switch out to a different execution engine at a later time.
Substrait Conformance¶
Substrait defines a broad set of operators and functions for many different situations and it is unlikely that Acero will ever completely satisfy all defined Substrait operators and functions. To help understand what features are available the following sections define which features have been currently implemented in Acero and any caveats that apply.
Plans¶
A plan should have a single top-level relation.
The consumer is currently based on a custom build of Substrait that is older than 0.1.0. Any features added that are newer than 0.1.0 will not be supported.
Extensions¶
If a plan contains any extension type variations it will be rejected.
If a plan contains any advanced extensions it will be rejected.
Relations (in general)¶
The
emit
property (to customize output order of a node or to drop columns) is not supported and plans containing this property will be rejected.The
hint
property is not supported and plans containing this property will be rejected.Any advanced extensions will cause a plan to be rejected.
Any relation not explicitly listed below will not be supported and will cause the plan to be rejected.
Read Relations¶
The
projection
property is not supported and plans containing this property will be rejected.The only supported read type is
LocalFiles
. Plans with any other type will be rejected.Only the parquet file format is currently supported.
All URIs must use the
file
scheme
partition_index
,start
, andlength
are not supported. Plans containing these properties will be rejected.The Substrait spec requires that a
filter
be completely satisfied by a read relation. However, Acero only uses a read filter for pushdown projection and it may not be fully satisfied. Users should generally attach an additional filter relation with the same filter expression after the read relation.
Filter Relations¶
No know caveats
Project Relations¶
No known caveats
Join Relations¶
The join type
JOIN_TYPE_SINGLE
is not supported and plans containing this will be rejected.The join expression must be a call to either the
equal
oris_not_distinct_from
functions. Both arguments to the call must be direct references. Only a single join key is supported.The
post_join_filter
property is not supported and will be ignored.
Aggregate Relations¶
At most one grouping set is supported.
Each grouping expression must be a direct reference.
Each measure’s arguments must be direct references.
A measure may not have a filter
A measure may not have sorts
A measure’s invocation must be AGGREGATION_INVOCATION_ALL
A measure’s phase must be AGGREGATION_PHASE_INITIAL_TO_RESULT
Expressions (general)¶
Various places in the Substrait spec allow for expressions to be used outside of a filter or project relation. For example, a join expression or an aggregate grouping set. Acero typically expects these expressions to be direct references. Planners should extract the implicit projection into a formal project relation before delivering the plan to Acero.
Older versions of Isthmus would omit optional arguments instead of including them as unspecified enums. Acero will not support these plans.
Literals¶
A literal with non-default nullability will cause a plan to be rejected.
Types¶
Acero does not have full support for non-nullable types and may allow input to have nulls without rejecting it.
The table below shows the mapping between Arrow types and Substrait type classes that are currently supported
Functions¶
Acero does not support the legacy
args
style of declaring argumentsThe following functions have caveats or are not supported at all. Note that this is not a comprehensive list. Functions are being added to Substrait at a rapid pace and new functions may be missing.
Acero does not support the SATURATE option for overflow
Acero does not support kernels that take more than two arguments for the functions
and
,or
,xor
Acero does not support temporal arithmetic
Acero does not support the following standard functions:
is_not_distinct_from
like
substring
starts_with
ends_with
contains
count
count_distinct
approx_count_distinct
The functions above must be referenced using the URI
https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml
Architecture Overview¶
ExecNode
Each node in the graph is an implementation of the
ExecNode
interface.ExecPlan
A set of
ExecNode
is contained and (to an extent) coordinated by anExecPlan
.ExecFactoryRegistry
Instances of
ExecNode
are constructed by factory functions held in aExecFactoryRegistry
.ExecNodeOptions
Heterogenous parameters for factories of
ExecNode
are bundled in anExecNodeOptions
.Declaration
dplyr
-inspired helper for efficient construction of anExecPlan
.ExecBatch
A lightweight container for a single chunk of data in the Arrow format. In contrast to
RecordBatch
,ExecBatch
is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by aScalar
instead of anArray
. In addition,ExecBatch
may carry execution-relevant properties including a guaranteed-true-filter forExpression
simplification.
An example ExecNode
implementation which simply passes all input batches
through unchanged:
class PassthruNode : public ExecNode {
public:
// InputReceived is the main entry point for ExecNodes. It is invoked
// by an input of this node to push a batch here for processing.
void InputReceived(ExecNode* input, ExecBatch batch) override {
// Since this is a passthru node we simply push the batch to our
// only output here.
outputs_[0]->InputReceived(this, batch);
}
// ErrorReceived is called by an input of this node to report an error.
// ExecNodes should always forward errors to their outputs unless they
// are able to fully handle the error (this is rare).
void ErrorReceived(ExecNode* input, Status error) override {
outputs_[0]->ErrorReceived(this, error);
}
// InputFinished is used to signal how many batches will ultimately arrive.
// It may be called with any ordering relative to InputReceived/ErrorReceived.
void InputFinished(ExecNode* input, int total_batches) override {
outputs_[0]->InputFinished(this, total_batches);
}
// ExecNodes may request that their inputs throttle production of batches
// until they are ready for more, or stop production if no further batches
// are required. These signals should typically be forwarded to the inputs
// of the ExecNode.
void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }
// An ExecNode has a single output schema to which all its batches conform.
using ExecNode::output_schema;
// ExecNodes carry basic introspection for debugging purposes
const char* kind_name() const override { return "PassthruNode"; }
using ExecNode::label;
using ExecNode::SetLabel;
using ExecNode::ToString;
// An ExecNode holds references to its inputs and outputs, so it is possible
// to walk the graph of execution if necessary.
using ExecNode::inputs;
using ExecNode::outputs;
// StartProducing() and StopProducing() are invoked by an ExecPlan to
// coordinate the graph-wide execution state. These do not need to be
// forwarded to inputs or outputs.
Status StartProducing() override { return Status::OK(); }
void StopProducing() override {}
Future<> finished() override { return inputs_[0]->finished(); }
};
Note that each method which is associated with an edge of the graph must be invoked
with an ExecNode*
to identify the node which invoked it. For example, in an
ExecNode
which implements JOIN
this tagging might be used to differentiate
between batches from the left or right inputs.
InputReceived
, ErrorReceived
, InputFinished
may only be invoked by
the inputs of a node, while ResumeProducing
, PauseProducing
, StopProducing
may only be invoked by outputs of a node.
ExecPlan
contains the associated instances of ExecNode
and is used to start and stop execution of all nodes and for querying/awaiting
their completion:
// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));
// ... add nodes to your ExecPlan
// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());
SetUserCancellationCallback([plan] {
// stop all nodes in the graph
plan->StopProducing();
});
// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();
Constructing ExecPlan
objects¶
Warning
The following will be superceded by construction from Compute IR, see ARROW-14074.
None of the concrete implementations of ExecNode
are exposed
in headers, so they can’t be constructed directly outside the
translation unit where they are defined. Instead, factories to
create them are provided in an extensible registry. This structure
provides a number of benefits:
This enforces consistent construction.
It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)
This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate
libarrow_dataset.so
library.Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.
Factories of ExecNode
can be retrieved by name from the registry.
The default registry is available through
arrow::compute::default_exec_factory_registry()
and can be queried for the built-in factories:
// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
default_exec_factory_registry()->GetFactory("filter"));
// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
// the ExecPlan which should own this node
plan.get(),
// nodes which will send batches to this node (inputs)
{scan_node},
// parameters unique to "filter" nodes
FilterNodeOptions{filter_expression}));
// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
plan.get(), {scan_node}, FilterNodeOptions{filter_expression});
Factories can also be added to the default registry as long as they are
convertible to std::function<Result<ExecNode*>(
ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>
.
To build an ExecPlan
representing a simple pipeline which
reads from a RecordBatchReader
then filters, projects, and
writes to disk:
std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
SourceNodeOptions::FromReader(
reader,
GetCpuThreadPool()));
ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
FilterNodeOptions{
greater(field_ref("score"), literal(3))
});
ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
ProjectNodeOptions{
{add(field_ref("score"), literal(1))},
{"score + 1"}
});
arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});
Declaration
is a dplyr-inspired
helper which further decreases the boilerplate associated with populating
an ExecPlan
from C++:
arrow::dataset::internal::Initialize();
std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
{
{"source", SourceNodeOptions::FromReader(
reader,
GetCpuThreadPool())},
{"filter", FilterNodeOptions{
greater(field_ref("score"), literal(3))}},
{"project", ProjectNodeOptions{
{add(field_ref("score"), literal(1))},
{"score + 1"}}},
{"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
})
.AddToPlan(plan.get()));
Note that a source node can wrap anything which resembles a stream of batches.
For example, PR#11032 adds
support for use of a DuckDB query as a source node.
Similarly, a sink node can wrap anything which absorbs a stream of batches.
In the example above we’re writing completed
batches to disk. However we can also collect these in memory into a Table
or forward them to a RecordBatchReader
as an out-of-graph stream.
This flexibility allows an ExecPlan
to be used as streaming middleware
between any endpoints which support Arrow formatted batches.
An arrow::dataset::Dataset
can also be wrapped as a source node which
pushes all the dataset’s batches into an ExecPlan
. This factory is added
to the default registry with the name "scan"
by calling
arrow::dataset::internal::Initialize()
:
arrow::dataset::internal::Initialize();
std::shared_ptr<Dataset> dataset = GetDataset();
ASSERT_OK(Declaration::Sequence(
{
{"scan", ScanNodeOptions{dataset,
/* push down predicate, projection, ... */}},
{"filter", FilterNodeOptions{/* ... */}},
// ...
})
.AddToPlan(plan.get()));
Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.
Constructing ExecNode
using Options¶
ExecNode
is the component we use as a building block
containing in-built operations with various functionalities.
This is the list of operations associated with the execution plan:
Operation |
Options |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
N/A |
|
source
¶
A source
operation can be considered as an entry point to create a streaming execution plan.
arrow::compute::SourceNodeOptions
are used to create the source
operation. The
source
operation is the most generic and flexible type of source currently available but it can
be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.
The source node requires some kind of function that can be called to poll for more data. This
function should take no arguments and should return an
arrow::Future<std::optional<arrow::ExecBatch>>
.
This function might be reading a file, iterating through an in memory structure, or receiving data
from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator
and there are a number of utilities for working with these functions. For this example we use
a vector of record batches that we’ve already stored in memory.
In addition, the schema of the data must be known up front. Acero must know the schema of the data
at each stage of the execution graph before any processing has begun. This means we must supply the
schema for a source node separately from the data itself.
Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :
155struct BatchesWithSchema {
156 std::vector<cp::ExecBatch> batches;
157 std::shared_ptr<arrow::Schema> schema;
158 // This method uses internal arrow utilities to
159 // convert a vector of record batches to an AsyncGenerator of optional batches
160 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
161 auto opt_batches = ::arrow::internal::MapVector(
162 [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
163 batches);
164 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
165 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166 return gen;
167 }
168};
Generating sample batches for computation:
172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173 BatchesWithSchema out;
174 auto field_vector = {arrow::field("a", arrow::int32()),
175 arrow::field("b", arrow::boolean())};
176 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181 GetArrayDataSample<arrow::BooleanType>({false, true}));
182 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187 ARROW_ASSIGN_OR_RAISE(auto b1,
188 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189 ARROW_ASSIGN_OR_RAISE(auto b2,
190 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191 ARROW_ASSIGN_OR_RAISE(auto b3,
192 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194 out.batches = {b1, b2, b3};
195 out.schema = arrow::schema(field_vector);
196 return out;
197}
Example of using source
(usage of sink is explained in detail in sink):
325/// \brief An example demonstrating a source and sink node
326/// \param exec_context The execution context to run the plan in
327///
328/// Source-Sink Example
329/// This example shows how a source and sink can be used
330/// in an execution plan. This includes source node receiving data
331/// and the sink node emits the data as an output represented in
332/// a table.
333arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
334 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
335 cp::ExecPlan::Make(&exec_context));
336
337 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
338
339 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
340
341 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
342
343 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
344 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
345
346 ARROW_RETURN_NOT_OK(
347 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
348
349 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
350}
table_source
¶
In the previous example, source node, a source node
was used to input the data. But when developing an application, if the data is already in memory
as a table, it is much easier, and more performant to use arrow::compute::TableSourceNodeOptions
.
Here the input data can be passed as a std::shared_ptr<arrow::Table>
along with a max_batch_size
.
The max_batch_size
is to break up large record batches so that they can be processed in parallel.
It is important to note that the table batches will not get merged to form larger batches when the source
table has a smaller batch size.
Example of using table_source
355/// \brief An example showing a table source node
356/// \param exec_context The execution context to run the plan in
357///
358/// TableSource-Sink Example
359/// This example shows how a table_source and sink can be used
360/// in an execution plan. This includes a table source node
361/// receiving data from a table and the sink node emits
362/// the data to a generator which we collect into a table.
363arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
364 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
365 cp::ExecPlan::Make(&exec_context));
366
367 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
368
369 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
370 int max_batch_size = 2;
371 auto table_source_options = cp::TableSourceNodeOptions{table, max_batch_size};
372
373 ARROW_ASSIGN_OR_RAISE(
374 cp::ExecNode * source,
375 cp::MakeExecNode("table_source", plan.get(), {}, table_source_options));
376
377 ARROW_RETURN_NOT_OK(
378 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
379
380 return ExecutePlanAndCollectAsTable(exec_context, plan, table->schema(), sink_gen);
381}
filter
¶
filter
operation, as the name suggests, provides an option to define data filtering
criteria. It selects rows matching a given expression. Filters can be written using
arrow::compute::Expression
. For example, if we wish to keep rows where the value
of column b
is greater than 3, then we can use the following expression.
Filter example:
386/// \brief An example showing a filter node
387/// \param exec_context The execution context to run the plan in
388///
389/// Source-Filter-Sink
390/// This example shows how a filter can be used in an execution plan,
391/// along with the source and sink operations. The output from the
392/// exeuction plan is obtained as a table via the sink node.
393arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
394 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
395 cp::ExecPlan::Make(&exec_context));
396
397 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
398
399 auto options = std::make_shared<arrow::dataset::ScanOptions>();
400 // specify the filter. This filter removes all rows where the
401 // value of the "a" column is greater than 3.
402 cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
403 // set filter for scanner : on-disk / push-down filtering.
404 // This step can be skipped if you are not reading from disk.
405 options->filter = filter_opt;
406 // empty projection
407 options->projection = cp::project({}, {});
408
409 // construct the scan node
410 std::cout << "Initialized Scanning Options" << std::endl;
411
412 cp::ExecNode* scan;
413
414 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
415 std::cout << "Scan node options created" << std::endl;
416
417 ARROW_ASSIGN_OR_RAISE(scan,
418 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
419
420 // pipe the scan node into the filter node
421 // Need to set the filter in scan node options and filter node options.
422 // At scan node it is used for on-disk / push-down filtering.
423 // At filter node it is used for in-memory filtering.
424 cp::ExecNode* filter;
425 ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
426 cp::FilterNodeOptions{filter_opt}));
427
428 // finally, pipe the filter node into a sink node
429 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
430 ARROW_RETURN_NOT_OK(
431 cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
432
433 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
434}
project
¶
project
operation rearranges, deletes, transforms, and creates columns.
Each output column is computed by evaluating an expression
against the source record batch. This is exposed via
arrow::compute::ProjectNodeOptions
which requires,
an arrow::compute::Expression
and name for each of the output columns (if names are not
provided, the string representations of exprs will be used).
Project example:
440/// \brief An example showing a project node
441/// \param exec_context The execution context to run the plan in
442///
443/// Scan-Project-Sink
444/// This example shows how Scan operation can be used to load the data
445/// into the execution plan, how project operation can be applied on the
446/// data stream and how the output is obtained as a table via the sink node.
447arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
448 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
449 cp::ExecPlan::Make(&exec_context));
450
451 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
452
453 auto options = std::make_shared<arrow::dataset::ScanOptions>();
454 // projection
455 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
456 options->projection = cp::project({}, {});
457
458 cp::ExecNode* scan;
459
460 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
461
462 ARROW_ASSIGN_OR_RAISE(scan,
463 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
464
465 cp::ExecNode* project;
466 ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
467 cp::ProjectNodeOptions{{a_times_2}}));
468 // schema after projection => multiply(a, 2): int64
469 std::cout << "Schema after projection : \n"
470 << project->output_schema()->ToString() << std::endl;
471
472 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
473 ARROW_RETURN_NOT_OK(
474 cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
475 auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
476
477 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
478}
aggregate
¶
The aggregate
node computes various types of aggregates over data.
Arrow supports two types of aggregates: “scalar” aggregates, and
“hash” aggregates. Scalar aggregates reduce an array or scalar input
to a single scalar output (e.g. computing the mean of a column). Hash
aggregates act like GROUP BY
in SQL and first partition data based
on one or more key columns, then reduce the data in each
partition. The aggregate
node supports both types of computation,
and can compute any number of aggregations at once.
arrow::compute::AggregateNodeOptions
is used to define the
aggregation criteria. It takes a list of aggregation functions and
their options; a list of target fields to aggregate, one per function;
and a list of names for the output fields, one per function.
Optionally, it takes a list of columns that are used to partition the
data, in the case of a hash aggregation. The aggregation functions
can be selected from this list of aggregation functions.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.
Scalar Aggregation example:
484/// \brief An example showing an aggregation node to aggregate an entire table
485/// \param exec_context The execution context to run the plan in
486///
487/// Source-Aggregation-Sink
488/// This example shows how an aggregation operation can be applied on a
489/// execution plan resulting a scalar output. The source node loads the
490/// data and the aggregation (counting unique types in column 'a')
491/// is applied on this data. The output is obtained from the sink node as a table.
492arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
493 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
494 cp::ExecPlan::Make(&exec_context));
495
496 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
497
498 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
499
500 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
501
502 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
503 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
504 auto aggregate_options =
505 cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
506 ARROW_ASSIGN_OR_RAISE(
507 cp::ExecNode * aggregate,
508 cp::MakeExecNode("aggregate", plan.get(), {source}, std::move(aggregate_options)));
509
510 ARROW_RETURN_NOT_OK(
511 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
512 auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
513
514 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
515}
Group Aggregation example:
520/// \brief An example showing an aggregation node to perform a group-by operation
521/// \param exec_context The execution context to run the plan in
522///
523/// Source-Aggregation-Sink
524/// This example shows how an aggregation operation can be applied on a
525/// execution plan resulting a grouped output. The source node loads the
526/// data and the aggregation (counting unique types in column 'a') is
527/// applied on this data. The output is obtained from the sink node as a table.
528arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
529 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
530 cp::ExecPlan::Make(&exec_context));
531
532 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
533
534 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
535
536 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
537
538 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
539 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
540 auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
541 auto aggregate_options =
542 cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
543 /*keys=*/{"b"}};
544 ARROW_ASSIGN_OR_RAISE(
545 cp::ExecNode * aggregate,
546 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
547
548 ARROW_RETURN_NOT_OK(
549 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
550 auto schema = arrow::schema({
551 arrow::field("count(a)", arrow::int32()),
552 arrow::field("b", arrow::boolean()),
553 });
554
555 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
556}
sink
¶
sink
operation provides output and is the final node of a streaming
execution definition. arrow::compute::SinkNodeOptions
interface is used to pass
the required options. Similar to the source operator the sink operator exposes the output
with a function that returns a record batch future each time it is called. It is expected the
caller will repeatedly call this function until the generator function is exhausted (returns
std::optional::nullopt
). If this function is not called often enough then record batches
will accumulate in memory. An execution plan should only have one
“terminal” node (one sink node). An ExecPlan
can terminate early due to cancellation or
an error, before the output is fully consumed. However, the plan can be safely destroyed independently
of the sink, which will hold the unconsumed batches by exec_plan->finished().
As a part of the Source Example, the Sink operation is also included;
325/// \brief An example demonstrating a source and sink node
326/// \param exec_context The execution context to run the plan in
327///
328/// Source-Sink Example
329/// This example shows how a source and sink can be used
330/// in an execution plan. This includes source node receiving data
331/// and the sink node emits the data as an output represented in
332/// a table.
333arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
334 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
335 cp::ExecPlan::Make(&exec_context));
336
337 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
338
339 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
340
341 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
342
343 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
344 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
345
346 ARROW_RETURN_NOT_OK(
347 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
348
349 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
350}
consuming_sink
¶
consuming_sink
operator is a sink operation containing consuming operation within the
execution plan (i.e. the exec plan should not complete until the consumption has completed).
Unlike the sink
node this node takes in a callback function that is expected to consume the
batch. Once this callback has finished the execution plan will no longer hold any reference to
the batch.
The consuming function may be called before a previous invocation has completed. If the consuming
function does not run quickly enough then many concurrent executions could pile up, blocking the
CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks
have been completed.
Once all batches have been delivered the execution plan will wait for the finish future to complete
before marking the execution plan finished. This allows for workflows where the consumption function
converts batches into async tasks (this is currently done internally for the dataset write node).
Example:
// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
batches_seen(batches_seen), finish(std::move(finish)) {}
// Consumption logic can be written here
arrow::Status Consume(cp::ExecBatch batch) override {
// data can be consumed in the expected way
// transfer to another system or just do some work
// and write to disk
(*batches_seen)++;
return arrow::Status::OK();
}
arrow::Future<> Finish() override { return finish; }
std::atomic<uint32_t> *batches_seen;
arrow::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
arrow::compute::ExecNode *consuming_sink;
ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
{source}, cp::ConsumingSinkNodeOptions(consumer)));
Consuming-Sink example:
561/// \brief An example showing a consuming sink node
562/// \param exec_context The execution context to run the plan in
563///
564/// Source-Consuming-Sink
565/// This example shows how the data can be consumed within the execution plan
566/// by using a ConsumingSink node. There is no data output from this execution plan.
567arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
568 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
569 cp::ExecPlan::Make(&exec_context));
570
571 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
572
573 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
574
575 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
576 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
577
578 std::atomic<uint32_t> batches_seen{0};
579 arrow::Future<> finish = arrow::Future<>::Make();
580 struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
581 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
582 : batches_seen(batches_seen), finish(std::move(finish)) {}
583
584 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
585 cp::BackpressureControl* backpressure_control) override {
586 return arrow::Status::OK();
587 }
588
589 arrow::Status Consume(cp::ExecBatch batch) override {
590 (*batches_seen)++;
591 return arrow::Status::OK();
592 }
593
594 arrow::Future<> Finish() override { return finish; }
595
596 std::atomic<uint32_t>* batches_seen;
597 arrow::Future<> finish;
598 };
599 std::shared_ptr<CustomSinkNodeConsumer> consumer =
600 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
601
602 cp::ExecNode* consuming_sink;
603
604 ARROW_ASSIGN_OR_RAISE(consuming_sink,
605 MakeExecNode("consuming_sink", plan.get(), {source},
606 cp::ConsumingSinkNodeOptions(consumer)));
607
608 ARROW_RETURN_NOT_OK(consuming_sink->Validate());
609
610 ARROW_RETURN_NOT_OK(plan->Validate());
611 std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
612 // plan start producing
613 ARROW_RETURN_NOT_OK(plan->StartProducing());
614 // Source should finish fairly quickly
615 ARROW_RETURN_NOT_OK(source->finished().status());
616 std::cout << "Source Finished!" << std::endl;
617 // Mark consumption complete, plan should finish
618 finish.MarkFinished(arrow::Status::OK());
619 ARROW_RETURN_NOT_OK(plan->finished().status());
620 return arrow::Status::OK();
621}
order_by_sink
¶
order_by_sink
operation is an extension to the sink
operation.
This operation provides the ability to guarantee the ordering of the
stream by providing the arrow::compute::OrderBySinkNodeOptions
.
Here the arrow::compute::SortOptions
are provided to define which columns
are used for sorting and whether to sort by ascending or descending values.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
Order-By-Sink example:
626/// \brief An example showing an order-by node
627/// \param exec_context The execution context to run the plan in
628///
629/// Source-OrderBy-Sink
630/// In this example, the data enters through the source node
631/// and the data is ordered in the sink node. The order can be
632/// ASCENDING or DESCENDING and it is configurable. The output
633/// is obtained as a table from the sink node.
634arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
635 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
636 cp::ExecPlan::Make(&exec_context));
637
638 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
639
640 std::cout << "basic data created" << std::endl;
641
642 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
643
644 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
645 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
646 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
647
648 ARROW_RETURN_NOT_OK(cp::MakeExecNode(
649 "order_by_sink", plan.get(), {source},
650 cp::OrderBySinkNodeOptions{
651 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
652
653 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
654}
select_k_sink
¶
select_k_sink
option enables selecting the top/bottom K elements,
similar to a SQL ORDER BY ... LIMIT K
clause.
arrow::compute::SelectKOptions
which is a defined by
using OrderBySinkNode
definition. This option returns a sink node that receives
inputs and then compute top_k/bottom_k.
Note
This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
SelectK example:
705/// \brief An example showing a select-k node
706/// \param exec_context The execution context to run the plan in
707///
708/// Source-KSelect
709/// This example shows how K number of elements can be selected
710/// either from the top or bottom. The output node is a modified
711/// sink node where output can be obtained as a table.
712arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
713 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
714 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
715 cp::ExecPlan::Make(&exec_context));
716 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
717
718 ARROW_ASSIGN_OR_RAISE(
719 cp::ExecNode * source,
720 cp::MakeExecNode("source", plan.get(), {},
721 cp::SourceNodeOptions{input.schema, input.gen()}));
722
723 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
724
725 ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
726 cp::SelectKSinkNodeOptions{options, &sink_gen}));
727
728 auto schema = arrow::schema(
729 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
730
731 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
732}
table_sink
¶
The table_sink
node provides the ability to receive the output as an in-memory table.
This is simpler to use than the other sink nodes provided by the streaming execution engine
but it only makes sense when the output fits comfortably in memory.
The node is created using arrow::compute::TableSinkNodeOptions
.
Example of using table_sink
852/// \brief An example showing a table sink node
853/// \param exec_context The execution context to run the plan in
854///
855/// TableSink Example
856/// This example shows how a table_sink can be used
857/// in an execution plan. This includes a source node
858/// receiving data as batches and the table sink node
859/// which emits the output as a table.
860arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
861 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
862 cp::ExecPlan::Make(&exec_context));
863
864 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
865
866 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
867
868 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
869 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
870
871 std::shared_ptr<arrow::Table> output_table;
872 auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
873
874 ARROW_RETURN_NOT_OK(
875 cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
876 // validate the ExecPlan
877 ARROW_RETURN_NOT_OK(plan->Validate());
878 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
879 // start the ExecPlan
880 ARROW_RETURN_NOT_OK(plan->StartProducing());
881
882 // Wait for the plan to finish
883 auto finished = plan->finished();
884 RETURN_NOT_OK(finished.status());
885 std::cout << "Results : " << output_table->ToString() << std::endl;
886 return arrow::Status::OK();
887}
scan
¶
scan
is an operation used to load and process datasets. It should be preferred over the
more generic source
node when your input is a dataset. The behavior is defined using
arrow::dataset::ScanNodeOptions
. More information on datasets and the various
scan options can be found in Tabular Datasets.
This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.
Scan example:
289/// \brief An example demonstrating a scan and sink node
290/// \param exec_context The execution context to run the plan in
291///
292/// Scan-Sink
293/// This example shows how scan operation can be applied on a dataset.
294/// There are operations that can be applied on the scan (project, filter)
295/// and the input data can be processed. The output is obtained as a table
296/// via the sink node.
297arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
298 // Execution plan created
299 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
300 cp::ExecPlan::Make(&exec_context));
301
302 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
303
304 auto options = std::make_shared<arrow::dataset::ScanOptions>();
305 options->projection = cp::project({}, {}); // create empty projection
306
307 // construct the scan node
308 cp::ExecNode* scan;
309 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
310
311 ARROW_ASSIGN_OR_RAISE(scan,
312 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
313
314 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
315
316 ARROW_RETURN_NOT_OK(
317 cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
318
319 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
320}
write
¶
The write
node saves query results as a dataset of files in a
format like Parquet, Feather, CSV, etc. using the Tabular Datasets
functionality in Arrow. The write options are provided via the
arrow::dataset::WriteNodeOptions
which in turn contains
arrow::dataset::FileSystemDatasetWriteOptions
.
arrow::dataset::FileSystemDatasetWriteOptions
provides
control over the written dataset, including options like the output
directory, file naming scheme, and so on.
Write example:
738/// \brief An example showing a write node
739/// \param exec_context The execution context to run the plan in
740/// \param file_path The destination to write to
741///
742/// Scan-Filter-Write
743/// This example shows how scan node can be used to load the data
744/// and after processing how it can be written to disk.
745arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
746 const std::string& file_path) {
747 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
748 cp::ExecPlan::Make(&exec_context));
749
750 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
751
752 auto options = std::make_shared<arrow::dataset::ScanOptions>();
753 // empty projection
754 options->projection = cp::project({}, {});
755
756 cp::ExecNode* scan;
757
758 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
759
760 ARROW_ASSIGN_OR_RAISE(scan,
761 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
762
763 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
764
765 std::string root_path = "";
766 std::string uri = "file://" + file_path;
767 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
768 arrow::fs::FileSystemFromUri(uri, &root_path));
769
770 auto base_path = root_path + "/parquet_dataset";
771 // Uncomment the following line, if run repeatedly
772 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
773 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
774
775 // The partition schema determines which fields are part of the partitioning.
776 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
777 // We'll use Hive-style partitioning,
778 // which creates directories with "key=value" pairs.
779
780 auto partitioning =
781 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
782 // We'll write Parquet files.
783 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
784
785 arrow::dataset::FileSystemDatasetWriteOptions write_options;
786 write_options.file_write_options = format->DefaultWriteOptions();
787 write_options.filesystem = filesystem;
788 write_options.base_dir = base_path;
789 write_options.partitioning = partitioning;
790 write_options.basename_template = "part{i}.parquet";
791
792 arrow::dataset::WriteNodeOptions write_node_options{write_options};
793
794 ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
795
796 ARROW_RETURN_NOT_OK(plan->Validate());
797 std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
798 // // // start the ExecPlan
799 ARROW_RETURN_NOT_OK(plan->StartProducing());
800 auto future = plan->finished();
801 ARROW_RETURN_NOT_OK(future.status());
802 future.Wait();
803 return arrow::Status::OK();
804}
union
¶
union
merges multiple data streams with the same schema into one, similar to
a SQL UNION ALL
clause.
The following example demonstrates how this can be achieved using two data sources.
Union example:
810/// \brief An example showing a union node
811/// \param exec_context The execution context to run the plan in
812///
813/// Source-Union-Sink
814/// This example shows how a union operation can be applied on two
815/// data sources. The output is obtained as a table via the sink
816/// node.
817arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
818 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
819
820 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
821 cp::ExecPlan::Make(&exec_context));
822 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
823
824 cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
825 cp::Declaration lhs{"source",
826 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
827 lhs.label = "lhs";
828 cp::Declaration rhs{"source",
829 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
830 rhs.label = "rhs";
831 union_node.inputs.emplace_back(lhs);
832 union_node.inputs.emplace_back(rhs);
833
834 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
835 ARROW_ASSIGN_OR_RAISE(
836 auto declr, cp::Declaration::Sequence({
837 union_node,
838 {"sink", cp::SinkNodeOptions{&sink_gen}},
839 })
840 .AddToPlan(plan.get()));
841
842 ARROW_RETURN_NOT_OK(declr->Validate());
843
844 ARROW_RETURN_NOT_OK(plan->Validate());
845 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
846}
hash_join
¶
hash_join
operation provides the relational algebra operation, join using hash-based
algorithm. arrow::compute::HashJoinNodeOptions
contains the options required in
defining a join. The hash_join supports
left/right/full semi/anti/outerjoins.
Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x”
which can be appended as a suffix for column names duplicated in both left and right
relations.) can be set via the the join options.
Read more on hash-joins.
Hash-Join example:
660/// \brief An example showing a hash join node
661/// \param exec_context The execution context to run the plan in
662///
663/// Source-HashJoin-Sink
664/// This example shows how source node gets the data and how a self-join
665/// is applied on the data. The join options are configurable. The output
666/// is obtained as a table via the sink node.
667arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
668 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
669 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
670 cp::ExecPlan::Make(&exec_context));
671
672 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
673
674 cp::ExecNode* left_source;
675 cp::ExecNode* right_source;
676 for (auto source : {&left_source, &right_source}) {
677 ARROW_ASSIGN_OR_RAISE(*source,
678 MakeExecNode("source", plan.get(), {},
679 cp::SourceNodeOptions{input.schema, input.gen()}));
680 }
681
682 cp::HashJoinNodeOptions join_opts{
683 cp::JoinType::INNER,
684 /*left_keys=*/{"str"},
685 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
686
687 ARROW_ASSIGN_OR_RAISE(
688 auto hashjoin,
689 cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
690
691 ARROW_RETURN_NOT_OK(
692 cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
693 // expected columns i32, str, l_str, r_str
694 auto schema = arrow::schema(
695 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
696 arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
697
698 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
699}
Summary¶
There are examples of these nodes which can be found in
cpp/examples/arrow/execution_plan_documentation_examples.cc
in the Arrow source.
Complete Example:
19#include <arrow/array.h>
20#include <arrow/builder.h>
21
22#include <arrow/compute/api.h>
23#include <arrow/compute/api_vector.h>
24#include <arrow/compute/cast.h>
25#include <arrow/compute/exec/exec_plan.h>
26
27#include <arrow/csv/api.h>
28
29#include <arrow/dataset/dataset.h>
30#include <arrow/dataset/file_base.h>
31#include <arrow/dataset/file_parquet.h>
32#include <arrow/dataset/plan.h>
33#include <arrow/dataset/scanner.h>
34
35#include <arrow/io/interfaces.h>
36#include <arrow/io/memory.h>
37
38#include <arrow/result.h>
39#include <arrow/status.h>
40#include <arrow/table.h>
41
42#include <arrow/ipc/api.h>
43
44#include <arrow/util/future.h>
45#include <arrow/util/range.h>
46#include <arrow/util/thread_pool.h>
47#include <arrow/util/vector.h>
48
49#include <iostream>
50#include <memory>
51#include <utility>
52
53// Demonstrate various operators in Arrow Streaming Execution Engine
54
55namespace cp = ::arrow::compute;
56
57constexpr char kSep[] = "******";
58
59void PrintBlock(const std::string& msg) {
60 std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
61}
62
63template <typename TYPE,
64 typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
65 arrow::is_boolean_type<TYPE>::value |
66 arrow::is_temporal_type<TYPE>::value>::type>
67arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
68 const std::vector<typename TYPE::c_type>& values) {
69 using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
70 ArrowBuilderType builder;
71 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
72 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
73 return builder.Finish();
74}
75
76template <class TYPE>
77arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
78 const std::vector<std::string>& values) {
79 using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
80 ArrowBuilderType builder;
81 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
82 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
83 return builder.Finish();
84}
85
86arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
87 const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
88 std::shared_ptr<arrow::RecordBatch> record_batch;
89 ARROW_ASSIGN_OR_RAISE(auto struct_result,
90 arrow::StructArray::Make(array_vector, field_vector));
91 return record_batch->FromStructArray(struct_result);
92}
93
94/// \brief Create a sample table
95/// The table's contents will be:
96/// a,b
97/// 1,null
98/// 2,true
99/// null,true
100/// 3,false
101/// null,true
102/// 4,false
103/// 5,null
104/// 6,false
105/// 7,false
106/// 8,true
107/// \return The created table
108
109arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
110 auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
111 ARROW_ASSIGN_OR_RAISE(auto int64_array,
112 GetArrayDataSample<arrow::Int64Type>(
113 {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
114
115 arrow::BooleanBuilder boolean_builder;
116 std::shared_ptr<arrow::BooleanArray> bool_array;
117
118 std::vector<uint8_t> bool_values = {false, true, true, false, true,
119 false, false, false, false, true};
120 std::vector<bool> is_valid = {false, true, true, true, true,
121 true, false, true, true, true};
122
123 ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
124
125 ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
126
127 ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
128
129 auto record_batch =
130 arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
131 arrow::field("b", arrow::boolean())}),
132 10, {int64_array, bool_array});
133 ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
134 return table;
135}
136
137/// \brief Create a sample dataset
138/// \return An in-memory dataset based on GetTable()
139arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
140 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
141 auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
142 return ds;
143}
144
145arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
146 const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
147 std::shared_ptr<arrow::RecordBatch> record_batch;
148 ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
149 cp::ExecBatch batch{*res_batch};
150 return batch;
151}
152
153// (Doc section: BatchesWithSchema Definition)
154struct BatchesWithSchema {
155 std::vector<cp::ExecBatch> batches;
156 std::shared_ptr<arrow::Schema> schema;
157 // This method uses internal arrow utilities to
158 // convert a vector of record batches to an AsyncGenerator of optional batches
159 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
160 auto opt_batches = ::arrow::internal::MapVector(
161 [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
162 batches);
163 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
164 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
165 return gen;
166 }
167};
168// (Doc section: BatchesWithSchema Definition)
169
170// (Doc section: MakeBasicBatches Definition)
171arrow::Result<BatchesWithSchema> MakeBasicBatches() {
172 BatchesWithSchema out;
173 auto field_vector = {arrow::field("a", arrow::int32()),
174 arrow::field("b", arrow::boolean())};
175 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
176 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
177 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
178
179 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
180 GetArrayDataSample<arrow::BooleanType>({false, true}));
181 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
182 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
183 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
184 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
185
186 ARROW_ASSIGN_OR_RAISE(auto b1,
187 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
188 ARROW_ASSIGN_OR_RAISE(auto b2,
189 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
190 ARROW_ASSIGN_OR_RAISE(auto b3,
191 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
192
193 out.batches = {b1, b2, b3};
194 out.schema = arrow::schema(field_vector);
195 return out;
196}
197// (Doc section: MakeBasicBatches Definition)
198
199arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
200 BatchesWithSchema out;
201 auto field = arrow::field("a", arrow::int32());
202 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
203 ARROW_ASSIGN_OR_RAISE(auto b2_int,
204 GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
205 ARROW_ASSIGN_OR_RAISE(auto b3_int,
206 GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
207 ARROW_ASSIGN_OR_RAISE(auto b4_int,
208 GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
209 ARROW_ASSIGN_OR_RAISE(auto b5_int,
210 GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
211 ARROW_ASSIGN_OR_RAISE(auto b6_int,
212 GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
213 ARROW_ASSIGN_OR_RAISE(auto b7_int,
214 GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
215 ARROW_ASSIGN_OR_RAISE(auto b8_int,
216 GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
217
218 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
219 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
220 ARROW_ASSIGN_OR_RAISE(auto b3,
221 GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
222 ARROW_ASSIGN_OR_RAISE(auto b4,
223 GetExecBatchFromVectors({field, field, field, field},
224 {b4_int, b5_int, b6_int, b7_int}));
225 out.batches = {b1, b2, b3, b4};
226 out.schema = arrow::schema({field});
227 return out;
228}
229
230arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
231 BatchesWithSchema out;
232 auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
233 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
234 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
235 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
236 ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
237 {"alpha", "beta", "alpha"}));
238 ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
239 {"alpha", "gamma", "alpha"}));
240 ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
241 {"gamma", "beta", "alpha"}));
242 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
243 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
244 ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
245 out.batches = {b1, b2, b3};
246
247 size_t batch_count = out.batches.size();
248 for (int repeat = 1; repeat < multiplicity; ++repeat) {
249 for (size_t i = 0; i < batch_count; ++i) {
250 out.batches.push_back(out.batches[i]);
251 }
252 }
253
254 out.schema = arrow::schema(fields);
255 return out;
256}
257
258arrow::Status ExecutePlanAndCollectAsTable(
259 cp::ExecContext& exec_context, std::shared_ptr<cp::ExecPlan> plan,
260 std::shared_ptr<arrow::Schema> schema,
261 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
262 // translate sink_gen (async) to sink_reader (sync)
263 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
264 cp::MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
265
266 // validate the ExecPlan
267 ARROW_RETURN_NOT_OK(plan->Validate());
268 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
269 // start the ExecPlan
270 ARROW_RETURN_NOT_OK(plan->StartProducing());
271
272 // collect sink_reader into a Table
273 std::shared_ptr<arrow::Table> response_table;
274
275 ARROW_ASSIGN_OR_RAISE(response_table,
276 arrow::Table::FromRecordBatchReader(sink_reader.get()));
277
278 std::cout << "Results : " << response_table->ToString() << std::endl;
279
280 // stop producing
281 plan->StopProducing();
282 // plan mark finished
283 auto future = plan->finished();
284 return future.status();
285}
286
287// (Doc section: Scan Example)
288
289/// \brief An example demonstrating a scan and sink node
290/// \param exec_context The execution context to run the plan in
291///
292/// Scan-Sink
293/// This example shows how scan operation can be applied on a dataset.
294/// There are operations that can be applied on the scan (project, filter)
295/// and the input data can be processed. The output is obtained as a table
296/// via the sink node.
297arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
298 // Execution plan created
299 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
300 cp::ExecPlan::Make(&exec_context));
301
302 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
303
304 auto options = std::make_shared<arrow::dataset::ScanOptions>();
305 options->projection = cp::project({}, {}); // create empty projection
306
307 // construct the scan node
308 cp::ExecNode* scan;
309 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
310
311 ARROW_ASSIGN_OR_RAISE(scan,
312 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
313
314 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
315
316 ARROW_RETURN_NOT_OK(
317 cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
318
319 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
320}
321// (Doc section: Scan Example)
322
323// (Doc section: Source Example)
324
325/// \brief An example demonstrating a source and sink node
326/// \param exec_context The execution context to run the plan in
327///
328/// Source-Sink Example
329/// This example shows how a source and sink can be used
330/// in an execution plan. This includes source node receiving data
331/// and the sink node emits the data as an output represented in
332/// a table.
333arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
334 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
335 cp::ExecPlan::Make(&exec_context));
336
337 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
338
339 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
340
341 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
342
343 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
344 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
345
346 ARROW_RETURN_NOT_OK(
347 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
348
349 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
350}
351// (Doc section: Source Example)
352
353// (Doc section: Table Source Example)
354
355/// \brief An example showing a table source node
356/// \param exec_context The execution context to run the plan in
357///
358/// TableSource-Sink Example
359/// This example shows how a table_source and sink can be used
360/// in an execution plan. This includes a table source node
361/// receiving data from a table and the sink node emits
362/// the data to a generator which we collect into a table.
363arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
364 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
365 cp::ExecPlan::Make(&exec_context));
366
367 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
368
369 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
370 int max_batch_size = 2;
371 auto table_source_options = cp::TableSourceNodeOptions{table, max_batch_size};
372
373 ARROW_ASSIGN_OR_RAISE(
374 cp::ExecNode * source,
375 cp::MakeExecNode("table_source", plan.get(), {}, table_source_options));
376
377 ARROW_RETURN_NOT_OK(
378 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
379
380 return ExecutePlanAndCollectAsTable(exec_context, plan, table->schema(), sink_gen);
381}
382// (Doc section: Table Source Example)
383
384// (Doc section: Filter Example)
385
386/// \brief An example showing a filter node
387/// \param exec_context The execution context to run the plan in
388///
389/// Source-Filter-Sink
390/// This example shows how a filter can be used in an execution plan,
391/// along with the source and sink operations. The output from the
392/// exeuction plan is obtained as a table via the sink node.
393arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
394 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
395 cp::ExecPlan::Make(&exec_context));
396
397 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
398
399 auto options = std::make_shared<arrow::dataset::ScanOptions>();
400 // specify the filter. This filter removes all rows where the
401 // value of the "a" column is greater than 3.
402 cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
403 // set filter for scanner : on-disk / push-down filtering.
404 // This step can be skipped if you are not reading from disk.
405 options->filter = filter_opt;
406 // empty projection
407 options->projection = cp::project({}, {});
408
409 // construct the scan node
410 std::cout << "Initialized Scanning Options" << std::endl;
411
412 cp::ExecNode* scan;
413
414 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
415 std::cout << "Scan node options created" << std::endl;
416
417 ARROW_ASSIGN_OR_RAISE(scan,
418 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
419
420 // pipe the scan node into the filter node
421 // Need to set the filter in scan node options and filter node options.
422 // At scan node it is used for on-disk / push-down filtering.
423 // At filter node it is used for in-memory filtering.
424 cp::ExecNode* filter;
425 ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
426 cp::FilterNodeOptions{filter_opt}));
427
428 // finally, pipe the filter node into a sink node
429 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
430 ARROW_RETURN_NOT_OK(
431 cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
432
433 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
434}
435
436// (Doc section: Filter Example)
437
438// (Doc section: Project Example)
439
440/// \brief An example showing a project node
441/// \param exec_context The execution context to run the plan in
442///
443/// Scan-Project-Sink
444/// This example shows how Scan operation can be used to load the data
445/// into the execution plan, how project operation can be applied on the
446/// data stream and how the output is obtained as a table via the sink node.
447arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
448 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
449 cp::ExecPlan::Make(&exec_context));
450
451 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
452
453 auto options = std::make_shared<arrow::dataset::ScanOptions>();
454 // projection
455 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
456 options->projection = cp::project({}, {});
457
458 cp::ExecNode* scan;
459
460 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
461
462 ARROW_ASSIGN_OR_RAISE(scan,
463 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
464
465 cp::ExecNode* project;
466 ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
467 cp::ProjectNodeOptions{{a_times_2}}));
468 // schema after projection => multiply(a, 2): int64
469 std::cout << "Schema after projection : \n"
470 << project->output_schema()->ToString() << std::endl;
471
472 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
473 ARROW_RETURN_NOT_OK(
474 cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
475 auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
476
477 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
478}
479
480// (Doc section: Project Example)
481
482// (Doc section: Scalar Aggregate Example)
483
484/// \brief An example showing an aggregation node to aggregate an entire table
485/// \param exec_context The execution context to run the plan in
486///
487/// Source-Aggregation-Sink
488/// This example shows how an aggregation operation can be applied on a
489/// execution plan resulting a scalar output. The source node loads the
490/// data and the aggregation (counting unique types in column 'a')
491/// is applied on this data. The output is obtained from the sink node as a table.
492arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
493 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
494 cp::ExecPlan::Make(&exec_context));
495
496 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
497
498 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
499
500 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
501
502 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
503 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
504 auto aggregate_options =
505 cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
506 ARROW_ASSIGN_OR_RAISE(
507 cp::ExecNode * aggregate,
508 cp::MakeExecNode("aggregate", plan.get(), {source}, std::move(aggregate_options)));
509
510 ARROW_RETURN_NOT_OK(
511 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
512 auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
513
514 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
515}
516// (Doc section: Scalar Aggregate Example)
517
518// (Doc section: Group Aggregate Example)
519
520/// \brief An example showing an aggregation node to perform a group-by operation
521/// \param exec_context The execution context to run the plan in
522///
523/// Source-Aggregation-Sink
524/// This example shows how an aggregation operation can be applied on a
525/// execution plan resulting a grouped output. The source node loads the
526/// data and the aggregation (counting unique types in column 'a') is
527/// applied on this data. The output is obtained from the sink node as a table.
528arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
529 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
530 cp::ExecPlan::Make(&exec_context));
531
532 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
533
534 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
535
536 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
537
538 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
539 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
540 auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
541 auto aggregate_options =
542 cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
543 /*keys=*/{"b"}};
544 ARROW_ASSIGN_OR_RAISE(
545 cp::ExecNode * aggregate,
546 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
547
548 ARROW_RETURN_NOT_OK(
549 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
550 auto schema = arrow::schema({
551 arrow::field("count(a)", arrow::int32()),
552 arrow::field("b", arrow::boolean()),
553 });
554
555 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
556}
557// (Doc section: Group Aggregate Example)
558
559// (Doc section: ConsumingSink Example)
560
561/// \brief An example showing a consuming sink node
562/// \param exec_context The execution context to run the plan in
563///
564/// Source-Consuming-Sink
565/// This example shows how the data can be consumed within the execution plan
566/// by using a ConsumingSink node. There is no data output from this execution plan.
567arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
568 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
569 cp::ExecPlan::Make(&exec_context));
570
571 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
572
573 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
574
575 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
576 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
577
578 std::atomic<uint32_t> batches_seen{0};
579 arrow::Future<> finish = arrow::Future<>::Make();
580 struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
581 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
582 : batches_seen(batches_seen), finish(std::move(finish)) {}
583
584 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
585 cp::BackpressureControl* backpressure_control) override {
586 return arrow::Status::OK();
587 }
588
589 arrow::Status Consume(cp::ExecBatch batch) override {
590 (*batches_seen)++;
591 return arrow::Status::OK();
592 }
593
594 arrow::Future<> Finish() override { return finish; }
595
596 std::atomic<uint32_t>* batches_seen;
597 arrow::Future<> finish;
598 };
599 std::shared_ptr<CustomSinkNodeConsumer> consumer =
600 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
601
602 cp::ExecNode* consuming_sink;
603
604 ARROW_ASSIGN_OR_RAISE(consuming_sink,
605 MakeExecNode("consuming_sink", plan.get(), {source},
606 cp::ConsumingSinkNodeOptions(consumer)));
607
608 ARROW_RETURN_NOT_OK(consuming_sink->Validate());
609
610 ARROW_RETURN_NOT_OK(plan->Validate());
611 std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
612 // plan start producing
613 ARROW_RETURN_NOT_OK(plan->StartProducing());
614 // Source should finish fairly quickly
615 ARROW_RETURN_NOT_OK(source->finished().status());
616 std::cout << "Source Finished!" << std::endl;
617 // Mark consumption complete, plan should finish
618 finish.MarkFinished(arrow::Status::OK());
619 ARROW_RETURN_NOT_OK(plan->finished().status());
620 return arrow::Status::OK();
621}
622// (Doc section: ConsumingSink Example)
623
624// (Doc section: OrderBySink Example)
625
626/// \brief An example showing an order-by node
627/// \param exec_context The execution context to run the plan in
628///
629/// Source-OrderBy-Sink
630/// In this example, the data enters through the source node
631/// and the data is ordered in the sink node. The order can be
632/// ASCENDING or DESCENDING and it is configurable. The output
633/// is obtained as a table from the sink node.
634arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
635 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
636 cp::ExecPlan::Make(&exec_context));
637
638 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
639
640 std::cout << "basic data created" << std::endl;
641
642 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
643
644 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
645 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
646 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
647
648 ARROW_RETURN_NOT_OK(cp::MakeExecNode(
649 "order_by_sink", plan.get(), {source},
650 cp::OrderBySinkNodeOptions{
651 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
652
653 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
654}
655
656// (Doc section: OrderBySink Example)
657
658// (Doc section: HashJoin Example)
659
660/// \brief An example showing a hash join node
661/// \param exec_context The execution context to run the plan in
662///
663/// Source-HashJoin-Sink
664/// This example shows how source node gets the data and how a self-join
665/// is applied on the data. The join options are configurable. The output
666/// is obtained as a table via the sink node.
667arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
668 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
669 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
670 cp::ExecPlan::Make(&exec_context));
671
672 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
673
674 cp::ExecNode* left_source;
675 cp::ExecNode* right_source;
676 for (auto source : {&left_source, &right_source}) {
677 ARROW_ASSIGN_OR_RAISE(*source,
678 MakeExecNode("source", plan.get(), {},
679 cp::SourceNodeOptions{input.schema, input.gen()}));
680 }
681
682 cp::HashJoinNodeOptions join_opts{
683 cp::JoinType::INNER,
684 /*left_keys=*/{"str"},
685 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
686
687 ARROW_ASSIGN_OR_RAISE(
688 auto hashjoin,
689 cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
690
691 ARROW_RETURN_NOT_OK(
692 cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
693 // expected columns i32, str, l_str, r_str
694 auto schema = arrow::schema(
695 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
696 arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
697
698 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
699}
700
701// (Doc section: HashJoin Example)
702
703// (Doc section: KSelect Example)
704
705/// \brief An example showing a select-k node
706/// \param exec_context The execution context to run the plan in
707///
708/// Source-KSelect
709/// This example shows how K number of elements can be selected
710/// either from the top or bottom. The output node is a modified
711/// sink node where output can be obtained as a table.
712arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
713 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
714 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
715 cp::ExecPlan::Make(&exec_context));
716 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
717
718 ARROW_ASSIGN_OR_RAISE(
719 cp::ExecNode * source,
720 cp::MakeExecNode("source", plan.get(), {},
721 cp::SourceNodeOptions{input.schema, input.gen()}));
722
723 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
724
725 ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
726 cp::SelectKSinkNodeOptions{options, &sink_gen}));
727
728 auto schema = arrow::schema(
729 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
730
731 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
732}
733
734// (Doc section: KSelect Example)
735
736// (Doc section: Write Example)
737
738/// \brief An example showing a write node
739/// \param exec_context The execution context to run the plan in
740/// \param file_path The destination to write to
741///
742/// Scan-Filter-Write
743/// This example shows how scan node can be used to load the data
744/// and after processing how it can be written to disk.
745arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
746 const std::string& file_path) {
747 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
748 cp::ExecPlan::Make(&exec_context));
749
750 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
751
752 auto options = std::make_shared<arrow::dataset::ScanOptions>();
753 // empty projection
754 options->projection = cp::project({}, {});
755
756 cp::ExecNode* scan;
757
758 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
759
760 ARROW_ASSIGN_OR_RAISE(scan,
761 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
762
763 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
764
765 std::string root_path = "";
766 std::string uri = "file://" + file_path;
767 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
768 arrow::fs::FileSystemFromUri(uri, &root_path));
769
770 auto base_path = root_path + "/parquet_dataset";
771 // Uncomment the following line, if run repeatedly
772 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
773 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
774
775 // The partition schema determines which fields are part of the partitioning.
776 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
777 // We'll use Hive-style partitioning,
778 // which creates directories with "key=value" pairs.
779
780 auto partitioning =
781 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
782 // We'll write Parquet files.
783 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
784
785 arrow::dataset::FileSystemDatasetWriteOptions write_options;
786 write_options.file_write_options = format->DefaultWriteOptions();
787 write_options.filesystem = filesystem;
788 write_options.base_dir = base_path;
789 write_options.partitioning = partitioning;
790 write_options.basename_template = "part{i}.parquet";
791
792 arrow::dataset::WriteNodeOptions write_node_options{write_options};
793
794 ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
795
796 ARROW_RETURN_NOT_OK(plan->Validate());
797 std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
798 // // // start the ExecPlan
799 ARROW_RETURN_NOT_OK(plan->StartProducing());
800 auto future = plan->finished();
801 ARROW_RETURN_NOT_OK(future.status());
802 future.Wait();
803 return arrow::Status::OK();
804}
805
806// (Doc section: Write Example)
807
808// (Doc section: Union Example)
809
810/// \brief An example showing a union node
811/// \param exec_context The execution context to run the plan in
812///
813/// Source-Union-Sink
814/// This example shows how a union operation can be applied on two
815/// data sources. The output is obtained as a table via the sink
816/// node.
817arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
818 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
819
820 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
821 cp::ExecPlan::Make(&exec_context));
822 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
823
824 cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
825 cp::Declaration lhs{"source",
826 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
827 lhs.label = "lhs";
828 cp::Declaration rhs{"source",
829 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
830 rhs.label = "rhs";
831 union_node.inputs.emplace_back(lhs);
832 union_node.inputs.emplace_back(rhs);
833
834 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
835 ARROW_ASSIGN_OR_RAISE(
836 auto declr, cp::Declaration::Sequence({
837 union_node,
838 {"sink", cp::SinkNodeOptions{&sink_gen}},
839 })
840 .AddToPlan(plan.get()));
841
842 ARROW_RETURN_NOT_OK(declr->Validate());
843
844 ARROW_RETURN_NOT_OK(plan->Validate());
845 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
846}
847
848// (Doc section: Union Example)
849
850// (Doc section: Table Sink Example)
851
852/// \brief An example showing a table sink node
853/// \param exec_context The execution context to run the plan in
854///
855/// TableSink Example
856/// This example shows how a table_sink can be used
857/// in an execution plan. This includes a source node
858/// receiving data as batches and the table sink node
859/// which emits the output as a table.
860arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
861 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
862 cp::ExecPlan::Make(&exec_context));
863
864 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
865
866 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
867
868 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
869 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
870
871 std::shared_ptr<arrow::Table> output_table;
872 auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
873
874 ARROW_RETURN_NOT_OK(
875 cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
876 // validate the ExecPlan
877 ARROW_RETURN_NOT_OK(plan->Validate());
878 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
879 // start the ExecPlan
880 ARROW_RETURN_NOT_OK(plan->StartProducing());
881
882 // Wait for the plan to finish
883 auto finished = plan->finished();
884 RETURN_NOT_OK(finished.status());
885 std::cout << "Results : " << output_table->ToString() << std::endl;
886 return arrow::Status::OK();
887}
888// (Doc section: Table Sink Example)
889
890enum ExampleMode {
891 SOURCE_SINK = 0,
892 TABLE_SOURCE_SINK = 1,
893 SCAN = 2,
894 FILTER = 3,
895 PROJECT = 4,
896 SCALAR_AGGREGATION = 5,
897 GROUP_AGGREGATION = 6,
898 CONSUMING_SINK = 7,
899 ORDER_BY_SINK = 8,
900 HASHJOIN = 9,
901 KSELECT = 10,
902 WRITE = 11,
903 UNION = 12,
904 TABLE_SOURCE_TABLE_SINK = 13
905};
906
907int main(int argc, char** argv) {
908 if (argc < 2) {
909 // Fake success for CI purposes.
910 return EXIT_SUCCESS;
911 }
912
913 std::string base_save_path = argv[1];
914 int mode = std::atoi(argv[2]);
915 arrow::Status status;
916 // ensure arrow::dataset node factories are in the registry
917 arrow::dataset::internal::Initialize();
918 // execution context
919 cp::ExecContext exec_context;
920 switch (mode) {
921 case SOURCE_SINK:
922 PrintBlock("Source Sink Example");
923 status = SourceSinkExample(exec_context);
924 break;
925 case TABLE_SOURCE_SINK:
926 PrintBlock("Table Source Sink Example");
927 status = TableSourceSinkExample(exec_context);
928 break;
929 case SCAN:
930 PrintBlock("Scan Example");
931 status = ScanSinkExample(exec_context);
932 break;
933 case FILTER:
934 PrintBlock("Filter Example");
935 status = ScanFilterSinkExample(exec_context);
936 break;
937 case PROJECT:
938 PrintBlock("Project Example");
939 status = ScanProjectSinkExample(exec_context);
940 break;
941 case GROUP_AGGREGATION:
942 PrintBlock("Aggregate Example");
943 status = SourceGroupAggregateSinkExample(exec_context);
944 break;
945 case SCALAR_AGGREGATION:
946 PrintBlock("Aggregate Example");
947 status = SourceScalarAggregateSinkExample(exec_context);
948 break;
949 case CONSUMING_SINK:
950 PrintBlock("Consuming-Sink Example");
951 status = SourceConsumingSinkExample(exec_context);
952 break;
953 case ORDER_BY_SINK:
954 PrintBlock("OrderBy Example");
955 status = SourceOrderBySinkExample(exec_context);
956 break;
957 case HASHJOIN:
958 PrintBlock("HashJoin Example");
959 status = SourceHashJoinSinkExample(exec_context);
960 break;
961 case KSELECT:
962 PrintBlock("KSelect Example");
963 status = SourceKSelectExample(exec_context);
964 break;
965 case WRITE:
966 PrintBlock("Write Example");
967 status = ScanFilterWriteExample(exec_context, base_save_path);
968 break;
969 case UNION:
970 PrintBlock("Union Example");
971 status = SourceUnionSinkExample(exec_context);
972 break;
973 case TABLE_SOURCE_TABLE_SINK:
974 PrintBlock("TableSink Example");
975 status = TableSinkExample(exec_context);
976 break;
977 default:
978 break;
979 }
980
981 if (status.ok()) {
982 return EXIT_SUCCESS;
983 } else {
984 std::cout << "Error occurred: " << status.message() << std::endl;
985 return EXIT_FAILURE;
986 }
987}