Streaming execution engine#
Warning
The streaming execution engine is experimental, and a stable API is not yet guaranteed.
Motivation#
For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, Arrow also provides a streaming query engine with which computations can be formulated and executed.
ExecNode
is provided to reify the graph of operations in a query.
Batches of data (ExecBatch
) flow along edges of the graph from
node to node. Structuring the API around streams of batches allows the
working set for each node to be tuned for optimal performance independent
of any other nodes in the graph. Each ExecNode
processes batches
as they are pushed to it along an edge of the graph by upstream nodes
(its inputs), and pushes batches along an edge of the graph to downstream
nodes (its outputs) as they are finalized.
Overview#
ExecNode
Each node in the graph is an implementation of the
ExecNode
interface.ExecPlan
A set of
ExecNode
is contained and (to an extent) coordinated by anExecPlan
.ExecFactoryRegistry
Instances of
ExecNode
are constructed by factory functions held in aExecFactoryRegistry
.ExecNodeOptions
Heterogenous parameters for factories of
ExecNode
are bundled in anExecNodeOptions
.Declaration
dplyr
-inspired helper for efficient construction of anExecPlan
.ExecBatch
A lightweight container for a single chunk of data in the Arrow format. In contrast to
RecordBatch
,ExecBatch
is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by aScalar
instead of anArray
. In addition,ExecBatch
may carry execution-relevant properties including a guaranteed-true-filter forExpression
simplification.
An example ExecNode
implementation which simply passes all input batches
through unchanged:
class PassthruNode : public ExecNode {
public:
// InputReceived is the main entry point for ExecNodes. It is invoked
// by an input of this node to push a batch here for processing.
void InputReceived(ExecNode* input, ExecBatch batch) override {
// Since this is a passthru node we simply push the batch to our
// only output here.
outputs_[0]->InputReceived(this, batch);
}
// ErrorReceived is called by an input of this node to report an error.
// ExecNodes should always forward errors to their outputs unless they
// are able to fully handle the error (this is rare).
void ErrorReceived(ExecNode* input, Status error) override {
outputs_[0]->ErrorReceived(this, error);
}
// InputFinished is used to signal how many batches will ultimately arrive.
// It may be called with any ordering relative to InputReceived/ErrorReceived.
void InputFinished(ExecNode* input, int total_batches) override {
outputs_[0]->InputFinished(this, total_batches);
}
// ExecNodes may request that their inputs throttle production of batches
// until they are ready for more, or stop production if no further batches
// are required. These signals should typically be forwarded to the inputs
// of the ExecNode.
void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }
// An ExecNode has a single output schema to which all its batches conform.
using ExecNode::output_schema;
// ExecNodes carry basic introspection for debugging purposes
const char* kind_name() const override { return "PassthruNode"; }
using ExecNode::label;
using ExecNode::SetLabel;
using ExecNode::ToString;
// An ExecNode holds references to its inputs and outputs, so it is possible
// to walk the graph of execution if necessary.
using ExecNode::inputs;
using ExecNode::outputs;
// StartProducing() and StopProducing() are invoked by an ExecPlan to
// coordinate the graph-wide execution state. These do not need to be
// forwarded to inputs or outputs.
Status StartProducing() override { return Status::OK(); }
void StopProducing() override {}
Future<> finished() override { return inputs_[0]->finished(); }
};
Note that each method which is associated with an edge of the graph must be invoked
with an ExecNode*
to identify the node which invoked it. For example, in an
ExecNode
which implements JOIN
this tagging might be used to differentiate
between batches from the left or right inputs.
InputReceived
, ErrorReceived
, InputFinished
may only be invoked by
the inputs of a node, while ResumeProducing
, PauseProducing
, StopProducing
may only be invoked by outputs of a node.
ExecPlan
contains the associated instances of ExecNode
and is used to start and stop execution of all nodes and for querying/awaiting
their completion:
// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));
// ... add nodes to your ExecPlan
// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());
SetUserCancellationCallback([plan] {
// stop all nodes in the graph
plan->StopProducing();
});
// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();
Constructing ExecPlan
objects#
Warning
The following will be superceded by construction from Compute IR, see ARROW-14074.
None of the concrete implementations of ExecNode
are exposed
in headers, so they can’t be constructed directly outside the
translation unit where they are defined. Instead, factories to
create them are provided in an extensible registry. This structure
provides a number of benefits:
This enforces consistent construction.
It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)
This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate
libarrow_dataset.so
library.Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.
Factories of ExecNode
can be retrieved by name from the registry.
The default registry is available through
arrow::compute::default_exec_factory_registry()
and can be queried for the built-in factories:
// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
default_exec_factory_registry()->GetFactory("filter"));
// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
// the ExecPlan which should own this node
plan.get(),
// nodes which will send batches to this node (inputs)
{scan_node},
// parameters unique to "filter" nodes
FilterNodeOptions{filter_expression}));
// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
plan.get(), {scan_node}, FilterNodeOptions{filter_expression});
Factories can also be added to the default registry as long as they are
convertible to std::function<Result<ExecNode*>(
ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>
.
To build an ExecPlan
representing a simple pipeline which
reads from a RecordBatchReader
then filters, projects, and
writes to disk:
std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
SourceNodeOptions::FromReader(
reader,
GetCpuThreadPool()));
ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
FilterNodeOptions{
greater(field_ref("score"), literal(3))
});
ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
ProjectNodeOptions{
{add(field_ref("score"), literal(1))},
{"score + 1"}
});
arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});
Declaration
is a dplyr-inspired
helper which further decreases the boilerplate associated with populating
an ExecPlan
from C++:
arrow::dataset::internal::Initialize();
std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
{
{"source", SourceNodeOptions::FromReader(
reader,
GetCpuThreadPool())},
{"filter", FilterNodeOptions{
greater(field_ref("score"), literal(3))}},
{"project", ProjectNodeOptions{
{add(field_ref("score"), literal(1))},
{"score + 1"}}},
{"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
})
.AddToPlan(plan.get()));
Note that a source node can wrap anything which resembles a stream of batches.
For example, PR#11032 adds
support for use of a DuckDB query as a source node.
Similarly, a sink node can wrap anything which absorbs a stream of batches.
In the example above we’re writing completed
batches to disk. However we can also collect these in memory into a Table
or forward them to a RecordBatchReader
as an out-of-graph stream.
This flexibility allows an ExecPlan
to be used as streaming middleware
between any endpoints which support Arrow formatted batches.
An arrow::dataset::Dataset
can also be wrapped as a source node which
pushes all the dataset’s batches into an ExecPlan
. This factory is added
to the default registry with the name "scan"
by calling
arrow::dataset::internal::Initialize()
:
arrow::dataset::internal::Initialize();
std::shared_ptr<Dataset> dataset = GetDataset();
ASSERT_OK(Declaration::Sequence(
{
{"scan", ScanNodeOptions{dataset,
/* push down predicate, projection, ... */}},
{"filter", FilterNodeOptions{/* ... */}},
// ...
})
.AddToPlan(plan.get()));
Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.
Constructing ExecNode
using Options#
ExecNode
is the component we use as a building block
containing in-built operations with various functionalities.
This is the list of operations associated with the execution plan:
Operation |
Options |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
N/A |
|
|
source
#
A source
operation can be considered as an entry point to create a streaming execution plan.
arrow::compute::SourceNodeOptions
are used to create the source
operation. The
source
operation is the most generic and flexible type of source currently available but it can
be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.
The source node requires some kind of function that can be called to poll for more data. This
function should take no arguments and should return an
arrow::Future<std::shared_ptr<arrow::util::optional<arrow::RecordBatch>>>
.
This function might be reading a file, iterating through an in memory structure, or receiving data
from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator
and there are a number of utilities for working with these functions. For this example we use
a vector of record batches that we’ve already stored in memory.
In addition, the schema of the data must be known up front. Arrow’s streaming execution
engine must know the schema of the data at each stage of the execution graph before any
processing has begun. This means we must supply the schema for a source node separately
from the data itself.
Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :
161struct BatchesWithSchema {
162 std::vector<cp::ExecBatch> batches;
163 std::shared_ptr<arrow::Schema> schema;
164 // This method uses internal arrow utilities to
165 // convert a vector of record batches to an AsyncGenerator of optional batches
166 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
167 auto opt_batches = ::arrow::internal::MapVector(
168 [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
169 batches);
170 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
171 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
172 return gen;
173 }
174};
Generating sample batches for computation:
178arrow::Result<BatchesWithSchema> MakeBasicBatches() {
179 BatchesWithSchema out;
180 auto field_vector = {arrow::field("a", arrow::int32()),
181 arrow::field("b", arrow::boolean())};
182 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
183 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
184 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
185
186 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
187 GetArrayDataSample<arrow::BooleanType>({false, true}));
188 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
189 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
190 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
191 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
192
193 ARROW_ASSIGN_OR_RAISE(auto b1,
194 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
195 ARROW_ASSIGN_OR_RAISE(auto b2,
196 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
197 ARROW_ASSIGN_OR_RAISE(auto b3,
198 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
199
200 out.batches = {b1, b2, b3};
201 out.schema = arrow::schema(field_vector);
202 return out;
203}
Example of using source
(usage of sink is explained in detail in sink):
331/// \brief An example demonstrating a source and sink node
332/// \param exec_context The execution context to run the plan in
333///
334/// Source-Sink Example
335/// This example shows how a source and sink can be used
336/// in an execution plan. This includes source node receiving data
337/// and the sink node emits the data as an output represented in
338/// a table.
339arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
340 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
341 cp::ExecPlan::Make(&exec_context));
342
343 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
344
345 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
346
347 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
348
349 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
350 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
351
352 ARROW_RETURN_NOT_OK(
353 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
354
355 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
356}
table_source
#
In the previous example, source node, a source node
was used to input the data. But when developing an application, if the data is already in memory
as a table, it is much easier, and more performant to use arrow::compute::TableSourceNodeOptions
.
Here the input data can be passed as a std::shared_ptr<arrow::Table>
along with a max_batch_size
.
The max_batch_size
is to break up large record batches so that they can be processed in parallel.
It is important to note that the table batches will not get merged to form larger batches when the source
table has a smaller batch size.
Example of using table_source
361/// \brief An example showing a table source node
362/// \param exec_context The execution context to run the plan in
363///
364/// TableSource-Sink Example
365/// This example shows how a table_source and sink can be used
366/// in an execution plan. This includes a table source node
367/// receiving data from a table and the sink node emits
368/// the data to a generator which we collect into a table.
369arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
370 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
371 cp::ExecPlan::Make(&exec_context));
372
373 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
374
375 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
376 int max_batch_size = 2;
377 auto table_source_options = cp::TableSourceNodeOptions{table, max_batch_size};
378
379 ARROW_ASSIGN_OR_RAISE(
380 cp::ExecNode * source,
381 cp::MakeExecNode("table_source", plan.get(), {}, table_source_options));
382
383 ARROW_RETURN_NOT_OK(
384 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
385
386 return ExecutePlanAndCollectAsTable(exec_context, plan, table->schema(), sink_gen);
387}
filter
#
filter
operation, as the name suggests, provides an option to define data filtering
criteria. It selects rows matching a given expression. Filters can be written using
arrow::compute::Expression
. For example, if we wish to keep rows where the value
of column b
is greater than 3, then we can use the following expression.
Filter example:
392/// \brief An example showing a filter node
393/// \param exec_context The execution context to run the plan in
394///
395/// Source-Filter-Sink
396/// This example shows how a filter can be used in an execution plan,
397/// along with the source and sink operations. The output from the
398/// exeuction plan is obtained as a table via the sink node.
399arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
400 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
401 cp::ExecPlan::Make(&exec_context));
402
403 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
404
405 auto options = std::make_shared<arrow::dataset::ScanOptions>();
406 // specify the filter. This filter removes all rows where the
407 // value of the "a" column is greater than 3.
408 cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
409 // set filter for scanner : on-disk / push-down filtering.
410 // This step can be skipped if you are not reading from disk.
411 options->filter = filter_opt;
412 // empty projection
413 options->projection = cp::project({}, {});
414
415 // construct the scan node
416 std::cout << "Initialized Scanning Options" << std::endl;
417
418 cp::ExecNode* scan;
419
420 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
421 std::cout << "Scan node options created" << std::endl;
422
423 ARROW_ASSIGN_OR_RAISE(scan,
424 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
425
426 // pipe the scan node into the filter node
427 // Need to set the filter in scan node options and filter node options.
428 // At scan node it is used for on-disk / push-down filtering.
429 // At filter node it is used for in-memory filtering.
430 cp::ExecNode* filter;
431 ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
432 cp::FilterNodeOptions{filter_opt}));
433
434 // finally, pipe the filter node into a sink node
435 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
436 ARROW_RETURN_NOT_OK(
437 cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
438
439 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
440}
project
#
project
operation rearranges, deletes, transforms, and creates columns.
Each output column is computed by evaluating an expression
against the source record batch. This is exposed via
arrow::compute::ProjectNodeOptions
which requires,
an arrow::compute::Expression
and name for each of the output columns (if names are not
provided, the string representations of exprs will be used).
Project example:
446/// \brief An example showing a project node
447/// \param exec_context The execution context to run the plan in
448///
449/// Scan-Project-Sink
450/// This example shows how Scan operation can be used to load the data
451/// into the execution plan, how project operation can be applied on the
452/// data stream and how the output is obtained as a table via the sink node.
453arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
454 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
455 cp::ExecPlan::Make(&exec_context));
456
457 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
458
459 auto options = std::make_shared<arrow::dataset::ScanOptions>();
460 // projection
461 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
462 options->projection = cp::project({}, {});
463
464 cp::ExecNode* scan;
465
466 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
467
468 ARROW_ASSIGN_OR_RAISE(scan,
469 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
470
471 cp::ExecNode* project;
472 ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
473 cp::ProjectNodeOptions{{a_times_2}}));
474 // schema after projection => multiply(a, 2): int64
475 std::cout << "Schema after projection : \n"
476 << project->output_schema()->ToString() << std::endl;
477
478 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
479 ARROW_RETURN_NOT_OK(
480 cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
481 auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
482
483 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
484}
aggregate
#
The aggregate
node computes various types of aggregates over data.
Arrow supports two types of aggregates: “scalar” aggregates, and
“hash” aggregates. Scalar aggregates reduce an array or scalar input
to a single scalar output (e.g. computing the mean of a column). Hash
aggregates act like GROUP BY
in SQL and first partition data based
on one or more key columns, then reduce the data in each
partition. The aggregate
node supports both types of computation,
and can compute any number of aggregations at once.
arrow::compute::AggregateNodeOptions
is used to define the
aggregation criteria. It takes a list of aggregation functions and
their options; a list of target fields to aggregate, one per function;
and a list of names for the output fields, one per function.
Optionally, it takes a list of columns that are used to partition the
data, in the case of a hash aggregation. The aggregation functions
can be selected from this list of aggregation functions.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.
Scalar Aggregation example:
490/// \brief An example showing an aggregation node to aggregate an entire table
491/// \param exec_context The execution context to run the plan in
492///
493/// Source-Aggregation-Sink
494/// This example shows how an aggregation operation can be applied on a
495/// execution plan resulting a scalar output. The source node loads the
496/// data and the aggregation (counting unique types in column 'a')
497/// is applied on this data. The output is obtained from the sink node as a table.
498arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
499 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
500 cp::ExecPlan::Make(&exec_context));
501
502 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
503
504 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
505
506 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
507
508 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
509 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
510 auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
511 /*targets=*/{"a"},
512 /*names=*/{"sum(a)"}};
513 ARROW_ASSIGN_OR_RAISE(
514 cp::ExecNode * aggregate,
515 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
516
517 ARROW_RETURN_NOT_OK(
518 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
519 auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
520
521 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
522}
Group Aggregation example:
527/// \brief An example showing an aggregation node to perform a group-by operation
528/// \param exec_context The execution context to run the plan in
529///
530/// Source-Aggregation-Sink
531/// This example shows how an aggregation operation can be applied on a
532/// execution plan resulting a grouped output. The source node loads the
533/// data and the aggregation (counting unique types in column 'a') is
534/// applied on this data. The output is obtained from the sink node as a table.
535arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
536 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
537 cp::ExecPlan::Make(&exec_context));
538
539 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
540
541 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
542
543 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
544
545 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
546 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
547 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
548 auto aggregate_options =
549 cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
550 /*targets=*/{"a"},
551 /*names=*/{"count(a)"},
552 /*keys=*/{"b"}};
553 ARROW_ASSIGN_OR_RAISE(
554 cp::ExecNode * aggregate,
555 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
556
557 ARROW_RETURN_NOT_OK(
558 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
559 auto schema = arrow::schema({
560 arrow::field("count(a)", arrow::int32()),
561 arrow::field("b", arrow::boolean()),
562 });
563
564 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
565}
sink
#
sink
operation provides output and is the final node of a streaming
execution definition. arrow::compute::SinkNodeOptions
interface is used to pass
the required options. Similar to the source operator the sink operator exposes the output
with a function that returns a record batch future each time it is called. It is expected the
caller will repeatedly call this function until the generator function is exhausted (returns
arrow::util::optional::nullopt
). If this function is not called often enough then record batches
will accumulate in memory. An execution plan should only have one
“terminal” node (one sink node). An ExecPlan
can terminate early due to cancellation or
an error, before the output is fully consumed. However, the plan can be safely destroyed independently
of the sink, which will hold the unconsumed batches by exec_plan->finished().
As a part of the Source Example, the Sink operation is also included;
331/// \brief An example demonstrating a source and sink node
332/// \param exec_context The execution context to run the plan in
333///
334/// Source-Sink Example
335/// This example shows how a source and sink can be used
336/// in an execution plan. This includes source node receiving data
337/// and the sink node emits the data as an output represented in
338/// a table.
339arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
340 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
341 cp::ExecPlan::Make(&exec_context));
342
343 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
344
345 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
346
347 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
348
349 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
350 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
351
352 ARROW_RETURN_NOT_OK(
353 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
354
355 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
356}
consuming_sink
#
consuming_sink
operator is a sink operation containing consuming operation within the
execution plan (i.e. the exec plan should not complete until the consumption has completed).
Unlike the sink
node this node takes in a callback function that is expected to consume the
batch. Once this callback has finished the execution plan will no longer hold any reference to
the batch.
The consuming function may be called before a previous invocation has completed. If the consuming
function does not run quickly enough then many concurrent executions could pile up, blocking the
CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks
have been completed.
Once all batches have been delivered the execution plan will wait for the finish future to complete
before marking the execution plan finished. This allows for workflows where the consumption function
converts batches into async tasks (this is currently done internally for the dataset write node).
Example:
// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
batches_seen(batches_seen), finish(std::move(finish)) {}
// Consumption logic can be written here
arrow::Status Consume(cp::ExecBatch batch) override {
// data can be consumed in the expected way
// transfer to another system or just do some work
// and write to disk
(*batches_seen)++;
return arrow::Status::OK();
}
arrow::Future<> Finish() override { return finish; }
std::atomic<uint32_t> *batches_seen;
arrow::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
arrow::compute::ExecNode *consuming_sink;
ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
{source}, cp::ConsumingSinkNodeOptions(consumer)));
Consuming-Sink example:
570/// \brief An example showing a consuming sink node
571/// \param exec_context The execution context to run the plan in
572///
573/// Source-Consuming-Sink
574/// This example shows how the data can be consumed within the execution plan
575/// by using a ConsumingSink node. There is no data output from this execution plan.
576arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
577 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
578 cp::ExecPlan::Make(&exec_context));
579
580 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
581
582 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
583
584 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
585 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
586
587 std::atomic<uint32_t> batches_seen{0};
588 arrow::Future<> finish = arrow::Future<>::Make();
589 struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
590 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
591 : batches_seen(batches_seen), finish(std::move(finish)) {}
592
593 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
594 cp::BackpressureControl* backpressure_control) override {
595 return arrow::Status::OK();
596 }
597
598 arrow::Status Consume(cp::ExecBatch batch) override {
599 (*batches_seen)++;
600 return arrow::Status::OK();
601 }
602
603 arrow::Future<> Finish() override { return finish; }
604
605 std::atomic<uint32_t>* batches_seen;
606 arrow::Future<> finish;
607 };
608 std::shared_ptr<CustomSinkNodeConsumer> consumer =
609 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
610
611 cp::ExecNode* consuming_sink;
612
613 ARROW_ASSIGN_OR_RAISE(consuming_sink,
614 MakeExecNode("consuming_sink", plan.get(), {source},
615 cp::ConsumingSinkNodeOptions(consumer)));
616
617 ARROW_RETURN_NOT_OK(consuming_sink->Validate());
618
619 ARROW_RETURN_NOT_OK(plan->Validate());
620 std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
621 // plan start producing
622 ARROW_RETURN_NOT_OK(plan->StartProducing());
623 // Source should finish fairly quickly
624 ARROW_RETURN_NOT_OK(source->finished().status());
625 std::cout << "Source Finished!" << std::endl;
626 // Mark consumption complete, plan should finish
627 finish.MarkFinished(arrow::Status::OK());
628 ARROW_RETURN_NOT_OK(plan->finished().status());
629 return arrow::Status::OK();
630}
order_by_sink
#
order_by_sink
operation is an extension to the sink
operation.
This operation provides the ability to guarantee the ordering of the
stream by providing the arrow::compute::OrderBySinkNodeOptions
.
Here the arrow::compute::SortOptions
are provided to define which columns
are used for sorting and whether to sort by ascending or descending values.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
Order-By-Sink example:
635/// \brief An example showing an order-by node
636/// \param exec_context The execution context to run the plan in
637///
638/// Source-OrderBy-Sink
639/// In this example, the data enters through the source node
640/// and the data is ordered in the sink node. The order can be
641/// ASCENDING or DESCENDING and it is configurable. The output
642/// is obtained as a table from the sink node.
643arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
644 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
645 cp::ExecPlan::Make(&exec_context));
646
647 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
648
649 std::cout << "basic data created" << std::endl;
650
651 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
652
653 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
654 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
655 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
656
657 ARROW_RETURN_NOT_OK(cp::MakeExecNode(
658 "order_by_sink", plan.get(), {source},
659 cp::OrderBySinkNodeOptions{
660 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
661
662 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
663}
select_k_sink
#
select_k_sink
option enables selecting the top/bottom K elements,
similar to a SQL ORDER BY ... LIMIT K
clause.
arrow::compute::SelectKOptions
which is a defined by
using OrderBySinkNode
definition. This option returns a sink node that receives
inputs and then compute top_k/bottom_k.
Note
This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
SelectK example:
714/// \brief An example showing a select-k node
715/// \param exec_context The execution context to run the plan in
716///
717/// Source-KSelect
718/// This example shows how K number of elements can be selected
719/// either from the top or bottom. The output node is a modified
720/// sink node where output can be obtained as a table.
721arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
722 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
723 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
724 cp::ExecPlan::Make(&exec_context));
725 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
726
727 ARROW_ASSIGN_OR_RAISE(
728 cp::ExecNode * source,
729 cp::MakeExecNode("source", plan.get(), {},
730 cp::SourceNodeOptions{input.schema, input.gen()}));
731
732 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
733
734 ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
735 cp::SelectKSinkNodeOptions{options, &sink_gen}));
736
737 auto schema = arrow::schema(
738 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
739
740 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
741}
table_sink
#
The table_sink
node provides the ability to receive the output as an in-memory table.
This is simpler to use than the other sink nodes provided by the streaming execution engine
but it only makes sense when the output fits comfortably in memory.
The node is created using arrow::compute::TableSinkNodeOptions
.
Example of using table_sink
860/// \brief An example showing a table sink node
861/// \param exec_context The execution context to run the plan in
862///
863/// TableSink Example
864/// This example shows how a table_sink can be used
865/// in an execution plan. This includes a source node
866/// receiving data as batches and the table sink node
867/// which emits the output as a table.
868arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
869 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
870 cp::ExecPlan::Make(&exec_context));
871
872 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
873
874 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
875
876 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
877 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
878
879 std::shared_ptr<arrow::Table> output_table;
880 auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
881
882 ARROW_RETURN_NOT_OK(
883 cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
884 // validate the ExecPlan
885 ARROW_RETURN_NOT_OK(plan->Validate());
886 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
887 // start the ExecPlan
888 ARROW_RETURN_NOT_OK(plan->StartProducing());
889
890 // Wait for the plan to finish
891 auto finished = plan->finished();
892 RETURN_NOT_OK(finished.status());
893 std::cout << "Results : " << output_table->ToString() << std::endl;
894 return arrow::Status::OK();
895}
scan
#
scan
is an operation used to load and process datasets. It should be preferred over the
more generic source
node when your input is a dataset. The behavior is defined using
arrow::dataset::ScanNodeOptions
. More information on datasets and the various
scan options can be found in Tabular Datasets.
This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.
Scan example:
295/// \brief An example demonstrating a scan and sink node
296/// \param exec_context The execution context to run the plan in
297///
298/// Scan-Sink
299/// This example shows how scan operation can be applied on a dataset.
300/// There are operations that can be applied on the scan (project, filter)
301/// and the input data can be processed. The output is obtained as a table
302/// via the sink node.
303arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
304 // Execution plan created
305 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
306 cp::ExecPlan::Make(&exec_context));
307
308 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
309
310 auto options = std::make_shared<arrow::dataset::ScanOptions>();
311 options->projection = cp::project({}, {}); // create empty projection
312
313 // construct the scan node
314 cp::ExecNode* scan;
315 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
316
317 ARROW_ASSIGN_OR_RAISE(scan,
318 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
319
320 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
321
322 ARROW_RETURN_NOT_OK(
323 cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
324
325 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
326}
write
#
The write
node saves query results as a dataset of files in a
format like Parquet, Feather, CSV, etc. using the Tabular Datasets
functionality in Arrow. The write options are provided via the
arrow::dataset::WriteNodeOptions
which in turn contains
arrow::dataset::FileSystemDatasetWriteOptions
.
arrow::dataset::FileSystemDatasetWriteOptions
provides
control over the written dataset, including options like the output
directory, file naming scheme, and so on.
Write example:
747/// \brief An example showing a write node
748/// \param exec_context The execution context to run the plan in
749/// \param file_path The destination to write to
750///
751/// Scan-Filter-Write
752/// This example shows how scan node can be used to load the data
753/// and after processing how it can be written to disk.
754arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
755 const std::string& file_path) {
756 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
757 cp::ExecPlan::Make(&exec_context));
758
759 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
760
761 auto options = std::make_shared<arrow::dataset::ScanOptions>();
762 // empty projection
763 options->projection = cp::project({}, {});
764
765 cp::ExecNode* scan;
766
767 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
768
769 ARROW_ASSIGN_OR_RAISE(scan,
770 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
771
772 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
773
774 std::string root_path = "";
775 std::string uri = "file://" + file_path;
776 std::shared_ptr<arrow::fs::FileSystem> filesystem =
777 arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
778
779 auto base_path = root_path + "/parquet_dataset";
780 // Uncomment the following line, if run repeatedly
781 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
782 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
783
784 // The partition schema determines which fields are part of the partitioning.
785 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
786 // We'll use Hive-style partitioning,
787 // which creates directories with "key=value" pairs.
788
789 auto partitioning =
790 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
791 // We'll write Parquet files.
792 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
793
794 arrow::dataset::FileSystemDatasetWriteOptions write_options;
795 write_options.file_write_options = format->DefaultWriteOptions();
796 write_options.filesystem = filesystem;
797 write_options.base_dir = base_path;
798 write_options.partitioning = partitioning;
799 write_options.basename_template = "part{i}.parquet";
800
801 arrow::dataset::WriteNodeOptions write_node_options{write_options};
802
803 ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
804
805 ARROW_RETURN_NOT_OK(plan->Validate());
806 std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
807 // // // start the ExecPlan
808 ARROW_RETURN_NOT_OK(plan->StartProducing());
809 auto future = plan->finished();
810 ARROW_RETURN_NOT_OK(future.status());
811 future.Wait();
812 return arrow::Status::OK();
813}
union
#
union
merges multiple data streams with the same schema into one, similar to
a SQL UNION ALL
clause.
The following example demonstrates how this can be achieved using two data sources.
Union example:
819/// \brief An example showing a union node
820/// \param exec_context The execution context to run the plan in
821///
822/// Source-Union-Sink
823/// This example shows how a union operation can be applied on two
824/// data sources. The output is obtained as a table via the sink
825/// node.
826arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
827 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
828
829 std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
830 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
831
832 cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
833 cp::Declaration lhs{"source",
834 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
835 lhs.label = "lhs";
836 cp::Declaration rhs{"source",
837 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
838 rhs.label = "rhs";
839 union_node.inputs.emplace_back(lhs);
840 union_node.inputs.emplace_back(rhs);
841
842 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
843 ARROW_ASSIGN_OR_RAISE(
844 auto declr, cp::Declaration::Sequence({
845 union_node,
846 {"sink", cp::SinkNodeOptions{&sink_gen}},
847 })
848 .AddToPlan(plan.get()));
849
850 ARROW_RETURN_NOT_OK(declr->Validate());
851
852 ARROW_RETURN_NOT_OK(plan->Validate());
853 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
854}
hash_join
#
hash_join
operation provides the relational algebra operation, join using hash-based
algorithm. arrow::compute::HashJoinNodeOptions
contains the options required in
defining a join. The hash_join supports
left/right/full semi/anti/outerjoins.
Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x”
which can be appended as a suffix for column names duplicated in both left and right
relations.) can be set via the the join options.
Read more on hash-joins.
Hash-Join example:
669/// \brief An example showing a hash join node
670/// \param exec_context The execution context to run the plan in
671///
672/// Source-HashJoin-Sink
673/// This example shows how source node gets the data and how a self-join
674/// is applied on the data. The join options are configurable. The output
675/// is obtained as a table via the sink node.
676arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
677 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
678 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
679 cp::ExecPlan::Make(&exec_context));
680
681 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
682
683 cp::ExecNode* left_source;
684 cp::ExecNode* right_source;
685 for (auto source : {&left_source, &right_source}) {
686 ARROW_ASSIGN_OR_RAISE(*source,
687 MakeExecNode("source", plan.get(), {},
688 cp::SourceNodeOptions{input.schema, input.gen()}));
689 }
690
691 cp::HashJoinNodeOptions join_opts{
692 cp::JoinType::INNER,
693 /*left_keys=*/{"str"},
694 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
695
696 ARROW_ASSIGN_OR_RAISE(
697 auto hashjoin,
698 cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
699
700 ARROW_RETURN_NOT_OK(
701 cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
702 // expected columns i32, str, l_str, r_str
703 auto schema = arrow::schema(
704 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
705 arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
706
707 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
708}
Summary#
There are examples of these nodes which can be found in
cpp/examples/arrow/execution_plan_documentation_examples.cc
in the Arrow source.
Complete Example:
19#include <arrow/array.h>
20#include <arrow/builder.h>
21
22#include <arrow/compute/api.h>
23#include <arrow/compute/api_vector.h>
24#include <arrow/compute/cast.h>
25#include <arrow/compute/exec/exec_plan.h>
26
27#include <arrow/csv/api.h>
28
29#include <arrow/dataset/dataset.h>
30#include <arrow/dataset/file_base.h>
31#include <arrow/dataset/file_parquet.h>
32#include <arrow/dataset/plan.h>
33#include <arrow/dataset/scanner.h>
34
35#include <arrow/io/interfaces.h>
36#include <arrow/io/memory.h>
37
38#include <arrow/result.h>
39#include <arrow/status.h>
40#include <arrow/table.h>
41
42#include <arrow/ipc/api.h>
43
44#include <arrow/util/future.h>
45#include <arrow/util/range.h>
46#include <arrow/util/thread_pool.h>
47#include <arrow/util/vector.h>
48
49#include <iostream>
50#include <memory>
51#include <utility>
52
53// Demonstrate various operators in Arrow Streaming Execution Engine
54
55namespace cp = ::arrow::compute;
56
57constexpr char kSep[] = "******";
58
59void PrintBlock(const std::string& msg) {
60 std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
61}
62
63template <typename TYPE,
64 typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
65 arrow::is_boolean_type<TYPE>::value |
66 arrow::is_temporal_type<TYPE>::value>::type>
67arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
68 const std::vector<typename TYPE::c_type>& values) {
69 using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
70 using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
71 ARROW_BUILDER_TYPE builder;
72 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
73 std::shared_ptr<ARROW_ARRAY_TYPE> array;
74 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
75 ARROW_RETURN_NOT_OK(builder.Finish(&array));
76 return array;
77}
78
79template <class TYPE>
80arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
81 const std::vector<std::string>& values) {
82 using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
83 using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
84 ARROW_BUILDER_TYPE builder;
85 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
86 std::shared_ptr<ARROW_ARRAY_TYPE> array;
87 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
88 ARROW_RETURN_NOT_OK(builder.Finish(&array));
89 return array;
90}
91
92arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
93 const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
94 std::shared_ptr<arrow::RecordBatch> record_batch;
95 ARROW_ASSIGN_OR_RAISE(auto struct_result,
96 arrow::StructArray::Make(array_vector, field_vector));
97 return record_batch->FromStructArray(struct_result);
98}
99
100/// \brief Create a sample table
101/// The table's contents will be:
102/// a,b
103/// 1,null
104/// 2,true
105/// null,true
106/// 3,false
107/// null,true
108/// 4,false
109/// 5,null
110/// 6,false
111/// 7,false
112/// 8,true
113/// \return The created table
114
115arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
116 auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
117 ARROW_ASSIGN_OR_RAISE(auto int64_array,
118 GetArrayDataSample<arrow::Int64Type>(
119 {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
120
121 arrow::BooleanBuilder boolean_builder;
122 std::shared_ptr<arrow::BooleanArray> bool_array;
123
124 std::vector<uint8_t> bool_values = {false, true, true, false, true,
125 false, false, false, false, true};
126 std::vector<bool> is_valid = {false, true, true, true, true,
127 true, false, true, true, true};
128
129 ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
130
131 ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
132
133 ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
134
135 auto record_batch =
136 arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
137 arrow::field("b", arrow::boolean())}),
138 10, {int64_array, bool_array});
139 ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
140 return table;
141}
142
143/// \brief Create a sample dataset
144/// \return An in-memory dataset based on GetTable()
145arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
146 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
147 auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
148 return ds;
149}
150
151arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
152 const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
153 std::shared_ptr<arrow::RecordBatch> record_batch;
154 ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
155 cp::ExecBatch batch{*res_batch};
156 return batch;
157}
158
159// (Doc section: BatchesWithSchema Definition)
160struct BatchesWithSchema {
161 std::vector<cp::ExecBatch> batches;
162 std::shared_ptr<arrow::Schema> schema;
163 // This method uses internal arrow utilities to
164 // convert a vector of record batches to an AsyncGenerator of optional batches
165 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
166 auto opt_batches = ::arrow::internal::MapVector(
167 [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
168 batches);
169 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
170 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
171 return gen;
172 }
173};
174// (Doc section: BatchesWithSchema Definition)
175
176// (Doc section: MakeBasicBatches Definition)
177arrow::Result<BatchesWithSchema> MakeBasicBatches() {
178 BatchesWithSchema out;
179 auto field_vector = {arrow::field("a", arrow::int32()),
180 arrow::field("b", arrow::boolean())};
181 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
182 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
183 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
184
185 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
186 GetArrayDataSample<arrow::BooleanType>({false, true}));
187 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
188 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
189 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
190 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
191
192 ARROW_ASSIGN_OR_RAISE(auto b1,
193 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
194 ARROW_ASSIGN_OR_RAISE(auto b2,
195 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
196 ARROW_ASSIGN_OR_RAISE(auto b3,
197 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
198
199 out.batches = {b1, b2, b3};
200 out.schema = arrow::schema(field_vector);
201 return out;
202}
203// (Doc section: MakeBasicBatches Definition)
204
205arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
206 BatchesWithSchema out;
207 auto field = arrow::field("a", arrow::int32());
208 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
209 ARROW_ASSIGN_OR_RAISE(auto b2_int,
210 GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
211 ARROW_ASSIGN_OR_RAISE(auto b3_int,
212 GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
213 ARROW_ASSIGN_OR_RAISE(auto b4_int,
214 GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
215 ARROW_ASSIGN_OR_RAISE(auto b5_int,
216 GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
217 ARROW_ASSIGN_OR_RAISE(auto b6_int,
218 GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
219 ARROW_ASSIGN_OR_RAISE(auto b7_int,
220 GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
221 ARROW_ASSIGN_OR_RAISE(auto b8_int,
222 GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
223
224 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
225 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
226 ARROW_ASSIGN_OR_RAISE(auto b3,
227 GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
228 ARROW_ASSIGN_OR_RAISE(auto b4,
229 GetExecBatchFromVectors({field, field, field, field},
230 {b4_int, b5_int, b6_int, b7_int}));
231 out.batches = {b1, b2, b3, b4};
232 out.schema = arrow::schema({field});
233 return out;
234}
235
236arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
237 BatchesWithSchema out;
238 auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
239 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
240 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
241 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
242 ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
243 {"alpha", "beta", "alpha"}));
244 ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
245 {"alpha", "gamma", "alpha"}));
246 ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
247 {"gamma", "beta", "alpha"}));
248 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
249 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
250 ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
251 out.batches = {b1, b2, b3};
252
253 size_t batch_count = out.batches.size();
254 for (int repeat = 1; repeat < multiplicity; ++repeat) {
255 for (size_t i = 0; i < batch_count; ++i) {
256 out.batches.push_back(out.batches[i]);
257 }
258 }
259
260 out.schema = arrow::schema(fields);
261 return out;
262}
263
264arrow::Status ExecutePlanAndCollectAsTable(
265 cp::ExecContext& exec_context, std::shared_ptr<cp::ExecPlan> plan,
266 std::shared_ptr<arrow::Schema> schema,
267 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen) {
268 // translate sink_gen (async) to sink_reader (sync)
269 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
270 cp::MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
271
272 // validate the ExecPlan
273 ARROW_RETURN_NOT_OK(plan->Validate());
274 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
275 // start the ExecPlan
276 ARROW_RETURN_NOT_OK(plan->StartProducing());
277
278 // collect sink_reader into a Table
279 std::shared_ptr<arrow::Table> response_table;
280
281 ARROW_ASSIGN_OR_RAISE(response_table,
282 arrow::Table::FromRecordBatchReader(sink_reader.get()));
283
284 std::cout << "Results : " << response_table->ToString() << std::endl;
285
286 // stop producing
287 plan->StopProducing();
288 // plan mark finished
289 auto future = plan->finished();
290 return future.status();
291}
292
293// (Doc section: Scan Example)
294
295/// \brief An example demonstrating a scan and sink node
296/// \param exec_context The execution context to run the plan in
297///
298/// Scan-Sink
299/// This example shows how scan operation can be applied on a dataset.
300/// There are operations that can be applied on the scan (project, filter)
301/// and the input data can be processed. The output is obtained as a table
302/// via the sink node.
303arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
304 // Execution plan created
305 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
306 cp::ExecPlan::Make(&exec_context));
307
308 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
309
310 auto options = std::make_shared<arrow::dataset::ScanOptions>();
311 options->projection = cp::project({}, {}); // create empty projection
312
313 // construct the scan node
314 cp::ExecNode* scan;
315 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
316
317 ARROW_ASSIGN_OR_RAISE(scan,
318 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
319
320 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
321
322 ARROW_RETURN_NOT_OK(
323 cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
324
325 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
326}
327// (Doc section: Scan Example)
328
329// (Doc section: Source Example)
330
331/// \brief An example demonstrating a source and sink node
332/// \param exec_context The execution context to run the plan in
333///
334/// Source-Sink Example
335/// This example shows how a source and sink can be used
336/// in an execution plan. This includes source node receiving data
337/// and the sink node emits the data as an output represented in
338/// a table.
339arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
340 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
341 cp::ExecPlan::Make(&exec_context));
342
343 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
344
345 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
346
347 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
348
349 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
350 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
351
352 ARROW_RETURN_NOT_OK(
353 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
354
355 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
356}
357// (Doc section: Source Example)
358
359// (Doc section: Table Source Example)
360
361/// \brief An example showing a table source node
362/// \param exec_context The execution context to run the plan in
363///
364/// TableSource-Sink Example
365/// This example shows how a table_source and sink can be used
366/// in an execution plan. This includes a table source node
367/// receiving data from a table and the sink node emits
368/// the data to a generator which we collect into a table.
369arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
370 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
371 cp::ExecPlan::Make(&exec_context));
372
373 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
374
375 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
376 int max_batch_size = 2;
377 auto table_source_options = cp::TableSourceNodeOptions{table, max_batch_size};
378
379 ARROW_ASSIGN_OR_RAISE(
380 cp::ExecNode * source,
381 cp::MakeExecNode("table_source", plan.get(), {}, table_source_options));
382
383 ARROW_RETURN_NOT_OK(
384 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
385
386 return ExecutePlanAndCollectAsTable(exec_context, plan, table->schema(), sink_gen);
387}
388// (Doc section: Table Source Example)
389
390// (Doc section: Filter Example)
391
392/// \brief An example showing a filter node
393/// \param exec_context The execution context to run the plan in
394///
395/// Source-Filter-Sink
396/// This example shows how a filter can be used in an execution plan,
397/// along with the source and sink operations. The output from the
398/// exeuction plan is obtained as a table via the sink node.
399arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
400 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
401 cp::ExecPlan::Make(&exec_context));
402
403 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
404
405 auto options = std::make_shared<arrow::dataset::ScanOptions>();
406 // specify the filter. This filter removes all rows where the
407 // value of the "a" column is greater than 3.
408 cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
409 // set filter for scanner : on-disk / push-down filtering.
410 // This step can be skipped if you are not reading from disk.
411 options->filter = filter_opt;
412 // empty projection
413 options->projection = cp::project({}, {});
414
415 // construct the scan node
416 std::cout << "Initialized Scanning Options" << std::endl;
417
418 cp::ExecNode* scan;
419
420 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
421 std::cout << "Scan node options created" << std::endl;
422
423 ARROW_ASSIGN_OR_RAISE(scan,
424 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
425
426 // pipe the scan node into the filter node
427 // Need to set the filter in scan node options and filter node options.
428 // At scan node it is used for on-disk / push-down filtering.
429 // At filter node it is used for in-memory filtering.
430 cp::ExecNode* filter;
431 ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
432 cp::FilterNodeOptions{filter_opt}));
433
434 // finally, pipe the filter node into a sink node
435 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
436 ARROW_RETURN_NOT_OK(
437 cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
438
439 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
440}
441
442// (Doc section: Filter Example)
443
444// (Doc section: Project Example)
445
446/// \brief An example showing a project node
447/// \param exec_context The execution context to run the plan in
448///
449/// Scan-Project-Sink
450/// This example shows how Scan operation can be used to load the data
451/// into the execution plan, how project operation can be applied on the
452/// data stream and how the output is obtained as a table via the sink node.
453arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
454 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
455 cp::ExecPlan::Make(&exec_context));
456
457 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
458
459 auto options = std::make_shared<arrow::dataset::ScanOptions>();
460 // projection
461 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
462 options->projection = cp::project({}, {});
463
464 cp::ExecNode* scan;
465
466 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
467
468 ARROW_ASSIGN_OR_RAISE(scan,
469 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
470
471 cp::ExecNode* project;
472 ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
473 cp::ProjectNodeOptions{{a_times_2}}));
474 // schema after projection => multiply(a, 2): int64
475 std::cout << "Schema after projection : \n"
476 << project->output_schema()->ToString() << std::endl;
477
478 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
479 ARROW_RETURN_NOT_OK(
480 cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
481 auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
482
483 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
484}
485
486// (Doc section: Project Example)
487
488// (Doc section: Scalar Aggregate Example)
489
490/// \brief An example showing an aggregation node to aggregate an entire table
491/// \param exec_context The execution context to run the plan in
492///
493/// Source-Aggregation-Sink
494/// This example shows how an aggregation operation can be applied on a
495/// execution plan resulting a scalar output. The source node loads the
496/// data and the aggregation (counting unique types in column 'a')
497/// is applied on this data. The output is obtained from the sink node as a table.
498arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
499 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
500 cp::ExecPlan::Make(&exec_context));
501
502 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
503
504 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
505
506 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
507
508 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
509 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
510 auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
511 /*targets=*/{"a"},
512 /*names=*/{"sum(a)"}};
513 ARROW_ASSIGN_OR_RAISE(
514 cp::ExecNode * aggregate,
515 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
516
517 ARROW_RETURN_NOT_OK(
518 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
519 auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
520
521 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
522}
523// (Doc section: Scalar Aggregate Example)
524
525// (Doc section: Group Aggregate Example)
526
527/// \brief An example showing an aggregation node to perform a group-by operation
528/// \param exec_context The execution context to run the plan in
529///
530/// Source-Aggregation-Sink
531/// This example shows how an aggregation operation can be applied on a
532/// execution plan resulting a grouped output. The source node loads the
533/// data and the aggregation (counting unique types in column 'a') is
534/// applied on this data. The output is obtained from the sink node as a table.
535arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
536 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
537 cp::ExecPlan::Make(&exec_context));
538
539 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
540
541 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
542
543 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
544
545 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
546 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
547 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
548 auto aggregate_options =
549 cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
550 /*targets=*/{"a"},
551 /*names=*/{"count(a)"},
552 /*keys=*/{"b"}};
553 ARROW_ASSIGN_OR_RAISE(
554 cp::ExecNode * aggregate,
555 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
556
557 ARROW_RETURN_NOT_OK(
558 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
559 auto schema = arrow::schema({
560 arrow::field("count(a)", arrow::int32()),
561 arrow::field("b", arrow::boolean()),
562 });
563
564 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
565}
566// (Doc section: Group Aggregate Example)
567
568// (Doc section: ConsumingSink Example)
569
570/// \brief An example showing a consuming sink node
571/// \param exec_context The execution context to run the plan in
572///
573/// Source-Consuming-Sink
574/// This example shows how the data can be consumed within the execution plan
575/// by using a ConsumingSink node. There is no data output from this execution plan.
576arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
577 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
578 cp::ExecPlan::Make(&exec_context));
579
580 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
581
582 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
583
584 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
585 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
586
587 std::atomic<uint32_t> batches_seen{0};
588 arrow::Future<> finish = arrow::Future<>::Make();
589 struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
590 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
591 : batches_seen(batches_seen), finish(std::move(finish)) {}
592
593 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
594 cp::BackpressureControl* backpressure_control) override {
595 return arrow::Status::OK();
596 }
597
598 arrow::Status Consume(cp::ExecBatch batch) override {
599 (*batches_seen)++;
600 return arrow::Status::OK();
601 }
602
603 arrow::Future<> Finish() override { return finish; }
604
605 std::atomic<uint32_t>* batches_seen;
606 arrow::Future<> finish;
607 };
608 std::shared_ptr<CustomSinkNodeConsumer> consumer =
609 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
610
611 cp::ExecNode* consuming_sink;
612
613 ARROW_ASSIGN_OR_RAISE(consuming_sink,
614 MakeExecNode("consuming_sink", plan.get(), {source},
615 cp::ConsumingSinkNodeOptions(consumer)));
616
617 ARROW_RETURN_NOT_OK(consuming_sink->Validate());
618
619 ARROW_RETURN_NOT_OK(plan->Validate());
620 std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
621 // plan start producing
622 ARROW_RETURN_NOT_OK(plan->StartProducing());
623 // Source should finish fairly quickly
624 ARROW_RETURN_NOT_OK(source->finished().status());
625 std::cout << "Source Finished!" << std::endl;
626 // Mark consumption complete, plan should finish
627 finish.MarkFinished(arrow::Status::OK());
628 ARROW_RETURN_NOT_OK(plan->finished().status());
629 return arrow::Status::OK();
630}
631// (Doc section: ConsumingSink Example)
632
633// (Doc section: OrderBySink Example)
634
635/// \brief An example showing an order-by node
636/// \param exec_context The execution context to run the plan in
637///
638/// Source-OrderBy-Sink
639/// In this example, the data enters through the source node
640/// and the data is ordered in the sink node. The order can be
641/// ASCENDING or DESCENDING and it is configurable. The output
642/// is obtained as a table from the sink node.
643arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
644 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
645 cp::ExecPlan::Make(&exec_context));
646
647 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
648
649 std::cout << "basic data created" << std::endl;
650
651 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
652
653 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
654 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
655 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
656
657 ARROW_RETURN_NOT_OK(cp::MakeExecNode(
658 "order_by_sink", plan.get(), {source},
659 cp::OrderBySinkNodeOptions{
660 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
661
662 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
663}
664
665// (Doc section: OrderBySink Example)
666
667// (Doc section: HashJoin Example)
668
669/// \brief An example showing a hash join node
670/// \param exec_context The execution context to run the plan in
671///
672/// Source-HashJoin-Sink
673/// This example shows how source node gets the data and how a self-join
674/// is applied on the data. The join options are configurable. The output
675/// is obtained as a table via the sink node.
676arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
677 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
678 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
679 cp::ExecPlan::Make(&exec_context));
680
681 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
682
683 cp::ExecNode* left_source;
684 cp::ExecNode* right_source;
685 for (auto source : {&left_source, &right_source}) {
686 ARROW_ASSIGN_OR_RAISE(*source,
687 MakeExecNode("source", plan.get(), {},
688 cp::SourceNodeOptions{input.schema, input.gen()}));
689 }
690
691 cp::HashJoinNodeOptions join_opts{
692 cp::JoinType::INNER,
693 /*left_keys=*/{"str"},
694 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
695
696 ARROW_ASSIGN_OR_RAISE(
697 auto hashjoin,
698 cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
699
700 ARROW_RETURN_NOT_OK(
701 cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
702 // expected columns i32, str, l_str, r_str
703 auto schema = arrow::schema(
704 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
705 arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
706
707 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
708}
709
710// (Doc section: HashJoin Example)
711
712// (Doc section: KSelect Example)
713
714/// \brief An example showing a select-k node
715/// \param exec_context The execution context to run the plan in
716///
717/// Source-KSelect
718/// This example shows how K number of elements can be selected
719/// either from the top or bottom. The output node is a modified
720/// sink node where output can be obtained as a table.
721arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
722 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
723 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
724 cp::ExecPlan::Make(&exec_context));
725 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
726
727 ARROW_ASSIGN_OR_RAISE(
728 cp::ExecNode * source,
729 cp::MakeExecNode("source", plan.get(), {},
730 cp::SourceNodeOptions{input.schema, input.gen()}));
731
732 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
733
734 ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
735 cp::SelectKSinkNodeOptions{options, &sink_gen}));
736
737 auto schema = arrow::schema(
738 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
739
740 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
741}
742
743// (Doc section: KSelect Example)
744
745// (Doc section: Write Example)
746
747/// \brief An example showing a write node
748/// \param exec_context The execution context to run the plan in
749/// \param file_path The destination to write to
750///
751/// Scan-Filter-Write
752/// This example shows how scan node can be used to load the data
753/// and after processing how it can be written to disk.
754arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
755 const std::string& file_path) {
756 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
757 cp::ExecPlan::Make(&exec_context));
758
759 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
760
761 auto options = std::make_shared<arrow::dataset::ScanOptions>();
762 // empty projection
763 options->projection = cp::project({}, {});
764
765 cp::ExecNode* scan;
766
767 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
768
769 ARROW_ASSIGN_OR_RAISE(scan,
770 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
771
772 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
773
774 std::string root_path = "";
775 std::string uri = "file://" + file_path;
776 std::shared_ptr<arrow::fs::FileSystem> filesystem =
777 arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
778
779 auto base_path = root_path + "/parquet_dataset";
780 // Uncomment the following line, if run repeatedly
781 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
782 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
783
784 // The partition schema determines which fields are part of the partitioning.
785 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
786 // We'll use Hive-style partitioning,
787 // which creates directories with "key=value" pairs.
788
789 auto partitioning =
790 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
791 // We'll write Parquet files.
792 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
793
794 arrow::dataset::FileSystemDatasetWriteOptions write_options;
795 write_options.file_write_options = format->DefaultWriteOptions();
796 write_options.filesystem = filesystem;
797 write_options.base_dir = base_path;
798 write_options.partitioning = partitioning;
799 write_options.basename_template = "part{i}.parquet";
800
801 arrow::dataset::WriteNodeOptions write_node_options{write_options};
802
803 ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
804
805 ARROW_RETURN_NOT_OK(plan->Validate());
806 std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
807 // // // start the ExecPlan
808 ARROW_RETURN_NOT_OK(plan->StartProducing());
809 auto future = plan->finished();
810 ARROW_RETURN_NOT_OK(future.status());
811 future.Wait();
812 return arrow::Status::OK();
813}
814
815// (Doc section: Write Example)
816
817// (Doc section: Union Example)
818
819/// \brief An example showing a union node
820/// \param exec_context The execution context to run the plan in
821///
822/// Source-Union-Sink
823/// This example shows how a union operation can be applied on two
824/// data sources. The output is obtained as a table via the sink
825/// node.
826arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
827 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
828
829 std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
830 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
831
832 cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
833 cp::Declaration lhs{"source",
834 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
835 lhs.label = "lhs";
836 cp::Declaration rhs{"source",
837 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
838 rhs.label = "rhs";
839 union_node.inputs.emplace_back(lhs);
840 union_node.inputs.emplace_back(rhs);
841
842 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
843 ARROW_ASSIGN_OR_RAISE(
844 auto declr, cp::Declaration::Sequence({
845 union_node,
846 {"sink", cp::SinkNodeOptions{&sink_gen}},
847 })
848 .AddToPlan(plan.get()));
849
850 ARROW_RETURN_NOT_OK(declr->Validate());
851
852 ARROW_RETURN_NOT_OK(plan->Validate());
853 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
854}
855
856// (Doc section: Union Example)
857
858// (Doc section: Table Sink Example)
859
860/// \brief An example showing a table sink node
861/// \param exec_context The execution context to run the plan in
862///
863/// TableSink Example
864/// This example shows how a table_sink can be used
865/// in an execution plan. This includes a source node
866/// receiving data as batches and the table sink node
867/// which emits the output as a table.
868arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
869 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
870 cp::ExecPlan::Make(&exec_context));
871
872 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
873
874 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
875
876 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
877 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
878
879 std::shared_ptr<arrow::Table> output_table;
880 auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
881
882 ARROW_RETURN_NOT_OK(
883 cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
884 // validate the ExecPlan
885 ARROW_RETURN_NOT_OK(plan->Validate());
886 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
887 // start the ExecPlan
888 ARROW_RETURN_NOT_OK(plan->StartProducing());
889
890 // Wait for the plan to finish
891 auto finished = plan->finished();
892 RETURN_NOT_OK(finished.status());
893 std::cout << "Results : " << output_table->ToString() << std::endl;
894 return arrow::Status::OK();
895}
896// (Doc section: Table Sink Example)
897
898enum ExampleMode {
899 SOURCE_SINK = 0,
900 TABLE_SOURCE_SINK = 1,
901 SCAN = 2,
902 FILTER = 3,
903 PROJECT = 4,
904 SCALAR_AGGREGATION = 5,
905 GROUP_AGGREGATION = 6,
906 CONSUMING_SINK = 7,
907 ORDER_BY_SINK = 8,
908 HASHJOIN = 9,
909 KSELECT = 10,
910 WRITE = 11,
911 UNION = 12,
912 TABLE_SOURCE_TABLE_SINK = 13
913};
914
915int main(int argc, char** argv) {
916 if (argc < 2) {
917 // Fake success for CI purposes.
918 return EXIT_SUCCESS;
919 }
920
921 std::string base_save_path = argv[1];
922 int mode = std::atoi(argv[2]);
923 arrow::Status status;
924 // ensure arrow::dataset node factories are in the registry
925 arrow::dataset::internal::Initialize();
926 // execution context
927 cp::ExecContext exec_context;
928 switch (mode) {
929 case SOURCE_SINK:
930 PrintBlock("Source Sink Example");
931 status = SourceSinkExample(exec_context);
932 break;
933 case TABLE_SOURCE_SINK:
934 PrintBlock("Table Source Sink Example");
935 status = TableSourceSinkExample(exec_context);
936 break;
937 case SCAN:
938 PrintBlock("Scan Example");
939 status = ScanSinkExample(exec_context);
940 break;
941 case FILTER:
942 PrintBlock("Filter Example");
943 status = ScanFilterSinkExample(exec_context);
944 break;
945 case PROJECT:
946 PrintBlock("Project Example");
947 status = ScanProjectSinkExample(exec_context);
948 break;
949 case GROUP_AGGREGATION:
950 PrintBlock("Aggregate Example");
951 status = SourceGroupAggregateSinkExample(exec_context);
952 break;
953 case SCALAR_AGGREGATION:
954 PrintBlock("Aggregate Example");
955 status = SourceScalarAggregateSinkExample(exec_context);
956 break;
957 case CONSUMING_SINK:
958 PrintBlock("Consuming-Sink Example");
959 status = SourceConsumingSinkExample(exec_context);
960 break;
961 case ORDER_BY_SINK:
962 PrintBlock("OrderBy Example");
963 status = SourceOrderBySinkExample(exec_context);
964 break;
965 case HASHJOIN:
966 PrintBlock("HashJoin Example");
967 status = SourceHashJoinSinkExample(exec_context);
968 break;
969 case KSELECT:
970 PrintBlock("KSelect Example");
971 status = SourceKSelectExample(exec_context);
972 break;
973 case WRITE:
974 PrintBlock("Write Example");
975 status = ScanFilterWriteExample(exec_context, base_save_path);
976 break;
977 case UNION:
978 PrintBlock("Union Example");
979 status = SourceUnionSinkExample(exec_context);
980 break;
981 case TABLE_SOURCE_TABLE_SINK:
982 PrintBlock("TableSink Example");
983 status = TableSinkExample(exec_context);
984 break;
985 default:
986 break;
987 }
988
989 if (status.ok()) {
990 return EXIT_SUCCESS;
991 } else {
992 std::cout << "Error occurred: " << status.message() << std::endl;
993 return EXIT_FAILURE;
994 }
995}