Substrait

The arrow-dataset module can execute Substrait plans via the Acero query engine.

Executing Queries Using Substrait Plans

Plans can reference data in files via URIs, or “named tables” that must be provided along with the plan.

Here is an example of a Java program that queries a Parquet file using Java Substrait (this example use Substrait Java project to compile a SQL query to a Substrait plan):

import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class ClientSubstrait {
    public static void main(String[] args) {
        String uri = "file:///data/tpch_parquet/nation.parquet";
        ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
        try (
            BufferAllocator allocator = new RootAllocator();
            DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
                    FileFormat.PARQUET, uri);
            Dataset dataset = datasetFactory.finish();
            Scanner scanner = dataset.newScan(options);
            ArrowReader reader = scanner.scanBatches()
        ) {
            // map table to reader
            Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
            mapTableToArrowReader.put("NATION", reader);
            // get binary plan
            Plan plan = getPlan();
            ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
            substraitPlan.put(plan.toByteArray());
            // run query
            try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
                    substraitPlan,
                    mapTableToArrowReader
            )) {
                while (arrowReader.loadNextBatch()) {
                    System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static Plan getPlan() throws SqlParseException {
        String sql = "SELECT * from nation";
        String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
                "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
        SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
        Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
        return plan;
    }
}
// Results example:
FieldPath(0)    FieldPath(1)    FieldPath(2)    FieldPath(3)
0               ALGERIA         0               haggle. carefully final deposits detect slyly agai
1               ARGENTINA       1               al foxes promise slyly according to the regular accounts. bold requests alon

Executing Projections and Filters Using Extended Expressions

Dataset also supports projections and filters with Substrait’s Extended Expression. This requires the substrait-java library.

This Java program:

  • Loads a Parquet file containing the “nation” table from the TPC-H benchmark.

  • Applies a filter:
    • N_NATIONKEY > 18

  • Projects two new columns:
    • N_REGIONKEY + 10

    • N_NAME || ' - ' || N_COMMENT

import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlExpressionToSubstrait;
import io.substrait.proto.ExtendedExpression;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;

public class ClientSubstraitExtendedExpressionsCookbook {

  public static void main(String[] args) throws SqlParseException {
    projectAndFilterDataset();
  }

  private static void projectAndFilterDataset() throws SqlParseException {
    String uri = "file:///Users/data/tpch_parquet/nation.parquet";
    ScanOptions options =
        new ScanOptions.Builder(/*batchSize*/ 32768)
            .columns(Optional.empty())
            .substraitFilter(getByteBuffer(new String[]{"N_NATIONKEY > 18"}))
            .substraitProjection(getByteBuffer(new String[]{"N_REGIONKEY + 10",
                "N_NAME || CAST(' - ' as VARCHAR) || N_COMMENT"}))
            .build();
    try (BufferAllocator allocator = new RootAllocator();
         DatasetFactory datasetFactory =
             new FileSystemDatasetFactory(
                 allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
         Dataset dataset = datasetFactory.finish();
         Scanner scanner = dataset.newScan(options);
         ArrowReader reader = scanner.scanBatches()) {
      while (reader.loadNextBatch()) {
        System.out.println(reader.getVectorSchemaRoot().contentToTSVString());
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  private static ByteBuffer getByteBuffer(String[] sqlExpression) throws SqlParseException {
    String schema =
        "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME VARCHAR, "
            + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
    SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait();
    ExtendedExpression expression =
        expressionToSubstrait.convert(sqlExpression, ImmutableList.of(schema));
    byte[] expressionToByte =
        Base64.getDecoder().decode(Base64.getEncoder().encodeToString(expression.toByteArray()));
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length);
    byteBuffer.put(expressionToByte);
    return byteBuffer;
  }
}
column-1  column-2
13        ROMANIA - ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account
14        SAUDI ARABIA - ts. silent requests haggle. closely express packages sleep across the blithely
12        VIETNAM - hely enticingly express accounts. even, final
13        RUSSIA -  requests against the platelets use never according to the quickly regular pint
13        UNITED KINGDOM - eans boost carefully special requests. accounts are. carefull
11        UNITED STATES - y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be