Streaming execution engine¶
Warning
The streaming execution engine is experimental, and a stable API is not yet guaranteed.
Motivation¶
For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, Arrow also provides a streaming query engine with which computations can be formulated and executed.
ExecNode
is provided to reify the graph of operations in a query.
Batches of data (ExecBatch
) flow along edges of the graph from
node to node. Structuring the API around streams of batches allows the
working set for each node to be tuned for optimal performance independent
of any other nodes in the graph. Each ExecNode
processes batches
as they are pushed to it along an edge of the graph by upstream nodes
(its inputs), and pushes batches along an edge of the graph to downstream
nodes (its outputs) as they are finalized.
Overview¶
ExecNode
Each node in the graph is an implementation of the
ExecNode
interface.ExecPlan
A set of
ExecNode
is contained and (to an extent) coordinated by anExecPlan
.ExecFactoryRegistry
Instances of
ExecNode
are constructed by factory functions held in aExecFactoryRegistry
.ExecNodeOptions
Heterogenous parameters for factories of
ExecNode
are bundled in anExecNodeOptions
.Declaration
dplyr
-inspired helper for efficient construction of anExecPlan
.ExecBatch
A lightweight container for a single chunk of data in the Arrow format. In contrast to
RecordBatch
,ExecBatch
is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by aScalar
instead of anArray
. In addition,ExecBatch
may carry execution-relevant properties including a guaranteed-true-filter forExpression
simplification.
An example ExecNode
implementation which simply passes all input batches
through unchanged:
class PassthruNode : public ExecNode {
public:
// InputReceived is the main entry point for ExecNodes. It is invoked
// by an input of this node to push a batch here for processing.
void InputReceived(ExecNode* input, ExecBatch batch) override {
// Since this is a passthru node we simply push the batch to our
// only output here.
outputs_[0]->InputReceived(this, batch);
}
// ErrorReceived is called by an input of this node to report an error.
// ExecNodes should always forward errors to their outputs unless they
// are able to fully handle the error (this is rare).
void ErrorReceived(ExecNode* input, Status error) override {
outputs_[0]->ErrorReceived(this, error);
}
// InputFinished is used to signal how many batches will ultimately arrive.
// It may be called with any ordering relative to InputReceived/ErrorReceived.
void InputFinished(ExecNode* input, int total_batches) override {
outputs_[0]->InputFinished(this, total_batches);
}
// ExecNodes may request that their inputs throttle production of batches
// until they are ready for more, or stop production if no further batches
// are required. These signals should typically be forwarded to the inputs
// of the ExecNode.
void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }
// An ExecNode has a single output schema to which all its batches conform.
using ExecNode::output_schema;
// ExecNodes carry basic introspection for debugging purposes
const char* kind_name() const override { return "PassthruNode"; }
using ExecNode::label;
using ExecNode::SetLabel;
using ExecNode::ToString;
// An ExecNode holds references to its inputs and outputs, so it is possible
// to walk the graph of execution if necessary.
using ExecNode::inputs;
using ExecNode::outputs;
// StartProducing() and StopProducing() are invoked by an ExecPlan to
// coordinate the graph-wide execution state. These do not need to be
// forwarded to inputs or outputs.
Status StartProducing() override { return Status::OK(); }
void StopProducing() override {}
Future<> finished() override { return inputs_[0]->finished(); }
};
Note that each method which is associated with an edge of the graph must be invoked
with an ExecNode*
to identify the node which invoked it. For example, in an
ExecNode
which implements JOIN
this tagging might be used to differentiate
between batches from the left or right inputs.
InputReceived
, ErrorReceived
, InputFinished
may only be invoked by
the inputs of a node, while ResumeProducing
, PauseProducing
, StopProducing
may only be invoked by outputs of a node.
ExecPlan
contains the associated instances of ExecNode
and is used to start and stop execution of all nodes and for querying/awaiting
their completion:
// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));
// ... add nodes to your ExecPlan
// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());
SetUserCancellationCallback([plan] {
// stop all nodes in the graph
plan->StopProducing();
});
// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();
Constructing ExecPlan
objects¶
Warning
The following will be superceded by construction from Compute IR, see ARROW-14074.
None of the concrete implementations of ExecNode
are exposed
in headers, so they can’t be constructed directly outside the
translation unit where they are defined. Instead, factories to
create them are provided in an extensible registry. This structure
provides a number of benefits:
This enforces consistent construction.
It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)
This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate
libarrow_dataset.so
library.Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.
Factories of ExecNode
can be retrieved by name from the registry.
The default registry is available through
arrow::compute::default_exec_factory_registry()
and can be queried for the built-in factories:
// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
default_exec_factory_registry()->GetFactory("filter"));
// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
// the ExecPlan which should own this node
plan.get(),
// nodes which will send batches to this node (inputs)
{scan_node},
// parameters unique to "filter" nodes
FilterNodeOptions{filter_expression}));
// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
plan.get(), {scan_node}, FilterNodeOptions{filter_expression});
Factories can also be added to the default registry as long as they are
convertible to std::function<Result<ExecNode*>(
ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>
.
To build an ExecPlan
representing a simple pipeline which
reads from a RecordBatchReader
then filters, projects, and
writes to disk:
std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
SourceNodeOptions::FromReader(
reader,
GetCpuThreadPool()));
ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
FilterNodeOptions{
greater(field_ref("score"), literal(3))
});
ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
ProjectNodeOptions{
{add(field_ref("score"), literal(1))},
{"score + 1"}
});
arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});
Declaration
is a dplyr-inspired
helper which further decreases the boilerplate associated with populating
an ExecPlan
from C++:
arrow::dataset::internal::Initialize();
std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
{
{"source", SourceNodeOptions::FromReader(
reader,
GetCpuThreadPool())},
{"filter", FilterNodeOptions{
greater(field_ref("score"), literal(3))}},
{"project", ProjectNodeOptions{
{add(field_ref("score"), literal(1))},
{"score + 1"}}},
{"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
})
.AddToPlan(plan.get()));
Note that a source node can wrap anything which resembles a stream of batches.
For example, PR#11032 adds
support for use of a DuckDB query as a source node.
Similarly, a sink node can wrap anything which absorbs a stream of batches.
In the example above we’re writing completed
batches to disk. However we can also collect these in memory into a Table
or forward them to a RecordBatchReader
as an out-of-graph stream.
This flexibility allows an ExecPlan
to be used as streaming middleware
between any endpoints which support Arrow formatted batches.
An arrow::dataset::Dataset
can also be wrapped as a source node which
pushes all the dataset’s batches into an ExecPlan
. This factory is added
to the default registry with the name "scan"
by calling
arrow::dataset::internal::Initialize()
:
arrow::dataset::internal::Initialize();
std::shared_ptr<Dataset> dataset = GetDataset();
ASSERT_OK(Declaration::Sequence(
{
{"scan", ScanNodeOptions{dataset,
/* push down predicate, projection, ... */}},
{"filter", FilterNodeOptions{/* ... */}},
// ...
})
.AddToPlan(plan.get()));
Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.
Constructing ExecNode
using Options¶
ExecNode
is the component we use as a building block
containing in-built operations with various functionalities.
This is the list of operations associated with the execution plan:
Operation |
Options |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
N/A |
source
¶
A source
operation can be considered as an entry point to create a streaming execution plan.
arrow::compute::SourceNodeOptions
are used to create the source
operation. The
source
operation is the most generic and flexible type of source currently available but it can
be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.
The source node requires some kind of function that can be called to poll for more data. This
function should take no arguments and should return an
arrow::Future<std::shared_ptr<arrow::util::optional<arrow::RecordBatch>>>
.
This function might be reading a file, iterating through an in memory structure, or receiving data
from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator
and there are a number of utilities for working with these functions. For this example we use
a vector of record batches that we’ve already stored in memory.
In addition, the schema of the data must be known up front. Arrow’s streaming execution
engine must know the schema of the data at each stage of the execution graph before any
processing has begun. This means we must supply the schema for a source node separately
from the data itself.
Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :
155 struct BatchesWithSchema {
156 std::vector<cp::ExecBatch> batches;
157 std::shared_ptr<arrow::Schema> schema;
158 // // This method uses internal arrow utilities to
159 // // convert a vector of record batches to an AsyncGenerator of optional batches
160 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
161 auto opt_batches = ::arrow::internal::MapVector(
162 [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
163 batches);
164 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
165 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166 return gen;
167 }
168 };
Generating sample batches for computation:
172 arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173 BatchesWithSchema out;
174 auto field_vector = {arrow::field("a", arrow::int32()),
175 arrow::field("b", arrow::boolean())};
176 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181 GetArrayDataSample<arrow::BooleanType>({false, true}));
182 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187 ARROW_ASSIGN_OR_RAISE(auto b1,
188 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189 ARROW_ASSIGN_OR_RAISE(auto b2,
190 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191 ARROW_ASSIGN_OR_RAISE(auto b3,
192 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194 out.batches = {b1, b2, b3};
195 out.schema = arrow::schema(field_vector);
196 return out;
197 }
Example of using source
(usage of sink is explained in detail in sink):
326 /**
327 * \brief
328 * Source-Sink Example
329 * This example shows how a source and sink can be used
330 * in an execution plan. This includes source node receiving data
331 * and the sink node emits the data as an output represented in
332 * a table.
333 * \param exec_context : execution context
334 * \return arrow::Status
335 */
336 arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
337 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
338 cp::ExecPlan::Make(&exec_context));
339
340 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
341
342 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
343
344 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
345
346 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
347 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
348
349 ARROW_RETURN_NOT_OK(
350 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
351
352 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
353 }
filter
¶
filter
operation, as the name suggests, provides an option to define data filtering
criteria. It selects rows matching a given expression. Filters can be written using
arrow::compute::Expression
. For example, if we wish to keep rows where the value
of column b
is greater than 3, then we can use the following expression.
Filter example:
357 /**
358 * \brief
359 * Source-Filter-Sink
360 * This example shows how a filter can be used in an execution plan,
361 * along with the source and sink operations. The output from the
362 * exeuction plan is obtained as a table via the sink node.
363 * \param exec_context : execution context
364 * \return arrow::Status
365 */
366 arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
367 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
368 cp::ExecPlan::Make(&exec_context));
369
370 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
371
372 auto options = std::make_shared<arrow::dataset::ScanOptions>();
373 // // specify the filter. This filter removes all rows where the
374 // value of the "a" column is greater than 3.
375 cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
376 // set filter for scanner : on-disk / push-down filtering.
377 // This step can be skipped if you are not reading from disk.
378 options->filter = filter_opt;
379 // empty projection
380 options->projection = cp::project({}, {});
381
382 // construct the scan node
383 std::cout << "Initialized Scanning Options" << std::endl;
384
385 cp::ExecNode* scan;
386
387 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
388 std::cout << "Scan node options created" << std::endl;
389
390 ARROW_ASSIGN_OR_RAISE(scan,
391 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
392
393 // pipe the scan node into a filter node
394 // // Need to set the filter in scan node options and filter node options.
395 // // At scan node it is used for on-disk / push-down filtering.
396 // // At filter node it is used for in-memory filtering.
397 cp::ExecNode* filter;
398 ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
399 cp::FilterNodeOptions{filter_opt}));
400
401 // // finally, pipe the filter node into a sink node
402 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
403 ARROW_RETURN_NOT_OK(
404 cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
405
406 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
407 }
project
¶
project
operation rearranges, deletes, transforms, and creates columns.
Each output column is computed by evaluating an expression
against the source record batch. This is exposed via
arrow::compute::ProjectNodeOptions
which requires,
an arrow::compute::Expression
and name for each of the output columns (if names are not
provided, the string representations of exprs will be used).
Project example:
412 /**
413 * \brief
414 * Scan-Project-Sink
415 * This example shows how Scan operation can be used to load the data
416 * into the execution plan, how project operation can be applied on the
417 * data stream and how the output is obtained as a table via the sink node.
418 *
419 * \param exec_context : execution context
420 * \return arrow::Status
421 */
422 arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
423 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
424 cp::ExecPlan::Make(&exec_context));
425
426 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
427
428 auto options = std::make_shared<arrow::dataset::ScanOptions>();
429 // projection
430 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
431 options->projection = cp::project({}, {});
432
433 cp::ExecNode* scan;
434
435 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
436
437 ARROW_ASSIGN_OR_RAISE(scan,
438 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
439
440 cp::ExecNode* project;
441 ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
442 cp::ProjectNodeOptions{{a_times_2}}));
443 // schema after projection => multiply(a, 2): int64
444 std::cout << "Schema after projection : \n"
445 << project->output_schema()->ToString() << std::endl;
446
447 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
448 ARROW_RETURN_NOT_OK(
449 cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
450 auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
451
452 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
453 }
aggregate
¶
The aggregate
node computes various types of aggregates over data.
Arrow supports two types of aggregates: “scalar” aggregates, and
“hash” aggregates. Scalar aggregates reduce an array or scalar input
to a single scalar output (e.g. computing the mean of a column). Hash
aggregates act like GROUP BY
in SQL and first partition data based
on one or more key columns, then reduce the data in each
partition. The aggregate
node supports both types of computation,
and can compute any number of aggregations at once.
arrow::compute::AggregateNodeOptions
is used to define the
aggregation criteria. It takes a list of aggregation functions and
their options; a list of target fields to aggregate, one per function;
and a list of names for the output fields, one per function.
Optionally, it takes a list of columns that are used to partition the
data, in the case of a hash aggregation. The aggregation functions
can be selected from this list of aggregation functions.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.
Scalar Aggregation example:
458 /**
459 * \brief
460 * Source-Aggregation-Sink
461 * This example shows how an aggregation operation can be applied on a
462 * execution plan resulting a scalar output. The source node loads the
463 * data and the aggregation (counting unique types in column 'a')
464 * is applied on this data. The output is obtained from the sink node as a table.
465 * \param exec_context : execution context
466 * \return arrow::Status
467 */
468 arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
469 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
470 cp::ExecPlan::Make(&exec_context));
471
472 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
473
474 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
475
476 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
477
478 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
479 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
480 auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
481 /*targets=*/{"a"},
482 /*names=*/{"sum(a)"}};
483 ARROW_ASSIGN_OR_RAISE(
484 cp::ExecNode * aggregate,
485 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
486
487 ARROW_RETURN_NOT_OK(
488 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
489 auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
490
491 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
492 }
Group Aggregation example:
496 /**
497 * \brief
498 * Source-Aggregation-Sink
499 * This example shows how an aggregation operation can be applied on a
500 * execution plan resulting a grouped output. The source node loads the
501 * data and the aggregation (counting unique types in column 'a') is
502 * applied on this data. The output is obtained from the sink node as a table.
503 * \param exec_context : execution context
504 * \return arrow::Status
505 */
506 arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
507 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
508 cp::ExecPlan::Make(&exec_context));
509
510 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
511
512 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
513
514 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
515
516 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
517 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
518 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
519 auto aggregate_options =
520 cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
521 /*targets=*/{"a"},
522 /*names=*/{"count(a)"},
523 /*keys=*/{"b"}};
524 ARROW_ASSIGN_OR_RAISE(
525 cp::ExecNode * aggregate,
526 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
527
528 ARROW_RETURN_NOT_OK(
529 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
530 auto schema = arrow::schema({
531 arrow::field("count(a)", arrow::int32()),
532 arrow::field("b", arrow::boolean()),
533 });
534
535 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
536 }
sink
¶
sink
operation provides output and is the final node of a streaming
execution definition. arrow::compute::SinkNodeOptions
interface is used to pass
the required options. Similar to the source operator the sink operator exposes the output
with a function that returns a record batch future each time it is called. It is expected the
caller will repeatedly call this function until the generator function is exhausted (returns
arrow::util::optional::nullopt
). If this function is not called often enough then record batches
will accumulate in memory. An execution plan should only have one
“terminal” node (one sink node). An ExecPlan
can terminate early due to cancellation or
an error, before the output is fully consumed. However, the plan can be safely destroyed independently
of the sink, which will hold the unconsumed batches by exec_plan->finished().
As a part of the Source Example, the Sink operation is also included;
326 /**
327 * \brief
328 * Source-Sink Example
329 * This example shows how a source and sink can be used
330 * in an execution plan. This includes source node receiving data
331 * and the sink node emits the data as an output represented in
332 * a table.
333 * \param exec_context : execution context
334 * \return arrow::Status
335 */
336 arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
337 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
338 cp::ExecPlan::Make(&exec_context));
339
340 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
341
342 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
343
344 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
345
346 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
347 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
348
349 ARROW_RETURN_NOT_OK(
350 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
351
352 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
353 }
consuming_sink
¶
consuming_sink
operator is a sink operation containing consuming operation within the
execution plan (i.e. the exec plan should not complete until the consumption has completed).
Unlike the sink
node this node takes in a callback function that is expected to consume the
batch. Once this callback has finished the execution plan will no longer hold any reference to
the batch.
The consuming function may be called before a previous invocation has completed. If the consuming
function does not run quickly enough then many concurrent executions could pile up, blocking the
CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks
have been completed.
Once all batches have been delivered the execution plan will wait for the finish future to complete
before marking the execution plan finished. This allows for workflows where the consumption function
converts batches into async tasks (this is currently done internally for the dataset write node).
Example:
// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
batches_seen(batches_seen), finish(std::move(finish)) {}
// Consumption logic can be written here
arrow::Status Consume(cp::ExecBatch batch) override {
// data can be consumed in the expected way
// transfer to another system or just do some work
// and write to disk
(*batches_seen)++;
return arrow::Status::OK();
}
arrow::Future<> Finish() override { return finish; }
std::atomic<uint32_t> *batches_seen;
arrow::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
arrow::compute::ExecNode *consuming_sink;
ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
{source}, cp::ConsumingSinkNodeOptions(consumer)));
Consuming-Sink example:
540 /**
541 * \brief
542 * Source-ConsumingSink
543 * This example shows how the data can be consumed within the execution plan
544 * by using a ConsumingSink node. There is no data output from this execution plan.
545 * \param exec_context : execution context
546 * \return arrow::Status
547 */
548 arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
549 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
550 cp::ExecPlan::Make(&exec_context));
551
552 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
553
554 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
555
556 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
557 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
558
559 std::atomic<uint32_t> batches_seen{0};
560 arrow::Future<> finish = arrow::Future<>::Make();
561 struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
562 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
563 : batches_seen(batches_seen), finish(std::move(finish)) {}
564
565 arrow::Status Consume(cp::ExecBatch batch) override {
566 (*batches_seen)++;
567 return arrow::Status::OK();
568 }
569
570 arrow::Future<> Finish() override { return finish; }
571
572 std::atomic<uint32_t>* batches_seen;
573 arrow::Future<> finish;
574 };
575 std::shared_ptr<CustomSinkNodeConsumer> consumer =
576 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
577
578 cp::ExecNode* consuming_sink;
579
580 ARROW_ASSIGN_OR_RAISE(consuming_sink,
581 MakeExecNode("consuming_sink", plan.get(), {source},
582 cp::ConsumingSinkNodeOptions(consumer)));
583
584 ARROW_RETURN_NOT_OK(consuming_sink->Validate());
585
586 ARROW_RETURN_NOT_OK(plan->Validate());
587 std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
588 // plan start producing
589 ARROW_RETURN_NOT_OK(plan->StartProducing());
590 // Source should finish fairly quickly
591 ARROW_RETURN_NOT_OK(source->finished().status());
592 std::cout << "Source Finished!" << std::endl;
593 // Mark consumption complete, plan should finish
594 finish.MarkFinished(arrow::Status::OK());
595 ARROW_RETURN_NOT_OK(plan->finished().status());
596 return arrow::Status::OK();
597 }
order_by_sink
¶
order_by_sink
operation is an extension to the sink
operation.
This operation provides the ability to guarantee the ordering of the
stream by providing the arrow::compute::OrderBySinkNodeOptions
.
Here the arrow::compute::SortOptions
are provided to define which columns
are used for sorting and whether to sort by ascending or descending values.
Note
This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
Order-By-Sink example:
601 /**
602 * \brief
603 * Source-OrderBySink
604 * In this example, the data enters through the source node
605 * and the data is ordered in the sink node. The order can be
606 * ASCENDING or DESCENDING and it is configurable. The output
607 * is obtained as a table from the sink node.
608 * \param exec_context : execution context
609 * \return arrow::Status
610 */
611 arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
612 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
613 cp::ExecPlan::Make(&exec_context));
614
615 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
616
617 std::cout << "basic data created" << std::endl;
618
619 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
620
621 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
622 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
623 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
624
625 ARROW_RETURN_NOT_OK(cp::MakeExecNode(
626 "order_by_sink", plan.get(), {source},
627 cp::OrderBySinkNodeOptions{
628 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
629
630 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
631 }
select_k_sink
¶
select_k_sink
option enables selecting the top/bottom K elements,
similar to a SQL ORDER BY ... LIMIT K
clause.
arrow::compute::SelectKOptions
which is a defined by
using OrderBySinkNode
definition. This option returns a sink node that receives
inputs and then compute top_k/bottom_k.
Note
This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
SelectK example:
683 /**
684 * \brief
685 * Source-KSelect
686 * This example shows how K number of elements can be selected
687 * either from the top or bottom. The output node is a modified
688 * sink node where output can be obtained as a table.
689 * \param exec_context : execution context
690 * \return arrow::Status
691 */
692 arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
693 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
694 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
695 cp::ExecPlan::Make(&exec_context));
696 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
697
698 ARROW_ASSIGN_OR_RAISE(
699 cp::ExecNode * source,
700 cp::MakeExecNode("source", plan.get(), {},
701 cp::SourceNodeOptions{input.schema, input.gen()}));
702
703 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
704
705 ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
706 cp::SelectKSinkNodeOptions{options, &sink_gen}));
707
708 auto schema = arrow::schema(
709 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
710
711 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
712 }
scan
¶
scan
is an operation used to load and process datasets. It should be preferred over the
more generic source
node when your input is a dataset. The behavior is defined using
arrow::dataset::ScanNodeOptions
. More information on datasets and the various
scan options can be found in Tabular Datasets.
This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.
Scan example:
289 /**
290 * \brief
291 * Scan-Sink
292 * This example shows how scan operation can be applied on a dataset.
293 * There are operations that can be applied on the scan (project, filter)
294 * and the input data can be processed. THe output is obtained as a table
295 * via the sink node.
296 * \param exec_context : execution context
297 * \return arrow::Status
298 */
299 arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
300 // Execution plan created
301 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
302 cp::ExecPlan::Make(&exec_context));
303
304 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
305
306 auto options = std::make_shared<arrow::dataset::ScanOptions>();
307 options->projection = cp::project({}, {}); // create empty projection
308
309 // construct the scan node
310 cp::ExecNode* scan;
311 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
312
313 ARROW_ASSIGN_OR_RAISE(scan,
314 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
315
316 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
317
318 ARROW_RETURN_NOT_OK(
319 cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
320
321 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
322 }
write
¶
The write
node saves query results as a dataset of files in a
format like Parquet, Feather, CSV, etc. using the Tabular Datasets
functionality in Arrow. The write options are provided via the
arrow::dataset::WriteNodeOptions
which in turn contains
arrow::dataset::FileSystemDatasetWriteOptions
.
arrow::dataset::FileSystemDatasetWriteOptions
provides
control over the written dataset, including options like the output
directory, file naming scheme, and so on.
Write example:
717 /**
718 * \brief
719 * Scan-Filter-Write
720 * This example shows how scan node can be used to load the data
721 * and after processing how it can be written to disk.
722 * \param exec_context : execution context
723 * \param file_path : file saving path
724 * \return arrow::Status
725 */
726 arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
727 const std::string& file_path) {
728 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
729 cp::ExecPlan::Make(&exec_context));
730
731 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
732
733 auto options = std::make_shared<arrow::dataset::ScanOptions>();
734 // empty projection
735 options->projection = cp::project({}, {});
736
737 cp::ExecNode* scan;
738
739 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
740
741 ARROW_ASSIGN_OR_RAISE(scan,
742 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
743
744 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
745
746 std::string root_path = "";
747 std::string uri = "file://" + file_path;
748 std::shared_ptr<arrow::fs::FileSystem> filesystem =
749 arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
750
751 auto base_path = root_path + "/parquet_dataset";
752 // Uncomment the following line, if run repeatedly
753 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
754 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
755
756 // The partition schema determines which fields are part of the partitioning.
757 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
758 // We'll use Hive-style partitioning,
759 // which creates directories with "key=value" pairs.
760
761 auto partitioning =
762 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
763 // We'll write Parquet files.
764 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
765
766 arrow::dataset::FileSystemDatasetWriteOptions write_options;
767 write_options.file_write_options = format->DefaultWriteOptions();
768 write_options.filesystem = filesystem;
769 write_options.base_dir = base_path;
770 write_options.partitioning = partitioning;
771 write_options.basename_template = "part{i}.parquet";
772
773 arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()};
774
775 ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
776
777 ARROW_RETURN_NOT_OK(plan->Validate());
778 std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
779 // // // start the ExecPlan
780 ARROW_RETURN_NOT_OK(plan->StartProducing());
781 auto future = plan->finished();
782 ARROW_RETURN_NOT_OK(future.status());
783 future.Wait();
784 return arrow::Status::OK();
785 }
union
¶
union
merges multiple data streams with the same schema into one, similar to
a SQL UNION ALL
clause.
The following example demonstrates how this can be achieved using two data sources.
Union example:
791 /**
792 * \brief
793 * Source-Union-Sink
794 * This example shows how a union operation can be applied on two
795 * data sources. The output is obtained as a table via the sink
796 * node.
797 * \param exec_context : execution context
798 * \return arrow::Status
799 */
800 arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
801 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
802
803 std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
804 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
805
806 cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
807 cp::Declaration lhs{"source",
808 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
809 lhs.label = "lhs";
810 cp::Declaration rhs{"source",
811 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
812 rhs.label = "rhs";
813 union_node.inputs.emplace_back(lhs);
814 union_node.inputs.emplace_back(rhs);
815
816 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
817 ARROW_ASSIGN_OR_RAISE(
818 auto declr, cp::Declaration::Sequence({
819 union_node,
820 {"sink", cp::SinkNodeOptions{&sink_gen}},
821 })
822 .AddToPlan(plan.get()));
823
824 ARROW_RETURN_NOT_OK(declr->Validate());
825
826 ARROW_RETURN_NOT_OK(plan->Validate());
827 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
828 }
hash_join
¶
hash_join
operation provides the relational algebra operation, join using hash-based
algorithm. arrow::compute::HashJoinNodeOptions
contains the options required in
defining a join. The hash_join supports
left/right/full semi/anti/outerjoins.
Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x”
which can be appended as a suffix for column names duplicated in both left and right
relations.) can be set via the the join options.
Read more on hash-joins.
Hash-Join example:
637 /**
638 * \brief
639 * Source-HashJoin-Sink
640 * This example shows how source node gets the data and how a self-join
641 * is applied on the data. The join options are configurable. The output
642 * is obtained as a table via the sink node.
643 * \param exec_context : execution context
644 * \return arrow::Status
645 */
646 arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
647 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
648 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
649 cp::ExecPlan::Make(&exec_context));
650
651 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
652
653 cp::ExecNode* left_source;
654 cp::ExecNode* right_source;
655 for (auto source : {&left_source, &right_source}) {
656 ARROW_ASSIGN_OR_RAISE(*source,
657 MakeExecNode("source", plan.get(), {},
658 cp::SourceNodeOptions{input.schema, input.gen()}));
659 }
660
661 cp::HashJoinNodeOptions join_opts{
662 cp::JoinType::INNER,
663 /*left_keys=*/{"str"},
664 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
665
666 ARROW_ASSIGN_OR_RAISE(
667 auto hashjoin,
668 cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
669
670 ARROW_RETURN_NOT_OK(
671 cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
672 // expected columns i32, str, l_str, r_str
673 auto schema = arrow::schema(
674 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
675 arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
676
677 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
678 }
Summary¶
There are examples of these nodes which can be found in
cpp/examples/arrow/execution_plan_documentation_examples.cc
in the Arrow source.
Complete Example:
19 #include <arrow/array.h>
20 #include <arrow/builder.h>
21
22 #include <arrow/compute/api.h>
23 #include <arrow/compute/api_vector.h>
24 #include <arrow/compute/cast.h>
25 #include <arrow/compute/exec/exec_plan.h>
26
27 #include <arrow/csv/api.h>
28
29 #include <arrow/dataset/dataset.h>
30 #include <arrow/dataset/file_base.h>
31 #include <arrow/dataset/file_parquet.h>
32 #include <arrow/dataset/plan.h>
33 #include <arrow/dataset/scanner.h>
34
35 #include <arrow/io/interfaces.h>
36 #include <arrow/io/memory.h>
37
38 #include <arrow/result.h>
39 #include <arrow/status.h>
40 #include <arrow/table.h>
41
42 #include <arrow/ipc/api.h>
43
44 #include <arrow/util/future.h>
45 #include <arrow/util/range.h>
46 #include <arrow/util/thread_pool.h>
47 #include <arrow/util/vector.h>
48
49 #include <iostream>
50 #include <memory>
51 #include <utility>
52
53 // Demonstrate various operators in Arrow Streaming Execution Engine
54
55 namespace cp = ::arrow::compute;
56
57 constexpr char kSep[] = "******";
58
59 void PrintBlock(const std::string& msg) {
60 std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
61 }
62
63 template <typename TYPE,
64 typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
65 arrow::is_boolean_type<TYPE>::value |
66 arrow::is_temporal_type<TYPE>::value>::type>
67 arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
68 const std::vector<typename TYPE::c_type>& values) {
69 using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
70 using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
71 ARROW_BUILDER_TYPE builder;
72 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
73 std::shared_ptr<ARROW_ARRAY_TYPE> array;
74 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
75 ARROW_RETURN_NOT_OK(builder.Finish(&array));
76 return array;
77 }
78
79 template <class TYPE>
80 arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
81 const std::vector<std::string>& values) {
82 using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
83 using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
84 ARROW_BUILDER_TYPE builder;
85 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
86 std::shared_ptr<ARROW_ARRAY_TYPE> array;
87 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
88 ARROW_RETURN_NOT_OK(builder.Finish(&array));
89 return array;
90 }
91
92 arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
93 const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
94 std::shared_ptr<arrow::RecordBatch> record_batch;
95 ARROW_ASSIGN_OR_RAISE(auto struct_result,
96 arrow::StructArray::Make(array_vector, field_vector));
97 return record_batch->FromStructArray(struct_result);
98 }
99
100 /**
101 * \brief Get the Dataset object
102 * Creating Dataset
103 * a, b
104 1,null
105 2,true
106 null,true
107 3,false
108 null,true
109 4,false
110 5,null
111 6,false
112 7,false
113 8,true
114 * \return arrow::Result<std::shared_ptr<arrow::dataset::Dataset>>
115 */
116 arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
117 auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
118 ARROW_ASSIGN_OR_RAISE(auto int64_array,
119 GetArrayDataSample<arrow::Int64Type>(
120 {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
121
122 arrow::BooleanBuilder boolean_builder;
123 std::shared_ptr<arrow::BooleanArray> bool_array;
124
125 std::vector<uint8_t> bool_values = {false, true, true, false, true,
126 false, false, false, false, true};
127 std::vector<bool> is_valid = {false, true, true, true, true,
128 true, false, true, true, true};
129
130 ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
131
132 ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
133
134 ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
135
136 auto record_batch =
137 arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
138 arrow::field("b", arrow::boolean())}),
139 10, {int64_array, bool_array});
140 ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
141 auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
142 return ds;
143 }
144
145 arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
146 const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
147 std::shared_ptr<arrow::RecordBatch> record_batch;
148 ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
149 cp::ExecBatch batch{*res_batch};
150 return batch;
151 }
152
153 // (Doc section: BatchesWithSchema Definition)
154 struct BatchesWithSchema {
155 std::vector<cp::ExecBatch> batches;
156 std::shared_ptr<arrow::Schema> schema;
157 // // This method uses internal arrow utilities to
158 // // convert a vector of record batches to an AsyncGenerator of optional batches
159 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
160 auto opt_batches = ::arrow::internal::MapVector(
161 [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
162 batches);
163 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
164 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
165 return gen;
166 }
167 };
168 // (Doc section: BatchesWithSchema Definition)
169
170 // (Doc section: MakeBasicBatches Definition)
171 arrow::Result<BatchesWithSchema> MakeBasicBatches() {
172 BatchesWithSchema out;
173 auto field_vector = {arrow::field("a", arrow::int32()),
174 arrow::field("b", arrow::boolean())};
175 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
176 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
177 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
178
179 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
180 GetArrayDataSample<arrow::BooleanType>({false, true}));
181 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
182 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
183 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
184 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
185
186 ARROW_ASSIGN_OR_RAISE(auto b1,
187 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
188 ARROW_ASSIGN_OR_RAISE(auto b2,
189 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
190 ARROW_ASSIGN_OR_RAISE(auto b3,
191 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
192
193 out.batches = {b1, b2, b3};
194 out.schema = arrow::schema(field_vector);
195 return out;
196 }
197 // (Doc section: MakeBasicBatches Definition)
198
199 arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
200 BatchesWithSchema out;
201 auto field = arrow::field("a", arrow::int32());
202 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
203 ARROW_ASSIGN_OR_RAISE(auto b2_int,
204 GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
205 ARROW_ASSIGN_OR_RAISE(auto b3_int,
206 GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
207 ARROW_ASSIGN_OR_RAISE(auto b4_int,
208 GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
209 ARROW_ASSIGN_OR_RAISE(auto b5_int,
210 GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
211 ARROW_ASSIGN_OR_RAISE(auto b6_int,
212 GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
213 ARROW_ASSIGN_OR_RAISE(auto b7_int,
214 GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
215 ARROW_ASSIGN_OR_RAISE(auto b8_int,
216 GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
217
218 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
219 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
220 ARROW_ASSIGN_OR_RAISE(auto b3,
221 GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
222 ARROW_ASSIGN_OR_RAISE(auto b4,
223 GetExecBatchFromVectors({field, field, field, field},
224 {b4_int, b5_int, b6_int, b7_int}));
225 out.batches = {b1, b2, b3, b4};
226 out.schema = arrow::schema({field});
227 return out;
228 }
229
230 arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
231 BatchesWithSchema out;
232 auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
233 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
234 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
235 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
236 ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
237 {"alpha", "beta", "alpha"}));
238 ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
239 {"alpha", "gamma", "alpha"}));
240 ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
241 {"gamma", "beta", "alpha"}));
242 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
243 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
244 ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
245 out.batches = {b1, b2, b3};
246
247 size_t batch_count = out.batches.size();
248 for (int repeat = 1; repeat < multiplicity; ++repeat) {
249 for (size_t i = 0; i < batch_count; ++i) {
250 out.batches.push_back(out.batches[i]);
251 }
252 }
253
254 out.schema = arrow::schema(fields);
255 return out;
256 }
257
258 arrow::Status ExecutePlanAndCollectAsTable(
259 cp::ExecContext& exec_context, std::shared_ptr<cp::ExecPlan> plan,
260 std::shared_ptr<arrow::Schema> schema,
261 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen) {
262 // // translate sink_gen (async) to sink_reader (sync)
263 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
264 cp::MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
265
266 // validate the ExecPlan
267 ARROW_RETURN_NOT_OK(plan->Validate());
268 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
269 // // start the ExecPlan
270 ARROW_RETURN_NOT_OK(plan->StartProducing());
271
272 // // collect sink_reader into a Table
273 std::shared_ptr<arrow::Table> response_table;
274
275 ARROW_ASSIGN_OR_RAISE(response_table,
276 arrow::Table::FromRecordBatchReader(sink_reader.get()));
277
278 std::cout << "Results : " << response_table->ToString() << std::endl;
279
280 // // stop producing
281 plan->StopProducing();
282 // // plan mark finished
283 auto future = plan->finished();
284 return future.status();
285 }
286
287 // (Doc section: Scan Example)
288 /**
289 * \brief
290 * Scan-Sink
291 * This example shows how scan operation can be applied on a dataset.
292 * There are operations that can be applied on the scan (project, filter)
293 * and the input data can be processed. THe output is obtained as a table
294 * via the sink node.
295 * \param exec_context : execution context
296 * \return arrow::Status
297 */
298 arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
299 // Execution plan created
300 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
301 cp::ExecPlan::Make(&exec_context));
302
303 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
304
305 auto options = std::make_shared<arrow::dataset::ScanOptions>();
306 options->projection = cp::project({}, {}); // create empty projection
307
308 // construct the scan node
309 cp::ExecNode* scan;
310 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
311
312 ARROW_ASSIGN_OR_RAISE(scan,
313 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
314
315 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
316
317 ARROW_RETURN_NOT_OK(
318 cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
319
320 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
321 }
322 // (Doc section: Scan Example)
323
324 // (Doc section: Source Example)
325 /**
326 * \brief
327 * Source-Sink Example
328 * This example shows how a source and sink can be used
329 * in an execution plan. This includes source node receiving data
330 * and the sink node emits the data as an output represented in
331 * a table.
332 * \param exec_context : execution context
333 * \return arrow::Status
334 */
335 arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
336 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
337 cp::ExecPlan::Make(&exec_context));
338
339 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
340
341 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
342
343 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
344
345 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
346 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
347
348 ARROW_RETURN_NOT_OK(
349 cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
350
351 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
352 }
353 // (Doc section: Source Example)
354
355 // (Doc section: Filter Example)
356 /**
357 * \brief
358 * Source-Filter-Sink
359 * This example shows how a filter can be used in an execution plan,
360 * along with the source and sink operations. The output from the
361 * exeuction plan is obtained as a table via the sink node.
362 * \param exec_context : execution context
363 * \return arrow::Status
364 */
365 arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
366 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
367 cp::ExecPlan::Make(&exec_context));
368
369 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
370
371 auto options = std::make_shared<arrow::dataset::ScanOptions>();
372 // // specify the filter. This filter removes all rows where the
373 // value of the "a" column is greater than 3.
374 cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
375 // set filter for scanner : on-disk / push-down filtering.
376 // This step can be skipped if you are not reading from disk.
377 options->filter = filter_opt;
378 // empty projection
379 options->projection = cp::project({}, {});
380
381 // construct the scan node
382 std::cout << "Initialized Scanning Options" << std::endl;
383
384 cp::ExecNode* scan;
385
386 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
387 std::cout << "Scan node options created" << std::endl;
388
389 ARROW_ASSIGN_OR_RAISE(scan,
390 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
391
392 // pipe the scan node into a filter node
393 // // Need to set the filter in scan node options and filter node options.
394 // // At scan node it is used for on-disk / push-down filtering.
395 // // At filter node it is used for in-memory filtering.
396 cp::ExecNode* filter;
397 ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
398 cp::FilterNodeOptions{filter_opt}));
399
400 // // finally, pipe the filter node into a sink node
401 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
402 ARROW_RETURN_NOT_OK(
403 cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
404
405 return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
406 }
407
408 // (Doc section: Filter Example)
409
410 // (Doc section: Project Example)
411 /**
412 * \brief
413 * Scan-Project-Sink
414 * This example shows how Scan operation can be used to load the data
415 * into the execution plan, how project operation can be applied on the
416 * data stream and how the output is obtained as a table via the sink node.
417 *
418 * \param exec_context : execution context
419 * \return arrow::Status
420 */
421 arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
422 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
423 cp::ExecPlan::Make(&exec_context));
424
425 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
426
427 auto options = std::make_shared<arrow::dataset::ScanOptions>();
428 // projection
429 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
430 options->projection = cp::project({}, {});
431
432 cp::ExecNode* scan;
433
434 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
435
436 ARROW_ASSIGN_OR_RAISE(scan,
437 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
438
439 cp::ExecNode* project;
440 ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
441 cp::ProjectNodeOptions{{a_times_2}}));
442 // schema after projection => multiply(a, 2): int64
443 std::cout << "Schema after projection : \n"
444 << project->output_schema()->ToString() << std::endl;
445
446 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
447 ARROW_RETURN_NOT_OK(
448 cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
449 auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
450
451 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
452 }
453
454 // (Doc section: Project Example)
455
456 // (Doc section: Scalar Aggregate Example)
457 /**
458 * \brief
459 * Source-Aggregation-Sink
460 * This example shows how an aggregation operation can be applied on a
461 * execution plan resulting a scalar output. The source node loads the
462 * data and the aggregation (counting unique types in column 'a')
463 * is applied on this data. The output is obtained from the sink node as a table.
464 * \param exec_context : execution context
465 * \return arrow::Status
466 */
467 arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
468 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
469 cp::ExecPlan::Make(&exec_context));
470
471 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
472
473 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
474
475 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
476
477 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
478 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
479 auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
480 /*targets=*/{"a"},
481 /*names=*/{"sum(a)"}};
482 ARROW_ASSIGN_OR_RAISE(
483 cp::ExecNode * aggregate,
484 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
485
486 ARROW_RETURN_NOT_OK(
487 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
488 auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
489
490 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
491 }
492 // (Doc section: Scalar Aggregate Example)
493
494 // (Doc section: Group Aggregate Example)
495 /**
496 * \brief
497 * Source-Aggregation-Sink
498 * This example shows how an aggregation operation can be applied on a
499 * execution plan resulting a grouped output. The source node loads the
500 * data and the aggregation (counting unique types in column 'a') is
501 * applied on this data. The output is obtained from the sink node as a table.
502 * \param exec_context : execution context
503 * \return arrow::Status
504 */
505 arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
506 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
507 cp::ExecPlan::Make(&exec_context));
508
509 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
510
511 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
512
513 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
514
515 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
516 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
517 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
518 auto aggregate_options =
519 cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
520 /*targets=*/{"a"},
521 /*names=*/{"count(a)"},
522 /*keys=*/{"b"}};
523 ARROW_ASSIGN_OR_RAISE(
524 cp::ExecNode * aggregate,
525 cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
526
527 ARROW_RETURN_NOT_OK(
528 cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
529 auto schema = arrow::schema({
530 arrow::field("count(a)", arrow::int32()),
531 arrow::field("b", arrow::boolean()),
532 });
533
534 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
535 }
536 // (Doc section: Group Aggregate Example)
537
538 // (Doc section: ConsumingSink Example)
539 /**
540 * \brief
541 * Source-ConsumingSink
542 * This example shows how the data can be consumed within the execution plan
543 * by using a ConsumingSink node. There is no data output from this execution plan.
544 * \param exec_context : execution context
545 * \return arrow::Status
546 */
547 arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
548 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
549 cp::ExecPlan::Make(&exec_context));
550
551 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
552
553 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
554
555 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
556 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
557
558 std::atomic<uint32_t> batches_seen{0};
559 arrow::Future<> finish = arrow::Future<>::Make();
560 struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
561 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
562 : batches_seen(batches_seen), finish(std::move(finish)) {}
563
564 arrow::Status Consume(cp::ExecBatch batch) override {
565 (*batches_seen)++;
566 return arrow::Status::OK();
567 }
568
569 arrow::Future<> Finish() override { return finish; }
570
571 std::atomic<uint32_t>* batches_seen;
572 arrow::Future<> finish;
573 };
574 std::shared_ptr<CustomSinkNodeConsumer> consumer =
575 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
576
577 cp::ExecNode* consuming_sink;
578
579 ARROW_ASSIGN_OR_RAISE(consuming_sink,
580 MakeExecNode("consuming_sink", plan.get(), {source},
581 cp::ConsumingSinkNodeOptions(consumer)));
582
583 ARROW_RETURN_NOT_OK(consuming_sink->Validate());
584
585 ARROW_RETURN_NOT_OK(plan->Validate());
586 std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
587 // plan start producing
588 ARROW_RETURN_NOT_OK(plan->StartProducing());
589 // Source should finish fairly quickly
590 ARROW_RETURN_NOT_OK(source->finished().status());
591 std::cout << "Source Finished!" << std::endl;
592 // Mark consumption complete, plan should finish
593 finish.MarkFinished(arrow::Status::OK());
594 ARROW_RETURN_NOT_OK(plan->finished().status());
595 return arrow::Status::OK();
596 }
597 // (Doc section: ConsumingSink Example)
598
599 // (Doc section: OrderBySink Example)
600
601 /**
602 * \brief
603 * Source-OrderBySink
604 * In this example, the data enters through the source node
605 * and the data is ordered in the sink node. The order can be
606 * ASCENDING or DESCENDING and it is configurable. The output
607 * is obtained as a table from the sink node.
608 * \param exec_context : execution context
609 * \return arrow::Status
610 */
611 arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
612 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
613 cp::ExecPlan::Make(&exec_context));
614
615 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
616
617 std::cout << "basic data created" << std::endl;
618
619 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
620
621 auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
622 ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
623 cp::MakeExecNode("source", plan.get(), {}, source_node_options));
624
625 ARROW_RETURN_NOT_OK(cp::MakeExecNode(
626 "order_by_sink", plan.get(), {source},
627 cp::OrderBySinkNodeOptions{
628 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
629
630 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
631 }
632
633 // (Doc section: OrderBySink Example)
634
635 // (Doc section: HashJoin Example)
636 /**
637 * \brief
638 * Source-HashJoin-Sink
639 * This example shows how source node gets the data and how a self-join
640 * is applied on the data. The join options are configurable. The output
641 * is obtained as a table via the sink node.
642 * \param exec_context : execution context
643 * \return arrow::Status
644 */
645 arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
646 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
647 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
648 cp::ExecPlan::Make(&exec_context));
649
650 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
651
652 cp::ExecNode* left_source;
653 cp::ExecNode* right_source;
654 for (auto source : {&left_source, &right_source}) {
655 ARROW_ASSIGN_OR_RAISE(*source,
656 MakeExecNode("source", plan.get(), {},
657 cp::SourceNodeOptions{input.schema, input.gen()}));
658 }
659
660 cp::HashJoinNodeOptions join_opts{
661 cp::JoinType::INNER,
662 /*left_keys=*/{"str"},
663 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
664
665 ARROW_ASSIGN_OR_RAISE(
666 auto hashjoin,
667 cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
668
669 ARROW_RETURN_NOT_OK(
670 cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
671 // expected columns i32, str, l_str, r_str
672 auto schema = arrow::schema(
673 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
674 arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
675
676 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
677 }
678
679 // (Doc section: HashJoin Example)
680
681 // (Doc section: KSelect Example)
682 /**
683 * \brief
684 * Source-KSelect
685 * This example shows how K number of elements can be selected
686 * either from the top or bottom. The output node is a modified
687 * sink node where output can be obtained as a table.
688 * \param exec_context : execution context
689 * \return arrow::Status
690 */
691 arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
692 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
693 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
694 cp::ExecPlan::Make(&exec_context));
695 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
696
697 ARROW_ASSIGN_OR_RAISE(
698 cp::ExecNode * source,
699 cp::MakeExecNode("source", plan.get(), {},
700 cp::SourceNodeOptions{input.schema, input.gen()}));
701
702 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
703
704 ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
705 cp::SelectKSinkNodeOptions{options, &sink_gen}));
706
707 auto schema = arrow::schema(
708 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
709
710 return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
711 }
712
713 // (Doc section: KSelect Example)
714
715 // (Doc section: Write Example)
716
717 /**
718 * \brief
719 * Scan-Filter-Write
720 * This example shows how scan node can be used to load the data
721 * and after processing how it can be written to disk.
722 * \param exec_context : execution context
723 * \param file_path : file saving path
724 * \return arrow::Status
725 */
726 arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
727 const std::string& file_path) {
728 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
729 cp::ExecPlan::Make(&exec_context));
730
731 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
732
733 auto options = std::make_shared<arrow::dataset::ScanOptions>();
734 // empty projection
735 options->projection = cp::project({}, {});
736
737 cp::ExecNode* scan;
738
739 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
740
741 ARROW_ASSIGN_OR_RAISE(scan,
742 cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
743
744 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
745
746 std::string root_path = "";
747 std::string uri = "file://" + file_path;
748 std::shared_ptr<arrow::fs::FileSystem> filesystem =
749 arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
750
751 auto base_path = root_path + "/parquet_dataset";
752 // Uncomment the following line, if run repeatedly
753 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
754 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
755
756 // The partition schema determines which fields are part of the partitioning.
757 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
758 // We'll use Hive-style partitioning,
759 // which creates directories with "key=value" pairs.
760
761 auto partitioning =
762 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
763 // We'll write Parquet files.
764 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
765
766 arrow::dataset::FileSystemDatasetWriteOptions write_options;
767 write_options.file_write_options = format->DefaultWriteOptions();
768 write_options.filesystem = filesystem;
769 write_options.base_dir = base_path;
770 write_options.partitioning = partitioning;
771 write_options.basename_template = "part{i}.parquet";
772
773 arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()};
774
775 ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
776
777 ARROW_RETURN_NOT_OK(plan->Validate());
778 std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
779 // // // start the ExecPlan
780 ARROW_RETURN_NOT_OK(plan->StartProducing());
781 auto future = plan->finished();
782 ARROW_RETURN_NOT_OK(future.status());
783 future.Wait();
784 return arrow::Status::OK();
785 }
786
787 // (Doc section: Write Example)
788
789 // (Doc section: Union Example)
790
791 /**
792 * \brief
793 * Source-Union-Sink
794 * This example shows how a union operation can be applied on two
795 * data sources. The output is obtained as a table via the sink
796 * node.
797 * \param exec_context : execution context
798 * \return arrow::Status
799 */
800 arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
801 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
802
803 std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
804 arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
805
806 cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
807 cp::Declaration lhs{"source",
808 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
809 lhs.label = "lhs";
810 cp::Declaration rhs{"source",
811 cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
812 rhs.label = "rhs";
813 union_node.inputs.emplace_back(lhs);
814 union_node.inputs.emplace_back(rhs);
815
816 cp::CountOptions options(cp::CountOptions::ONLY_VALID);
817 ARROW_ASSIGN_OR_RAISE(
818 auto declr, cp::Declaration::Sequence({
819 union_node,
820 {"sink", cp::SinkNodeOptions{&sink_gen}},
821 })
822 .AddToPlan(plan.get()));
823
824 ARROW_RETURN_NOT_OK(declr->Validate());
825
826 ARROW_RETURN_NOT_OK(plan->Validate());
827 return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
828 }
829
830 // (Doc section: Union Example)
831
832 enum ExampleMode {
833 SOURCE_SINK = 0,
834 SCAN = 1,
835 FILTER = 2,
836 PROJECT = 3,
837 SCALAR_AGGREGATION = 4,
838 GROUP_AGGREGATION = 5,
839 CONSUMING_SINK = 6,
840 ORDER_BY_SINK = 7,
841 HASHJOIN = 8,
842 KSELECT = 9,
843 WRITE = 10,
844 UNION = 11,
845 };
846
847 int main(int argc, char** argv) {
848 if (argc < 2) {
849 // Fake success for CI purposes.
850 return EXIT_SUCCESS;
851 }
852
853 std::string base_save_path = argv[1];
854 int mode = std::atoi(argv[2]);
855 arrow::Status status;
856 // ensure arrow::dataset node factories are in the registry
857 arrow::dataset::internal::Initialize();
858 // execution context
859 cp::ExecContext exec_context(arrow::default_memory_pool(),
860 ::arrow::internal::GetCpuThreadPool());
861 switch (mode) {
862 case SOURCE_SINK:
863 PrintBlock("Source Sink Example");
864 status = SourceSinkExample(exec_context);
865 break;
866 case SCAN:
867 PrintBlock("Scan Example");
868 status = ScanSinkExample(exec_context);
869 break;
870 case FILTER:
871 PrintBlock("Filter Example");
872 status = ScanFilterSinkExample(exec_context);
873 break;
874 case PROJECT:
875 PrintBlock("Project Example");
876 status = ScanProjectSinkExample(exec_context);
877 break;
878 case GROUP_AGGREGATION:
879 PrintBlock("Aggregate Example");
880 status = SourceGroupAggregateSinkExample(exec_context);
881 break;
882 case SCALAR_AGGREGATION:
883 PrintBlock("Aggregate Example");
884 status = SourceScalarAggregateSinkExample(exec_context);
885 break;
886 case CONSUMING_SINK:
887 PrintBlock("Consuming-Sink Example");
888 status = SourceConsumingSinkExample(exec_context);
889 break;
890 case ORDER_BY_SINK:
891 PrintBlock("OrderBy Example");
892 status = SourceOrderBySinkExample(exec_context);
893 break;
894 case HASHJOIN:
895 PrintBlock("HashJoin Example");
896 status = SourceHashJoinSinkExample(exec_context);
897 break;
898 case KSELECT:
899 PrintBlock("KSelect Example");
900 status = SourceKSelectExample(exec_context);
901 break;
902 case WRITE:
903 PrintBlock("Write Example");
904 status = ScanFilterWriteExample(exec_context, base_save_path);
905 break;
906 case UNION:
907 PrintBlock("Union Example");
908 status = SourceUnionSinkExample(exec_context);
909 break;
910 default:
911 break;
912 }
913
914 if (status.ok()) {
915 return EXIT_SUCCESS;
916 } else {
917 std::cout << "Error occurred: " << status.message() << std::endl;
918 return EXIT_FAILURE;
919 }
920 }