Arrow Flight RPC

Arrow Flight is an RPC framework for efficient transfer of Arrow data over the network.

See also

Flight protocol documentation

Documentation of the Flight protocol, including how to use Flight conceptually.

Java Cookbook

Recipes for using Arrow Flight in Java.

Writing a Flight Service

Flight servers implement the FlightProducer interface. For convenience, they can subclass NoOpFlightProducer instead, which offers default implementations of all the RPC methods.

public class TutorialFlightProducer implements FlightProducer {
    @Override
    // Override methods or use NoOpFlightProducer for only methods needed
}

Each RPC method always takes a CallContext for common parameters. To indicate failure, pass an exception to the “listener” if present, or else raise an exception.

// Server
@Override
public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
    // ...
    listener.onError(
        CallStatus.UNAUTHENTICATED.withDescription(
            "Custom UNAUTHENTICATED description message.").toRuntimeException());
    // ...
}

// Client
try{
    Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
    // ...
} catch (FlightRuntimeException e){
    // Catch UNAUTHENTICATED exception
}

To start a server, create a Location to specify where to listen, and then create a FlightServer with an instance of a producer. This will start the server, but won’t block the rest of the program. Call FlightServer.awaitTermination to block until the server stops.

class TutorialFlightProducer implements FlightProducer {
    @Override
    // Override methods or use NoOpFlightProducer for only methods needed
}

Location location = Location.forGrpcInsecure("0.0.0.0", 0);
try(
    BufferAllocator allocator = new RootAllocator();
    FlightServer server = FlightServer.builder(
            allocator,
            location,
            new TutorialFlightProducer()
    ).build();
){
    server.start();
    System.out.println("Server listening on port " + server.getPort());
    server.awaitTermination();
} catch (Exception e) {
    e.printStackTrace();
}
Server listening on port 58104

Using the Flight Client

To connect to a Flight service, create a FlightClient with a location.

Location location = Location.forGrpcInsecure("0.0.0.0", 58104);

try(BufferAllocator allocator = new RootAllocator();
    FlightClient client = FlightClient.builder(allocator, location).build()){
    // ... Consume operations exposed by Flight server
} catch (Exception e) {
    e.printStackTrace();
}

Cancellation and Timeouts

When making a call, clients can optionally provide CallOptions. This allows clients to set a timeout on calls. Also, some objects returned by client RPC calls expose a cancel method which allows terminating a call early.

Location location = Location.forGrpcInsecure("0.0.0.0", 58609);

try(BufferAllocator allocator = new RootAllocator();
    FlightClient tutorialFlightClient = FlightClient.builder(allocator, location).build()){

    Iterator<Result> resultIterator = tutorialFlightClient.doAction(
            new Action("test-timeout"),
            CallOptions.timeout(2, TimeUnit.SECONDS)
    );
} catch (Exception e) {
    e.printStackTrace();
}

On the server side, timeouts are transparent. For cancellation, the server needs to manually poll setOnCancelHandler or isCancelled to check if the client has cancelled the call, and if so, break out of any processing the server is currently doing.

// Client
Location location = Location.forGrpcInsecure("0.0.0.0", 58609);
try(BufferAllocator allocator = new RootAllocator();
    FlightClient tutorialFlightClient = FlightClient.builder(allocator, location).build()){
    try(FlightStream flightStream = flightClient.getStream(new Ticket(new byte[]{}))) {
        // ...
        flightStream.cancel("tutorial-cancel", new Exception("Testing cancellation option!"));
    }
} catch (Exception e) {
    e.printStackTrace();
}
// Server
@Override
public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
    // ...
    listener.setOnCancelHandler(()->{
                // Implement logic to handle cancellation option
            });
}

Enabling TLS

TLS can be enabled when setting up a server by providing a certificate and key pair to FlightServer.Builder.useTls.

On the client side, use Location.forGrpcTls to create the Location for the client.

Enabling Authentication

Warning

Authentication is insecure without enabling TLS.

Handshake-based authentication can be enabled by implementing ServerAuthHandler. Authentication consists of two parts: on initial client connection, the server and client authentication implementations can perform any negotiation needed. The client authentication handler then provides a token that will be attached to future calls.

The client send data to be validated through ClientAuthHandler.authenticate The server validate data received through ServerAuthHandler.authenticate.

Custom Middleware

Servers and clients support custom middleware (or interceptors) that are called on every request and can modify the request in a limited fashion. These can be implemented by implementing the FlightServerMiddleware and FlightClientMiddleware interfaces.

Middleware are fairly limited, but they can add headers to a request/response. On the server, they can inspect incoming headers and fail the request; hence, they can be used to implement custom authentication methods.

Adding Services

Servers can add other gRPC services. For example, to add the Health Check service:

final HealthStatusManager statusManager = new HealthStatusManager();
final Consumer<NettyServerBuilder> consumer = (builder) -> {
  builder.addService(statusManager.getHealthService());
};
final Location location = forGrpcInsecure(LOCALHOST, 5555);
try (
    BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
    Producer producer = new Producer(a);
    FlightServer s = FlightServer.builder(a, location, producer)
        .transportHint("grpc.builderConsumer", consumer).build().start();
) {
  Channel channel = NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();
  HealthCheckResponse response = HealthGrpc
          .newBlockingStub(channel)
          .check(HealthCheckRequest.getDefaultInstance());

  System.out.println(response.getStatus());
}

Flight best practices