.. Licensed to the Apache Software Foundation (ASF) under one .. or more contributor license agreements. See the NOTICE file .. distributed with this work for additional information .. regarding copyright ownership. The ASF licenses this file .. to you under the Apache License, Version 2.0 (the .. "License"); you may not use this file except in compliance .. with the License. You may obtain a copy of the License at .. http://www.apache.org/licenses/LICENSE-2.0 .. Unless required by applicable law or agreed to in writing, .. software distributed under the License is distributed on an .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY .. KIND, either express or implied. See the License for the .. specific language governing permissions and limitations .. under the License. ========= Substrait ========= The ``arrow-dataset`` module can execute Substrait_ plans via the :external+arrow:doc:`Acero ` query engine. Executing Queries Using Substrait Plans ======================================= Plans can reference data in files via URIs, or "named tables" that must be provided along with the plan. Here is an example of a Java program that queries a Parquet file using Java Substrait (this example use `Substrait Java`_ project to compile a SQL query to a Substrait plan): .. code-block:: Java import com.google.common.collect.ImmutableList; import io.substrait.isthmus.SqlToSubstrait; import io.substrait.proto.Plan; import org.apache.arrow.dataset.file.FileFormat; import org.apache.arrow.dataset.file.FileSystemDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.dataset.source.Dataset; import org.apache.arrow.dataset.source.DatasetFactory; import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.calcite.sql.parser.SqlParseException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; public class ClientSubstrait { public static void main(String[] args) { String uri = "file:///data/tpch_parquet/nation.parquet"; ScanOptions options = new ScanOptions(/*batchSize*/ 32768); try ( BufferAllocator allocator = new RootAllocator(); DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); Dataset dataset = datasetFactory.finish(); Scanner scanner = dataset.newScan(options); ArrowReader reader = scanner.scanBatches() ) { // map table to reader Map mapTableToArrowReader = new HashMap<>(); mapTableToArrowReader.put("NATION", reader); // get binary plan Plan plan = getPlan(); ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length); substraitPlan.put(plan.toByteArray()); // run query try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery( substraitPlan, mapTableToArrowReader )) { while (arrowReader.loadNextBatch()) { System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString()); } } } catch (Exception e) { e.printStackTrace(); } } static Plan getPlan() throws SqlParseException { String sql = "SELECT * from nation"; String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " + "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; SqlToSubstrait sqlToSubstrait = new SqlToSubstrait(); Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation)); return plan; } } .. code-block:: text // Results example: FieldPath(0) FieldPath(1) FieldPath(2) FieldPath(3) 0 ALGERIA 0 haggle. carefully final deposits detect slyly agai 1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon Executing Projections and Filters Using Extended Expressions ============================================================ Dataset also supports projections and filters with Substrait's `Extended Expression`_. This requires the substrait-java library. This Java program: - Loads a Parquet file containing the "nation" table from the TPC-H benchmark. - Applies a filter: - ``N_NATIONKEY > 18`` - Projects two new columns: - ``N_REGIONKEY + 10`` - ``N_NAME || ' - ' || N_COMMENT`` .. code-block:: Java import com.google.common.collect.ImmutableList; import io.substrait.isthmus.SqlExpressionToSubstrait; import io.substrait.proto.ExtendedExpression; import org.apache.arrow.dataset.file.FileFormat; import org.apache.arrow.dataset.file.FileSystemDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.dataset.source.Dataset; import org.apache.arrow.dataset.source.DatasetFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.calcite.sql.parser.SqlParseException; import java.nio.ByteBuffer; import java.util.Base64; import java.util.Optional; public class ClientSubstraitExtendedExpressionsCookbook { public static void main(String[] args) throws SqlParseException { projectAndFilterDataset(); } private static void projectAndFilterDataset() throws SqlParseException { String uri = "file:///Users/data/tpch_parquet/nation.parquet"; ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768) .columns(Optional.empty()) .substraitFilter(getByteBuffer(new String[]{"N_NATIONKEY > 18"})) .substraitProjection(getByteBuffer(new String[]{"N_REGIONKEY + 10", "N_NAME || CAST(' - ' as VARCHAR) || N_COMMENT"})) .build(); try (BufferAllocator allocator = new RootAllocator(); DatasetFactory datasetFactory = new FileSystemDatasetFactory( allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); Dataset dataset = datasetFactory.finish(); Scanner scanner = dataset.newScan(options); ArrowReader reader = scanner.scanBatches()) { while (reader.loadNextBatch()) { System.out.println(reader.getVectorSchemaRoot().contentToTSVString()); } } catch (Exception e) { throw new RuntimeException(e); } } private static ByteBuffer getByteBuffer(String[] sqlExpression) throws SqlParseException { String schema = "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME VARCHAR, " + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)"; SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait(); ExtendedExpression expression = expressionToSubstrait.convert(sqlExpression, ImmutableList.of(schema)); byte[] expressionToByte = Base64.getDecoder().decode(Base64.getEncoder().encodeToString(expression.toByteArray())); ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length); byteBuffer.put(expressionToByte); return byteBuffer; } } .. code-block:: text column-1 column-2 13 ROMANIA - ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account 14 SAUDI ARABIA - ts. silent requests haggle. closely express packages sleep across the blithely 12 VIETNAM - hely enticingly express accounts. even, final 13 RUSSIA - requests against the platelets use never according to the quickly regular pint 13 UNITED KINGDOM - eans boost carefully special requests. accounts are. carefull 11 UNITED STATES - y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be .. _`Substrait`: https://substrait.io/ .. _`Substrait Java`: https://github.com/substrait-io/substrait-java .. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html .. _`Extended Expression`: https://github.com/substrait-io/substrait/blob/main/site/docs/expressions/extended_expression.md