Client#

How to connect#

You must specify PostgreSQL user and PostgreSQL database to connect Apache Arrow Flight SQL endpoint.

User name and password must be passed by Handshake call. Note that basic authentication is only supported for now. mTLS (mutual-TLS) isn’t implemented yet. If you’re interested in mTLS, please see the issue for it: apache/arrow-flight-sql-postgresql#79

You need to use a header to specify PostgreSQL database. The header name for PostgreSQL database is x-flight-sql-database.

You need to use arrow::flight::FlightClient::AuthenticateBasicToken() for authentication.

You need to add a x-flight-sql-database header to arrow::flight::FlightCallOptions::headers for database.

The following example uses PGUSER (fallback to USER) and PGPASSWORD environment variables like libpq. AuthenticateBasicToken() returns Bearer token on success. So the example users the returned Bearer token to request headers to use following requests.

The following example uses PGDATABASE (fallback to PGUSER/USER) environment variable like libpq.

  1#include <cstdlib>
  2#include <fstream>
  3#include <iostream>
  4
  5#include <arrow/flight/sql/client.h>
  6
  7namespace {
  8std::string
  9getenv(const char* name)
 10{
 11    auto value = std::getenv(name);
 12    if (value)
 13    {
 14        return std::string(value);
 15    }
 16    else
 17    {
 18        return std::string("");
 19    }
 20}
 21
 22arrow::Status
 23run()
 24{
 25    auto uri = getenv("PGFLIGHTSQLURI");
 26    if (uri.empty())
 27    {
 28        auto host = getenv("PGHOST");
 29        if (host.empty())
 30        {
 31            host = "localhost";
 32        }
 33        auto sslmode = getenv("PGSSLMODE");
 34        if (sslmode == "require" || sslmode == "verify-ca" || sslmode == "verify-full")
 35        {
 36            uri = std::string("grpc+tls://") + host + ":15432";
 37        }
 38        else
 39        {
 40            uri = std::string("grpc://") + host + ":15432";
 41        }
 42    }
 43    ARROW_ASSIGN_OR_RAISE(auto location, arrow::flight::Location::Parse(uri));
 44    arrow::flight::FlightClientOptions client_options;
 45    auto sslrootcert = getenv("PGSSLROOTCERT");
 46    if (sslrootcert.empty())
 47    {
 48        auto home = getenv("HOME");
 49        if (!home.empty())
 50        {
 51            sslrootcert = home + "/.postgresql/root.crt";
 52        }
 53    }
 54    if (!sslrootcert.empty())
 55    {
 56        std::ifstream input(sslrootcert);
 57        if (input)
 58        {
 59            client_options.tls_root_certs =
 60                std::string(std::istreambuf_iterator<char>{input}, {});
 61        }
 62    }
 63    ARROW_ASSIGN_OR_RAISE(auto client,
 64                          arrow::flight::FlightClient::Connect(location, client_options));
 65    auto user = getenv("PGUSER");
 66    if (user.empty())
 67    {
 68        user = getenv("USER");
 69    }
 70    auto password = getenv("PGPASSWORD");
 71    if (password.empty())
 72    {
 73        password = "";
 74    }
 75    arrow::flight::FlightCallOptions call_options;
 76    auto database = getenv("PGDATABASE");
 77    if (database.empty())
 78    {
 79        database = user;
 80    }
 81    call_options.headers.emplace_back("x-flight-sql-database", database);
 82    ARROW_ASSIGN_OR_RAISE(auto bearer_token,
 83                          client->AuthenticateBasicToken(call_options, user, password));
 84    const auto& bearer_name = bearer_token.first;
 85    const auto& bearer_value = bearer_token.second;
 86    if (!bearer_name.empty() && !bearer_value.empty())
 87    {
 88        call_options.headers.emplace_back(bearer_name, bearer_value);
 89    }
 90    return client->Close();
 91}
 92};  // namespace
 93
 94int
 95main(int argc, char** argv)
 96{
 97    auto status = run();
 98    if (status.ok())
 99    {
100        std::cout << "Authenticated!" << std::endl;
101        return EXIT_SUCCESS;
102    }
103    else
104    {
105        std::cerr << status.ToString() << std::endl;
106        return EXIT_FAILURE;
107    }
108}

You need to set the username and password options for authentication. See also the Authentication document.

You need to set the adbc.flight.sql.rpc.call_header.x-flight-sql-database option for database. See also the Custom Call Headers document.

  1#include <stdio.h>
  2#include <stdlib.h>
  3#include <string.h>
  4
  5#include <adbc.h>
  6
  7#ifndef ADBC_ERROR_INIT
  8#   define ADBC_ERROR_INIT \
  9        {                   \
 10            0               \
 11        }
 12#endif
 13
 14static AdbcStatusCode
 15database_init(struct AdbcDatabase* database, struct AdbcError* error)
 16{
 17    AdbcStatusCode code;
 18    code = AdbcDatabaseSetOption(database, "driver", "adbc_driver_flightsql", error);
 19    if (code != ADBC_STATUS_OK)
 20    {
 21        return code;
 22    }
 23    const char* uri = getenv("PGFLIGHTSQLURI");
 24    char uri_buffer[4096];
 25    if (!uri)
 26    {
 27        const char* host = getenv("PGHOST");
 28        if (!host)
 29        {
 30            host = "localhost";
 31        }
 32        const char* sslmode = getenv("PGSSLMODE");
 33        if (sslmode &&
 34            ((strcmp(sslmode, "require") == 0) || (strcmp(sslmode, "verify-ca") == 0) ||
 35             (strcmp(sslmode, "verify-full") == 0)))
 36        {
 37            snprintf(uri_buffer, sizeof(uri_buffer), "grpc+tls://%s:15432", host);
 38        }
 39        else
 40        {
 41            snprintf(uri_buffer, sizeof(uri_buffer), "grpc://%s:15432", host);
 42        }
 43        uri = uri_buffer;
 44    }
 45    code = AdbcDatabaseSetOption(database, "uri", uri, error);
 46    if (code != ADBC_STATUS_OK)
 47    {
 48        return code;
 49    }
 50    const char* sslrootcert = getenv("PGSSLROOTCERT");
 51    char sslrootcert_buffer[4096];
 52    if (!sslrootcert)
 53    {
 54        const char* home = getenv("HOME");
 55        if (home)
 56        {
 57            snprintf(sslrootcert_buffer,
 58                     sizeof(sslrootcert_buffer),
 59                     "%s/.postgresql/root.crt",
 60                     home);
 61            sslrootcert = sslrootcert_buffer;
 62        }
 63    }
 64    if (sslrootcert)
 65    {
 66        FILE* input = fopen(sslrootcert, "r");
 67        if (input)
 68        {
 69            char sslrootcert_data[40960];
 70            size_t read_size =
 71                fread(sslrootcert_data, 1, sizeof(sslrootcert_data), input);
 72            fclose(input);
 73            if (read_size < sizeof(sslrootcert_data))
 74            {
 75                code =
 76                    AdbcDatabaseSetOption(database,
 77                                          "adbc.flight.sql.client_option.tls_root_certs",
 78                                          sslrootcert_data,
 79                                          error);
 80                if (code != ADBC_STATUS_OK)
 81                {
 82                    return code;
 83                }
 84            }
 85        }
 86    }
 87    const char* user = getenv("PGUSER");
 88    if (!user)
 89    {
 90        user = getenv("USER");
 91    }
 92    if (user)
 93    {
 94        code = AdbcDatabaseSetOption(database, "username", user, error);
 95        if (code != ADBC_STATUS_OK)
 96        {
 97            return code;
 98        }
 99    }
100    const char* password = getenv("PGPASSWORD");
101    if (password)
102    {
103        code = AdbcDatabaseSetOption(database, "password", password, error);
104        if (code != ADBC_STATUS_OK)
105        {
106            return code;
107        }
108    }
109    const char* database_name = getenv("PGDATABASE");
110    if (!database_name)
111    {
112        database_name = user;
113    }
114    if (database_name)
115    {
116        code =
117            AdbcDatabaseSetOption(database,
118                                  "adbc.flight.sql.rpc.call_header.x-flight-sql-database",
119                                  database_name,
120                                  error);
121        if (code != ADBC_STATUS_OK)
122        {
123            return code;
124        }
125    }
126    return AdbcDatabaseInit(database, error);
127}
128
129static AdbcStatusCode
130run(struct AdbcError* error)
131{
132    AdbcStatusCode code;
133    struct AdbcDatabase database = {0};
134    code = AdbcDatabaseNew(&database, error);
135    if (code != ADBC_STATUS_OK)
136    {
137        return code;
138    }
139    code = database_init(&database, error);
140    if (code == ADBC_STATUS_OK)
141    {
142        struct AdbcConnection connection = {0};
143        code = AdbcConnectionNew(&connection, error);
144        if (code == ADBC_STATUS_OK)
145        {
146            code = AdbcConnectionInit(&connection, &database, error);
147            AdbcConnectionRelease(&connection, error);
148        }
149    }
150    AdbcDatabaseRelease(&database, error);
151    return code;
152}
153
154int
155main(int argc, char** argv)
156{
157    struct AdbcError error = ADBC_ERROR_INIT;
158    AdbcStatusCode code = run(&error);
159    if (code == ADBC_STATUS_OK)
160    {
161        printf("Authenticated!\n");
162        return EXIT_SUCCESS;
163    }
164    else
165    {
166        fprintf(stderr, "%s\n", error.message);
167        error.release(&error);
168        return EXIT_FAILURE;
169    }
170}

How to query#

You can use an ad-hoc SQL statement or a prepared SQL statement to query.

Ad-hoc SQL statement#

You need to use arrow::flight::sql::FlightSqlClient::Execute() to execute a query.

You need to use arrow::flight::sql::FlightSqlClient::DoGet() to get results.

 1arrow::Status
 2run()
 3{
 4    arrow::flight::FlightCallOptions call_options;
 5    ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
 6    ARROW_ASSIGN_OR_RAISE(
 7        auto info,
 8        sql_client->Execute(call_options, "SELECT 1 AS number, 'hello' AS string"));
 9    for (const auto& endpoint : info->endpoints())
10    {
11        ARROW_ASSIGN_OR_RAISE(auto reader,
12                              sql_client->DoGet(call_options, endpoint.ticket));
13        while (true)
14        {
15            ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
16            if (!chunk.data)
17            {
18                break;
19            }
20            std::cout << chunk.data->ToString() << std::endl;
21        }
22    }
23    return sql_client->Close();
24}

TODO

Prepared SQL statement#

You need to use arrow::flight::sql::FlightSqlClient::Prepare() to prepare a query.

You need to use arrow::flight::sql::PreparedStatement::SetParameters() to set parameters.

You need to use arrow::flight::sql::PreparedStatement::Execute() to execute a prepared statement with parameters.

You need to use arrow::flight::sql::FlightSqlClient::DoGet() to get results.

 1arrow::Status
 2run()
 3{
 4    arrow::flight::FlightCallOptions call_options;
 5    ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
 6    ARROW_ASSIGN_OR_RAISE(auto statement,
 7                          sql_client->Prepare(call_options,
 8                                              "SELECT i "
 9                                              "  FROM generate_series(1, 100) "
10                                              "       AS series (i) "
11                                              "  WHERE i < $1"));
12    auto schema = arrow::schema({arrow::field("i", arrow::int32())});
13    ARROW_ASSIGN_OR_RAISE(
14        auto record_batch_builder,
15        arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
16    auto i_builder = record_batch_builder->GetFieldAs<arrow::Int32Builder>(0);
17    ARROW_RETURN_NOT_OK(i_builder->Append(10));
18    ARROW_ASSIGN_OR_RAISE(auto record_batch, record_batch_builder->Flush());
19    ARROW_RETURN_NOT_OK(statement->SetParameters(record_batch));
20    ARROW_ASSIGN_OR_RAISE(auto info, statement->Execute());
21    for (const auto& endpoint : info->endpoints())
22    {
23        ARROW_ASSIGN_OR_RAISE(auto reader,
24                              sql_client->DoGet(call_options, endpoint.ticket));
25        while (true)
26        {
27            ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
28            if (!chunk.data)
29            {
30                break;
31            }
32            std::cout << chunk.data->ToString() << std::endl;
33        }
34    }
35    ARROW_RETURN_NOT_OK(statement->Close());
36    return sql_client->Close();
37}

TODO