Gandiva Expression, Projector, and Filter#

Building Expressions#

Gandiva provides a general expression representation where expressions are represented by a tree of nodes. The expression trees are built using TreeExprBuilder. The leaves of the expression tree are typically field references, created by TreeExprBuilder::MakeField(), and literal values, created by TreeExprBuilder::MakeLiteral(). Nodes can be combined into more complex expression trees using:

  • TreeExprBuilder::MakeFunction() to create a function node. (You can call GetRegisteredFunctionSignatures() to get a list of valid function signatures.)

  • TreeExprBuilder::MakeIf() to create if-else logic.

  • TreeExprBuilder::MakeAnd() and TreeExprBuilder::MakeOr() to create boolean expressions. (For “not”, use the not(bool) function in MakeFunction.)

  • TreeExprBuilder::MakeInExpressionInt32() and the other “in expression” functions to create set membership tests.

Each of these functions create new composite nodes, which contain the leaf nodes (literals and field references) or other composite nodes as children. By composing these, you can create arbitrarily complex expression trees.

Once an expression tree is built, they are wrapped in either Expression or Condition, depending on how they will be used. Expression is used in projections while Condition is used in filters.

As an example, here is how to create an Expression representing x + 3 and a Condition representing x < 3:

std::shared_ptr<arrow::Field> field_x_raw = arrow::field("x", arrow::int32());
std::shared_ptr<Node> field_x = TreeExprBuilder::MakeField(field_x_raw);
std::shared_ptr<Node> literal_3 = TreeExprBuilder::MakeLiteral(3);
std::shared_ptr<arrow::Field> field_result = arrow::field("result", arrow::int32());

std::shared_ptr<Node> add_node =
    TreeExprBuilder::MakeFunction("add", {field_x, literal_3}, arrow::int32());
std::shared_ptr<Expression> expression =
    TreeExprBuilder::MakeExpression(add_node, field_result);

std::shared_ptr<Node> less_than_node =
    TreeExprBuilder::MakeFunction("less_than", {field_x, literal_3}, arrow::boolean());
std::shared_ptr<Condition> condition = TreeExprBuilder::MakeCondition(less_than_node);

Projectors and Filters#

Gandiva’s two execution kernels are Projector and Filter. Projector consumes a record batch and projects into a new record batch. Filter consumes a record batch and produces a SelectionVector containing the indices that matched the condition.

For both Projector and Filter, optimization of the expression IR happens when creating instances. They are compiled against a static schema, so the schema of the record batches must be known at this point.

Continuing with the expression and condition created in the previous section, here is an example of creating a Projector and a Filter:

std::shared_ptr<arrow::Schema> input_schema = arrow::schema({field_x_raw});
std::shared_ptr<arrow::Schema> output_schema = arrow::schema({field_result});
std::shared_ptr<Projector> projector;
Status status;
std::vector<std::shared_ptr<Expression>> expressions = {expression};
status = Projector::Make(input_schema, expressions, &projector);
ARROW_RETURN_NOT_OK(status);

std::shared_ptr<Filter> filter;
status = Filter::Make(input_schema, condition, &filter);
ARROW_RETURN_NOT_OK(status);

Once a Projector or Filter is created, it can be evaluated on Arrow record batches. These execution kernels are single-threaded on their own, but are designed to be reused to process distinct record batches in parallel.

Evaluating projections#

Execution is performed with Projector::Evaluate(). This outputs a vector of arrays, which can be passed along with the output schema to arrow::RecordBatch::Make().

auto pool = arrow::default_memory_pool();
int num_records = 4;
arrow::Int32Builder builder;
int32_t values[4] = {1, 2, 3, 4};
ARROW_RETURN_NOT_OK(builder.AppendValues(values, 4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array, builder.Finish());
auto in_batch = arrow::RecordBatch::Make(input_schema, num_records, {array});

arrow::ArrayVector outputs;
status = projector->Evaluate(*in_batch, pool, &outputs);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::RecordBatch> result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs);

Evaluating filters#

Filter::Evaluate() produces SelectionVector, a vector of row indices that matched the filter condition. The selection vector is a wrapper around an arrow integer array, parameterized by bitwidth. When creating the selection vector (you must initialize it before passing to Evaluate()), you must choose the bitwidth, which determines the max index value it can hold, and the max number of slots, which determines how many indices it may contain. In general, the max number of slots should be set to your batch size and the bitwidth the smallest integer size that can represent all integers less than the batch size. For example, if your batch size is 100k, set the maximum number of slots to 100k and the bitwidth to 32 (since 2^16 = 64k which would be too small).

Once Evaluate() has been run and the SelectionVector is populated, use the SelectionVector::ToArray() method to get the underlying array and then ::arrow::compute::Take() to materialize the output record batch.

std::shared_ptr<gandiva::SelectionVector> result_indices;
// Use 16-bit integers for indices. Result can be no longer than input size,
// so use batch num_rows as max_slots.
status = gandiva::SelectionVector::MakeInt16(/*max_slots=*/in_batch->num_rows(), pool,
                                             &result_indices);
ARROW_RETURN_NOT_OK(status);
status = filter->Evaluate(*in_batch, result_indices);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::Array> take_indices = result_indices->ToArray();
Datum maybe_batch;
ARROW_ASSIGN_OR_RAISE(maybe_batch,
                      arrow::compute::Take(Datum(in_batch), Datum(take_indices),
                                           TakeOptions::NoBoundsCheck()));
result = maybe_batch.record_batch();

Evaluating projections and filters#

Finally, you can also project while apply a selection vector, with Projector::Evaluate(). To do so, first make sure to initialize the Projector with SelectionVector::GetMode() so that the projector compiles with the correct bitwidth. Then you can pass the SelectionVector into the Projector::Evaluate() method.

// Make sure the projector is compiled for the appropriate selection vector mode
status = Projector::Make(input_schema, expressions, result_indices->GetMode(),
                         ConfigurationBuilder::DefaultConfiguration(), &projector);
ARROW_RETURN_NOT_OK(status);

arrow::ArrayVector outputs_filtered;
status = projector->Evaluate(*in_batch, result_indices.get(), pool, &outputs_filtered);
ARROW_RETURN_NOT_OK(status);

result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs_filtered);