arrow_flight/sql/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Support for execute SQL queries using [Apache Arrow] [Flight SQL].
19//!
20//! [Flight SQL] is built on top of Arrow Flight RPC framework, by
21//! defining specific messages, encoded using the protobuf format,
22//! sent in the[`FlightDescriptor::cmd`] field to [`FlightService`]
23//! endpoints such as[`get_flight_info`] and [`do_get`].
24//!
25//! This module contains:
26//! 1. [prost] generated structs for FlightSQL messages such as [`CommandStatementQuery`]
27//! 2. Helpers for encoding and decoding FlightSQL messages: [`Any`] and [`Command`]
28//! 3. A [`FlightSqlServiceClient`] for interacting with FlightSQL servers.
29//! 4. A [`FlightSqlService`] to help building FlightSQL servers from [`FlightService`].
30//! 5. Helpers to build responses for FlightSQL metadata APIs: [`metadata`]
31//!
32//! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html
33//! [Apache Arrow]: https://arrow.apache.org
34//! [`FlightDescriptor::cmd`]: crate::FlightDescriptor::cmd
35//! [`FlightService`]: crate::flight_service_server::FlightService
36//! [`get_flight_info`]: crate::flight_service_server::FlightService::get_flight_info
37//! [`do_get`]: crate::flight_service_server::FlightService::do_get
38//! [`FlightSqlServiceClient`]: client::FlightSqlServiceClient
39//! [`FlightSqlService`]: server::FlightSqlService
40//! [`metadata`]: crate::sql::metadata
41use arrow_schema::ArrowError;
42use bytes::Bytes;
43use paste::paste;
44use prost::Message;
45
46#[allow(clippy::all)]
47mod gen {
48    // Since this file is auto-generated, we suppress all warnings
49    #![allow(missing_docs)]
50    include!("arrow.flight.protocol.sql.rs");
51}
52
53pub use gen::action_end_transaction_request::EndTransaction;
54pub use gen::command_statement_ingest::table_definition_options::{
55    TableExistsOption, TableNotExistOption,
56};
57pub use gen::command_statement_ingest::TableDefinitionOptions;
58pub use gen::ActionBeginSavepointRequest;
59pub use gen::ActionBeginSavepointResult;
60pub use gen::ActionBeginTransactionRequest;
61pub use gen::ActionBeginTransactionResult;
62pub use gen::ActionCancelQueryRequest;
63pub use gen::ActionCancelQueryResult;
64pub use gen::ActionClosePreparedStatementRequest;
65pub use gen::ActionCreatePreparedStatementRequest;
66pub use gen::ActionCreatePreparedStatementResult;
67pub use gen::ActionCreatePreparedSubstraitPlanRequest;
68pub use gen::ActionEndSavepointRequest;
69pub use gen::ActionEndTransactionRequest;
70pub use gen::CommandGetCatalogs;
71pub use gen::CommandGetCrossReference;
72pub use gen::CommandGetDbSchemas;
73pub use gen::CommandGetExportedKeys;
74pub use gen::CommandGetImportedKeys;
75pub use gen::CommandGetPrimaryKeys;
76pub use gen::CommandGetSqlInfo;
77pub use gen::CommandGetTableTypes;
78pub use gen::CommandGetTables;
79pub use gen::CommandGetXdbcTypeInfo;
80pub use gen::CommandPreparedStatementQuery;
81pub use gen::CommandPreparedStatementUpdate;
82pub use gen::CommandStatementIngest;
83pub use gen::CommandStatementQuery;
84pub use gen::CommandStatementSubstraitPlan;
85pub use gen::CommandStatementUpdate;
86pub use gen::DoPutPreparedStatementResult;
87pub use gen::DoPutUpdateResult;
88pub use gen::Nullable;
89pub use gen::Searchable;
90pub use gen::SqlInfo;
91pub use gen::SqlNullOrdering;
92pub use gen::SqlOuterJoinsSupportLevel;
93pub use gen::SqlSupportedCaseSensitivity;
94pub use gen::SqlSupportedElementActions;
95pub use gen::SqlSupportedGroupBy;
96pub use gen::SqlSupportedPositionedCommands;
97pub use gen::SqlSupportedResultSetConcurrency;
98pub use gen::SqlSupportedResultSetType;
99pub use gen::SqlSupportedSubqueries;
100pub use gen::SqlSupportedTransaction;
101pub use gen::SqlSupportedTransactions;
102pub use gen::SqlSupportedUnions;
103pub use gen::SqlSupportsConvert;
104pub use gen::SqlTransactionIsolationLevel;
105pub use gen::SubstraitPlan;
106pub use gen::SupportedSqlGrammar;
107pub use gen::TicketStatementQuery;
108pub use gen::UpdateDeleteRules;
109pub use gen::XdbcDataType;
110pub use gen::XdbcDatetimeSubcode;
111
112pub mod client;
113pub mod metadata;
114pub mod server;
115
116/// ProstMessageExt are useful utility methods for prost::Message types
117pub trait ProstMessageExt: prost::Message + Default {
118    /// type_url for this Message
119    fn type_url() -> &'static str;
120
121    /// Convert this Message to [`Any`]
122    fn as_any(&self) -> Any;
123}
124
125/// Macro to coerce a token to an item, specifically
126/// to build the `Commands` enum.
127///
128/// See: <https://danielkeep.github.io/tlborm/book/blk-ast-coercion.html>
129macro_rules! as_item {
130    ($i:item) => {
131        $i
132    };
133}
134
135macro_rules! prost_message_ext {
136    ($($name:tt,)*) => {
137        paste! {
138            $(
139            const [<$name:snake:upper _TYPE_URL>]: &'static str = concat!("type.googleapis.com/arrow.flight.protocol.sql.", stringify!($name));
140            )*
141
142                as_item! {
143                /// Helper to convert to/from protobuf [`Any`] message
144                /// to a specific FlightSQL command message.
145                ///
146                /// # Example
147                /// ```rust
148                /// # use arrow_flight::sql::{Any, CommandStatementQuery, Command};
149                /// let flightsql_message = CommandStatementQuery {
150                ///   query: "SELECT * FROM foo".to_string(),
151                ///   transaction_id: None,
152                /// };
153                ///
154                /// // Given a packed FlightSQL Any message
155                /// let any_message = Any::pack(&flightsql_message).unwrap();
156                ///
157                /// // decode it to Command:
158                /// match Command::try_from(any_message).unwrap() {
159                ///   Command::CommandStatementQuery(decoded) => {
160                ///    assert_eq!(flightsql_message, decoded);
161                ///   }
162                ///   _ => panic!("Unexpected decoded message"),
163                /// }
164                /// ```
165                #[derive(Clone, Debug, PartialEq)]
166                pub enum Command {
167                    $(
168                        #[doc = concat!(stringify!($name), "variant")]
169                        $name($name),)*
170
171                    /// Any message that is not any FlightSQL command.
172                    Unknown(Any),
173                }
174            }
175
176            impl Command {
177                /// Convert the command to [`Any`].
178                pub fn into_any(self) -> Any {
179                    match self {
180                        $(
181                        Self::$name(cmd) => cmd.as_any(),
182                        )*
183                        Self::Unknown(any) => any,
184                    }
185                }
186
187                /// Get the URL for the command.
188                pub fn type_url(&self) -> &str {
189                    match self {
190                        $(
191                        Self::$name(_) => [<$name:snake:upper _TYPE_URL>],
192                        )*
193                        Self::Unknown(any) => any.type_url.as_str(),
194                    }
195                }
196            }
197
198            impl TryFrom<Any> for Command {
199                type Error = ArrowError;
200
201                fn try_from(any: Any) -> Result<Self, Self::Error> {
202                    match any.type_url.as_str() {
203                        $(
204                        [<$name:snake:upper _TYPE_URL>]
205                            => {
206                                let m: $name = Message::decode(&*any.value).map_err(|err| {
207                                    ArrowError::ParseError(format!("Unable to decode Any value: {err}"))
208                                })?;
209                                Ok(Self::$name(m))
210                            }
211                        )*
212                        _ => Ok(Self::Unknown(any)),
213                    }
214                }
215            }
216
217            $(
218                impl ProstMessageExt for $name {
219                    fn type_url() -> &'static str {
220                        [<$name:snake:upper _TYPE_URL>]
221                    }
222
223                    fn as_any(&self) -> Any {
224                        Any {
225                            type_url: <$name>::type_url().to_string(),
226                            value: self.encode_to_vec().into(),
227                        }
228                    }
229                }
230            )*
231        }
232    };
233}
234
235// Implement ProstMessageExt for all structs defined in FlightSql.proto
236prost_message_ext!(
237    ActionBeginSavepointRequest,
238    ActionBeginSavepointResult,
239    ActionBeginTransactionRequest,
240    ActionBeginTransactionResult,
241    ActionCancelQueryRequest,
242    ActionCancelQueryResult,
243    ActionClosePreparedStatementRequest,
244    ActionCreatePreparedStatementRequest,
245    ActionCreatePreparedStatementResult,
246    ActionCreatePreparedSubstraitPlanRequest,
247    ActionEndSavepointRequest,
248    ActionEndTransactionRequest,
249    CommandGetCatalogs,
250    CommandGetCrossReference,
251    CommandGetDbSchemas,
252    CommandGetExportedKeys,
253    CommandGetImportedKeys,
254    CommandGetPrimaryKeys,
255    CommandGetSqlInfo,
256    CommandGetTableTypes,
257    CommandGetTables,
258    CommandGetXdbcTypeInfo,
259    CommandPreparedStatementQuery,
260    CommandPreparedStatementUpdate,
261    CommandStatementIngest,
262    CommandStatementQuery,
263    CommandStatementSubstraitPlan,
264    CommandStatementUpdate,
265    DoPutPreparedStatementResult,
266    DoPutUpdateResult,
267    TicketStatementQuery,
268);
269
270/// An implementation of the protobuf [`Any`] message type
271///
272/// Encoded protobuf messages are not self-describing, nor contain any information
273/// on the schema of the encoded payload. Consequently to decode a protobuf a client
274/// must know the exact schema of the message.
275///
276/// This presents a problem for loosely typed APIs, where the exact message payloads
277/// are not enumerable, and therefore cannot be enumerated as variants in a [oneof].
278///
279/// One solution is [`Any`] where the encoded payload is paired with a `type_url`
280/// identifying the type of encoded message, and the resulting combination encoded.
281///
282/// Clients can then decode the outer [`Any`], inspect the `type_url` and if it is
283/// a type they recognise, proceed to decode the embedded message `value`
284///
285/// [`Any`]: https://developers.google.com/protocol-buffers/docs/proto3#any
286/// [oneof]: https://developers.google.com/protocol-buffers/docs/proto3#oneof
287#[derive(Clone, PartialEq, ::prost::Message)]
288pub struct Any {
289    /// A URL/resource name that uniquely identifies the type of the serialized
290    /// protocol buffer message. This string must contain at least
291    /// one "/" character. The last segment of the URL's path must represent
292    /// the fully qualified name of the type (as in
293    /// `path/google.protobuf.Duration`). The name should be in a canonical form
294    /// (e.g., leading "." is not accepted).
295    #[prost(string, tag = "1")]
296    pub type_url: String,
297    /// Must be a valid serialized protocol buffer of the above specified type.
298    #[prost(bytes = "bytes", tag = "2")]
299    pub value: Bytes,
300}
301
302impl Any {
303    /// Checks whether the message is of type `M`
304    pub fn is<M: ProstMessageExt>(&self) -> bool {
305        M::type_url() == self.type_url
306    }
307
308    /// Unpacks the contents of the message if it is of type `M`
309    pub fn unpack<M: ProstMessageExt>(&self) -> Result<Option<M>, ArrowError> {
310        if !self.is::<M>() {
311            return Ok(None);
312        }
313        let m = Message::decode(&*self.value)
314            .map_err(|err| ArrowError::ParseError(format!("Unable to decode Any value: {err}")))?;
315        Ok(Some(m))
316    }
317
318    /// Packs a message into an [`Any`] message
319    pub fn pack<M: ProstMessageExt>(message: &M) -> Result<Any, ArrowError> {
320        Ok(message.as_any())
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_type_url() {
330        assert_eq!(
331            TicketStatementQuery::type_url(),
332            "type.googleapis.com/arrow.flight.protocol.sql.TicketStatementQuery"
333        );
334        assert_eq!(
335            CommandStatementQuery::type_url(),
336            "type.googleapis.com/arrow.flight.protocol.sql.CommandStatementQuery"
337        );
338    }
339
340    #[test]
341    fn test_prost_any_pack_unpack() {
342        let query = CommandStatementQuery {
343            query: "select 1".to_string(),
344            transaction_id: None,
345        };
346        let any = Any::pack(&query).unwrap();
347        assert!(any.is::<CommandStatementQuery>());
348        let unpack_query: CommandStatementQuery = any.unpack().unwrap().unwrap();
349        assert_eq!(query, unpack_query);
350    }
351
352    #[test]
353    fn test_command() {
354        let query = CommandStatementQuery {
355            query: "select 1".to_string(),
356            transaction_id: None,
357        };
358        let any = Any::pack(&query).unwrap();
359        let cmd: Command = any.try_into().unwrap();
360
361        assert!(matches!(cmd, Command::CommandStatementQuery(_)));
362        assert_eq!(cmd.type_url(), COMMAND_STATEMENT_QUERY_TYPE_URL);
363
364        // Unknown variant
365
366        let any = Any {
367            type_url: "fake_url".to_string(),
368            value: Default::default(),
369        };
370
371        let cmd: Command = any.try_into().unwrap();
372        assert!(matches!(cmd, Command::Unknown(_)));
373        assert_eq!(cmd.type_url(), "fake_url");
374    }
375}