Arrow Flight

This section contains a number of recipes for working with Arrow Flight, an RPC library specialized for tabular datasets. For more about Flight, see format/Flight.

Simple Parquet storage service with Arrow Flight

We’ll implement a service that provides a key-value store for tabular data, using Flight to handle uploads/requests and Parquet to store the actual data.

First, we’ll implement the service itself. For simplicity, we won’t use the Datasets API in favor of just using the Parquet API directly.

Parquet storage service, server implementation
  1class ParquetStorageService : public arrow::flight::FlightServerBase {
  2 public:
  3  const arrow::flight::ActionType kActionDropDataset{"drop_dataset", "Delete a dataset."};
  4
  5  explicit ParquetStorageService(std::shared_ptr<arrow::fs::FileSystem> root)
  6      : root_(std::move(root)) {}
  7
  8  arrow::Status ListFlights(
  9      const arrow::flight::ServerCallContext&, const arrow::flight::Criteria*,
 10      std::unique_ptr<arrow::flight::FlightListing>* listings) override {
 11    arrow::fs::FileSelector selector;
 12    selector.base_dir = "/";
 13    ARROW_ASSIGN_OR_RAISE(auto listing, root_->GetFileInfo(selector));
 14
 15    std::vector<arrow::flight::FlightInfo> flights;
 16    for (const auto& file_info : listing) {
 17      if (!file_info.IsFile() || file_info.extension() != "parquet") continue;
 18      ARROW_ASSIGN_OR_RAISE(auto info, MakeFlightInfo(file_info));
 19      flights.push_back(std::move(info));
 20    }
 21
 22    *listings = std::unique_ptr<arrow::flight::FlightListing>(
 23        new arrow::flight::SimpleFlightListing(std::move(flights)));
 24    return arrow::Status::OK();
 25  }
 26
 27  arrow::Status GetFlightInfo(const arrow::flight::ServerCallContext&,
 28                              const arrow::flight::FlightDescriptor& descriptor,
 29                              std::unique_ptr<arrow::flight::FlightInfo>* info) override {
 30    ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(descriptor));
 31    ARROW_ASSIGN_OR_RAISE(auto flight_info, MakeFlightInfo(file_info));
 32    *info = std::unique_ptr<arrow::flight::FlightInfo>(
 33        new arrow::flight::FlightInfo(std::move(flight_info)));
 34    return arrow::Status::OK();
 35  }
 36
 37  arrow::Status DoPut(const arrow::flight::ServerCallContext&,
 38                      std::unique_ptr<arrow::flight::FlightMessageReader> reader,
 39                      std::unique_ptr<arrow::flight::FlightMetadataWriter>) override {
 40    ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(reader->descriptor()));
 41    ARROW_ASSIGN_OR_RAISE(auto sink, root_->OpenOutputStream(file_info.path()));
 42    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, reader->ToTable());
 43
 44    ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(),
 45                                                   sink, /*chunk_size=*/65536));
 46    return arrow::Status::OK();
 47  }
 48
 49  arrow::Status DoGet(const arrow::flight::ServerCallContext&,
 50                      const arrow::flight::Ticket& request,
 51                      std::unique_ptr<arrow::flight::FlightDataStream>* stream) override {
 52    ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(request.ticket));
 53    std::unique_ptr<parquet::arrow::FileReader> reader;
 54    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
 55                                                 arrow::default_memory_pool(), &reader));
 56
 57    std::shared_ptr<arrow::Table> table;
 58    ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
 59    // Note that we can't directly pass TableBatchReader to
 60    // RecordBatchStream because TableBatchReader keeps a non-owning
 61    // reference to the underlying Table, which would then get freed
 62    // when we exit this function
 63    std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
 64    arrow::TableBatchReader batch_reader(*table);
 65    ARROW_ASSIGN_OR_RAISE(batches, batch_reader.ToRecordBatches());
 66
 67    ARROW_ASSIGN_OR_RAISE(auto owning_reader, arrow::RecordBatchReader::Make(
 68                                                  std::move(batches), table->schema()));
 69    *stream = std::unique_ptr<arrow::flight::FlightDataStream>(
 70        new arrow::flight::RecordBatchStream(owning_reader));
 71
 72    return arrow::Status::OK();
 73  }
 74
 75  arrow::Status ListActions(const arrow::flight::ServerCallContext&,
 76                            std::vector<arrow::flight::ActionType>* actions) override {
 77    *actions = {kActionDropDataset};
 78    return arrow::Status::OK();
 79  }
 80
 81  arrow::Status DoAction(const arrow::flight::ServerCallContext&,
 82                         const arrow::flight::Action& action,
 83                         std::unique_ptr<arrow::flight::ResultStream>* result) override {
 84    if (action.type == kActionDropDataset.type) {
 85      *result = std::unique_ptr<arrow::flight::ResultStream>(
 86          new arrow::flight::SimpleResultStream({}));
 87      return DoActionDropDataset(action.body->ToString());
 88    }
 89    return arrow::Status::NotImplemented("Unknown action type: ", action.type);
 90  }
 91
 92 private:
 93  arrow::Result<arrow::flight::FlightInfo> MakeFlightInfo(
 94      const arrow::fs::FileInfo& file_info) {
 95    ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(file_info));
 96    std::unique_ptr<parquet::arrow::FileReader> reader;
 97    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
 98                                                 arrow::default_memory_pool(), &reader));
 99
100    std::shared_ptr<arrow::Schema> schema;
101    ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
102
103    auto descriptor = arrow::flight::FlightDescriptor::Path({file_info.base_name()});
104
105    arrow::flight::FlightEndpoint endpoint;
106    endpoint.ticket.ticket = file_info.base_name();
107    arrow::flight::Location location;
108    ARROW_ASSIGN_OR_RAISE(location,
109        arrow::flight::Location::ForGrpcTcp("localhost", port()));
110    endpoint.locations.push_back(location);
111
112    int64_t total_records = reader->parquet_reader()->metadata()->num_rows();
113    int64_t total_bytes = file_info.size();
114
115    return arrow::flight::FlightInfo::Make(*schema, descriptor, {endpoint}, total_records,
116                                           total_bytes);
117  }
118
119  arrow::Result<arrow::fs::FileInfo> FileInfoFromDescriptor(
120      const arrow::flight::FlightDescriptor& descriptor) {
121    if (descriptor.type != arrow::flight::FlightDescriptor::PATH) {
122      return arrow::Status::Invalid("Must provide PATH-type FlightDescriptor");
123    } else if (descriptor.path.size() != 1) {
124      return arrow::Status::Invalid(
125          "Must provide PATH-type FlightDescriptor with one path component");
126    }
127    return root_->GetFileInfo(descriptor.path[0]);
128  }
129
130  arrow::Status DoActionDropDataset(const std::string& key) {
131    return root_->DeleteFile(key);
132  }
133
134  std::shared_ptr<arrow::fs::FileSystem> root_;
135};  // end ParquetStorageService

First, we’ll start our server:

auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
    arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
    new ParquetStorageService(std::move(root)));
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
Code Output
Listening on port 33611

We can then create a client and connect to the server:

arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
    arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location));
rout << "Connected to " << location.ToString() << std::endl;
Code Output
Connected to grpc+tcp://localhost:33611

First, we’ll create and upload a table, which will get stored in a Parquet file by the server.

// Open example data file to upload
ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
                      FindTestDataFile("airquality.parquet"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
                      fs->OpenInputFile(airquality_path));
std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_RETURN_NOT_OK(
    parquet::arrow::OpenFile(std::move(input), arrow::default_memory_pool(), &reader));

auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
std::shared_ptr<arrow::Schema> schema;
ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));

// Start the RPC call
std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
ARROW_ASSIGN_OR_RAISE(auto put_stream, client->DoPut(descriptor, schema));
writer = std::move(put_stream.writer);
metadata_reader = std::move(put_stream.reader);

// Upload data
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::vector<int> row_groups(reader->num_row_groups());
std::iota(row_groups.begin(), row_groups.end(), 0);
ARROW_RETURN_NOT_OK(reader->GetRecordBatchReader(row_groups, &batch_reader));
int64_t batches = 0;
while (true) {
  ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
  if (!batch) break;
  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
  batches++;
}

ARROW_RETURN_NOT_OK(writer->Close());
rout << "Wrote " << batches << " batches" << std::endl;
Code Output
Wrote 1 batches

Once we do so, we can retrieve the metadata for that dataset:

std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, client->GetFlightInfo(descriptor));
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
Code Output
FlightDescriptor<path = 'airquality.parquet'>
=== Schema ===
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
==============

And get the data back:

std::unique_ptr<arrow::flight::FlightStreamReader> stream;
ARROW_ASSIGN_OR_RAISE(stream, client->DoGet(flight_info->endpoints()[0].ticket));
std::shared_ptr<arrow::Table> table;
ARROW_ASSIGN_OR_RAISE(table, stream->ToTable());
arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
ARROW_RETURN_NOT_OK(arrow::PrettyPrint(*table, print_options, &rout));
Code Output
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
----
Ozone:
  [
    [
      41,
      36,
      ...
      18,
      20
    ]
  ]
Solar.R:
  [
    [
      190,
      118,
      ...
      131,
      223
    ]
  ]
Wind:
  [
    [
      7.4,
      8,
      ...
      8,
      11.5
    ]
  ]
Temp:
  [
    [
      67,
      72,
      ...
      76,
      68
    ]
  ]
Month:
  [
    [
      5,
      5,
      ...
      9,
      9
    ]
  ]
Day:
  [
    [
      1,
      2,
      ...
      29,
      30
    ]
  ]

Then, we’ll delete the dataset:

arrow::flight::Action action{"drop_dataset",
                             arrow::Buffer::FromString("airquality.parquet")};
std::unique_ptr<arrow::flight::ResultStream> results;
ARROW_ASSIGN_OR_RAISE(results, client->DoAction(action));
rout << "Deleted dataset" << std::endl;
Code Output
Deleted dataset

And confirm that it’s been deleted:

std::unique_ptr<arrow::flight::FlightListing> listing;
ARROW_ASSIGN_OR_RAISE(listing, client->ListFlights());
while (true) {
  std::unique_ptr<arrow::flight::FlightInfo> flight_info;
  ARROW_ASSIGN_OR_RAISE(flight_info, listing->Next());
  if (!flight_info) break;
  rout << flight_info->descriptor().ToString() << std::endl;
  rout << "=== Schema ===" << std::endl;
  std::shared_ptr<arrow::Schema> info_schema;
  arrow::ipc::DictionaryMemo dictionary_memo;
  ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
  rout << info_schema->ToString() << std::endl;
  rout << "==============" << std::endl;
}
rout << "End of listing" << std::endl;
Code Output
End of listing

Finally, we’ll stop our server:

ARROW_RETURN_NOT_OK(server->Shutdown());
rout << "Server shut down successfully" << std::endl;
Code Output
Server shut down successfully

Setting gRPC client options

Options for gRPC clients can be passed in using the generic_options field of arrow::flight::FlightClientOptions. There is a list of available client options in the gRPC API documentation.

For example, you can change the maximum message length sent with:

auto client_options = arrow::flight::FlightClientOptions::Defaults();
// Set a very low limit at the gRPC layer to fail all calls
client_options.generic_options.emplace_back(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 2);

arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
    arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
// pass client_options into Connect()
ARROW_ASSIGN_OR_RAISE(client,
    arrow::flight::FlightClient::Connect(location, client_options));
rout << "Connected to " << location.ToString() << std::endl;
Code Output
Connected to grpc+tcp://localhost:42439

Flight Service with other gRPC endpoints

If you are using the gRPC backend, you can add other gRPC endpoints to the Flight server. While Flight clients won’t recognize these endpoints, general gRPC clients will be able to.

Note

If statically linking Arrow Flight, Protobuf and gRPC must also be statically linked, and the same goes for dynamic linking. Read more at https://arrow.apache.org/docs/cpp/build_system.html#a-note-on-linking

Creating the server

To create a gRPC service, first define a service using protobuf.

Hello world protobuf specification
 1syntax = "proto3";
 2
 3service HelloWorldService {
 4  rpc SayHello(HelloRequest) returns (HelloResponse);
 5}
 6
 7message HelloRequest {
 8  string name = 1;
 9}
10
11message HelloResponse {
12  string reply = 1;
13}

Next, you’ll need to compile that to provide the protobuf and gRPC generated files. See gRPC’s generating client and server code docs for details.

Then write an implementation for the gRPC service:

Hello world gRPC service implementation
 1class HelloWorldServiceImpl : public HelloWorldService::Service {
 2  grpc::Status SayHello(grpc::ServerContext*, const HelloRequest* request,
 3                        HelloResponse* reply) override {
 4    const std::string& name = request->name();
 5    if (name.empty()) {
 6      return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Must provide a name!");
 7    }
 8    reply->set_reply("Hello, " + name);
 9    return grpc::Status::OK;
10  }
11};  // end HelloWorldServiceImpl

Finally, use the builder_hook hook on arrow::flight::FlightServerOptions to register the additional gRPC service.

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
    arrow::flight::Location::ForGrpcTcp("0.0.0.0", 5000));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
    new ParquetStorageService(std::move(root)));

// Create hello world service
HelloWorldServiceImpl grpc_service;

// Use builder_hook to register grpc service
options.builder_hook = [&](void* raw_builder) {
  auto* builder = reinterpret_cast<grpc::ServerBuilder*>(raw_builder);
  builder->RegisterService(&grpc_service);
};

ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
Code Output
Listening on port 5000

Creating the client

The Flight client implementation doesn’t know about any custom gRPC services, so to call them you’ll need to create a normal gRPC client. For the Hello World service, we use the HelloWorldService stub, which is provided by the compiled gRPC definition.

auto client_channel =
    grpc::CreateChannel("0.0.0.0:5000", grpc::InsecureChannelCredentials());

auto stub = HelloWorldService::NewStub(client_channel);

grpc::ClientContext context;
HelloRequest request;
request.set_name("Arrow User");
HelloResponse response;
grpc::Status status = stub->SayHello(&context, request, &response);
if (!status.ok()) {
  return arrow::Status::IOError(status.error_message());
}
rout << response.reply();
Code Output
Hello, Arrow User