The Gandiva Expression Compiler#

Gandiva is a runtime expression compiler that uses LLVM to generate efficient native code for compute on Arrow record batches. Gandiva only handles projections and filters; for other transformations, see Compute Functions.

Gandiva was designed to take advantage of the Arrow memory format and modern hardware. From the Arrow memory model, since Arrow arrays have separate buffers for values and validity bitmaps, values and their null status can often be processed independently, allowing for better instruction pipelining. On modern hardware, compiling expressions using LLVM allows the execution to be optimized to the local runtime environment and hardware, including available SIMD instructions. To reduce optimization overhead, many Gandiva functions are pre-compiled into LLVM IR (intermediate representation).

Building Expressions#

Gandiva provides a general expression representation where expressions are represented by a tree of nodes. The expression trees are built using TreeExprBuilder. The leaves of the expression tree are typically field references, created by TreeExprBuilder::MakeField(), and literal values, created by TreeExprBuilder::MakeLiteral(). Nodes can be combined into more complex expression trees using:

Each of these functions create new composite nodes, which contain the leaf nodes (literals and field references) or other composite nodes as children. By composing these, you can create arbitrarily complex expression trees.

Once an expression tree is built, they are wrapped in either Expression or Condition, depending on how they will be used. Expression is used in projections while Condition is used in filters.

As an example, here is how to create an Expression representing x + 3 and a Condition representing x < 3:

std::shared_ptr<arrow::Field> field_x_raw = arrow::field("x", arrow::int32());
std::shared_ptr<Node> field_x = TreeExprBuilder::MakeField(field_x_raw);
std::shared_ptr<Node> literal_3 = TreeExprBuilder::MakeLiteral(3);
std::shared_ptr<arrow::Field> field_result = arrow::field("result", arrow::int32());

std::shared_ptr<Node> add_node =
    TreeExprBuilder::MakeFunction("add", {field_x, literal_3}, arrow::int32());
std::shared_ptr<Expression> expression =
    TreeExprBuilder::MakeExpression(add_node, field_result);

std::shared_ptr<Node> less_than_node =
    TreeExprBuilder::MakeFunction("less_than", {field_x, literal_3}, arrow::boolean());
std::shared_ptr<Condition> condition = TreeExprBuilder::MakeCondition(less_than_node);

Projectors and Filters#

Gandiva’s two execution kernels are Projector and Filter. Projector consumes a record batch and projects into a new record batch. Filter consumes a record batch and produces a SelectionVector containing the indices that matched the condition.

For both Projector and Filter, optimization of the expression IR happens when creating instances. They are compiled against a static schema, so the schema of the record batches must be known at this point.

Continuing with the expression and condition created in the previous section, here is an example of creating a Projector and a Filter:

std::shared_ptr<arrow::Schema> input_schema = arrow::schema({field_x_raw});
std::shared_ptr<arrow::Schema> output_schema = arrow::schema({field_result});
std::shared_ptr<Projector> projector;
Status status;
std::vector<std::shared_ptr<Expression>> expressions = {expression};
status = Projector::Make(input_schema, expressions, &projector);
ARROW_RETURN_NOT_OK(status);

std::shared_ptr<Filter> filter;
status = Filter::Make(input_schema, condition, &filter);
ARROW_RETURN_NOT_OK(status);

Once a Projector or Filter is created, it can be evaluated on Arrow record batches. These execution kernels are single-threaded on their own, but are designed to be reused to process distinct record batches in parallel.

Evaluating projections#

Execution is performed with Projector::Evaluate(). This outputs a vector of arrays, which can be passed along with the output schema to arrow::RecordBatch::Make().

auto pool = arrow::default_memory_pool();
int num_records = 4;
arrow::Int32Builder builder;
int32_t values[4] = {1, 2, 3, 4};
ARROW_RETURN_NOT_OK(builder.AppendValues(values, 4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array, builder.Finish());
auto in_batch = arrow::RecordBatch::Make(input_schema, num_records, {array});

arrow::ArrayVector outputs;
status = projector->Evaluate(*in_batch, pool, &outputs);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::RecordBatch> result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs);

Evaluating filters#

Filter::Evaluate() produces SelectionVector, a vector of row indices that matched the filter condition. The selection vector is a wrapper around an arrow integer array, parameterized by bitwidth. When creating the selection vector (you must initialize it before passing to Evaluate()), you must choose the bitwidth, which determines the max index value it can hold, and the max number of slots, which determines how many indices it may contain. In general, the max number of slots should be set to your batch size and the bitwidth the smallest integer size that can represent all integers less than the batch size. For example, if your batch size is 100k, set the maximum number of slots to 100k and the bitwidth to 32 (since 2^16 = 64k which would be too small).

Once Evaluate() has been run and the SelectionVector is populated, use the SelectionVector::ToArray() method to get the underlying array and then ::arrow::compute::Take() to materialize the output record batch.

std::shared_ptr<gandiva::SelectionVector> result_indices;
// Use 16-bit integers for indices. Result can be no longer than input size,
// so use batch num_rows as max_slots.
status = gandiva::SelectionVector::MakeInt16(/*max_slots=*/in_batch->num_rows(), pool,
                                             &result_indices);
ARROW_RETURN_NOT_OK(status);
status = filter->Evaluate(*in_batch, result_indices);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::Array> take_indices = result_indices->ToArray();
Datum maybe_batch;
ARROW_ASSIGN_OR_RAISE(maybe_batch,
                      arrow::compute::Take(Datum(in_batch), Datum(take_indices),
                                           TakeOptions::NoBoundsCheck()));
result = maybe_batch.record_batch();

Evaluating projections and filters#

Finally, you can also project while apply a selection vector, with Projector::Evaluate(). To do so, first make sure to initialize the Projector with SelectionVector::GetMode() so that the projector compiles with the correct bitwidth. Then you can pass the SelectionVector into the Projector::Evaluate() method.

// Make sure the projector is compiled for the appropriate selection vector mode
status = Projector::Make(input_schema, expressions, result_indices->GetMode(),
                         ConfigurationBuilder::DefaultConfiguration(), &projector);
ARROW_RETURN_NOT_OK(status);

arrow::ArrayVector outputs_filtered;
status = projector->Evaluate(*in_batch, result_indices.get(), pool, &outputs_filtered);
ARROW_RETURN_NOT_OK(status);

result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs_filtered);