arrow_flight/sql/metadata/
sql_info.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Helpers for building responses to [`CommandGetSqlInfo`] metadata requests.
19//!
20//! - [`SqlInfoDataBuilder`] - a builder for collecting sql infos
21//!   and building a conformant `RecordBatch` with sql info server metadata.
22//! - [`SqlInfoData`] - a helper type wrapping a `RecordBatch`
23//!   used for storing sql info server metadata.
24//! - [`GetSqlInfoBuilder`] - a builder for consructing [`CommandGetSqlInfo`] responses.
25//!
26
27use std::collections::{BTreeMap, HashMap};
28use std::sync::Arc;
29
30use arrow_arith::boolean::or;
31use arrow_array::array::{Array, UInt32Array, UnionArray};
32use arrow_array::builder::{
33    ArrayBuilder, BooleanBuilder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, MapBuilder,
34    StringBuilder, UInt32Builder,
35};
36use arrow_array::{RecordBatch, Scalar};
37use arrow_data::ArrayData;
38use arrow_ord::cmp::eq;
39use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, UnionFields, UnionMode};
40use arrow_select::filter::filter_record_batch;
41use once_cell::sync::Lazy;
42
43use crate::error::Result;
44use crate::sql::{CommandGetSqlInfo, SqlInfo};
45
46/// Represents a dynamic value
47#[derive(Debug, Clone, PartialEq)]
48pub enum SqlInfoValue {
49    String(String),
50    Bool(bool),
51    BigInt(i64),
52    Bitmask(i32),
53    StringList(Vec<String>),
54    ListMap(BTreeMap<i32, Vec<i32>>),
55}
56
57impl From<&str> for SqlInfoValue {
58    fn from(value: &str) -> Self {
59        Self::String(value.to_string())
60    }
61}
62
63impl From<bool> for SqlInfoValue {
64    fn from(value: bool) -> Self {
65        Self::Bool(value)
66    }
67}
68
69impl From<i32> for SqlInfoValue {
70    fn from(value: i32) -> Self {
71        Self::Bitmask(value)
72    }
73}
74
75impl From<i64> for SqlInfoValue {
76    fn from(value: i64) -> Self {
77        Self::BigInt(value)
78    }
79}
80
81impl From<&[&str]> for SqlInfoValue {
82    fn from(values: &[&str]) -> Self {
83        let values = values.iter().map(|s| s.to_string()).collect();
84        Self::StringList(values)
85    }
86}
87
88impl From<Vec<String>> for SqlInfoValue {
89    fn from(values: Vec<String>) -> Self {
90        Self::StringList(values)
91    }
92}
93
94impl From<BTreeMap<i32, Vec<i32>>> for SqlInfoValue {
95    fn from(value: BTreeMap<i32, Vec<i32>>) -> Self {
96        Self::ListMap(value)
97    }
98}
99
100impl From<HashMap<i32, Vec<i32>>> for SqlInfoValue {
101    fn from(value: HashMap<i32, Vec<i32>>) -> Self {
102        Self::ListMap(value.into_iter().collect())
103    }
104}
105
106impl From<&HashMap<i32, Vec<i32>>> for SqlInfoValue {
107    fn from(value: &HashMap<i32, Vec<i32>>) -> Self {
108        Self::ListMap(
109            value
110                .iter()
111                .map(|(k, v)| (k.to_owned(), v.to_owned()))
112                .collect(),
113        )
114    }
115}
116
117/// Something that can be converted into u32 (the represenation of a [`SqlInfo`] name)
118pub trait SqlInfoName {
119    fn as_u32(&self) -> u32;
120}
121
122impl SqlInfoName for SqlInfo {
123    fn as_u32(&self) -> u32 {
124        // SqlInfos are u32 in the flight spec, but for some reason
125        // SqlInfo repr is an i32, so convert between them
126        u32::try_from(i32::from(*self)).expect("SqlInfo fit into u32")
127    }
128}
129
130// Allow passing u32 directly into to with_sql_info
131impl SqlInfoName for u32 {
132    fn as_u32(&self) -> u32 {
133        *self
134    }
135}
136
137/// Handles creating the dense [`UnionArray`] described by [flightsql]
138///
139/// incrementally build types/offset of the dense union. See [Union Spec] for details.
140///
141/// ```text
142/// *  value: dense_union<
143/// *              string_value: utf8,
144/// *              bool_value: bool,
145/// *              bigint_value: int64,
146/// *              int32_bitmask: int32,
147/// *              string_list: list<string_data: utf8>
148/// *              int32_to_int32_list_map: map<key: int32, value: list<$data$: int32>>
149/// * >
150/// ```
151///[flightsql]: https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/FlightSql.proto#L32-L43
152///[Union Spec]: https://arrow.apache.org/docs/format/Columnar.html#dense-union
153struct SqlInfoUnionBuilder {
154    // Values for each child type
155    string_values: StringBuilder,
156    bool_values: BooleanBuilder,
157    bigint_values: Int64Builder,
158    int32_bitmask_values: Int32Builder,
159    string_list_values: ListBuilder<StringBuilder>,
160    int32_to_int32_list_map_values: MapBuilder<Int32Builder, ListBuilder<Int32Builder>>,
161    type_ids: Int8Builder,
162    offsets: Int32Builder,
163}
164
165/// [`DataType`] for the output union array
166static UNION_TYPE: Lazy<DataType> = Lazy::new(|| {
167    let fields = vec![
168        Field::new("string_value", DataType::Utf8, false),
169        Field::new("bool_value", DataType::Boolean, false),
170        Field::new("bigint_value", DataType::Int64, false),
171        Field::new("int32_bitmask", DataType::Int32, false),
172        // treat list as nullable b/c that is what the builders make
173        Field::new(
174            "string_list",
175            DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
176            true,
177        ),
178        Field::new(
179            "int32_to_int32_list_map",
180            DataType::Map(
181                Arc::new(Field::new(
182                    "entries",
183                    DataType::Struct(Fields::from(vec![
184                        Field::new("keys", DataType::Int32, false),
185                        Field::new(
186                            "values",
187                            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
188                            true,
189                        ),
190                    ])),
191                    false,
192                )),
193                false,
194            ),
195            true,
196        ),
197    ];
198
199    // create "type ids", one for each type, assume they go from 0 .. num_fields
200    let type_ids: Vec<i8> = (0..fields.len()).map(|v| v as i8).collect();
201
202    DataType::Union(UnionFields::new(type_ids, fields), UnionMode::Dense)
203});
204
205impl SqlInfoUnionBuilder {
206    pub fn new() -> Self {
207        Self {
208            string_values: StringBuilder::new(),
209            bool_values: BooleanBuilder::new(),
210            bigint_values: Int64Builder::new(),
211            int32_bitmask_values: Int32Builder::new(),
212            string_list_values: ListBuilder::new(StringBuilder::new()),
213            int32_to_int32_list_map_values: MapBuilder::new(
214                None,
215                Int32Builder::new(),
216                ListBuilder::new(Int32Builder::new()),
217            ),
218            type_ids: Int8Builder::new(),
219            offsets: Int32Builder::new(),
220        }
221    }
222
223    /// Returns the DataType created by this builder
224    pub fn schema() -> &'static DataType {
225        &UNION_TYPE
226    }
227
228    /// Append the specified value to this builder
229    pub fn append_value(&mut self, v: &SqlInfoValue) -> Result<()> {
230        // typeid is which child and len is the child array's length
231        // *after* adding the value
232        let (type_id, len) = match v {
233            SqlInfoValue::String(v) => {
234                self.string_values.append_value(v);
235                (0, self.string_values.len())
236            }
237            SqlInfoValue::Bool(v) => {
238                self.bool_values.append_value(*v);
239                (1, self.bool_values.len())
240            }
241            SqlInfoValue::BigInt(v) => {
242                self.bigint_values.append_value(*v);
243                (2, self.bigint_values.len())
244            }
245            SqlInfoValue::Bitmask(v) => {
246                self.int32_bitmask_values.append_value(*v);
247                (3, self.int32_bitmask_values.len())
248            }
249            SqlInfoValue::StringList(values) => {
250                // build list
251                for v in values {
252                    self.string_list_values.values().append_value(v);
253                }
254                // complete the list
255                self.string_list_values.append(true);
256                (4, self.string_list_values.len())
257            }
258            SqlInfoValue::ListMap(values) => {
259                // build map
260                for (k, v) in values.clone() {
261                    self.int32_to_int32_list_map_values.keys().append_value(k);
262                    self.int32_to_int32_list_map_values
263                        .values()
264                        .append_value(v.into_iter().map(Some));
265                }
266                // complete the list
267                self.int32_to_int32_list_map_values.append(true)?;
268                (5, self.int32_to_int32_list_map_values.len())
269            }
270        };
271
272        self.type_ids.append_value(type_id);
273        let len = i32::try_from(len).expect("offset fit in i32");
274        self.offsets.append_value(len - 1);
275        Ok(())
276    }
277
278    /// Complete the construction and build the [`UnionArray`]
279    pub fn finish(self) -> UnionArray {
280        let Self {
281            mut string_values,
282            mut bool_values,
283            mut bigint_values,
284            mut int32_bitmask_values,
285            mut string_list_values,
286            mut int32_to_int32_list_map_values,
287            mut type_ids,
288            mut offsets,
289        } = self;
290        let type_ids = type_ids.finish();
291        let offsets = offsets.finish();
292
293        // form the correct ArrayData
294
295        let len = offsets.len();
296        let null_bit_buffer = None;
297        let offset = 0;
298
299        let buffers = vec![
300            type_ids.into_data().buffers()[0].clone(),
301            offsets.into_data().buffers()[0].clone(),
302        ];
303
304        let child_data = vec![
305            string_values.finish().into_data(),
306            bool_values.finish().into_data(),
307            bigint_values.finish().into_data(),
308            int32_bitmask_values.finish().into_data(),
309            string_list_values.finish().into_data(),
310            int32_to_int32_list_map_values.finish().into_data(),
311        ];
312
313        let data = ArrayData::try_new(
314            UNION_TYPE.clone(),
315            len,
316            null_bit_buffer,
317            offset,
318            buffers,
319            child_data,
320        )
321        .expect("Correctly created UnionArray");
322
323        UnionArray::from(data)
324    }
325}
326
327/// Helper to create [`CommandGetSqlInfo`] responses.
328///
329/// [`CommandGetSqlInfo`] are metadata requests used by a Flight SQL
330/// server to communicate supported capabilities to Flight SQL clients.
331///
332/// Servers constuct - usually static - [`SqlInfoData`] via the [`SqlInfoDataBuilder`],
333/// and build responses using [`CommandGetSqlInfo::into_builder`]
334#[derive(Debug, Clone, PartialEq, Default)]
335pub struct SqlInfoDataBuilder {
336    /// Use BTreeMap to ensure the values are sorted by value as
337    /// to make output consistent
338    ///
339    /// Use u32 to support "custom" sql info values that are not
340    /// part of the SqlInfo enum
341    infos: BTreeMap<u32, SqlInfoValue>,
342}
343
344impl SqlInfoDataBuilder {
345    /// Create a new SQL info builder
346    pub fn new() -> Self {
347        Self::default()
348    }
349
350    /// register the specific sql metadata item
351    pub fn append(&mut self, name: impl SqlInfoName, value: impl Into<SqlInfoValue>) {
352        self.infos.insert(name.as_u32(), value.into());
353    }
354
355    /// Encode the contents of this list according to the [FlightSQL spec]
356    ///
357    /// [FlightSQL spec]: https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/FlightSql.proto#L32-L43
358    pub fn build(self) -> Result<SqlInfoData> {
359        let mut name_builder = UInt32Builder::new();
360        let mut value_builder = SqlInfoUnionBuilder::new();
361
362        let mut names: Vec<_> = self.infos.keys().cloned().collect();
363        names.sort_unstable();
364
365        for key in names {
366            let (name, value) = self.infos.get_key_value(&key).unwrap();
367            name_builder.append_value(*name);
368            value_builder.append_value(value)?
369        }
370
371        let batch = RecordBatch::try_from_iter(vec![
372            ("info_name", Arc::new(name_builder.finish()) as _),
373            ("value", Arc::new(value_builder.finish()) as _),
374        ])?;
375
376        Ok(SqlInfoData { batch })
377    }
378
379    /// Return the [`Schema`] for a GetSchema RPC call with [`crate::sql::CommandGetSqlInfo`]
380    pub fn schema() -> &'static Schema {
381        // It is always the same
382        &SQL_INFO_SCHEMA
383    }
384}
385
386/// A builder for [`SqlInfoData`] which is used to create [`CommandGetSqlInfo`] responses.
387///
388/// # Example
389/// ```
390/// # use arrow_flight::sql::{metadata::SqlInfoDataBuilder, SqlInfo, SqlSupportedTransaction};
391/// // Create the list of metadata describing the server
392/// let mut builder = SqlInfoDataBuilder::new();
393/// builder.append(SqlInfo::FlightSqlServerName, "server name");
394///     // ... add other SqlInfo here ..
395/// builder.append(
396///     SqlInfo::FlightSqlServerTransaction,
397///     SqlSupportedTransaction::Transaction as i32,
398/// );
399///
400/// // Create the batch to send back to the client
401/// let info_data = builder.build().unwrap();
402/// ```
403///
404/// [protos]: https://github.com/apache/arrow/blob/6d3d2fca2c9693231fa1e52c142ceef563fc23f9/format/FlightSql.proto#L71-L820
405pub struct SqlInfoData {
406    batch: RecordBatch,
407}
408
409impl SqlInfoData {
410    /// Return a  [`RecordBatch`] containing only the requested `u32`, if any
411    /// from [`CommandGetSqlInfo`]
412    pub fn record_batch(&self, info: impl IntoIterator<Item = u32>) -> Result<RecordBatch> {
413        let arr = self.batch.column(0);
414        let type_filter = info
415            .into_iter()
416            .map(|tt| {
417                let s = UInt32Array::from(vec![tt]);
418                eq(arr, &Scalar::new(&s))
419            })
420            .collect::<std::result::Result<Vec<_>, _>>()?
421            .into_iter()
422            // We know the arrays are of same length as they are produced from the same root array
423            .reduce(|filter, arr| or(&filter, &arr).unwrap());
424        if let Some(filter) = type_filter {
425            Ok(filter_record_batch(&self.batch, &filter)?)
426        } else {
427            Ok(self.batch.clone())
428        }
429    }
430
431    /// Return the schema of the RecordBatch that will be returned
432    /// from [`CommandGetSqlInfo`]
433    pub fn schema(&self) -> SchemaRef {
434        self.batch.schema()
435    }
436}
437
438/// A builder for a [`CommandGetSqlInfo`] response.
439pub struct GetSqlInfoBuilder<'a> {
440    /// requested `SqlInfo`s. If empty means return all infos.
441    info: Vec<u32>,
442    infos: &'a SqlInfoData,
443}
444
445impl CommandGetSqlInfo {
446    /// Create a builder suitable for constructing a response
447    pub fn into_builder(self, infos: &SqlInfoData) -> GetSqlInfoBuilder {
448        GetSqlInfoBuilder {
449            info: self.info,
450            infos,
451        }
452    }
453}
454
455impl GetSqlInfoBuilder<'_> {
456    /// Builds a `RecordBatch` with the correct schema for a [`CommandGetSqlInfo`] response
457    pub fn build(self) -> Result<RecordBatch> {
458        self.infos.record_batch(self.info)
459    }
460
461    /// Return the schema of the RecordBatch that will be returned
462    /// from [`CommandGetSqlInfo`]
463    pub fn schema(&self) -> SchemaRef {
464        self.infos.schema()
465    }
466}
467
468// The schema produced by [`SqlInfoData`]
469static SQL_INFO_SCHEMA: Lazy<Schema> = Lazy::new(|| {
470    Schema::new(vec![
471        Field::new("info_name", DataType::UInt32, false),
472        Field::new("value", SqlInfoUnionBuilder::schema().clone(), false),
473    ])
474});
475
476#[cfg(test)]
477mod tests {
478    use std::collections::HashMap;
479
480    use super::SqlInfoDataBuilder;
481    use crate::sql::metadata::tests::assert_batches_eq;
482    use crate::sql::{SqlInfo, SqlNullOrdering, SqlSupportedTransaction, SqlSupportsConvert};
483
484    #[test]
485    fn test_sql_infos() {
486        let mut convert: HashMap<i32, Vec<i32>> = HashMap::new();
487        convert.insert(
488            SqlSupportsConvert::SqlConvertInteger as i32,
489            vec![
490                SqlSupportsConvert::SqlConvertFloat as i32,
491                SqlSupportsConvert::SqlConvertReal as i32,
492            ],
493        );
494
495        let mut builder = SqlInfoDataBuilder::new();
496        // str
497        builder.append(SqlInfo::SqlIdentifierQuoteChar, r#"""#);
498        // bool
499        builder.append(SqlInfo::SqlDdlCatalog, false);
500        // i32
501        builder.append(
502            SqlInfo::SqlNullOrdering,
503            SqlNullOrdering::SqlNullsSortedHigh as i32,
504        );
505        // i64
506        builder.append(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64);
507        // [str]
508        builder.append(SqlInfo::SqlKeywords, &["SELECT", "DELETE"] as &[&str]);
509        builder.append(SqlInfo::SqlSupportsConvert, &convert);
510
511        let batch = builder.build().unwrap().record_batch(None).unwrap();
512
513        let expected = vec![
514            "+-----------+----------------------------------------+",
515            "| info_name | value                                  |",
516            "+-----------+----------------------------------------+",
517            "| 500       | {bool_value=false}                     |",
518            "| 504       | {string_value=\"}                       |",
519            "| 507       | {int32_bitmask=0}                      |",
520            "| 508       | {string_list=[SELECT, DELETE]}         |",
521            "| 517       | {int32_to_int32_list_map={7: [6, 13]}} |",
522            "| 541       | {bigint_value=2147483647}              |",
523            "+-----------+----------------------------------------+",
524        ];
525
526        assert_batches_eq(&[batch], &expected);
527    }
528
529    #[test]
530    fn test_filter_sql_infos() {
531        let mut builder = SqlInfoDataBuilder::new();
532        builder.append(SqlInfo::FlightSqlServerName, "server name");
533        builder.append(
534            SqlInfo::FlightSqlServerTransaction,
535            SqlSupportedTransaction::Transaction as i32,
536        );
537        let data = builder.build().unwrap();
538
539        let batch = data.record_batch(None).unwrap();
540        assert_eq!(batch.num_rows(), 2);
541
542        let batch = data
543            .record_batch([SqlInfo::FlightSqlServerTransaction as u32])
544            .unwrap();
545        let mut ref_builder = SqlInfoDataBuilder::new();
546        ref_builder.append(
547            SqlInfo::FlightSqlServerTransaction,
548            SqlSupportedTransaction::Transaction as i32,
549        );
550        let ref_batch = ref_builder.build().unwrap().record_batch(None).unwrap();
551
552        assert_eq!(batch, ref_batch);
553    }
554}