arrow_flight/sql/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Support for execute SQL queries using [Apache Arrow] [Flight SQL].
19//!
20//! [Flight SQL] is built on top of Arrow Flight RPC framework, by
21//! defining specific messages, encoded using the protobuf format,
22//! sent in the[`FlightDescriptor::cmd`] field to [`FlightService`]
23//! endpoints such as[`get_flight_info`] and [`do_get`].
24//!
25//! This module contains:
26//! 1. [prost] generated structs for FlightSQL messages such as [`CommandStatementQuery`]
27//! 2. Helpers for encoding and decoding FlightSQL messages: [`Any`] and [`Command`]
28//! 3. A [`FlightSqlServiceClient`] for interacting with FlightSQL servers.
29//! 4. A [`FlightSqlService`] to help building FlightSQL servers from [`FlightService`].
30//! 5. Helpers to build responses for FlightSQL metadata APIs: [`metadata`]
31//!
32//! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html
33//! [Apache Arrow]: https://arrow.apache.org
34//! [`FlightDescriptor::cmd`]: crate::FlightDescriptor::cmd
35//! [`FlightService`]: crate::flight_service_server::FlightService
36//! [`get_flight_info`]: crate::flight_service_server::FlightService::get_flight_info
37//! [`do_get`]: crate::flight_service_server::FlightService::do_get
38//! [`FlightSqlServiceClient`]: client::FlightSqlServiceClient
39//! [`FlightSqlService`]: server::FlightSqlService
40//! [`metadata`]: crate::sql::metadata
41use arrow_schema::ArrowError;
42use bytes::Bytes;
43use paste::paste;
44use prost::Message;
45
46#[allow(clippy::all)]
47mod gen {
48    // Since this file is auto-generated, we suppress all warnings
49    #![allow(missing_docs)]
50    include!("arrow.flight.protocol.sql.rs");
51}
52
53pub use gen::action_end_transaction_request::EndTransaction;
54pub use gen::command_statement_ingest::table_definition_options::{
55    TableExistsOption, TableNotExistOption,
56};
57pub use gen::command_statement_ingest::TableDefinitionOptions;
58pub use gen::ActionBeginSavepointRequest;
59pub use gen::ActionBeginSavepointResult;
60pub use gen::ActionBeginTransactionRequest;
61pub use gen::ActionBeginTransactionResult;
62pub use gen::ActionCancelQueryRequest;
63pub use gen::ActionCancelQueryResult;
64pub use gen::ActionClosePreparedStatementRequest;
65pub use gen::ActionCreatePreparedStatementRequest;
66pub use gen::ActionCreatePreparedStatementResult;
67pub use gen::ActionCreatePreparedSubstraitPlanRequest;
68pub use gen::ActionEndSavepointRequest;
69pub use gen::ActionEndTransactionRequest;
70pub use gen::CommandGetCatalogs;
71pub use gen::CommandGetCrossReference;
72pub use gen::CommandGetDbSchemas;
73pub use gen::CommandGetExportedKeys;
74pub use gen::CommandGetImportedKeys;
75pub use gen::CommandGetPrimaryKeys;
76pub use gen::CommandGetSqlInfo;
77pub use gen::CommandGetTableTypes;
78pub use gen::CommandGetTables;
79pub use gen::CommandGetXdbcTypeInfo;
80pub use gen::CommandPreparedStatementQuery;
81pub use gen::CommandPreparedStatementUpdate;
82pub use gen::CommandStatementIngest;
83pub use gen::CommandStatementQuery;
84pub use gen::CommandStatementSubstraitPlan;
85pub use gen::CommandStatementUpdate;
86pub use gen::DoPutPreparedStatementResult;
87pub use gen::DoPutUpdateResult;
88pub use gen::Nullable;
89pub use gen::Searchable;
90pub use gen::SqlInfo;
91pub use gen::SqlNullOrdering;
92pub use gen::SqlOuterJoinsSupportLevel;
93pub use gen::SqlSupportedCaseSensitivity;
94pub use gen::SqlSupportedElementActions;
95pub use gen::SqlSupportedGroupBy;
96pub use gen::SqlSupportedPositionedCommands;
97pub use gen::SqlSupportedResultSetConcurrency;
98pub use gen::SqlSupportedResultSetType;
99pub use gen::SqlSupportedSubqueries;
100pub use gen::SqlSupportedTransaction;
101pub use gen::SqlSupportedTransactions;
102pub use gen::SqlSupportedUnions;
103pub use gen::SqlSupportsConvert;
104pub use gen::SqlTransactionIsolationLevel;
105pub use gen::SubstraitPlan;
106pub use gen::SupportedSqlGrammar;
107pub use gen::TicketStatementQuery;
108pub use gen::UpdateDeleteRules;
109pub use gen::XdbcDataType;
110pub use gen::XdbcDatetimeSubcode;
111
112pub mod client;
113pub mod metadata;
114pub mod server;
115
116pub use crate::streams::FallibleRequestStream;
117
118/// ProstMessageExt are useful utility methods for prost::Message types
119pub trait ProstMessageExt: prost::Message + Default {
120    /// type_url for this Message
121    fn type_url() -> &'static str;
122
123    /// Convert this Message to [`Any`]
124    fn as_any(&self) -> Any;
125}
126
127/// Macro to coerce a token to an item, specifically
128/// to build the `Commands` enum.
129///
130/// See: <https://danielkeep.github.io/tlborm/book/blk-ast-coercion.html>
131macro_rules! as_item {
132    ($i:item) => {
133        $i
134    };
135}
136
137macro_rules! prost_message_ext {
138    ($($name:tt,)*) => {
139        paste! {
140            $(
141            const [<$name:snake:upper _TYPE_URL>]: &'static str = concat!("type.googleapis.com/arrow.flight.protocol.sql.", stringify!($name));
142            )*
143
144                as_item! {
145                /// Helper to convert to/from protobuf [`Any`] message
146                /// to a specific FlightSQL command message.
147                ///
148                /// # Example
149                /// ```rust
150                /// # use arrow_flight::sql::{Any, CommandStatementQuery, Command};
151                /// let flightsql_message = CommandStatementQuery {
152                ///   query: "SELECT * FROM foo".to_string(),
153                ///   transaction_id: None,
154                /// };
155                ///
156                /// // Given a packed FlightSQL Any message
157                /// let any_message = Any::pack(&flightsql_message).unwrap();
158                ///
159                /// // decode it to Command:
160                /// match Command::try_from(any_message).unwrap() {
161                ///   Command::CommandStatementQuery(decoded) => {
162                ///    assert_eq!(flightsql_message, decoded);
163                ///   }
164                ///   _ => panic!("Unexpected decoded message"),
165                /// }
166                /// ```
167                #[derive(Clone, Debug, PartialEq)]
168                pub enum Command {
169                    $(
170                        #[doc = concat!(stringify!($name), "variant")]
171                        $name($name),)*
172
173                    /// Any message that is not any FlightSQL command.
174                    Unknown(Any),
175                }
176            }
177
178            impl Command {
179                /// Convert the command to [`Any`].
180                pub fn into_any(self) -> Any {
181                    match self {
182                        $(
183                        Self::$name(cmd) => cmd.as_any(),
184                        )*
185                        Self::Unknown(any) => any,
186                    }
187                }
188
189                /// Get the URL for the command.
190                pub fn type_url(&self) -> &str {
191                    match self {
192                        $(
193                        Self::$name(_) => [<$name:snake:upper _TYPE_URL>],
194                        )*
195                        Self::Unknown(any) => any.type_url.as_str(),
196                    }
197                }
198            }
199
200            impl TryFrom<Any> for Command {
201                type Error = ArrowError;
202
203                fn try_from(any: Any) -> Result<Self, Self::Error> {
204                    match any.type_url.as_str() {
205                        $(
206                        [<$name:snake:upper _TYPE_URL>]
207                            => {
208                                let m: $name = Message::decode(&*any.value).map_err(|err| {
209                                    ArrowError::ParseError(format!("Unable to decode Any value: {err}"))
210                                })?;
211                                Ok(Self::$name(m))
212                            }
213                        )*
214                        _ => Ok(Self::Unknown(any)),
215                    }
216                }
217            }
218
219            $(
220                impl ProstMessageExt for $name {
221                    fn type_url() -> &'static str {
222                        [<$name:snake:upper _TYPE_URL>]
223                    }
224
225                    fn as_any(&self) -> Any {
226                        Any {
227                            type_url: <$name>::type_url().to_string(),
228                            value: self.encode_to_vec().into(),
229                        }
230                    }
231                }
232            )*
233        }
234    };
235}
236
237// Implement ProstMessageExt for all structs defined in FlightSql.proto
238prost_message_ext!(
239    ActionBeginSavepointRequest,
240    ActionBeginSavepointResult,
241    ActionBeginTransactionRequest,
242    ActionBeginTransactionResult,
243    ActionCancelQueryRequest,
244    ActionCancelQueryResult,
245    ActionClosePreparedStatementRequest,
246    ActionCreatePreparedStatementRequest,
247    ActionCreatePreparedStatementResult,
248    ActionCreatePreparedSubstraitPlanRequest,
249    ActionEndSavepointRequest,
250    ActionEndTransactionRequest,
251    CommandGetCatalogs,
252    CommandGetCrossReference,
253    CommandGetDbSchemas,
254    CommandGetExportedKeys,
255    CommandGetImportedKeys,
256    CommandGetPrimaryKeys,
257    CommandGetSqlInfo,
258    CommandGetTableTypes,
259    CommandGetTables,
260    CommandGetXdbcTypeInfo,
261    CommandPreparedStatementQuery,
262    CommandPreparedStatementUpdate,
263    CommandStatementIngest,
264    CommandStatementQuery,
265    CommandStatementSubstraitPlan,
266    CommandStatementUpdate,
267    DoPutPreparedStatementResult,
268    DoPutUpdateResult,
269    TicketStatementQuery,
270);
271
272/// An implementation of the protobuf [`Any`] message type
273///
274/// Encoded protobuf messages are not self-describing, nor contain any information
275/// on the schema of the encoded payload. Consequently to decode a protobuf a client
276/// must know the exact schema of the message.
277///
278/// This presents a problem for loosely typed APIs, where the exact message payloads
279/// are not enumerable, and therefore cannot be enumerated as variants in a [oneof].
280///
281/// One solution is [`Any`] where the encoded payload is paired with a `type_url`
282/// identifying the type of encoded message, and the resulting combination encoded.
283///
284/// Clients can then decode the outer [`Any`], inspect the `type_url` and if it is
285/// a type they recognise, proceed to decode the embedded message `value`
286///
287/// [`Any`]: https://developers.google.com/protocol-buffers/docs/proto3#any
288/// [oneof]: https://developers.google.com/protocol-buffers/docs/proto3#oneof
289#[derive(Clone, PartialEq, ::prost::Message)]
290pub struct Any {
291    /// A URL/resource name that uniquely identifies the type of the serialized
292    /// protocol buffer message. This string must contain at least
293    /// one "/" character. The last segment of the URL's path must represent
294    /// the fully qualified name of the type (as in
295    /// `path/google.protobuf.Duration`). The name should be in a canonical form
296    /// (e.g., leading "." is not accepted).
297    #[prost(string, tag = "1")]
298    pub type_url: String,
299    /// Must be a valid serialized protocol buffer of the above specified type.
300    #[prost(bytes = "bytes", tag = "2")]
301    pub value: Bytes,
302}
303
304impl Any {
305    /// Checks whether the message is of type `M`
306    pub fn is<M: ProstMessageExt>(&self) -> bool {
307        M::type_url() == self.type_url
308    }
309
310    /// Unpacks the contents of the message if it is of type `M`
311    pub fn unpack<M: ProstMessageExt>(&self) -> Result<Option<M>, ArrowError> {
312        if !self.is::<M>() {
313            return Ok(None);
314        }
315        let m = Message::decode(&*self.value)
316            .map_err(|err| ArrowError::ParseError(format!("Unable to decode Any value: {err}")))?;
317        Ok(Some(m))
318    }
319
320    /// Packs a message into an [`Any`] message
321    pub fn pack<M: ProstMessageExt>(message: &M) -> Result<Any, ArrowError> {
322        Ok(message.as_any())
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn test_type_url() {
332        assert_eq!(
333            TicketStatementQuery::type_url(),
334            "type.googleapis.com/arrow.flight.protocol.sql.TicketStatementQuery"
335        );
336        assert_eq!(
337            CommandStatementQuery::type_url(),
338            "type.googleapis.com/arrow.flight.protocol.sql.CommandStatementQuery"
339        );
340    }
341
342    #[test]
343    fn test_prost_any_pack_unpack() {
344        let query = CommandStatementQuery {
345            query: "select 1".to_string(),
346            transaction_id: None,
347        };
348        let any = Any::pack(&query).unwrap();
349        assert!(any.is::<CommandStatementQuery>());
350        let unpack_query: CommandStatementQuery = any.unpack().unwrap().unwrap();
351        assert_eq!(query, unpack_query);
352    }
353
354    #[test]
355    fn test_command() {
356        let query = CommandStatementQuery {
357            query: "select 1".to_string(),
358            transaction_id: None,
359        };
360        let any = Any::pack(&query).unwrap();
361        let cmd: Command = any.try_into().unwrap();
362
363        assert!(matches!(cmd, Command::CommandStatementQuery(_)));
364        assert_eq!(cmd.type_url(), COMMAND_STATEMENT_QUERY_TYPE_URL);
365
366        // Unknown variant
367
368        let any = Any {
369            type_url: "fake_url".to_string(),
370            value: Default::default(),
371        };
372
373        let cmd: Command = any.try_into().unwrap();
374        assert!(matches!(cmd, Command::Unknown(_)));
375        assert_eq!(cmd.type_url(), "fake_url");
376    }
377}