Introducing Apache Arrow Flight: A Framework for Fast Data Transport


Published 13 Oct 2019
By Wes McKinney (wesm)
Translations 日本語

Over the last 18 months, the Apache Arrow community has been busy designing and implementing Flight, a new general-purpose client-server framework to simplify high performance transport of large datasets over network interfaces.

Flight initially is focused on optimized transport of the Arrow columnar format (i.e. “Arrow record batches”) over gRPC, Google’s popular HTTP/2-based general-purpose RPC library and framework. While we have focused on integration with gRPC, as a development framework Flight is not intended to be exclusive to gRPC.

One of the biggest features that sets apart Flight from other data transport frameworks is parallel transfers, allowing data to be streamed to or from a cluster of servers simultaneously. This enables developers to more easily create scalable data services that can serve a growing client base.

In the 0.15.0 Apache Arrow release, we have ready-to-use Flight implementations in C++ (with Python bindings) and Java. These libraries are suitable for beta users who are comfortable with API or protocol changes while we continue to refine some low-level details in the Flight internals.

Motivation

Many people have experienced the pain associated with accessing large datasets over a network. There are many different transfer protocols and tools for reading datasets from remote data services, such as ODBC and JDBC. Over the last 10 years, file-based data warehousing in formats like CSV, Avro, and Parquet has become popular, but this also presents challenges as raw data must be transferred to local hosts before being deserialized.

The work we have done since the beginning of Apache Arrow holds exciting promise for accelerating data transport in a number of ways. The Arrow columnar format has key features that can help us:

  • It is an “on-the-wire” representation of tabular data that does not require deserialization on receipt
  • Its natural mode is that of “streaming batches”, larger datasets are transported a batch of rows at a time (called “record batches” in Arrow parlance). In this post we will talk about “data streams”, these are sequences of Arrow record batches using the project’s binary protocol
  • The format is language-independent and now has library support in 11 languages and counting.

Implementations of standard protocols like ODBC generally implement their own custom on-wire binary protocols that must be marshalled to and from each library’s public interface. The performance of ODBC or JDBC libraries varies greatly from case to case.

Our design goal for Flight is to create a new protocol for data services that uses the Arrow columnar format as both the over-the-wire data representation as well as the public API presented to developers. In doing so, we reduce or remove the serialization costs associated with data transport and increase the overall efficiency of distributed data systems. Additionally, two systems that are already using Apache Arrow for other purposes can communicate data to each other with extreme efficiency.

Flight Basics

The Arrow Flight libraries provide a development framework for implementing a service that can send and receive data streams. A Flight server supports several basic kinds of requests:

  • Handshake: a simple request to determine whether the client is authorized and, in some cases, to establish an implementation-defined session token to use for future requests
  • ListFlights: return a list of available data streams
  • GetSchema: return the schema for a data stream
  • GetFlightInfo: return an “access plan” for a dataset of interest, possibly requiring consuming multiple data streams. This request can accept custom serialized commands containing, for example, your specific application parameters.
  • DoGet: send a data stream to a client
  • DoPut: receive a data stream from a client
  • DoAction: perform an implementation-specific action and return any results, i.e. a generalized function call
  • ListActions: return a list of available action types

We take advantage of gRPC’s elegant “bidirectional” streaming support (built on top of HTTP/2 streaming) to allow clients and servers to send data and metadata to each other simultaneously while requests are being served.

A simple Flight setup might consist of a single server to which clients connect and make DoGet requests.

Flight Simple Architecture

Optimizing Data Throughput over gRPC

While using a general-purpose messaging library like gRPC has numerous specific benefits beyond the obvious ones (taking advantage of all the engineering that Google has done on the problem), some work was needed to improve the performance of transporting large datasets. Many kinds of gRPC users only deal with relatively small messages, for example.

The best-supported way to use gRPC is to define services in a Protocol Buffers (aka “Protobuf”) .proto file. A Protobuf plugin for gRPC generates gRPC service stubs that you can use to implement your applications. RPC commands and data messages are serialized using the Protobuf wire format. Because we use “vanilla gRPC and Protocol Buffers”, gRPC clients that are ignorant of the Arrow columnar format can still interact with Flight services and handle the Arrow data opaquely.

The main data-related Protobuf type in Flight is called FlightData. Reading and writing Protobuf messages in general is not free, so we implemented some low-level optimizations in gRPC in both C++ and Java to do the following:

  • Generate the Protobuf wire format for FlightData including the Arrow record batch being sent without going through any intermediate memory copying or serialization steps.
  • Reconstruct a Arrow record batch from the Protobuf representation of FlightData without any memory copying or deserialization. In fact, we intercept the encoded data payloads without allowing the Protocol Buffers library to touch them.

In a sense we are “having our cake and eating it, too”. Flight implementations having these optimizations will have better performance, while naive gRPC clients can still talk to the Flight service and use a Protobuf library to deserialize FlightData (albeit with some performance penalty).

As far as absolute speed, in our C++ data throughput benchmarks, we are seeing end-to-end TCP throughput in excess of 2-3GB/s on localhost without TLS enabled. This benchmark shows a transfer of ~12 gigabytes of data in about 4 seconds:

$ ./arrow-flight-benchmark --records_per_stream 100000000
Bytes read: 12800000000
Nanos: 3900466413
Speed: 3129.63 MB/s

From this we can conclude that the machinery of Flight and gRPC adds relatively little overhead, and it suggests that many real-world applications of Flight will be bottlenecked on network bandwidth.

Horizontal Scalability: Parallel and Partitioned Data Access

Many distributed database-type systems make use of an architectural pattern where the results of client requests are routed through a “coordinator” and sent to the client. Aside from the obvious efficiency issues of transporting a dataset multiple times on its way to a client, it also presents a scalability problem for getting access to very large datasets.

We wanted Flight to enable systems to create horizontally scalable data services without having to deal with such bottlenecks. A client request to a dataset using the GetFlightInfo RPC returns a list of endpoints, each of which contains a server location and a ticket to send that server in a DoGet request to obtain a part of the full dataset. To get access to the entire dataset, all of the endpoints must be consumed. While Flight streams are not necessarily ordered, we provide for application-defined metadata which can be used to serialize ordering information.

This multiple-endpoint pattern has a number of benefits:

  • Endpoints can be read by clients in parallel.
  • The service that serves the GetFlightInfo “planning” request can delegate work to sibling services to take advantage of data locality or simply to help with load balancing.
  • Nodes in a distributed cluster can take on different roles. For example, a subset of nodes might be responsible for planning queries while other nodes exclusively fulfill data stream (DoGet or DoPut) requests.

Here is an example diagram of a multi-node architecture with split service roles:

Flight Complex Architecture

Actions: Extending Flight with application business logic

While the GetFlightInfo request supports sending opaque serialized commands when requesting a dataset, a client may need to be able to ask a server to perform other kinds of operations. For example, a client may request for a particular dataset to be “pinned” in memory so that subsequent requests from other clients are served faster.

A Flight service can thus optionally define “actions” which are carried out by the DoAction RPC. An action request contains the name of the action being performed and optional serialized data containing further needed information. The result of an action is a gRPC stream of opaque binary results.

Some example actions:

  • Metadata discovery, beyond the capabilities provided by the built-in ListFlights RPC
  • Setting session-specific parameters and settings

Note that it is not required for a server to implement any actions, and actions need not return results.

Encryption and Authentication

Flight supports encryption out of the box using gRPC’s built in TLS / OpenSSL capabilities.

For authentication, there are extensible authentication handlers for the client and server that permit simple authentication schemes (like user and password) as well as more involved authentication such as Kerberos. The Flight protocol comes with a built-in BasicAuth so that user/password authentication can be implemented out of the box without custom development.

Middleware and Tracing

gRPC has the concept of “interceptors” which have allowed us to develop developer-defined “middleware” that can provide instrumentation of or telemetry for incoming and outgoing requests. One such framework for such instrumentation is OpenTracing.

Note that middleware functionality is one of the newest areas of the project and is only currently available in the project’s master branch.

gRPC, but not only gRPC

We specify server locations for DoGet requests using RFC 3986 compliant URIs. For example, TLS-secured gRPC may be specified like grpc+tls://$HOST:$PORT.

While we think that using gRPC for the “command” layer of Flight servers makes sense, we may wish to support data transport layers other than TCP such as RDMA. While some design and development work is required to make this possible, the idea is that gRPC could be used to coordinate get and put transfers which may be carried out on protocols other than TCP.

Getting Started and What’s Next

Documentation for Flight users is a work in progress, but the libraries themselves are mature enough for beta users that are tolerant of some minor API or protocol changes over the coming year.

One of the easiest ways to experiment with Flight is using the Python API, since custom servers and clients can be defined entirely in Python without any compilation required. You can see an example Flight client and server in Python in the Arrow codebase.

In real-world use, Dremio has developed an Arrow Flight-based connector which has been shown to deliver 20-50x better performance over ODBC. For Apache Spark users, Arrow contributor Ryan Murray has created a data source implementation to connect to Flight-enabled endpoints.

As far as “what’s next” in Flight, support for non-gRPC (or non-TCP) data transport may be an interesting direction of research and development work. A lot of the Flight work from here will be creating user-facing Flight-enabled services. Since Flight is a development framework, we expect that user-facing APIs will utilize a layer of API veneer that hides many general Flight details and details related to a particular application of Flight in a custom data service.