1use std::sync::Arc;
23
24use arrow_arith::boolean::{and, or};
25use arrow_array::builder::{BinaryBuilder, StringBuilder};
26use arrow_array::{ArrayRef, RecordBatch, StringArray};
27use arrow_ord::cmp::eq;
28use arrow_schema::{DataType, Field, Schema, SchemaRef};
29use arrow_select::{filter::filter_record_batch, take::take};
30use arrow_string::like::like;
31use once_cell::sync::Lazy;
32
33use super::lexsort_to_indices;
34use crate::error::*;
35use crate::sql::CommandGetTables;
36use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc};
37
38pub struct GetTablesBuilder {
49 catalog_filter: Option<String>,
50 table_types_filter: Vec<String>,
51 db_schema_filter_pattern: Option<String>,
53 table_name_filter_pattern: Option<String>,
55 catalog_name: StringBuilder,
57 db_schema_name: StringBuilder,
59 table_name: StringBuilder,
61 table_type: StringBuilder,
63 table_schema: Option<BinaryBuilder>,
65}
66
67impl CommandGetTables {
68 pub fn into_builder(self) -> GetTablesBuilder {
70 self.into()
71 }
72}
73
74impl From<CommandGetTables> for GetTablesBuilder {
75 fn from(value: CommandGetTables) -> Self {
76 Self::new(
77 value.catalog,
78 value.db_schema_filter_pattern,
79 value.table_name_filter_pattern,
80 value.table_types,
81 value.include_schema,
82 )
83 }
84}
85
86impl GetTablesBuilder {
87 pub fn new(
110 catalog: Option<impl Into<String>>,
111 db_schema_filter_pattern: Option<impl Into<String>>,
112 table_name_filter_pattern: Option<impl Into<String>>,
113 table_types: impl IntoIterator<Item = impl Into<String>>,
114 include_schema: bool,
115 ) -> Self {
116 let table_schema = if include_schema {
117 Some(BinaryBuilder::new())
118 } else {
119 None
120 };
121 Self {
122 catalog_filter: catalog.map(|s| s.into()),
123 table_types_filter: table_types.into_iter().map(|tt| tt.into()).collect(),
124 db_schema_filter_pattern: db_schema_filter_pattern.map(|s| s.into()),
125 table_name_filter_pattern: table_name_filter_pattern.map(|t| t.into()),
126 catalog_name: StringBuilder::new(),
127 db_schema_name: StringBuilder::new(),
128 table_name: StringBuilder::new(),
129 table_type: StringBuilder::new(),
130 table_schema,
131 }
132 }
133
134 pub fn append(
136 &mut self,
137 catalog_name: impl AsRef<str>,
138 schema_name: impl AsRef<str>,
139 table_name: impl AsRef<str>,
140 table_type: impl AsRef<str>,
141 table_schema: &Schema,
142 ) -> Result<()> {
143 self.catalog_name.append_value(catalog_name);
144 self.db_schema_name.append_value(schema_name);
145 self.table_name.append_value(table_name);
146 self.table_type.append_value(table_type);
147 if let Some(self_table_schema) = self.table_schema.as_mut() {
148 let options = IpcWriteOptions::default();
149 let message: std::result::Result<IpcMessage, _> =
151 SchemaAsIpc::new(table_schema, &options).try_into();
152 let IpcMessage(schema) = message?;
153 self_table_schema.append_value(schema);
154 }
155
156 Ok(())
157 }
158
159 pub fn build(self) -> Result<RecordBatch> {
161 let schema = self.schema();
162 let Self {
163 catalog_filter,
164 table_types_filter,
165 db_schema_filter_pattern,
166 table_name_filter_pattern,
167
168 mut catalog_name,
169 mut db_schema_name,
170 mut table_name,
171 mut table_type,
172 table_schema,
173 } = self;
174
175 let catalog_name = catalog_name.finish();
177 let db_schema_name = db_schema_name.finish();
178 let table_name = table_name.finish();
179 let table_type = table_type.finish();
180 let table_schema = table_schema.map(|mut table_schema| table_schema.finish());
181
182 let mut filters = vec![];
185
186 if let Some(catalog_filter_name) = catalog_filter {
187 let scalar = StringArray::new_scalar(catalog_filter_name);
188 filters.push(eq(&catalog_name, &scalar)?);
189 }
190
191 let tt_filter = table_types_filter
192 .into_iter()
193 .map(|tt| eq(&table_type, &StringArray::new_scalar(tt)))
194 .collect::<std::result::Result<Vec<_>, _>>()?
195 .into_iter()
196 .reduce(|filter, arr| or(&filter, &arr).unwrap());
198 if let Some(filter) = tt_filter {
199 filters.push(filter);
200 }
201
202 if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
203 let scalar = StringArray::new_scalar(db_schema_filter_pattern);
205 filters.push(like(&db_schema_name, &scalar)?)
206 }
207
208 if let Some(table_name_filter_pattern) = table_name_filter_pattern {
209 let scalar = StringArray::new_scalar(table_name_filter_pattern);
211 filters.push(like(&table_name, &scalar)?)
212 }
213
214 let batch = if let Some(table_schema) = table_schema {
215 RecordBatch::try_new(
216 schema,
217 vec![
218 Arc::new(catalog_name) as ArrayRef,
219 Arc::new(db_schema_name) as ArrayRef,
220 Arc::new(table_name) as ArrayRef,
221 Arc::new(table_type) as ArrayRef,
222 Arc::new(table_schema) as ArrayRef,
223 ],
224 )
225 } else {
226 RecordBatch::try_new(
227 schema,
229 vec![
230 Arc::new(catalog_name) as ArrayRef,
231 Arc::new(db_schema_name) as ArrayRef,
232 Arc::new(table_name) as ArrayRef,
233 Arc::new(table_type) as ArrayRef,
234 ],
235 )
236 }?;
237
238 let mut total_filter = None;
240 while let Some(filter) = filters.pop() {
241 let new_filter = match total_filter {
242 Some(total_filter) => and(&total_filter, &filter)?,
243 None => filter,
244 };
245 total_filter = Some(new_filter);
246 }
247
248 let filtered_batch = if let Some(total_filter) = total_filter {
250 filter_record_batch(&batch, &total_filter)?
251 } else {
252 batch
253 };
254
255 let sort_cols = filtered_batch.project(&[0, 1, 2, 3])?;
258 let indices = lexsort_to_indices(sort_cols.columns());
259 let columns = filtered_batch
260 .columns()
261 .iter()
262 .map(|c| take(c, &indices, None))
263 .collect::<std::result::Result<Vec<_>, _>>()?;
264
265 Ok(RecordBatch::try_new(filtered_batch.schema(), columns)?)
266 }
267
268 pub fn schema(&self) -> SchemaRef {
274 get_tables_schema(self.include_schema())
275 }
276
277 pub fn include_schema(&self) -> bool {
279 self.table_schema.is_some()
280 }
281}
282
283fn get_tables_schema(include_schema: bool) -> SchemaRef {
284 if include_schema {
285 Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA)
286 } else {
287 Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA)
288 }
289}
290
291static GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
293 Arc::new(Schema::new(vec![
294 Field::new("catalog_name", DataType::Utf8, false),
295 Field::new("db_schema_name", DataType::Utf8, false),
296 Field::new("table_name", DataType::Utf8, false),
297 Field::new("table_type", DataType::Utf8, false),
298 ]))
299});
300
301static GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
303 Arc::new(Schema::new(vec![
304 Field::new("catalog_name", DataType::Utf8, false),
305 Field::new("db_schema_name", DataType::Utf8, false),
306 Field::new("table_name", DataType::Utf8, false),
307 Field::new("table_type", DataType::Utf8, false),
308 Field::new("table_schema", DataType::Binary, false),
309 ]))
310});
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use arrow_array::{StringArray, UInt32Array};
316
317 fn get_ref_batch() -> RecordBatch {
318 RecordBatch::try_new(
319 get_tables_schema(false),
320 vec![
321 Arc::new(StringArray::from(vec![
322 "a_catalog",
323 "a_catalog",
324 "a_catalog",
325 "a_catalog",
326 "b_catalog",
327 "b_catalog",
328 "b_catalog",
329 "b_catalog",
330 ])) as ArrayRef,
331 Arc::new(StringArray::from(vec![
332 "a_schema", "a_schema", "b_schema", "b_schema", "a_schema", "a_schema",
333 "b_schema", "b_schema",
334 ])) as ArrayRef,
335 Arc::new(StringArray::from(vec![
336 "a_table", "b_table", "a_table", "b_table", "a_table", "a_table", "b_table",
337 "b_table",
338 ])) as ArrayRef,
339 Arc::new(StringArray::from(vec![
340 "TABLE", "TABLE", "TABLE", "TABLE", "TABLE", "VIEW", "TABLE", "VIEW",
341 ])) as ArrayRef,
342 ],
343 )
344 .unwrap()
345 }
346
347 fn get_ref_builder(
348 catalog: Option<&str>,
349 db_schema_filter_pattern: Option<&str>,
350 table_name_filter_pattern: Option<&str>,
351 table_types: Vec<&str>,
352 include_schema: bool,
353 ) -> GetTablesBuilder {
354 let dummy_schema = Schema::empty();
355 let tables = [
356 ("a_catalog", "a_schema", "a_table", "TABLE"),
357 ("a_catalog", "a_schema", "b_table", "TABLE"),
358 ("a_catalog", "b_schema", "a_table", "TABLE"),
359 ("a_catalog", "b_schema", "b_table", "TABLE"),
360 ("b_catalog", "a_schema", "a_table", "TABLE"),
361 ("b_catalog", "a_schema", "a_table", "VIEW"),
362 ("b_catalog", "b_schema", "b_table", "TABLE"),
363 ("b_catalog", "b_schema", "b_table", "VIEW"),
364 ];
365 let mut builder = GetTablesBuilder::new(
366 catalog,
367 db_schema_filter_pattern,
368 table_name_filter_pattern,
369 table_types,
370 include_schema,
371 );
372 for (catalog_name, schema_name, table_name, table_type) in tables {
373 builder
374 .append(
375 catalog_name,
376 schema_name,
377 table_name,
378 table_type,
379 &dummy_schema,
380 )
381 .unwrap();
382 }
383 builder
384 }
385
386 #[test]
387 fn test_tables_are_filtered() {
388 let ref_batch = get_ref_batch();
389
390 let builder = get_ref_builder(None, None, None, Vec::new(), false);
391 let table_batch = builder.build().unwrap();
392 assert_eq!(table_batch, ref_batch);
393
394 let builder = get_ref_builder(None, Some("a%"), Some("a%"), Vec::new(), false);
395 let table_batch = builder.build().unwrap();
396 let indices = UInt32Array::from(vec![0, 4, 5]);
397 let ref_filtered = RecordBatch::try_new(
398 get_tables_schema(false),
399 ref_batch
400 .columns()
401 .iter()
402 .map(|c| take(c, &indices, None))
403 .collect::<std::result::Result<Vec<_>, _>>()
404 .unwrap(),
405 )
406 .unwrap();
407 assert_eq!(table_batch, ref_filtered);
408
409 let builder = get_ref_builder(Some("a_catalog"), None, None, Vec::new(), false);
410 let table_batch = builder.build().unwrap();
411 let indices = UInt32Array::from(vec![0, 1, 2, 3]);
412 let ref_filtered = RecordBatch::try_new(
413 get_tables_schema(false),
414 ref_batch
415 .columns()
416 .iter()
417 .map(|c| take(c, &indices, None))
418 .collect::<std::result::Result<Vec<_>, _>>()
419 .unwrap(),
420 )
421 .unwrap();
422 assert_eq!(table_batch, ref_filtered);
423
424 let builder = get_ref_builder(None, None, None, vec!["VIEW"], false);
425 let table_batch = builder.build().unwrap();
426 let indices = UInt32Array::from(vec![5, 7]);
427 let ref_filtered = RecordBatch::try_new(
428 get_tables_schema(false),
429 ref_batch
430 .columns()
431 .iter()
432 .map(|c| take(c, &indices, None))
433 .collect::<std::result::Result<Vec<_>, _>>()
434 .unwrap(),
435 )
436 .unwrap();
437 assert_eq!(table_batch, ref_filtered);
438 }
439
440 #[test]
441 fn test_tables_are_sorted() {
442 let ref_batch = get_ref_batch();
443 let dummy_schema = Schema::empty();
444
445 let tables = [
446 ("b_catalog", "a_schema", "a_table", "TABLE"),
447 ("b_catalog", "b_schema", "b_table", "TABLE"),
448 ("b_catalog", "b_schema", "b_table", "VIEW"),
449 ("b_catalog", "a_schema", "a_table", "VIEW"),
450 ("a_catalog", "a_schema", "a_table", "TABLE"),
451 ("a_catalog", "b_schema", "a_table", "TABLE"),
452 ("a_catalog", "b_schema", "b_table", "TABLE"),
453 ("a_catalog", "a_schema", "b_table", "TABLE"),
454 ];
455 let mut builder = GetTablesBuilder::new(
456 None::<String>,
457 None::<String>,
458 None::<String>,
459 None::<String>,
460 false,
461 );
462 for (catalog_name, schema_name, table_name, table_type) in tables {
463 builder
464 .append(
465 catalog_name,
466 schema_name,
467 table_name,
468 table_type,
469 &dummy_schema,
470 )
471 .unwrap();
472 }
473 let table_batch = builder.build().unwrap();
474 assert_eq!(table_batch, ref_batch);
475 }
476}