Arrow Flight¶
This section contains a number of recipes for working with Arrow Flight, an RPC library specialized for tabular datasets. For more about Flight, see format/Flight.
Simple Parquet storage service with Arrow Flight¶
We’ll implement a service that provides a key-value store for tabular data, using Flight to handle uploads/requests and Parquet to store the actual data.
First, we’ll implement the service itself. For simplicity, we won’t use the Datasets API in favor of just using the Parquet API directly.
1class ParquetStorageService : public arrow::flight::FlightServerBase {
2 public:
3 const arrow::flight::ActionType kActionDropDataset{"drop_dataset", "Delete a dataset."};
4
5 explicit ParquetStorageService(std::shared_ptr<arrow::fs::FileSystem> root)
6 : root_(std::move(root)) {}
7
8 arrow::Status ListFlights(
9 const arrow::flight::ServerCallContext&, const arrow::flight::Criteria*,
10 std::unique_ptr<arrow::flight::FlightListing>* listings) override {
11 arrow::fs::FileSelector selector;
12 selector.base_dir = "/";
13 ARROW_ASSIGN_OR_RAISE(auto listing, root_->GetFileInfo(selector));
14
15 std::vector<arrow::flight::FlightInfo> flights;
16 for (const auto& file_info : listing) {
17 if (!file_info.IsFile() || file_info.extension() != "parquet") continue;
18 ARROW_ASSIGN_OR_RAISE(auto info, MakeFlightInfo(file_info));
19 flights.push_back(std::move(info));
20 }
21
22 *listings = std::unique_ptr<arrow::flight::FlightListing>(
23 new arrow::flight::SimpleFlightListing(std::move(flights)));
24 return arrow::Status::OK();
25 }
26
27 arrow::Status GetFlightInfo(const arrow::flight::ServerCallContext&,
28 const arrow::flight::FlightDescriptor& descriptor,
29 std::unique_ptr<arrow::flight::FlightInfo>* info) override {
30 ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(descriptor));
31 ARROW_ASSIGN_OR_RAISE(auto flight_info, MakeFlightInfo(file_info));
32 *info = std::unique_ptr<arrow::flight::FlightInfo>(
33 new arrow::flight::FlightInfo(std::move(flight_info)));
34 return arrow::Status::OK();
35 }
36
37 arrow::Status DoPut(const arrow::flight::ServerCallContext&,
38 std::unique_ptr<arrow::flight::FlightMessageReader> reader,
39 std::unique_ptr<arrow::flight::FlightMetadataWriter>) override {
40 ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(reader->descriptor()));
41 ARROW_ASSIGN_OR_RAISE(auto sink, root_->OpenOutputStream(file_info.path()));
42 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, reader->ToTable());
43
44 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(),
45 sink, /*chunk_size=*/65536));
46 return arrow::Status::OK();
47 }
48
49 arrow::Status DoGet(const arrow::flight::ServerCallContext&,
50 const arrow::flight::Ticket& request,
51 std::unique_ptr<arrow::flight::FlightDataStream>* stream) override {
52 ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(request.ticket));
53 std::unique_ptr<parquet::arrow::FileReader> reader;
54 ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
55 arrow::default_memory_pool(), &reader));
56
57 std::shared_ptr<arrow::Table> table;
58 ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
59 // Note that we can't directly pass TableBatchReader to
60 // RecordBatchStream because TableBatchReader keeps a non-owning
61 // reference to the underlying Table, which would then get freed
62 // when we exit this function
63 std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
64 arrow::TableBatchReader batch_reader(*table);
65 ARROW_ASSIGN_OR_RAISE(batches, batch_reader.ToRecordBatches());
66
67 ARROW_ASSIGN_OR_RAISE(auto owning_reader, arrow::RecordBatchReader::Make(
68 std::move(batches), table->schema()));
69 *stream = std::unique_ptr<arrow::flight::FlightDataStream>(
70 new arrow::flight::RecordBatchStream(owning_reader));
71
72 return arrow::Status::OK();
73 }
74
75 arrow::Status ListActions(const arrow::flight::ServerCallContext&,
76 std::vector<arrow::flight::ActionType>* actions) override {
77 *actions = {kActionDropDataset};
78 return arrow::Status::OK();
79 }
80
81 arrow::Status DoAction(const arrow::flight::ServerCallContext&,
82 const arrow::flight::Action& action,
83 std::unique_ptr<arrow::flight::ResultStream>* result) override {
84 if (action.type == kActionDropDataset.type) {
85 *result = std::unique_ptr<arrow::flight::ResultStream>(
86 new arrow::flight::SimpleResultStream({}));
87 return DoActionDropDataset(action.body->ToString());
88 }
89 return arrow::Status::NotImplemented("Unknown action type: ", action.type);
90 }
91
92 private:
93 arrow::Result<arrow::flight::FlightInfo> MakeFlightInfo(
94 const arrow::fs::FileInfo& file_info) {
95 ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(file_info));
96 std::unique_ptr<parquet::arrow::FileReader> reader;
97 ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
98 arrow::default_memory_pool(), &reader));
99
100 std::shared_ptr<arrow::Schema> schema;
101 ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
102
103 auto descriptor = arrow::flight::FlightDescriptor::Path({file_info.base_name()});
104
105 arrow::flight::FlightEndpoint endpoint;
106 endpoint.ticket.ticket = file_info.base_name();
107 arrow::flight::Location location;
108 ARROW_ASSIGN_OR_RAISE(location,
109 arrow::flight::Location::ForGrpcTcp("localhost", port()));
110 endpoint.locations.push_back(location);
111
112 int64_t total_records = reader->parquet_reader()->metadata()->num_rows();
113 int64_t total_bytes = file_info.size();
114
115 return arrow::flight::FlightInfo::Make(*schema, descriptor, {endpoint}, total_records,
116 total_bytes);
117 }
118
119 arrow::Result<arrow::fs::FileInfo> FileInfoFromDescriptor(
120 const arrow::flight::FlightDescriptor& descriptor) {
121 if (descriptor.type != arrow::flight::FlightDescriptor::PATH) {
122 return arrow::Status::Invalid("Must provide PATH-type FlightDescriptor");
123 } else if (descriptor.path.size() != 1) {
124 return arrow::Status::Invalid(
125 "Must provide PATH-type FlightDescriptor with one path component");
126 }
127 return root_->GetFileInfo(descriptor.path[0]);
128 }
129
130 arrow::Status DoActionDropDataset(const std::string& key) {
131 return root_->DeleteFile(key);
132 }
133
134 std::shared_ptr<arrow::fs::FileSystem> root_;
135}; // end ParquetStorageService
First, we’ll start our server:
auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);
arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));
arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
new ParquetStorageService(std::move(root)));
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
Listening on port 34553
We can then create a client and connect to the server:
arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));
std::unique_ptr<arrow::flight::FlightClient> client;
ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location));
rout << "Connected to " << location.ToString() << std::endl;
Connected to grpc+tcp://localhost:34553
First, we’ll create and upload a table, which will get stored in a Parquet file by the server.
// Open example data file to upload
ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
FindTestDataFile("airquality.parquet"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
fs->OpenInputFile(airquality_path));
std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_RETURN_NOT_OK(
parquet::arrow::OpenFile(std::move(input), arrow::default_memory_pool(), &reader));
auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
std::shared_ptr<arrow::Schema> schema;
ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
// Start the RPC call
std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
ARROW_ASSIGN_OR_RAISE(auto put_stream, client->DoPut(descriptor, schema));
writer = std::move(put_stream.writer);
metadata_reader = std::move(put_stream.reader);
// Upload data
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::vector<int> row_groups(reader->num_row_groups());
std::iota(row_groups.begin(), row_groups.end(), 0);
ARROW_RETURN_NOT_OK(reader->GetRecordBatchReader(row_groups, &batch_reader));
int64_t batches = 0;
while (true) {
ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
if (!batch) break;
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
batches++;
}
ARROW_RETURN_NOT_OK(writer->Close());
rout << "Wrote " << batches << " batches" << std::endl;
Wrote 1 batches
Once we do so, we can retrieve the metadata for that dataset:
std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, client->GetFlightInfo(descriptor));
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
<FlightDescriptor path='airquality.parquet'>
=== Schema ===
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
==============
And get the data back:
std::unique_ptr<arrow::flight::FlightStreamReader> stream;
ARROW_ASSIGN_OR_RAISE(stream, client->DoGet(flight_info->endpoints()[0].ticket));
std::shared_ptr<arrow::Table> table;
ARROW_ASSIGN_OR_RAISE(table, stream->ToTable());
arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
ARROW_RETURN_NOT_OK(arrow::PrettyPrint(*table, print_options, &rout));
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
----
Ozone:
[
[
41,
36,
...
18,
20
]
]
Solar.R:
[
[
190,
118,
...
131,
223
]
]
Wind:
[
[
7.4,
8,
...
8,
11.5
]
]
Temp:
[
[
67,
72,
...
76,
68
]
]
Month:
[
[
5,
5,
...
9,
9
]
]
Day:
[
[
1,
2,
...
29,
30
]
]
Then, we’ll delete the dataset:
arrow::flight::Action action{"drop_dataset",
arrow::Buffer::FromString("airquality.parquet")};
std::unique_ptr<arrow::flight::ResultStream> results;
ARROW_ASSIGN_OR_RAISE(results, client->DoAction(action));
rout << "Deleted dataset" << std::endl;
Deleted dataset
And confirm that it’s been deleted:
std::unique_ptr<arrow::flight::FlightListing> listing;
ARROW_ASSIGN_OR_RAISE(listing, client->ListFlights());
while (true) {
std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, listing->Next());
if (!flight_info) break;
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
}
rout << "End of listing" << std::endl;
End of listing
Finally, we’ll stop our server:
ARROW_RETURN_NOT_OK(server->Shutdown());
rout << "Server shut down successfully" << std::endl;
Server shut down successfully
Setting gRPC client options¶
Options for gRPC clients can be passed in using the generic_options
field of
arrow::flight::FlightClientOptions
. There is a list of available
client options in the gRPC API documentation.
For example, you can change the maximum message length sent with:
auto client_options = arrow::flight::FlightClientOptions::Defaults();
// Set a very low limit at the gRPC layer to fail all calls
client_options.generic_options.emplace_back(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 2);
arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));
std::unique_ptr<arrow::flight::FlightClient> client;
// pass client_options into Connect()
ARROW_ASSIGN_OR_RAISE(client,
arrow::flight::FlightClient::Connect(location, client_options));
rout << "Connected to " << location.ToString() << std::endl;
Connected to grpc+tcp://localhost:41329
Flight Service with other gRPC endpoints¶
If you are using the gRPC backend, you can add other gRPC endpoints to the Flight server. While Flight clients won’t recognize these endpoints, general gRPC clients will be able to.
Note
If statically linking Arrow Flight, Protobuf and gRPC must also be statically linked, and the same goes for dynamic linking. Read more at https://arrow.apache.org/docs/cpp/build_system.html#a-note-on-linking
Creating the server¶
To create a gRPC service, first define a service using protobuf.
1syntax = "proto3";
2
3service HelloWorldService {
4 rpc SayHello(HelloRequest) returns (HelloResponse);
5}
6
7message HelloRequest {
8 string name = 1;
9}
10
11message HelloResponse {
12 string reply = 1;
13}
Next, you’ll need to compile that to provide the protobuf and gRPC generated files. See gRPC’s generating client and server code docs for details.
Then write an implementation for the gRPC service:
1class HelloWorldServiceImpl : public HelloWorldService::Service {
2 grpc::Status SayHello(grpc::ServerContext*, const HelloRequest* request,
3 HelloResponse* reply) override {
4 const std::string& name = request->name();
5 if (name.empty()) {
6 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Must provide a name!");
7 }
8 reply->set_reply("Hello, " + name);
9 return grpc::Status::OK;
10 }
11}; // end HelloWorldServiceImpl
Finally, use the builder_hook
hook on arrow::flight::FlightServerOptions
to register the additional gRPC service.
arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 5000));
arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
new ParquetStorageService(std::move(root)));
// Create hello world service
HelloWorldServiceImpl grpc_service;
// Use builder_hook to register grpc service
options.builder_hook = [&](void* raw_builder) {
auto* builder = reinterpret_cast<grpc::ServerBuilder*>(raw_builder);
builder->RegisterService(&grpc_service);
};
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
Listening on port 5000
Creating the client¶
The Flight client implementation doesn’t know about any custom gRPC services, so to call them you’ll need to create a normal gRPC client. For the Hello World service, we use the HelloWorldService stub, which is provided by the compiled gRPC definition.
auto client_channel =
grpc::CreateChannel("0.0.0.0:5000", grpc::InsecureChannelCredentials());
auto stub = HelloWorldService::NewStub(client_channel);
grpc::ClientContext context;
HelloRequest request;
request.set_name("Arrow User");
HelloResponse response;
grpc::Status status = stub->SayHello(&context, request, &response);
if (!status.ok()) {
return arrow::Status::IOError(status.error_message());
}
rout << response.reply();
Hello, Arrow User