Substrait¶
The arrow-dataset
module can execute Substrait plans via the Acero
query engine.
Executing Queries Using Substrait Plans¶
Plans can reference data in files via URIs, or “named tables” that must be provided along with the plan.
Here is an example of a Java program that queries a Parquet file using Java Substrait (this example use Substrait Java project to compile a SQL query to a Substrait plan):
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class ClientSubstrait {
public static void main(String[] args) {
String uri = "file:///data/tpch_parquet/nation.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
// map table to reader
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", reader);
// get binary plan
Plan plan = getPlan();
ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
static Plan getPlan() throws SqlParseException {
String sql = "SELECT * from nation";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
return plan;
}
}
// Results example:
FieldPath(0) FieldPath(1) FieldPath(2) FieldPath(3)
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon
Executing Projections and Filters Using Extended Expressions¶
Dataset also supports projections and filters with Substrait’s Extended Expression. This requires the substrait-java library.
This Java program:
Loads a Parquet file containing the “nation” table from the TPC-H benchmark.
- Applies a filter:
N_NATIONKEY > 18
- Projects two new columns:
N_REGIONKEY + 10
N_NAME || ' - ' || N_COMMENT
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlExpressionToSubstrait;
import io.substrait.proto.ExtendedExpression;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
public class ClientSubstraitExtendedExpressionsCookbook {
public static void main(String[] args) throws SqlParseException {
projectAndFilterDataset();
}
private static void projectAndFilterDataset() throws SqlParseException {
String uri = "file:///Users/data/tpch_parquet/nation.parquet";
ScanOptions options =
new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.substraitFilter(getByteBuffer(new String[]{"N_NATIONKEY > 18"}))
.substraitProjection(getByteBuffer(new String[]{"N_REGIONKEY + 10",
"N_NAME || CAST(' - ' as VARCHAR) || N_COMMENT"}))
.build();
try (BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory =
new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()) {
while (reader.loadNextBatch()) {
System.out.println(reader.getVectorSchemaRoot().contentToTSVString());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static ByteBuffer getByteBuffer(String[] sqlExpression) throws SqlParseException {
String schema =
"CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME VARCHAR, "
+ "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait();
ExtendedExpression expression =
expressionToSubstrait.convert(sqlExpression, ImmutableList.of(schema));
byte[] expressionToByte =
Base64.getDecoder().decode(Base64.getEncoder().encodeToString(expression.toByteArray()));
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length);
byteBuffer.put(expressionToByte);
return byteBuffer;
}
}
column-1 column-2
13 ROMANIA - ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account
14 SAUDI ARABIA - ts. silent requests haggle. closely express packages sleep across the blithely
12 VIETNAM - hely enticingly express accounts. even, final
13 RUSSIA - requests against the platelets use never according to the quickly regular pint
13 UNITED KINGDOM - eans boost carefully special requests. accounts are. carefull
11 UNITED STATES - y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be