Gandiva Expression, Projector, and Filter#
Building Expressions#
Gandiva provides a general expression representation where expressions are
represented by a tree of nodes. The expression trees are built using
TreeExprBuilder
. The leaves of the expression tree are typically
field references, created by TreeExprBuilder::MakeField()
, and
literal values, created by TreeExprBuilder::MakeLiteral()
. Nodes
can be combined into more complex expression trees using:
TreeExprBuilder::MakeFunction()
to create a function node. (You can callGetRegisteredFunctionSignatures()
to get a list of valid function signatures.)TreeExprBuilder::MakeIf()
to create if-else logic.TreeExprBuilder::MakeAnd()
andTreeExprBuilder::MakeOr()
to create boolean expressions. (For “not”, use thenot(bool)
function inMakeFunction
.)TreeExprBuilder::MakeInExpressionInt32()
and the other “in expression” functions to create set membership tests.
Each of these functions create new composite nodes, which contain the leaf nodes (literals and field references) or other composite nodes as children. By composing these, you can create arbitrarily complex expression trees.
Once an expression tree is built, they are wrapped in either Expression
or Condition
, depending on how they will be used.
Expression
is used in projections while Condition
is used in filters.
As an example, here is how to create an Expression representing x + 3
and a
Condition representing x < 3
:
std::shared_ptr<arrow::Field> field_x_raw = arrow::field("x", arrow::int32());
std::shared_ptr<Node> field_x = TreeExprBuilder::MakeField(field_x_raw);
std::shared_ptr<Node> literal_3 = TreeExprBuilder::MakeLiteral(3);
std::shared_ptr<arrow::Field> field_result = arrow::field("result", arrow::int32());
std::shared_ptr<Node> add_node =
TreeExprBuilder::MakeFunction("add", {field_x, literal_3}, arrow::int32());
std::shared_ptr<Expression> expression =
TreeExprBuilder::MakeExpression(add_node, field_result);
std::shared_ptr<Node> less_than_node =
TreeExprBuilder::MakeFunction("less_than", {field_x, literal_3}, arrow::boolean());
std::shared_ptr<Condition> condition = TreeExprBuilder::MakeCondition(less_than_node);
Projectors and Filters#
Gandiva’s two execution kernels are Projector
and
Filter
. Projector
consumes a record batch and projects
into a new record batch. Filter
consumes a record batch and produces a
SelectionVector
containing the indices that matched the condition.
For both Projector
and Filter
, optimization of the expression IR happens
when creating instances. They are compiled against a static schema, so the
schema of the record batches must be known at this point.
Continuing with the expression
and condition
created in the previous
section, here is an example of creating a Projector and a Filter:
std::shared_ptr<arrow::Schema> input_schema = arrow::schema({field_x_raw});
std::shared_ptr<arrow::Schema> output_schema = arrow::schema({field_result});
std::shared_ptr<Projector> projector;
Status status;
std::vector<std::shared_ptr<Expression>> expressions = {expression};
status = Projector::Make(input_schema, expressions, &projector);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<Filter> filter;
status = Filter::Make(input_schema, condition, &filter);
ARROW_RETURN_NOT_OK(status);
Once a Projector or Filter is created, it can be evaluated on Arrow record batches. These execution kernels are single-threaded on their own, but are designed to be reused to process distinct record batches in parallel.
Evaluating projections#
Execution is performed with Projector::Evaluate()
. This outputs
a vector of arrays, which can be passed along with the output schema to
arrow::RecordBatch::Make()
.
auto pool = arrow::default_memory_pool();
int num_records = 4;
arrow::Int32Builder builder;
int32_t values[4] = {1, 2, 3, 4};
ARROW_RETURN_NOT_OK(builder.AppendValues(values, 4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array, builder.Finish());
auto in_batch = arrow::RecordBatch::Make(input_schema, num_records, {array});
arrow::ArrayVector outputs;
status = projector->Evaluate(*in_batch, pool, &outputs);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::RecordBatch> result =
arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs);
Evaluating filters#
Filter::Evaluate()
produces SelectionVector
,
a vector of row indices that matched the filter condition. The selection vector
is a wrapper around an arrow integer array, parameterized by bitwidth. When
creating the selection vector (you must initialize it before passing to
Evaluate()
), you must choose the bitwidth, which determines the max index
value it can hold, and the max number of slots, which determines how many indices
it may contain. In general, the max number of slots should be set to your batch
size and the bitwidth the smallest integer size that can represent all integers
less than the batch size. For example, if your batch size is 100k, set the
maximum number of slots to 100k and the bitwidth to 32 (since 2^16 = 64k which
would be too small).
Once Evaluate()
has been run and the SelectionVector
is
populated, use the SelectionVector::ToArray()
method to get
the underlying array and then ::arrow::compute::Take()
to materialize the
output record batch.
std::shared_ptr<gandiva::SelectionVector> result_indices;
// Use 16-bit integers for indices. Result can be no longer than input size,
// so use batch num_rows as max_slots.
status = gandiva::SelectionVector::MakeInt16(/*max_slots=*/in_batch->num_rows(), pool,
&result_indices);
ARROW_RETURN_NOT_OK(status);
status = filter->Evaluate(*in_batch, result_indices);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::Array> take_indices = result_indices->ToArray();
Datum maybe_batch;
ARROW_ASSIGN_OR_RAISE(maybe_batch,
arrow::compute::Take(Datum(in_batch), Datum(take_indices),
TakeOptions::NoBoundsCheck()));
result = maybe_batch.record_batch();
Evaluating projections and filters#
Finally, you can also project while apply a selection vector, with
Projector::Evaluate()
. To do so, first make sure to initialize the
Projector
with SelectionVector::GetMode()
so that the projector
compiles with the correct bitwidth. Then you can pass the
SelectionVector
into the Projector::Evaluate()
method.
// Make sure the projector is compiled for the appropriate selection vector mode
status = Projector::Make(input_schema, expressions, result_indices->GetMode(),
ConfigurationBuilder::DefaultConfiguration(), &projector);
ARROW_RETURN_NOT_OK(status);
arrow::ArrayVector outputs_filtered;
status = projector->Evaluate(*in_batch, result_indices.get(), pool, &outputs_filtered);
ARROW_RETURN_NOT_OK(status);
result =
arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs_filtered);