Substrait#
The arrow-substrait
module implements support for the Substrait format,
enabling conversion to and from Arrow objects.
The arrow-dataset
module can execute Substrait plans via the
Acero query engine.
Working with Schemas#
Arrow schemas can be encoded and decoded using the pyarrow.substrait.serialize_schema()
and
pyarrow.substrait.deserialize_schema()
functions.
import pyarrow as pa
import pyarrow.substrait as pa_substrait
arrow_schema = pa.schema([
pa.field("x", pa.int32()),
pa.field("y", pa.string())
])
substrait_schema = pa_substrait.serialize_schema(arrow_schema)
The schema marshalled as a Substrait NamedStruct
is directly
available as substrait_schema.schema
:
>>> print(substrait_schema.schema)
b'\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04b\x02\x10\x01'
In case arrow custom types were used, the schema will require extensions for those types to be actually usable, for this reason the schema is also available as an Extended Expression including all the extensions types:
>>> print(substrait_schema.expression)
b'"\x14\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04b\x02\x10\x01:\x19\x10,*\x15Acero 17.0.0'
If Substrait Python
is installed, the schema can also be converted to
a substrait-python
object:
>>> print(substrait_schema.to_pysubstrait())
version {
minor_number: 44
producer: "Acero 17.0.0"
}
base_schema {
names: "x"
names: "y"
struct {
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
types {
string {
nullability: NULLABILITY_NULLABLE
}
}
}
}
Working with Expressions#
Arrow compute expressions can be encoded and decoded using the
pyarrow.substrait.serialize_expressions()
and
pyarrow.substrait.deserialize_expressions()
functions.
import pyarrow as pa
import pyarrow.compute as pa
import pyarrow.substrait as pa_substrait
arrow_schema = pa.schema([
pa.field("x", pa.int32()),
pa.field("y", pa.int32())
])
substrait_expr = pa_substrait.serialize_expressions(
exprs=[pc.field("x") + pc.field("y")],
names=["total"],
schema=arrow_schema
)
The result of encoding to substrait an expression will be the
protobuf ExtendedExpression
message data itself:
>>> print(bytes(substrait_expr))
b'\nZ\x12Xhttps://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml\x12\x07\x1a\x05\x1a\x03add\x1a>\n5\x1a3\x1a\x04*\x02\x10\x01"\n\x1a\x08\x12\x06\n\x02\x12\x00"\x00"\x0c\x1a\n\x12\x08\n\x04\x12\x02\x08\x01"\x00*\x11\n\x08overflow\x12\x05ERROR\x1a\x05total"\x14\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04*\x02\x10\x01:\x19\x10,*\x15Acero 17.0.0'
So in case a Substrait Python
object is required, the expression
has to be decoded from substrait-python
itself:
>>> import substrait
>>> pysubstrait_expr = substrait.proto.ExtendedExpression.FromString(substrait_expr)
>>> print(pysubstrait_expr)
version {
minor_number: 44
producer: "Acero 17.0.0"
}
extension_uris {
uri: "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml"
}
extensions {
extension_function {
name: "add"
}
}
referred_expr {
expression {
scalar_function {
arguments {
value {
selection {
direct_reference {
struct_field {
}
}
root_reference {
}
}
}
}
arguments {
value {
selection {
direct_reference {
struct_field {
field: 1
}
}
root_reference {
}
}
}
}
options {
name: "overflow"
preference: "ERROR"
}
output_type {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
}
}
output_names: "total"
}
base_schema {
names: "x"
names: "y"
struct {
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
}
}
Executing Queries Using Substrait Extended Expressions#
Dataset supports executing queries using Substrait’s Extended Expression,
the expressions can be passed to the dataset scanner in the form of
pyarrow.substrait.BoundExpressions
import pyarrow.dataset as ds
import pyarrow.substrait as pa_substrait
# Use substrait-python to create the queries
from substrait import proto
dataset = ds.dataset("./data/index-0.parquet")
substrait_schema = pa_substrait.serialize_schema(dataset.schema).to_pysubstrait()
# SELECT project_name FROM dataset WHERE project_name = 'pyarrow'
projection = proto.ExtendedExpression(referred_expr=[
{"expression": {"selection": {"direct_reference": {"struct_field": {"field": 0}}}},
"output_names": ["project_name"]}
])
projection.MergeFrom(substrait_schema)
filtering = proto.ExtendedExpression(
extension_uris=[{"extension_uri_anchor": 99, "uri": "/functions_comparison.yaml"}],
extensions=[{"extension_function": {"extension_uri_reference": 99, "function_anchor": 199, "name": "equal:any1_any1"}}],
referred_expr=[
{"expression": {"scalar_function": {"function_reference": 199, "arguments": [
{"value": {"selection": {"direct_reference": {"struct_field": {"field": 0}}}}},
{"value": {"literal": {"string": "pyarrow"}}}
], "output_type": {"bool": {"nullability": False}}}}}
]
)
filtering.MergeFrom(substrait_schema)
results = dataset.scanner(
columns=pa.substrait.BoundExpressions.from_substrait(projection),
filter=pa.substrait.BoundExpressions.from_substrait(filtering)
).head(5)
project_name
0 pyarrow
1 pyarrow
2 pyarrow
3 pyarrow
4 pyarrow