Arrow Flight

This section contains a number of recipes for working with Arrow Flight. For more detail about Flight please take a look at Arrow Flight RPC.

Simple Key-Value Storage Service with Arrow Flight

We’ll implement a service that provides a key-value store for data, using Flight to handle uploads/requests and data in memory to store the actual data.

Flight Client and Server

import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.AsyncPutListener;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

class Dataset implements AutoCloseable {
    private final List<ArrowRecordBatch> batches;
    private final Schema schema;
    private final long rows;
    public Dataset(List<ArrowRecordBatch> batches, Schema schema, long rows) {
        this.batches = batches;
        this.schema = schema;
        this.rows = rows;
    }
    public List<ArrowRecordBatch> getBatches() {
        return batches;
    }
    public Schema getSchema() {
        return schema;
    }
    public long getRows() {
        return rows;
    }
    @Override
    public void close() throws Exception {
        AutoCloseables.close(batches);
    }
}
class CookbookProducer extends NoOpFlightProducer implements AutoCloseable {
    private final BufferAllocator allocator;
    private final Location location;
    private final ConcurrentMap<FlightDescriptor, Dataset> datasets;
    public CookbookProducer(BufferAllocator allocator, Location location) {
        this.allocator = allocator;
        this.location = location;
        this.datasets = new ConcurrentHashMap<>();
    }
    @Override
    public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener<PutResult> ackStream) {
        List<ArrowRecordBatch> batches = new ArrayList<>();
        return () -> {
            long rows = 0;
            VectorUnloader unloader;
            while (flightStream.next()) {
                unloader = new VectorUnloader(flightStream.getRoot());
                final ArrowRecordBatch arb = unloader.getRecordBatch();
                batches.add(arb);
                rows += flightStream.getRoot().getRowCount();
            }
            Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows);
            datasets.put(flightStream.getDescriptor(), dataset);
            ackStream.onCompleted();
        };
    }

    @Override
    public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
        FlightDescriptor flightDescriptor = FlightDescriptor.path(
                new String(ticket.getBytes(), StandardCharsets.UTF_8));
        Dataset dataset = this.datasets.get(flightDescriptor);
        if (dataset == null) {
            throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException();
        }
        try (VectorSchemaRoot root = VectorSchemaRoot.create(
                 this.datasets.get(flightDescriptor).getSchema(), allocator)) {
            VectorLoader loader = new VectorLoader(root);
            listener.start(root);
            for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) {
                loader.load(arrowRecordBatch);
                listener.putNext();
            }
            listener.completed();
        }
    }

    @Override
    public void doAction(CallContext context, Action action, StreamListener<Result> listener) {
        FlightDescriptor flightDescriptor = FlightDescriptor.path(
                new String(action.getBody(), StandardCharsets.UTF_8));
        switch (action.getType()) {
            case "DELETE": {
                Dataset removed = datasets.remove(flightDescriptor);
                if (removed != null) {
                    try {
                        removed.close();
                    } catch (Exception e) {
                        listener.onError(CallStatus.INTERNAL
                            .withDescription(e.toString())
                            .toRuntimeException());
                        return;
                    }
                    Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8));
                    listener.onNext(result);
                } else {
                    Result result = new Result("Delete not completed. Reason: Key did not exist."
                            .getBytes(StandardCharsets.UTF_8));
                    listener.onNext(result);
                }
                listener.onCompleted();
            }
        }
    }

    @Override
    public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) {
        FlightEndpoint flightEndpoint = new FlightEndpoint(
                new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location);
        return new FlightInfo(
                datasets.get(descriptor).getSchema(),
                descriptor,
                Collections.singletonList(flightEndpoint),
                /*bytes=*/-1,
                datasets.get(descriptor).getRows()
        );
    }

    @Override
    public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
        datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); });
        listener.onCompleted();
    }

    @Override
    public void close() throws Exception {
        AutoCloseables.close(datasets.values());
    }
}
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
try (BufferAllocator allocator = new RootAllocator()){
    // Server
    try(final CookbookProducer producer = new CookbookProducer(allocator, location);
        final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) {
        try {
            flightServer.start();
            System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        // Client
        try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) {
            System.out.println("C1: Client (Location): Connected to " + location.getUri());

            // Populate data
            Schema schema = new Schema(Arrays.asList(
                    new Field("name", FieldType.nullable(new ArrowType.Utf8()), null)));
            try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
                VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) {
                varCharVector.allocateNew(3);
                varCharVector.set(0, "Ronald".getBytes());
                varCharVector.set(1, "David".getBytes());
                varCharVector.set(2, "Francisco".getBytes());
                vectorSchemaRoot.setRowCount(3);
                FlightClient.ClientStreamListener listener = flightClient.startPut(
                        FlightDescriptor.path("profiles"),
                        vectorSchemaRoot, new AsyncPutListener());
                listener.putNext();
                varCharVector.set(0, "Manuel".getBytes());
                varCharVector.set(1, "Felipe".getBytes());
                varCharVector.set(2, "JJ".getBytes());
                vectorSchemaRoot.setRowCount(3);
                listener.putNext();
                listener.completed();
                listener.getResult();
                System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each");
            }

            // Get metadata information
            FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles"));
            System.out.println("C3: Client (Get Metadata): " + flightInfo);

            // Get data information
            try(FlightStream flightStream = flightClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {
                int batch = 0;
                try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) {
                    System.out.println("C4: Client (Get Stream):");
                    while (flightStream.next()) {
                        batch++;
                        System.out.println("Client Received batch #" + batch + ", Data:");
                        System.out.print(vectorSchemaRootReceived.contentToTSVString());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            // Get all metadata information
            Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
            System.out.print("C5: Client (List Flights Info): ");
            flightInfosBefore.forEach(t -> System.out.println(t));

            // Do delete action
            Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE",
                    FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
            while (deleteActionResult.hasNext()) {
                Result result = deleteActionResult.next();
                System.out.println("C6: Client (Do Delete Action): " +
                        new String(result.getBody(), StandardCharsets.UTF_8));
            }

            // Get all metadata information (to validate detele action)
            Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
            flightInfos.forEach(t -> System.out.println(t));
            System.out.println("C7: Client (List Flights Info): After delete - No records");

            // Server shut down
            flightServer.shutdown();
            System.out.println("C8: Server shut down successfully");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
S1: Server (Location): Listening on port 33333
C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
C2: Client (Populate Data): Wrote 2 batches with 3 rows each
C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false}
C4: Client (Get Stream):
Client Received batch #1, Data:
name
Ronald
David
Francisco
Client Received batch #2, Data:
name
Manuel
Felipe
JJ
C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false}
C6: Client (Do Delete Action): Delete completed
C7: Client (List Flights Info): After delete - No records
C8: Server shut down successfully

Let explain our code in more detail.

Start Flight Server

First, we’ll start our server:

try(FlightServer flightServer = FlightServer.builder(allocator, location,
        new CookbookProducer(allocator, location)).build()) {
    try {
        flightServer.start();
        System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
    } catch (IOException e) {
        e.printStackTrace();
    }
S1: Server (Location): Listening on port 33333

Connect to Flight Server

We can then create a client and connect to the server:

try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) {
    System.out.println("C1: Client (Location): Connected to " + location.getUri());
C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333

Put Data

First, we’ll create and upload a vector schema root, which will get stored in a memory by the server.

// Server
public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener<PutResult> ackStream) {
    List<ArrowRecordBatch> batches = new ArrayList<>();
    return () -> {
        long rows = 0;
        VectorUnloader unloader;
        while (flightStream.next()) {
            unloader = new VectorUnloader(flightStream.getRoot());
            try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
                batches.add(arb);
                rows += flightStream.getRoot().getRowCount();
            }
        }
        Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows);
        datasets.put(flightStream.getDescriptor(), dataset);
        ackStream.onCompleted();
    };
}

// Client
Schema schema = new Schema(Arrays.asList(
        new Field("name", FieldType.nullable(new ArrowType.Utf8()), null)));
try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
    VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) {
    varCharVector.allocateNew(3);
    varCharVector.set(0, "Ronald".getBytes());
    varCharVector.set(1, "David".getBytes());
    varCharVector.set(2, "Francisco".getBytes());
    vectorSchemaRoot.setRowCount(3);
    FlightClient.ClientStreamListener listener = flightClient.startPut(
            FlightDescriptor.path("profiles"),
            vectorSchemaRoot, new AsyncPutListener());
    listener.putNext();
    varCharVector.set(0, "Manuel".getBytes());
    varCharVector.set(1, "Felipe".getBytes());
    varCharVector.set(2, "JJ".getBytes());
    vectorSchemaRoot.setRowCount(3);
    listener.putNext();
    listener.completed();
    listener.getResult();
    System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each");
}
C2: Client (Populate Data): Wrote 2 batches with 3 rows each

Get Metadata

Once we do so, we can retrieve the metadata for that dataset.

// Server
public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) {
    FlightEndpoint flightEndpoint = new FlightEndpoint(
            new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location);
    return new FlightInfo(
            datasets.get(descriptor).getSchema(),
            descriptor,
            Collections.singletonList(flightEndpoint),
            /*bytes=*/-1,
            datasets.get(descriptor).getRows()
    );
}

// Client
FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles"));
System.out.println("C3: Client (Get Metadata): " + flightInfo);
C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6}

Get Data

And get the data back:

// Server
public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
    FlightDescriptor flightDescriptor = FlightDescriptor.path(
            new String(ticket.getBytes(), StandardCharsets.UTF_8));
    Dataset dataset = this.datasets.get(flightDescriptor);
    if (dataset == null) {
        throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException();
    } else {
        VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(
                this.datasets.get(flightDescriptor).getSchema(), allocator);
        listener.start(vectorSchemaRoot);
        for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) {
            VectorLoader loader = new VectorLoader(vectorSchemaRoot);
            loader.load(arrowRecordBatch.cloneWithTransfer(allocator));
            listener.putNext();
        }
        listener.completed();
    }
}

// Client
try(FlightStream flightStream = flightClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {
    int batch = 0;
    try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) {
        System.out.println("C4: Client (Get Stream):");
        while (flightStream.next()) {
            batch++;
            System.out.println("Client Received batch #" + batch + ", Data:");
            System.out.print(vectorSchemaRootReceived.contentToTSVString());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}
C4: Client (Get Stream):
Client Received batch #1, Data:
name
Ronald
David
Francisco
Client Received batch #2, Data:
name
Manuel
Felipe
JJ

Delete data

Then, we’ll delete the dataset:

// Server
public void doAction(CallContext context, Action action, StreamListener<Result> listener) {
    FlightDescriptor flightDescriptor = FlightDescriptor.path(
            new String(action.getBody(), StandardCharsets.UTF_8));
    switch (action.getType()) {
        case "DELETE":
            if (datasets.remove(flightDescriptor) != null) {
                Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8));
                listener.onNext(result);
            } else {
                Result result = new Result("Delete not completed. Reason: Key did not exist."
                        .getBytes(StandardCharsets.UTF_8));
                listener.onNext(result);
            }
            listener.onCompleted();
    }
}

// Client
Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE",
        FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
while (deleteActionResult.hasNext()) {
    Result result = deleteActionResult.next();
    System.out.println("C6: Client (Do Delete Action): " +
            new String(result.getBody(), StandardCharsets.UTF_8));
}
C6: Client (Do Delete Action): Delete completed

Validate Delete Data

And confirm that it’s been deleted:

// Server
public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
    datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); });
    listener.onCompleted();
}

// Client
Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
flightInfos.forEach(t -> System.out.println(t));
System.out.println("C7: Client (List Flights Info): After delete - No records");
C7: Client (List Flights Info): After delete - No records

Stop Flight Server

// Server
flightServer.shutdown();
System.out.println("C8: Server shut down successfully");
C8: Server shut down successfully

Arrow Flight RPC: https://arrow.apache.org/docs/format/Flight.html