arrow_flight/sql/metadata/
sql_info.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Helpers for building responses to [`CommandGetSqlInfo`] metadata requests.
19//!
20//! - [`SqlInfoDataBuilder`] - a builder for collecting sql infos
21//!   and building a conformant `RecordBatch` with sql info server metadata.
22//! - [`SqlInfoData`] - a helper type wrapping a `RecordBatch`
23//!   used for storing sql info server metadata.
24//! - [`GetSqlInfoBuilder`] - a builder for consructing [`CommandGetSqlInfo`] responses.
25//!
26
27use std::collections::{BTreeMap, HashMap};
28use std::sync::Arc;
29
30use arrow_arith::boolean::or;
31use arrow_array::array::{Array, UInt32Array, UnionArray};
32use arrow_array::builder::{
33    ArrayBuilder, BooleanBuilder, Int8Builder, Int32Builder, Int64Builder, ListBuilder, MapBuilder,
34    StringBuilder, UInt32Builder,
35};
36use arrow_array::{RecordBatch, Scalar};
37use arrow_data::ArrayData;
38use arrow_ord::cmp::eq;
39use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, UnionFields, UnionMode};
40use arrow_select::filter::filter_record_batch;
41use once_cell::sync::Lazy;
42
43use crate::error::Result;
44use crate::sql::{CommandGetSqlInfo, SqlInfo};
45
46/// Represents a dynamic value
47#[derive(Debug, Clone, PartialEq)]
48pub enum SqlInfoValue {
49    String(String),
50    Bool(bool),
51    BigInt(i64),
52    Bitmask(i32),
53    StringList(Vec<String>),
54    ListMap(BTreeMap<i32, Vec<i32>>),
55}
56
57impl From<&str> for SqlInfoValue {
58    fn from(value: &str) -> Self {
59        Self::String(value.to_string())
60    }
61}
62
63impl From<bool> for SqlInfoValue {
64    fn from(value: bool) -> Self {
65        Self::Bool(value)
66    }
67}
68
69impl From<i32> for SqlInfoValue {
70    fn from(value: i32) -> Self {
71        Self::Bitmask(value)
72    }
73}
74
75impl From<i64> for SqlInfoValue {
76    fn from(value: i64) -> Self {
77        Self::BigInt(value)
78    }
79}
80
81impl From<&[&str]> for SqlInfoValue {
82    fn from(values: &[&str]) -> Self {
83        let values = values.iter().map(|s| s.to_string()).collect();
84        Self::StringList(values)
85    }
86}
87
88impl From<Vec<String>> for SqlInfoValue {
89    fn from(values: Vec<String>) -> Self {
90        Self::StringList(values)
91    }
92}
93
94impl From<BTreeMap<i32, Vec<i32>>> for SqlInfoValue {
95    fn from(value: BTreeMap<i32, Vec<i32>>) -> Self {
96        Self::ListMap(value)
97    }
98}
99
100impl From<HashMap<i32, Vec<i32>>> for SqlInfoValue {
101    fn from(value: HashMap<i32, Vec<i32>>) -> Self {
102        Self::ListMap(value.into_iter().collect())
103    }
104}
105
106impl From<&HashMap<i32, Vec<i32>>> for SqlInfoValue {
107    fn from(value: &HashMap<i32, Vec<i32>>) -> Self {
108        Self::ListMap(
109            value
110                .iter()
111                .map(|(k, v)| (k.to_owned(), v.to_owned()))
112                .collect(),
113        )
114    }
115}
116
117/// Something that can be converted into u32 (the represenation of a [`SqlInfo`] name)
118pub trait SqlInfoName {
119    fn as_u32(&self) -> u32;
120}
121
122impl SqlInfoName for SqlInfo {
123    fn as_u32(&self) -> u32 {
124        // SqlInfos are u32 in the flight spec, but for some reason
125        // SqlInfo repr is an i32, so convert between them
126        u32::try_from(i32::from(*self)).expect("SqlInfo fit into u32")
127    }
128}
129
130// Allow passing u32 directly into to with_sql_info
131impl SqlInfoName for u32 {
132    fn as_u32(&self) -> u32 {
133        *self
134    }
135}
136
137/// Handles creating the dense [`UnionArray`] described by [flightsql]
138///
139/// incrementally build types/offset of the dense union. See [Union Spec] for details.
140///
141/// ```text
142/// *  value: dense_union<
143/// *              string_value: utf8,
144/// *              bool_value: bool,
145/// *              bigint_value: int64,
146/// *              int32_bitmask: int32,
147/// *              string_list: list<string_data: utf8>
148/// *              int32_to_int32_list_map: map<key: int32, value: list<$data$: int32>>
149/// * >
150/// ```
151///[flightsql]: https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/FlightSql.proto#L32-L43
152///[Union Spec]: https://arrow.apache.org/docs/format/Columnar.html#dense-union
153struct SqlInfoUnionBuilder {
154    // Values for each child type
155    string_values: StringBuilder,
156    bool_values: BooleanBuilder,
157    bigint_values: Int64Builder,
158    int32_bitmask_values: Int32Builder,
159    string_list_values: ListBuilder<StringBuilder>,
160    int32_to_int32_list_map_values: MapBuilder<Int32Builder, ListBuilder<Int32Builder>>,
161    type_ids: Int8Builder,
162    offsets: Int32Builder,
163}
164
165/// [`DataType`] for the output union array
166static UNION_TYPE: Lazy<DataType> = Lazy::new(|| {
167    let fields = vec![
168        Field::new("string_value", DataType::Utf8, false),
169        Field::new("bool_value", DataType::Boolean, false),
170        Field::new("bigint_value", DataType::Int64, false),
171        Field::new("int32_bitmask", DataType::Int32, false),
172        // treat list as nullable b/c that is what the builders make
173        Field::new(
174            "string_list",
175            DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
176            true,
177        ),
178        Field::new(
179            "int32_to_int32_list_map",
180            DataType::Map(
181                Arc::new(Field::new(
182                    "entries",
183                    DataType::Struct(Fields::from(vec![
184                        Field::new("keys", DataType::Int32, false),
185                        Field::new(
186                            "values",
187                            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
188                            true,
189                        ),
190                    ])),
191                    false,
192                )),
193                false,
194            ),
195            true,
196        ),
197    ];
198
199    DataType::Union(UnionFields::from_fields(fields), UnionMode::Dense)
200});
201
202impl SqlInfoUnionBuilder {
203    pub fn new() -> Self {
204        Self {
205            string_values: StringBuilder::new(),
206            bool_values: BooleanBuilder::new(),
207            bigint_values: Int64Builder::new(),
208            int32_bitmask_values: Int32Builder::new(),
209            string_list_values: ListBuilder::new(StringBuilder::new()),
210            int32_to_int32_list_map_values: MapBuilder::new(
211                None,
212                Int32Builder::new(),
213                ListBuilder::new(Int32Builder::new()),
214            ),
215            type_ids: Int8Builder::new(),
216            offsets: Int32Builder::new(),
217        }
218    }
219
220    /// Returns the DataType created by this builder
221    pub fn schema() -> &'static DataType {
222        &UNION_TYPE
223    }
224
225    /// Append the specified value to this builder
226    pub fn append_value(&mut self, v: &SqlInfoValue) -> Result<()> {
227        // typeid is which child and len is the child array's length
228        // *after* adding the value
229        let (type_id, len) = match v {
230            SqlInfoValue::String(v) => {
231                self.string_values.append_value(v);
232                (0, self.string_values.len())
233            }
234            SqlInfoValue::Bool(v) => {
235                self.bool_values.append_value(*v);
236                (1, self.bool_values.len())
237            }
238            SqlInfoValue::BigInt(v) => {
239                self.bigint_values.append_value(*v);
240                (2, self.bigint_values.len())
241            }
242            SqlInfoValue::Bitmask(v) => {
243                self.int32_bitmask_values.append_value(*v);
244                (3, self.int32_bitmask_values.len())
245            }
246            SqlInfoValue::StringList(values) => {
247                // build list
248                for v in values {
249                    self.string_list_values.values().append_value(v);
250                }
251                // complete the list
252                self.string_list_values.append(true);
253                (4, self.string_list_values.len())
254            }
255            SqlInfoValue::ListMap(values) => {
256                // build map
257                for (k, v) in values.clone() {
258                    self.int32_to_int32_list_map_values.keys().append_value(k);
259                    self.int32_to_int32_list_map_values
260                        .values()
261                        .append_value(v.into_iter().map(Some));
262                }
263                // complete the list
264                self.int32_to_int32_list_map_values.append(true)?;
265                (5, self.int32_to_int32_list_map_values.len())
266            }
267        };
268
269        self.type_ids.append_value(type_id);
270        let len = i32::try_from(len).expect("offset fit in i32");
271        self.offsets.append_value(len - 1);
272        Ok(())
273    }
274
275    /// Complete the construction and build the [`UnionArray`]
276    pub fn finish(self) -> UnionArray {
277        let Self {
278            mut string_values,
279            mut bool_values,
280            mut bigint_values,
281            mut int32_bitmask_values,
282            mut string_list_values,
283            mut int32_to_int32_list_map_values,
284            mut type_ids,
285            mut offsets,
286        } = self;
287        let type_ids = type_ids.finish();
288        let offsets = offsets.finish();
289
290        // form the correct ArrayData
291
292        let len = offsets.len();
293        let null_bit_buffer = None;
294        let offset = 0;
295
296        let buffers = vec![
297            type_ids.into_data().buffers()[0].clone(),
298            offsets.into_data().buffers()[0].clone(),
299        ];
300
301        let child_data = vec![
302            string_values.finish().into_data(),
303            bool_values.finish().into_data(),
304            bigint_values.finish().into_data(),
305            int32_bitmask_values.finish().into_data(),
306            string_list_values.finish().into_data(),
307            int32_to_int32_list_map_values.finish().into_data(),
308        ];
309
310        let data = ArrayData::try_new(
311            UNION_TYPE.clone(),
312            len,
313            null_bit_buffer,
314            offset,
315            buffers,
316            child_data,
317        )
318        .expect("Correctly created UnionArray");
319
320        UnionArray::from(data)
321    }
322}
323
324/// Helper to create [`CommandGetSqlInfo`] responses.
325///
326/// [`CommandGetSqlInfo`] are metadata requests used by a Flight SQL
327/// server to communicate supported capabilities to Flight SQL clients.
328///
329/// Servers constuct - usually static - [`SqlInfoData`] via the [`SqlInfoDataBuilder`],
330/// and build responses using [`CommandGetSqlInfo::into_builder`]
331#[derive(Debug, Clone, PartialEq, Default)]
332pub struct SqlInfoDataBuilder {
333    /// Use BTreeMap to ensure the values are sorted by value as
334    /// to make output consistent
335    ///
336    /// Use u32 to support "custom" sql info values that are not
337    /// part of the SqlInfo enum
338    infos: BTreeMap<u32, SqlInfoValue>,
339}
340
341impl SqlInfoDataBuilder {
342    /// Create a new SQL info builder
343    pub fn new() -> Self {
344        Self::default()
345    }
346
347    /// register the specific sql metadata item
348    pub fn append(&mut self, name: impl SqlInfoName, value: impl Into<SqlInfoValue>) {
349        self.infos.insert(name.as_u32(), value.into());
350    }
351
352    /// Encode the contents of this list according to the [FlightSQL spec]
353    ///
354    /// [FlightSQL spec]: https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/FlightSql.proto#L32-L43
355    pub fn build(self) -> Result<SqlInfoData> {
356        let mut name_builder = UInt32Builder::new();
357        let mut value_builder = SqlInfoUnionBuilder::new();
358
359        let mut names: Vec<_> = self.infos.keys().cloned().collect();
360        names.sort_unstable();
361
362        for key in names {
363            let (name, value) = self.infos.get_key_value(&key).unwrap();
364            name_builder.append_value(*name);
365            value_builder.append_value(value)?
366        }
367
368        let batch = RecordBatch::try_from_iter(vec![
369            ("info_name", Arc::new(name_builder.finish()) as _),
370            ("value", Arc::new(value_builder.finish()) as _),
371        ])?;
372
373        Ok(SqlInfoData { batch })
374    }
375
376    /// Return the [`Schema`] for a GetSchema RPC call with [`crate::sql::CommandGetSqlInfo`]
377    pub fn schema() -> &'static Schema {
378        // It is always the same
379        &SQL_INFO_SCHEMA
380    }
381}
382
383/// A builder for [`SqlInfoData`] which is used to create [`CommandGetSqlInfo`] responses.
384///
385/// # Example
386/// ```
387/// # use arrow_flight::sql::{metadata::SqlInfoDataBuilder, SqlInfo, SqlSupportedTransaction};
388/// // Create the list of metadata describing the server
389/// let mut builder = SqlInfoDataBuilder::new();
390/// builder.append(SqlInfo::FlightSqlServerName, "server name");
391///     // ... add other SqlInfo here ..
392/// builder.append(
393///     SqlInfo::FlightSqlServerTransaction,
394///     SqlSupportedTransaction::Transaction as i32,
395/// );
396///
397/// // Create the batch to send back to the client
398/// let info_data = builder.build().unwrap();
399/// ```
400///
401/// [protos]: https://github.com/apache/arrow/blob/6d3d2fca2c9693231fa1e52c142ceef563fc23f9/format/FlightSql.proto#L71-L820
402pub struct SqlInfoData {
403    batch: RecordBatch,
404}
405
406impl SqlInfoData {
407    /// Return a  [`RecordBatch`] containing only the requested `u32`, if any
408    /// from [`CommandGetSqlInfo`]
409    pub fn record_batch(&self, info: impl IntoIterator<Item = u32>) -> Result<RecordBatch> {
410        let arr = self.batch.column(0);
411        let type_filter = info
412            .into_iter()
413            .map(|tt| {
414                let s = UInt32Array::from(vec![tt]);
415                eq(arr, &Scalar::new(&s))
416            })
417            .collect::<std::result::Result<Vec<_>, _>>()?
418            .into_iter()
419            // We know the arrays are of same length as they are produced from the same root array
420            .reduce(|filter, arr| or(&filter, &arr).unwrap());
421        if let Some(filter) = type_filter {
422            Ok(filter_record_batch(&self.batch, &filter)?)
423        } else {
424            Ok(self.batch.clone())
425        }
426    }
427
428    /// Return the schema of the RecordBatch that will be returned
429    /// from [`CommandGetSqlInfo`]
430    pub fn schema(&self) -> SchemaRef {
431        self.batch.schema()
432    }
433}
434
435/// A builder for a [`CommandGetSqlInfo`] response.
436pub struct GetSqlInfoBuilder<'a> {
437    /// requested `SqlInfo`s. If empty means return all infos.
438    info: Vec<u32>,
439    infos: &'a SqlInfoData,
440}
441
442impl CommandGetSqlInfo {
443    /// Create a builder suitable for constructing a response
444    pub fn into_builder(self, infos: &SqlInfoData) -> GetSqlInfoBuilder<'_> {
445        GetSqlInfoBuilder {
446            info: self.info,
447            infos,
448        }
449    }
450}
451
452impl GetSqlInfoBuilder<'_> {
453    /// Builds a `RecordBatch` with the correct schema for a [`CommandGetSqlInfo`] response
454    pub fn build(self) -> Result<RecordBatch> {
455        self.infos.record_batch(self.info)
456    }
457
458    /// Return the schema of the RecordBatch that will be returned
459    /// from [`CommandGetSqlInfo`]
460    pub fn schema(&self) -> SchemaRef {
461        self.infos.schema()
462    }
463}
464
465// The schema produced by [`SqlInfoData`]
466static SQL_INFO_SCHEMA: Lazy<Schema> = Lazy::new(|| {
467    Schema::new(vec![
468        Field::new("info_name", DataType::UInt32, false),
469        Field::new("value", SqlInfoUnionBuilder::schema().clone(), false),
470    ])
471});
472
473#[cfg(test)]
474mod tests {
475    use std::collections::HashMap;
476
477    use super::SqlInfoDataBuilder;
478    use crate::sql::metadata::tests::assert_batches_eq;
479    use crate::sql::{SqlInfo, SqlNullOrdering, SqlSupportedTransaction, SqlSupportsConvert};
480
481    #[test]
482    fn test_sql_infos() {
483        let mut convert: HashMap<i32, Vec<i32>> = HashMap::new();
484        convert.insert(
485            SqlSupportsConvert::SqlConvertInteger as i32,
486            vec![
487                SqlSupportsConvert::SqlConvertFloat as i32,
488                SqlSupportsConvert::SqlConvertReal as i32,
489            ],
490        );
491
492        let mut builder = SqlInfoDataBuilder::new();
493        // str
494        builder.append(SqlInfo::SqlIdentifierQuoteChar, r#"""#);
495        // bool
496        builder.append(SqlInfo::SqlDdlCatalog, false);
497        // i32
498        builder.append(
499            SqlInfo::SqlNullOrdering,
500            SqlNullOrdering::SqlNullsSortedHigh as i32,
501        );
502        // i64
503        builder.append(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64);
504        // [str]
505        builder.append(SqlInfo::SqlKeywords, &["SELECT", "DELETE"] as &[&str]);
506        builder.append(SqlInfo::SqlSupportsConvert, &convert);
507
508        let batch = builder.build().unwrap().record_batch(None).unwrap();
509
510        let expected = vec![
511            "+-----------+----------------------------------------+",
512            "| info_name | value                                  |",
513            "+-----------+----------------------------------------+",
514            "| 500       | {bool_value=false}                     |",
515            "| 504       | {string_value=\"}                       |",
516            "| 507       | {int32_bitmask=0}                      |",
517            "| 508       | {string_list=[SELECT, DELETE]}         |",
518            "| 517       | {int32_to_int32_list_map={7: [6, 13]}} |",
519            "| 541       | {bigint_value=2147483647}              |",
520            "+-----------+----------------------------------------+",
521        ];
522
523        assert_batches_eq(&[batch], &expected);
524    }
525
526    #[test]
527    fn test_filter_sql_infos() {
528        let mut builder = SqlInfoDataBuilder::new();
529        builder.append(SqlInfo::FlightSqlServerName, "server name");
530        builder.append(
531            SqlInfo::FlightSqlServerTransaction,
532            SqlSupportedTransaction::Transaction as i32,
533        );
534        let data = builder.build().unwrap();
535
536        let batch = data.record_batch(None).unwrap();
537        assert_eq!(batch.num_rows(), 2);
538
539        let batch = data
540            .record_batch([SqlInfo::FlightSqlServerTransaction as u32])
541            .unwrap();
542        let mut ref_builder = SqlInfoDataBuilder::new();
543        ref_builder.append(
544            SqlInfo::FlightSqlServerTransaction,
545            SqlSupportedTransaction::Transaction as i32,
546        );
547        let ref_batch = ref_builder.build().unwrap().record_batch(None).unwrap();
548
549        assert_eq!(batch, ref_batch);
550    }
551}