arrow_flight/sql/metadata/
db_schemas.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`GetDbSchemasBuilder`] for building responses to [`CommandGetDbSchemas`] queries.
19//!
20//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
21
22use std::sync::Arc;
23
24use arrow_arith::boolean::and;
25use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch, StringArray};
26use arrow_ord::cmp::eq;
27use arrow_schema::{DataType, Field, Schema, SchemaRef};
28use arrow_select::{filter::filter_record_batch, take::take};
29use arrow_string::like::like;
30use once_cell::sync::Lazy;
31
32use super::lexsort_to_indices;
33use crate::error::*;
34use crate::sql::CommandGetDbSchemas;
35
36/// A builder for a [`CommandGetDbSchemas`] response.
37///
38/// Builds rows like this:
39///
40/// * catalog_name: utf8,
41/// * db_schema_name: utf8,
42pub struct GetDbSchemasBuilder {
43    // Specifies the Catalog to search for the tables.
44    // - An empty string retrieves those without a catalog.
45    // - If omitted the catalog name is not used to narrow the search.
46    catalog_filter: Option<String>,
47    // Optional filters to apply
48    db_schema_filter_pattern: Option<String>,
49    // array builder for catalog names
50    catalog_name: StringBuilder,
51    // array builder for schema names
52    db_schema_name: StringBuilder,
53}
54
55impl CommandGetDbSchemas {
56    /// Create a builder suitable for constructing a response
57    pub fn into_builder(self) -> GetDbSchemasBuilder {
58        self.into()
59    }
60}
61
62impl From<CommandGetDbSchemas> for GetDbSchemasBuilder {
63    fn from(value: CommandGetDbSchemas) -> Self {
64        Self::new(value.catalog, value.db_schema_filter_pattern)
65    }
66}
67
68impl GetDbSchemasBuilder {
69    /// Create a new instance of [`GetDbSchemasBuilder`]
70    ///
71    /// # Parameters
72    ///
73    /// - `catalog`:  Specifies the Catalog to search for the tables.
74    ///   - An empty string retrieves those without a catalog.
75    ///   - If omitted the catalog name is not used to narrow the search.
76    /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas to search for.
77    ///   When no pattern is provided, the pattern will not be used to narrow the search.
78    ///   In the pattern string, two special characters can be used to denote matching rules:
79    ///     - "%" means to match any substring with 0 or more characters.
80    ///     - "_" means to match any one character.
81    ///
82    /// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
83    pub fn new(
84        catalog: Option<impl Into<String>>,
85        db_schema_filter_pattern: Option<impl Into<String>>,
86    ) -> Self {
87        Self {
88            catalog_filter: catalog.map(|v| v.into()),
89            db_schema_filter_pattern: db_schema_filter_pattern.map(|v| v.into()),
90            catalog_name: StringBuilder::new(),
91            db_schema_name: StringBuilder::new(),
92        }
93    }
94
95    /// Append a row
96    ///
97    /// In case the catalog should be considered as empty, pass in an empty string '""'.
98    pub fn append(&mut self, catalog_name: impl AsRef<str>, schema_name: impl AsRef<str>) {
99        self.catalog_name.append_value(catalog_name);
100        self.db_schema_name.append_value(schema_name);
101    }
102
103    /// builds a `RecordBatch` with the correct schema for a `CommandGetDbSchemas` response
104    pub fn build(self) -> Result<RecordBatch> {
105        let schema = self.schema();
106        let Self {
107            catalog_filter,
108            db_schema_filter_pattern,
109            mut catalog_name,
110            mut db_schema_name,
111        } = self;
112
113        // Make the arrays
114        let catalog_name = catalog_name.finish();
115        let db_schema_name = db_schema_name.finish();
116
117        let mut filters = vec![];
118
119        if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
120            // use like kernel to get wildcard matching
121            let scalar = StringArray::new_scalar(db_schema_filter_pattern);
122            filters.push(like(&db_schema_name, &scalar)?)
123        }
124
125        if let Some(catalog_filter_name) = catalog_filter {
126            let scalar = StringArray::new_scalar(catalog_filter_name);
127            filters.push(eq(&catalog_name, &scalar)?);
128        }
129
130        // `AND` any filters together
131        let mut total_filter = None;
132        while let Some(filter) = filters.pop() {
133            let new_filter = match total_filter {
134                Some(total_filter) => and(&total_filter, &filter)?,
135                None => filter,
136            };
137            total_filter = Some(new_filter);
138        }
139
140        let batch = RecordBatch::try_new(
141            schema,
142            vec![
143                Arc::new(catalog_name) as ArrayRef,
144                Arc::new(db_schema_name) as ArrayRef,
145            ],
146        )?;
147
148        // Apply the filters if needed
149        let filtered_batch = if let Some(filter) = total_filter {
150            filter_record_batch(&batch, &filter)?
151        } else {
152            batch
153        };
154
155        // Order filtered results by catalog_name, then db_schema_name
156        let indices = lexsort_to_indices(filtered_batch.columns());
157        let columns = filtered_batch
158            .columns()
159            .iter()
160            .map(|c| take(c, &indices, None))
161            .collect::<std::result::Result<Vec<_>, _>>()?;
162
163        Ok(RecordBatch::try_new(filtered_batch.schema(), columns)?)
164    }
165
166    /// Return the schema of the RecordBatch that will be returned
167    /// from [`CommandGetDbSchemas`]
168    pub fn schema(&self) -> SchemaRef {
169        get_db_schemas_schema()
170    }
171}
172
173fn get_db_schemas_schema() -> SchemaRef {
174    Arc::clone(&GET_DB_SCHEMAS_SCHEMA)
175}
176
177/// The schema for GetDbSchemas
178static GET_DB_SCHEMAS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
179    Arc::new(Schema::new(vec![
180        Field::new("catalog_name", DataType::Utf8, false),
181        Field::new("db_schema_name", DataType::Utf8, false),
182    ]))
183});
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use arrow_array::{StringArray, UInt32Array};
189
190    fn get_ref_batch() -> RecordBatch {
191        RecordBatch::try_new(
192            get_db_schemas_schema(),
193            vec![
194                Arc::new(StringArray::from(vec![
195                    "a_catalog",
196                    "a_catalog",
197                    "b_catalog",
198                    "b_catalog",
199                ])) as ArrayRef,
200                Arc::new(StringArray::from(vec![
201                    "a_schema", "b_schema", "a_schema", "b_schema",
202                ])) as ArrayRef,
203            ],
204        )
205        .unwrap()
206    }
207
208    #[test]
209    fn test_schemas_are_filtered() {
210        let ref_batch = get_ref_batch();
211
212        let mut builder = GetDbSchemasBuilder::new(None::<String>, None::<String>);
213        builder.append("a_catalog", "a_schema");
214        builder.append("a_catalog", "b_schema");
215        builder.append("b_catalog", "a_schema");
216        builder.append("b_catalog", "b_schema");
217        let schema_batch = builder.build().unwrap();
218
219        assert_eq!(schema_batch, ref_batch);
220
221        let mut builder = GetDbSchemasBuilder::new(None::<String>, Some("a%"));
222        builder.append("a_catalog", "a_schema");
223        builder.append("a_catalog", "b_schema");
224        builder.append("b_catalog", "a_schema");
225        builder.append("b_catalog", "b_schema");
226        let schema_batch = builder.build().unwrap();
227
228        let indices = UInt32Array::from(vec![0, 2]);
229        let ref_filtered = RecordBatch::try_new(
230            get_db_schemas_schema(),
231            ref_batch
232                .columns()
233                .iter()
234                .map(|c| take(c, &indices, None))
235                .collect::<std::result::Result<Vec<_>, _>>()
236                .unwrap(),
237        )
238        .unwrap();
239
240        assert_eq!(schema_batch, ref_filtered);
241    }
242
243    #[test]
244    fn test_schemas_are_sorted() {
245        let ref_batch = get_ref_batch();
246
247        let mut builder = GetDbSchemasBuilder::new(None::<String>, None::<String>);
248        builder.append("a_catalog", "b_schema");
249        builder.append("b_catalog", "a_schema");
250        builder.append("a_catalog", "a_schema");
251        builder.append("b_catalog", "b_schema");
252        let schema_batch = builder.build().unwrap();
253
254        assert_eq!(schema_batch, ref_batch)
255    }
256
257    #[test]
258    fn test_builder_from_query() {
259        let ref_batch = get_ref_batch();
260        let query = CommandGetDbSchemas {
261            catalog: Some("a_catalog".into()),
262            db_schema_filter_pattern: Some("b%".into()),
263        };
264
265        let mut builder = query.into_builder();
266        builder.append("a_catalog", "a_schema");
267        builder.append("a_catalog", "b_schema");
268        builder.append("b_catalog", "a_schema");
269        builder.append("b_catalog", "b_schema");
270        let schema_batch = builder.build().unwrap();
271
272        let indices = UInt32Array::from(vec![1]);
273        let ref_filtered = RecordBatch::try_new(
274            get_db_schemas_schema(),
275            ref_batch
276                .columns()
277                .iter()
278                .map(|c| take(c, &indices, None))
279                .collect::<std::result::Result<Vec<_>, _>>()
280                .unwrap(),
281        )
282        .unwrap();
283
284        assert_eq!(schema_batch, ref_filtered);
285    }
286}