Client#
How to connect#
You must specify PostgreSQL user and PostgreSQL database to connect Apache Arrow Flight SQL endpoint.
User name and password must be passed by Handshake
call. Note that
basic authentication is only supported for now. mTLS (mutual-TLS)
isn’t implemented yet. If you’re interested in mTLS, please see the
issue for it:
apache/arrow-flight-sql-postgresql#79
You need to use a header to specify PostgreSQL database. The header
name for PostgreSQL database is x-flight-sql-database
.
You need to use
arrow::flight::FlightClient::AuthenticateBasicToken()
for authentication.
You need to add a x-flight-sql-database
header to
arrow::flight::FlightCallOptions::headers
for database.
The following example uses PGUSER
(fallback to USER
) and
PGPASSWORD
environment variables like
libpq
. AuthenticateBasicToken()
returns Bearer token on
success. So the example users the returned Bearer token to request
headers to use following requests.
The following example uses PGDATABASE
(fallback to PGUSER
/USER
)
environment variable like libpq
.
1#include <cstdlib>
2#include <fstream>
3#include <iostream>
4
5#include <arrow/flight/sql/client.h>
6
7namespace {
8std::string
9getenv(const char* name)
10{
11 auto value = std::getenv(name);
12 if (value)
13 {
14 return std::string(value);
15 }
16 else
17 {
18 return std::string("");
19 }
20}
21
22arrow::Status
23run()
24{
25 auto uri = getenv("PGFLIGHTSQLURI");
26 if (uri.empty())
27 {
28 auto host = getenv("PGHOST");
29 if (host.empty())
30 {
31 host = "localhost";
32 }
33 auto sslmode = getenv("PGSSLMODE");
34 if (sslmode == "require" || sslmode == "verify-ca" || sslmode == "verify-full")
35 {
36 uri = std::string("grpc+tls://") + host + ":15432";
37 }
38 else
39 {
40 uri = std::string("grpc://") + host + ":15432";
41 }
42 }
43 ARROW_ASSIGN_OR_RAISE(auto location, arrow::flight::Location::Parse(uri));
44 arrow::flight::FlightClientOptions client_options;
45 auto sslrootcert = getenv("PGSSLROOTCERT");
46 if (sslrootcert.empty())
47 {
48 auto home = getenv("HOME");
49 if (!home.empty())
50 {
51 sslrootcert = home + "/.postgresql/root.crt";
52 }
53 }
54 if (!sslrootcert.empty())
55 {
56 std::ifstream input(sslrootcert);
57 if (input)
58 {
59 client_options.tls_root_certs =
60 std::string(std::istreambuf_iterator<char>{input}, {});
61 }
62 }
63 ARROW_ASSIGN_OR_RAISE(auto client,
64 arrow::flight::FlightClient::Connect(location, client_options));
65 auto user = getenv("PGUSER");
66 if (user.empty())
67 {
68 user = getenv("USER");
69 }
70 auto password = getenv("PGPASSWORD");
71 if (password.empty())
72 {
73 password = "";
74 }
75 arrow::flight::FlightCallOptions call_options;
76 auto database = getenv("PGDATABASE");
77 if (database.empty())
78 {
79 database = user;
80 }
81 call_options.headers.emplace_back("x-flight-sql-database", database);
82 ARROW_ASSIGN_OR_RAISE(auto bearer_token,
83 client->AuthenticateBasicToken(call_options, user, password));
84 const auto& bearer_name = bearer_token.first;
85 const auto& bearer_value = bearer_token.second;
86 if (!bearer_name.empty() && !bearer_value.empty())
87 {
88 call_options.headers.emplace_back(bearer_name, bearer_value);
89 }
90 return client->Close();
91}
92}; // namespace
93
94int
95main(int argc, char** argv)
96{
97 auto status = run();
98 if (status.ok())
99 {
100 std::cout << "Authenticated!" << std::endl;
101 return EXIT_SUCCESS;
102 }
103 else
104 {
105 std::cerr << status.ToString() << std::endl;
106 return EXIT_FAILURE;
107 }
108}
You need to set the
username
and password
options for authentication. See also the
Authentication document.
You need to set the
adbc.flight.sql.rpc.call_header.x-flight-sql-database
option for
database. See also the Custom Call
Headers document.
1#include <stdio.h>
2#include <stdlib.h>
3#include <string.h>
4
5#include <adbc.h>
6
7#ifndef ADBC_ERROR_INIT
8# define ADBC_ERROR_INIT \
9 { \
10 0 \
11 }
12#endif
13
14static AdbcStatusCode
15database_init(struct AdbcDatabase* database, struct AdbcError* error)
16{
17 AdbcStatusCode code;
18 code = AdbcDatabaseSetOption(database, "driver", "adbc_driver_flightsql", error);
19 if (code != ADBC_STATUS_OK)
20 {
21 return code;
22 }
23 const char* uri = getenv("PGFLIGHTSQLURI");
24 char uri_buffer[4096];
25 if (!uri)
26 {
27 const char* host = getenv("PGHOST");
28 if (!host)
29 {
30 host = "localhost";
31 }
32 const char* sslmode = getenv("PGSSLMODE");
33 if (sslmode &&
34 ((strcmp(sslmode, "require") == 0) || (strcmp(sslmode, "verify-ca") == 0) ||
35 (strcmp(sslmode, "verify-full") == 0)))
36 {
37 snprintf(uri_buffer, sizeof(uri_buffer), "grpc+tls://%s:15432", host);
38 }
39 else
40 {
41 snprintf(uri_buffer, sizeof(uri_buffer), "grpc://%s:15432", host);
42 }
43 uri = uri_buffer;
44 }
45 code = AdbcDatabaseSetOption(database, "uri", uri, error);
46 if (code != ADBC_STATUS_OK)
47 {
48 return code;
49 }
50 const char* sslrootcert = getenv("PGSSLROOTCERT");
51 char sslrootcert_buffer[4096];
52 if (!sslrootcert)
53 {
54 const char* home = getenv("HOME");
55 if (home)
56 {
57 snprintf(sslrootcert_buffer,
58 sizeof(sslrootcert_buffer),
59 "%s/.postgresql/root.crt",
60 home);
61 sslrootcert = sslrootcert_buffer;
62 }
63 }
64 if (sslrootcert)
65 {
66 FILE* input = fopen(sslrootcert, "r");
67 if (input)
68 {
69 char sslrootcert_data[40960];
70 size_t read_size =
71 fread(sslrootcert_data, 1, sizeof(sslrootcert_data), input);
72 fclose(input);
73 if (read_size < sizeof(sslrootcert_data))
74 {
75 code =
76 AdbcDatabaseSetOption(database,
77 "adbc.flight.sql.client_option.tls_root_certs",
78 sslrootcert_data,
79 error);
80 if (code != ADBC_STATUS_OK)
81 {
82 return code;
83 }
84 }
85 }
86 }
87 const char* user = getenv("PGUSER");
88 if (!user)
89 {
90 user = getenv("USER");
91 }
92 if (user)
93 {
94 code = AdbcDatabaseSetOption(database, "username", user, error);
95 if (code != ADBC_STATUS_OK)
96 {
97 return code;
98 }
99 }
100 const char* password = getenv("PGPASSWORD");
101 if (password)
102 {
103 code = AdbcDatabaseSetOption(database, "password", password, error);
104 if (code != ADBC_STATUS_OK)
105 {
106 return code;
107 }
108 }
109 const char* database_name = getenv("PGDATABASE");
110 if (!database_name)
111 {
112 database_name = user;
113 }
114 if (database_name)
115 {
116 code =
117 AdbcDatabaseSetOption(database,
118 "adbc.flight.sql.rpc.call_header.x-flight-sql-database",
119 database_name,
120 error);
121 if (code != ADBC_STATUS_OK)
122 {
123 return code;
124 }
125 }
126 return AdbcDatabaseInit(database, error);
127}
128
129static AdbcStatusCode
130run(struct AdbcError* error)
131{
132 AdbcStatusCode code;
133 struct AdbcDatabase database = {0};
134 code = AdbcDatabaseNew(&database, error);
135 if (code != ADBC_STATUS_OK)
136 {
137 return code;
138 }
139 code = database_init(&database, error);
140 if (code == ADBC_STATUS_OK)
141 {
142 struct AdbcConnection connection = {0};
143 code = AdbcConnectionNew(&connection, error);
144 if (code == ADBC_STATUS_OK)
145 {
146 code = AdbcConnectionInit(&connection, &database, error);
147 AdbcConnectionRelease(&connection, error);
148 }
149 }
150 AdbcDatabaseRelease(&database, error);
151 return code;
152}
153
154int
155main(int argc, char** argv)
156{
157 struct AdbcError error = ADBC_ERROR_INIT;
158 AdbcStatusCode code = run(&error);
159 if (code == ADBC_STATUS_OK)
160 {
161 printf("Authenticated!\n");
162 return EXIT_SUCCESS;
163 }
164 else
165 {
166 fprintf(stderr, "%s\n", error.message);
167 error.release(&error);
168 return EXIT_FAILURE;
169 }
170}
How to query#
You can use an ad-hoc SQL statement or a prepared SQL statement to query.
Ad-hoc SQL statement#
You need to use
arrow::flight::sql::FlightSqlClient::Execute()
to execute a query.
You need to use
arrow::flight::sql::FlightSqlClient::DoGet()
to get results.
1arrow::Status
2run()
3{
4 arrow::flight::FlightCallOptions call_options;
5 ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
6 ARROW_ASSIGN_OR_RAISE(
7 auto info,
8 sql_client->Execute(call_options, "SELECT 1 AS number, 'hello' AS string"));
9 for (const auto& endpoint : info->endpoints())
10 {
11 ARROW_ASSIGN_OR_RAISE(auto reader,
12 sql_client->DoGet(call_options, endpoint.ticket));
13 while (true)
14 {
15 ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
16 if (!chunk.data)
17 {
18 break;
19 }
20 std::cout << chunk.data->ToString() << std::endl;
21 }
22 }
23 return sql_client->Close();
24}
TODO
Prepared SQL statement#
You need to use
arrow::flight::sql::FlightSqlClient::Prepare()
to prepare a query.
You need to use
arrow::flight::sql::PreparedStatement::SetParameters()
to set parameters.
You need to use
arrow::flight::sql::PreparedStatement::Execute()
to execute a prepared statement with parameters.
You need to use
arrow::flight::sql::FlightSqlClient::DoGet()
to get results.
1arrow::Status
2run()
3{
4 arrow::flight::FlightCallOptions call_options;
5 ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
6 ARROW_ASSIGN_OR_RAISE(auto statement,
7 sql_client->Prepare(call_options,
8 "SELECT i "
9 " FROM generate_series(1, 100) "
10 " AS series (i) "
11 " WHERE i < $1"));
12 auto schema = arrow::schema({arrow::field("i", arrow::int32())});
13 ARROW_ASSIGN_OR_RAISE(
14 auto record_batch_builder,
15 arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
16 auto i_builder = record_batch_builder->GetFieldAs<arrow::Int32Builder>(0);
17 ARROW_RETURN_NOT_OK(i_builder->Append(10));
18 ARROW_ASSIGN_OR_RAISE(auto record_batch, record_batch_builder->Flush());
19 ARROW_RETURN_NOT_OK(statement->SetParameters(record_batch));
20 ARROW_ASSIGN_OR_RAISE(auto info, statement->Execute());
21 for (const auto& endpoint : info->endpoints())
22 {
23 ARROW_ASSIGN_OR_RAISE(auto reader,
24 sql_client->DoGet(call_options, endpoint.ticket));
25 while (true)
26 {
27 ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
28 if (!chunk.data)
29 {
30 break;
31 }
32 std::cout << chunk.data->ToString() << std::endl;
33 }
34 }
35 ARROW_RETURN_NOT_OK(statement->Close());
36 return sql_client->Close();
37}
TODO