arrow_flight/sql/metadata/
tables.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`GetTablesBuilder`] for building responses to [`CommandGetTables`] queries.
19//!
20//! [`CommandGetTables`]: crate::sql::CommandGetTables
21
22use std::sync::Arc;
23
24use arrow_arith::boolean::{and, or};
25use arrow_array::builder::{BinaryBuilder, StringBuilder};
26use arrow_array::{ArrayRef, RecordBatch, StringArray};
27use arrow_ord::cmp::eq;
28use arrow_schema::{DataType, Field, Schema, SchemaRef};
29use arrow_select::{filter::filter_record_batch, take::take};
30use arrow_string::like::like;
31use once_cell::sync::Lazy;
32
33use super::lexsort_to_indices;
34use crate::error::*;
35use crate::sql::CommandGetTables;
36use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc};
37
38/// A builder for a [`CommandGetTables`] response.
39///
40/// Builds rows like this:
41///
42/// * catalog_name: utf8,
43/// * db_schema_name: utf8,
44/// * table_name: utf8 not null,
45/// * table_type: utf8 not null,
46/// * (optional) table_schema: bytes not null (schema of the table as described
47///   in Schema.fbs::Schema it is serialized as an IPC message.)
48pub struct GetTablesBuilder {
49    catalog_filter: Option<String>,
50    table_types_filter: Vec<String>,
51    // Optional filters to apply to schemas
52    db_schema_filter_pattern: Option<String>,
53    // Optional filters to apply to tables
54    table_name_filter_pattern: Option<String>,
55    // array builder for catalog names
56    catalog_name: StringBuilder,
57    // array builder for db schema names
58    db_schema_name: StringBuilder,
59    // array builder for tables names
60    table_name: StringBuilder,
61    // array builder for table types
62    table_type: StringBuilder,
63    // array builder for table schemas
64    table_schema: Option<BinaryBuilder>,
65}
66
67impl CommandGetTables {
68    /// Create a builder suitable for constructing a response
69    pub fn into_builder(self) -> GetTablesBuilder {
70        self.into()
71    }
72}
73
74impl From<CommandGetTables> for GetTablesBuilder {
75    fn from(value: CommandGetTables) -> Self {
76        Self::new(
77            value.catalog,
78            value.db_schema_filter_pattern,
79            value.table_name_filter_pattern,
80            value.table_types,
81            value.include_schema,
82        )
83    }
84}
85
86impl GetTablesBuilder {
87    /// Create a new instance of [`GetTablesBuilder`]
88    ///
89    /// # Parameters
90    ///
91    /// - `catalog`:  Specifies the Catalog to search for the tables.
92    ///   - An empty string retrieves those without a catalog.
93    ///   - If omitted the catalog name is not used to narrow the search.
94    /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas to search for.
95    ///   When no pattern is provided, the pattern will not be used to narrow the search.
96    ///   In the pattern string, two special characters can be used to denote matching rules:
97    ///     - "%" means to match any substring with 0 or more characters.
98    ///     - "_" means to match any one character.
99    /// - `table_name_filter_pattern`: Specifies a filter pattern for tables to search for.
100    ///   When no pattern is provided, all tables matching other filters are searched.
101    ///   In the pattern string, two special characters can be used to denote matching rules:
102    ///     - "%" means to match any substring with 0 or more characters.
103    ///     - "_" means to match any one character.
104    /// - `table_types`:  Specifies a filter of table types which must match.
105    ///   An empy Vec matches all table types.
106    /// - `include_schema`: Specifies if the Arrow schema should be returned for found tables.
107    ///
108    /// [`CommandGetTables`]: crate::sql::CommandGetTables
109    pub fn new(
110        catalog: Option<impl Into<String>>,
111        db_schema_filter_pattern: Option<impl Into<String>>,
112        table_name_filter_pattern: Option<impl Into<String>>,
113        table_types: impl IntoIterator<Item = impl Into<String>>,
114        include_schema: bool,
115    ) -> Self {
116        let table_schema = if include_schema {
117            Some(BinaryBuilder::new())
118        } else {
119            None
120        };
121        Self {
122            catalog_filter: catalog.map(|s| s.into()),
123            table_types_filter: table_types.into_iter().map(|tt| tt.into()).collect(),
124            db_schema_filter_pattern: db_schema_filter_pattern.map(|s| s.into()),
125            table_name_filter_pattern: table_name_filter_pattern.map(|t| t.into()),
126            catalog_name: StringBuilder::new(),
127            db_schema_name: StringBuilder::new(),
128            table_name: StringBuilder::new(),
129            table_type: StringBuilder::new(),
130            table_schema,
131        }
132    }
133
134    /// Append a row
135    pub fn append(
136        &mut self,
137        catalog_name: impl AsRef<str>,
138        schema_name: impl AsRef<str>,
139        table_name: impl AsRef<str>,
140        table_type: impl AsRef<str>,
141        table_schema: &Schema,
142    ) -> Result<()> {
143        self.catalog_name.append_value(catalog_name);
144        self.db_schema_name.append_value(schema_name);
145        self.table_name.append_value(table_name);
146        self.table_type.append_value(table_type);
147        if let Some(self_table_schema) = self.table_schema.as_mut() {
148            let options = IpcWriteOptions::default();
149            // encode the schema into the correct form
150            let message: std::result::Result<IpcMessage, _> =
151                SchemaAsIpc::new(table_schema, &options).try_into();
152            let IpcMessage(schema) = message?;
153            self_table_schema.append_value(schema);
154        }
155
156        Ok(())
157    }
158
159    /// builds a `RecordBatch` for `CommandGetTables`
160    pub fn build(self) -> Result<RecordBatch> {
161        let schema = self.schema();
162        let Self {
163            catalog_filter,
164            table_types_filter,
165            db_schema_filter_pattern,
166            table_name_filter_pattern,
167
168            mut catalog_name,
169            mut db_schema_name,
170            mut table_name,
171            mut table_type,
172            table_schema,
173        } = self;
174
175        // Make the arrays
176        let catalog_name = catalog_name.finish();
177        let db_schema_name = db_schema_name.finish();
178        let table_name = table_name.finish();
179        let table_type = table_type.finish();
180        let table_schema = table_schema.map(|mut table_schema| table_schema.finish());
181
182        // apply any filters, getting a BooleanArray that represents
183        // the rows that passed the filter
184        let mut filters = vec![];
185
186        if let Some(catalog_filter_name) = catalog_filter {
187            let scalar = StringArray::new_scalar(catalog_filter_name);
188            filters.push(eq(&catalog_name, &scalar)?);
189        }
190
191        let tt_filter = table_types_filter
192            .into_iter()
193            .map(|tt| eq(&table_type, &StringArray::new_scalar(tt)))
194            .collect::<std::result::Result<Vec<_>, _>>()?
195            .into_iter()
196            // We know the arrays are of same length as they are produced fromn the same root array
197            .reduce(|filter, arr| or(&filter, &arr).unwrap());
198        if let Some(filter) = tt_filter {
199            filters.push(filter);
200        }
201
202        if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
203            // use like kernel to get wildcard matching
204            let scalar = StringArray::new_scalar(db_schema_filter_pattern);
205            filters.push(like(&db_schema_name, &scalar)?)
206        }
207
208        if let Some(table_name_filter_pattern) = table_name_filter_pattern {
209            // use like kernel to get wildcard matching
210            let scalar = StringArray::new_scalar(table_name_filter_pattern);
211            filters.push(like(&table_name, &scalar)?)
212        }
213
214        let batch = if let Some(table_schema) = table_schema {
215            RecordBatch::try_new(
216                schema,
217                vec![
218                    Arc::new(catalog_name) as ArrayRef,
219                    Arc::new(db_schema_name) as ArrayRef,
220                    Arc::new(table_name) as ArrayRef,
221                    Arc::new(table_type) as ArrayRef,
222                    Arc::new(table_schema) as ArrayRef,
223                ],
224            )
225        } else {
226            RecordBatch::try_new(
227                // schema is different if table_schema is none
228                schema,
229                vec![
230                    Arc::new(catalog_name) as ArrayRef,
231                    Arc::new(db_schema_name) as ArrayRef,
232                    Arc::new(table_name) as ArrayRef,
233                    Arc::new(table_type) as ArrayRef,
234                ],
235            )
236        }?;
237
238        // `AND` any filters together
239        let mut total_filter = None;
240        while let Some(filter) = filters.pop() {
241            let new_filter = match total_filter {
242                Some(total_filter) => and(&total_filter, &filter)?,
243                None => filter,
244            };
245            total_filter = Some(new_filter);
246        }
247
248        // Apply the filters if needed
249        let filtered_batch = if let Some(total_filter) = total_filter {
250            filter_record_batch(&batch, &total_filter)?
251        } else {
252            batch
253        };
254
255        // Order filtered results by catalog_name, then db_schema_name, then table_name, then table_type
256        // https://github.com/apache/arrow/blob/130f9e981aa98c25de5f5bfe55185db270cec313/format/FlightSql.proto#LL1202C1-L1202C1
257        let sort_cols = filtered_batch.project(&[0, 1, 2, 3])?;
258        let indices = lexsort_to_indices(sort_cols.columns());
259        let columns = filtered_batch
260            .columns()
261            .iter()
262            .map(|c| take(c, &indices, None))
263            .collect::<std::result::Result<Vec<_>, _>>()?;
264
265        Ok(RecordBatch::try_new(filtered_batch.schema(), columns)?)
266    }
267
268    /// Return the schema of the RecordBatch that will be returned from [`CommandGetTables`]
269    ///
270    /// Note the schema differs based on the values of `include_schema
271    ///
272    /// [`CommandGetTables`]: crate::sql::CommandGetTables
273    pub fn schema(&self) -> SchemaRef {
274        get_tables_schema(self.include_schema())
275    }
276
277    /// Should the "schema" column be included
278    pub fn include_schema(&self) -> bool {
279        self.table_schema.is_some()
280    }
281}
282
283fn get_tables_schema(include_schema: bool) -> SchemaRef {
284    if include_schema {
285        Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA)
286    } else {
287        Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA)
288    }
289}
290
291/// The schema for GetTables without `table_schema` column
292static GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
293    Arc::new(Schema::new(vec![
294        Field::new("catalog_name", DataType::Utf8, false),
295        Field::new("db_schema_name", DataType::Utf8, false),
296        Field::new("table_name", DataType::Utf8, false),
297        Field::new("table_type", DataType::Utf8, false),
298    ]))
299});
300
301/// The schema for GetTables with `table_schema` column
302static GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
303    Arc::new(Schema::new(vec![
304        Field::new("catalog_name", DataType::Utf8, false),
305        Field::new("db_schema_name", DataType::Utf8, false),
306        Field::new("table_name", DataType::Utf8, false),
307        Field::new("table_type", DataType::Utf8, false),
308        Field::new("table_schema", DataType::Binary, false),
309    ]))
310});
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use arrow_array::{StringArray, UInt32Array};
316
317    fn get_ref_batch() -> RecordBatch {
318        RecordBatch::try_new(
319            get_tables_schema(false),
320            vec![
321                Arc::new(StringArray::from(vec![
322                    "a_catalog",
323                    "a_catalog",
324                    "a_catalog",
325                    "a_catalog",
326                    "b_catalog",
327                    "b_catalog",
328                    "b_catalog",
329                    "b_catalog",
330                ])) as ArrayRef,
331                Arc::new(StringArray::from(vec![
332                    "a_schema", "a_schema", "b_schema", "b_schema", "a_schema", "a_schema",
333                    "b_schema", "b_schema",
334                ])) as ArrayRef,
335                Arc::new(StringArray::from(vec![
336                    "a_table", "b_table", "a_table", "b_table", "a_table", "a_table", "b_table",
337                    "b_table",
338                ])) as ArrayRef,
339                Arc::new(StringArray::from(vec![
340                    "TABLE", "TABLE", "TABLE", "TABLE", "TABLE", "VIEW", "TABLE", "VIEW",
341                ])) as ArrayRef,
342            ],
343        )
344        .unwrap()
345    }
346
347    fn get_ref_builder(
348        catalog: Option<&str>,
349        db_schema_filter_pattern: Option<&str>,
350        table_name_filter_pattern: Option<&str>,
351        table_types: Vec<&str>,
352        include_schema: bool,
353    ) -> GetTablesBuilder {
354        let dummy_schema = Schema::empty();
355        let tables = [
356            ("a_catalog", "a_schema", "a_table", "TABLE"),
357            ("a_catalog", "a_schema", "b_table", "TABLE"),
358            ("a_catalog", "b_schema", "a_table", "TABLE"),
359            ("a_catalog", "b_schema", "b_table", "TABLE"),
360            ("b_catalog", "a_schema", "a_table", "TABLE"),
361            ("b_catalog", "a_schema", "a_table", "VIEW"),
362            ("b_catalog", "b_schema", "b_table", "TABLE"),
363            ("b_catalog", "b_schema", "b_table", "VIEW"),
364        ];
365        let mut builder = GetTablesBuilder::new(
366            catalog,
367            db_schema_filter_pattern,
368            table_name_filter_pattern,
369            table_types,
370            include_schema,
371        );
372        for (catalog_name, schema_name, table_name, table_type) in tables {
373            builder
374                .append(
375                    catalog_name,
376                    schema_name,
377                    table_name,
378                    table_type,
379                    &dummy_schema,
380                )
381                .unwrap();
382        }
383        builder
384    }
385
386    #[test]
387    fn test_tables_are_filtered() {
388        let ref_batch = get_ref_batch();
389
390        let builder = get_ref_builder(None, None, None, Vec::new(), false);
391        let table_batch = builder.build().unwrap();
392        assert_eq!(table_batch, ref_batch);
393
394        let builder = get_ref_builder(None, Some("a%"), Some("a%"), Vec::new(), false);
395        let table_batch = builder.build().unwrap();
396        let indices = UInt32Array::from(vec![0, 4, 5]);
397        let ref_filtered = RecordBatch::try_new(
398            get_tables_schema(false),
399            ref_batch
400                .columns()
401                .iter()
402                .map(|c| take(c, &indices, None))
403                .collect::<std::result::Result<Vec<_>, _>>()
404                .unwrap(),
405        )
406        .unwrap();
407        assert_eq!(table_batch, ref_filtered);
408
409        let builder = get_ref_builder(Some("a_catalog"), None, None, Vec::new(), false);
410        let table_batch = builder.build().unwrap();
411        let indices = UInt32Array::from(vec![0, 1, 2, 3]);
412        let ref_filtered = RecordBatch::try_new(
413            get_tables_schema(false),
414            ref_batch
415                .columns()
416                .iter()
417                .map(|c| take(c, &indices, None))
418                .collect::<std::result::Result<Vec<_>, _>>()
419                .unwrap(),
420        )
421        .unwrap();
422        assert_eq!(table_batch, ref_filtered);
423
424        let builder = get_ref_builder(None, None, None, vec!["VIEW"], false);
425        let table_batch = builder.build().unwrap();
426        let indices = UInt32Array::from(vec![5, 7]);
427        let ref_filtered = RecordBatch::try_new(
428            get_tables_schema(false),
429            ref_batch
430                .columns()
431                .iter()
432                .map(|c| take(c, &indices, None))
433                .collect::<std::result::Result<Vec<_>, _>>()
434                .unwrap(),
435        )
436        .unwrap();
437        assert_eq!(table_batch, ref_filtered);
438    }
439
440    #[test]
441    fn test_tables_are_sorted() {
442        let ref_batch = get_ref_batch();
443        let dummy_schema = Schema::empty();
444
445        let tables = [
446            ("b_catalog", "a_schema", "a_table", "TABLE"),
447            ("b_catalog", "b_schema", "b_table", "TABLE"),
448            ("b_catalog", "b_schema", "b_table", "VIEW"),
449            ("b_catalog", "a_schema", "a_table", "VIEW"),
450            ("a_catalog", "a_schema", "a_table", "TABLE"),
451            ("a_catalog", "b_schema", "a_table", "TABLE"),
452            ("a_catalog", "b_schema", "b_table", "TABLE"),
453            ("a_catalog", "a_schema", "b_table", "TABLE"),
454        ];
455        let mut builder = GetTablesBuilder::new(
456            None::<String>,
457            None::<String>,
458            None::<String>,
459            None::<String>,
460            false,
461        );
462        for (catalog_name, schema_name, table_name, table_type) in tables {
463            builder
464                .append(
465                    catalog_name,
466                    schema_name,
467                    table_name,
468                    table_type,
469                    &dummy_schema,
470                )
471                .unwrap();
472        }
473        let table_batch = builder.build().unwrap();
474        assert_eq!(table_batch, ref_batch);
475    }
476}