.. Licensed to the Apache Software Foundation (ASF) under one .. or more contributor license agreements. See the NOTICE file .. distributed with this work for additional information .. regarding copyright ownership. The ASF licenses this file .. to you under the Apache License, Version 2.0 (the .. "License"); you may not use this file except in compliance .. with the License. You may obtain a copy of the License at .. http://www.apache.org/licenses/LICENSE-2.0 .. Unless required by applicable law or agreed to in writing, .. software distributed under the License is distributed on an .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY .. KIND, either express or implied. See the License for the .. specific language governing permissions and limitations .. under the License. ============ Arrow Flight ============ This section contains a number of recipes for working with Arrow Flight. For more detail about Flight please take a look at `Arrow Flight RPC`_. .. contents:: Simple Key-Value Storage Service with Arrow Flight ================================================== We'll implement a service that provides a key-value store for data, using Flight to handle uploads/requests and data in memory to store the actual data. Flight Client and Server ************************ .. testcode:: import org.apache.arrow.flight.Action; import org.apache.arrow.flight.AsyncPutListener; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.Criteria; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.NoOpFlightProducer; import org.apache.arrow.flight.PutResult; import org.apache.arrow.flight.Result; import org.apache.arrow.flight.Ticket; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; class Dataset implements AutoCloseable { private final List batches; private final Schema schema; private final long rows; public Dataset(List batches, Schema schema, long rows) { this.batches = batches; this.schema = schema; this.rows = rows; } public List getBatches() { return batches; } public Schema getSchema() { return schema; } public long getRows() { return rows; } @Override public void close() throws Exception { AutoCloseables.close(batches); } } class CookbookProducer extends NoOpFlightProducer implements AutoCloseable { private final BufferAllocator allocator; private final Location location; private final ConcurrentMap datasets; public CookbookProducer(BufferAllocator allocator, Location location) { this.allocator = allocator; this.location = location; this.datasets = new ConcurrentHashMap<>(); } @Override public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener ackStream) { List batches = new ArrayList<>(); return () -> { long rows = 0; VectorUnloader unloader; while (flightStream.next()) { unloader = new VectorUnloader(flightStream.getRoot()); final ArrowRecordBatch arb = unloader.getRecordBatch(); batches.add(arb); rows += flightStream.getRoot().getRowCount(); } Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows); datasets.put(flightStream.getDescriptor(), dataset); ackStream.onCompleted(); }; } @Override public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) { FlightDescriptor flightDescriptor = FlightDescriptor.path( new String(ticket.getBytes(), StandardCharsets.UTF_8)); Dataset dataset = this.datasets.get(flightDescriptor); if (dataset == null) { throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException(); } try (VectorSchemaRoot root = VectorSchemaRoot.create( this.datasets.get(flightDescriptor).getSchema(), allocator)) { VectorLoader loader = new VectorLoader(root); listener.start(root); for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) { loader.load(arrowRecordBatch); listener.putNext(); } listener.completed(); } } @Override public void doAction(CallContext context, Action action, StreamListener listener) { FlightDescriptor flightDescriptor = FlightDescriptor.path( new String(action.getBody(), StandardCharsets.UTF_8)); switch (action.getType()) { case "DELETE": { Dataset removed = datasets.remove(flightDescriptor); if (removed != null) { try { removed.close(); } catch (Exception e) { listener.onError(CallStatus.INTERNAL .withDescription(e.toString()) .toRuntimeException()); return; } Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8)); listener.onNext(result); } else { Result result = new Result("Delete not completed. Reason: Key did not exist." .getBytes(StandardCharsets.UTF_8)); listener.onNext(result); } listener.onCompleted(); } } } @Override public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { FlightEndpoint flightEndpoint = new FlightEndpoint( new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location); return new FlightInfo( datasets.get(descriptor).getSchema(), descriptor, Collections.singletonList(flightEndpoint), /*bytes=*/-1, datasets.get(descriptor).getRows() ); } @Override public void listFlights(CallContext context, Criteria criteria, StreamListener listener) { datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); }); listener.onCompleted(); } @Override public void close() throws Exception { AutoCloseables.close(datasets.values()); } } Location location = Location.forGrpcInsecure("0.0.0.0", 33333); try (BufferAllocator allocator = new RootAllocator()){ // Server try(final CookbookProducer producer = new CookbookProducer(allocator, location); final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) { try { flightServer.start(); System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort()); } catch (IOException e) { throw new RuntimeException(e); } // Client try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) { System.out.println("C1: Client (Location): Connected to " + location.getUri()); // Populate data Schema schema = new Schema(Arrays.asList( new Field("name", FieldType.nullable(new ArrowType.Utf8()), null))); try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) { varCharVector.allocateNew(3); varCharVector.set(0, "Ronald".getBytes()); varCharVector.set(1, "David".getBytes()); varCharVector.set(2, "Francisco".getBytes()); vectorSchemaRoot.setRowCount(3); FlightClient.ClientStreamListener listener = flightClient.startPut( FlightDescriptor.path("profiles"), vectorSchemaRoot, new AsyncPutListener()); listener.putNext(); varCharVector.set(0, "Manuel".getBytes()); varCharVector.set(1, "Felipe".getBytes()); varCharVector.set(2, "JJ".getBytes()); vectorSchemaRoot.setRowCount(3); listener.putNext(); listener.completed(); listener.getResult(); System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each"); } // Get metadata information FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles")); System.out.println("C3: Client (Get Metadata): " + flightInfo); // Get data information try(FlightStream flightStream = flightClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) { int batch = 0; try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) { System.out.println("C4: Client (Get Stream):"); while (flightStream.next()) { batch++; System.out.println("Client Received batch #" + batch + ", Data:"); System.out.print(vectorSchemaRootReceived.contentToTSVString()); } } } catch (Exception e) { e.printStackTrace(); } // Get all metadata information Iterable flightInfosBefore = flightClient.listFlights(Criteria.ALL); System.out.print("C5: Client (List Flights Info): "); flightInfosBefore.forEach(t -> System.out.println(t)); // Do delete action Iterator deleteActionResult = flightClient.doAction(new Action("DELETE", FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8))); while (deleteActionResult.hasNext()) { Result result = deleteActionResult.next(); System.out.println("C6: Client (Do Delete Action): " + new String(result.getBody(), StandardCharsets.UTF_8)); } // Get all metadata information (to validate detele action) Iterable flightInfos = flightClient.listFlights(Criteria.ALL); flightInfos.forEach(t -> System.out.println(t)); System.out.println("C7: Client (List Flights Info): After delete - No records"); // Server shut down flightServer.shutdown(); System.out.println("C8: Server shut down successfully"); } } catch (Exception e) { e.printStackTrace(); } } .. testoutput:: S1: Server (Location): Listening on port 33333 C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333 C2: Client (Populate Data): Wrote 2 batches with 3 rows each C3: Client (Get Metadata): FlightInfo{schema=Schema, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false} C4: Client (Get Stream): Client Received batch #1, Data: name Ronald David Francisco Client Received batch #2, Data: name Manuel Felipe JJ C5: Client (List Flights Info): FlightInfo{schema=Schema, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false} C6: Client (Do Delete Action): Delete completed C7: Client (List Flights Info): After delete - No records C8: Server shut down successfully Let explain our code in more detail. Start Flight Server ******************* First, we'll start our server: .. code-block:: java try(FlightServer flightServer = FlightServer.builder(allocator, location, new CookbookProducer(allocator, location)).build()) { try { flightServer.start(); System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort()); } catch (IOException e) { e.printStackTrace(); } .. code-block:: shell S1: Server (Location): Listening on port 33333 Connect to Flight Server ************************ We can then create a client and connect to the server: .. code-block:: java try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) { System.out.println("C1: Client (Location): Connected to " + location.getUri()); .. code-block:: shell C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333 Put Data ******** First, we'll create and upload a vector schema root, which will get stored in a memory by the server. .. code-block:: java // Server public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener ackStream) { List batches = new ArrayList<>(); return () -> { long rows = 0; VectorUnloader unloader; while (flightStream.next()) { unloader = new VectorUnloader(flightStream.getRoot()); try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { batches.add(arb); rows += flightStream.getRoot().getRowCount(); } } Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows); datasets.put(flightStream.getDescriptor(), dataset); ackStream.onCompleted(); }; } // Client Schema schema = new Schema(Arrays.asList( new Field("name", FieldType.nullable(new ArrowType.Utf8()), null))); try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) { varCharVector.allocateNew(3); varCharVector.set(0, "Ronald".getBytes()); varCharVector.set(1, "David".getBytes()); varCharVector.set(2, "Francisco".getBytes()); vectorSchemaRoot.setRowCount(3); FlightClient.ClientStreamListener listener = flightClient.startPut( FlightDescriptor.path("profiles"), vectorSchemaRoot, new AsyncPutListener()); listener.putNext(); varCharVector.set(0, "Manuel".getBytes()); varCharVector.set(1, "Felipe".getBytes()); varCharVector.set(2, "JJ".getBytes()); vectorSchemaRoot.setRowCount(3); listener.putNext(); listener.completed(); listener.getResult(); System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each"); } .. code-block:: shell C2: Client (Populate Data): Wrote 2 batches with 3 rows each Get Metadata ************ Once we do so, we can retrieve the metadata for that dataset. .. code-block:: java // Server public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { FlightEndpoint flightEndpoint = new FlightEndpoint( new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location); return new FlightInfo( datasets.get(descriptor).getSchema(), descriptor, Collections.singletonList(flightEndpoint), /*bytes=*/-1, datasets.get(descriptor).getRows() ); } // Client FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles")); System.out.println("C3: Client (Get Metadata): " + flightInfo); .. code-block:: shell C3: Client (Get Metadata): FlightInfo{schema=Schema, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6} Get Data ******** And get the data back: .. code-block:: java // Server public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) { FlightDescriptor flightDescriptor = FlightDescriptor.path( new String(ticket.getBytes(), StandardCharsets.UTF_8)); Dataset dataset = this.datasets.get(flightDescriptor); if (dataset == null) { throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException(); } else { VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create( this.datasets.get(flightDescriptor).getSchema(), allocator); listener.start(vectorSchemaRoot); for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) { VectorLoader loader = new VectorLoader(vectorSchemaRoot); loader.load(arrowRecordBatch.cloneWithTransfer(allocator)); listener.putNext(); } listener.completed(); } } // Client try(FlightStream flightStream = flightClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) { int batch = 0; try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) { System.out.println("C4: Client (Get Stream):"); while (flightStream.next()) { batch++; System.out.println("Client Received batch #" + batch + ", Data:"); System.out.print(vectorSchemaRootReceived.contentToTSVString()); } } } catch (Exception e) { e.printStackTrace(); } .. code-block:: shell C4: Client (Get Stream): Client Received batch #1, Data: name Ronald David Francisco Client Received batch #2, Data: name Manuel Felipe JJ Delete data *********** Then, we'll delete the dataset: .. code-block:: java // Server public void doAction(CallContext context, Action action, StreamListener listener) { FlightDescriptor flightDescriptor = FlightDescriptor.path( new String(action.getBody(), StandardCharsets.UTF_8)); switch (action.getType()) { case "DELETE": if (datasets.remove(flightDescriptor) != null) { Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8)); listener.onNext(result); } else { Result result = new Result("Delete not completed. Reason: Key did not exist." .getBytes(StandardCharsets.UTF_8)); listener.onNext(result); } listener.onCompleted(); } } // Client Iterator deleteActionResult = flightClient.doAction(new Action("DELETE", FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8))); while (deleteActionResult.hasNext()) { Result result = deleteActionResult.next(); System.out.println("C6: Client (Do Delete Action): " + new String(result.getBody(), StandardCharsets.UTF_8)); } .. code-block:: shell C6: Client (Do Delete Action): Delete completed Validate Delete Data ******************** And confirm that it's been deleted: .. code-block:: java // Server public void listFlights(CallContext context, Criteria criteria, StreamListener listener) { datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); }); listener.onCompleted(); } // Client Iterable flightInfos = flightClient.listFlights(Criteria.ALL); flightInfos.forEach(t -> System.out.println(t)); System.out.println("C7: Client (List Flights Info): After delete - No records"); .. code-block:: shell C7: Client (List Flights Info): After delete - No records Stop Flight Server ****************** .. code-block:: java // Server flightServer.shutdown(); System.out.println("C8: Server shut down successfully"); .. code-block:: shell C8: Server shut down successfully _`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html