Substrait¶
Arrow can use Substrait to integrate with other languages.
Querying Datasets¶
The Substrait support in Arrow combines Dataset and substrait-java to query datasets using Acero as a backend.
Acero currently supports:
Reading Arrow, CSV, ORC, and Parquet files
Filters
Projections
Joins
Aggregates
Here is an example of a Java program that queries a Parquet file:
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
static Plan queryTableNation() throws SqlParseException {
String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
return plan;
}
static void queryDatasetThruSubstraitPlanDefinition() {
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", reader);
// get binary plan
Plan plan = queryTableNation();
ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
queryDatasetThruSubstraitPlanDefinition();
N_NATIONKEY N_NAME N_REGIONKEY N_COMMENT
17 PERU 1 platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun
It is also possible to query multiple datasets and join them based on some criteria. For example, we can join the nation and customer tables from the TPC-H benchmark:
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
static Plan queryTableNationJoinCustomer() throws SqlParseException {
String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " +
"ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
"GROUP BY n.n_name";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " +
"N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " +
"C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, " +
"C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " +
"C_COMMENT VARCHAR(117) )";
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.execute(sql,
ImmutableList.of(nation, customer));
return plan;
}
static void queryTwoDatasetsThruSubstraitPlanDefinition() {
String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uriNation);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader readerNation = scanner.scanBatches();
DatasetFactory datasetFactoryCustomer = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uriCustomer);
Dataset datasetCustomer = datasetFactoryCustomer.finish();
Scanner scannerCustomer = datasetCustomer.newScan(options);
ArrowReader readerCustomer = scannerCustomer.scanBatches()
) {
// map table to reader
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", readerNation);
mapTableToArrowReader.put("CUSTOMER", readerCustomer);
// get binary plan
Plan plan = queryTableNationJoinCustomer();
ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new AceroSubstraitConsumer(
allocator).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
queryTwoDatasetsThruSubstraitPlanDefinition();
N_NAME NUMBER_CUSTOMER
PERU 573