arrow_flight/sql/metadata/
db_schemas.rs1use std::sync::Arc;
23
24use arrow_arith::boolean::and;
25use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch, StringArray};
26use arrow_ord::cmp::eq;
27use arrow_schema::{DataType, Field, Schema, SchemaRef};
28use arrow_select::{filter::filter_record_batch, take::take};
29use arrow_string::like::like;
30use once_cell::sync::Lazy;
31
32use super::lexsort_to_indices;
33use crate::error::*;
34use crate::sql::CommandGetDbSchemas;
35
36pub struct GetDbSchemasBuilder {
43 catalog_filter: Option<String>,
47 db_schema_filter_pattern: Option<String>,
49 catalog_name: StringBuilder,
51 db_schema_name: StringBuilder,
53}
54
55impl CommandGetDbSchemas {
56 pub fn into_builder(self) -> GetDbSchemasBuilder {
58 self.into()
59 }
60}
61
62impl From<CommandGetDbSchemas> for GetDbSchemasBuilder {
63 fn from(value: CommandGetDbSchemas) -> Self {
64 Self::new(value.catalog, value.db_schema_filter_pattern)
65 }
66}
67
68impl GetDbSchemasBuilder {
69 pub fn new(
84 catalog: Option<impl Into<String>>,
85 db_schema_filter_pattern: Option<impl Into<String>>,
86 ) -> Self {
87 Self {
88 catalog_filter: catalog.map(|v| v.into()),
89 db_schema_filter_pattern: db_schema_filter_pattern.map(|v| v.into()),
90 catalog_name: StringBuilder::new(),
91 db_schema_name: StringBuilder::new(),
92 }
93 }
94
95 pub fn append(&mut self, catalog_name: impl AsRef<str>, schema_name: impl AsRef<str>) {
99 self.catalog_name.append_value(catalog_name);
100 self.db_schema_name.append_value(schema_name);
101 }
102
103 pub fn build(self) -> Result<RecordBatch> {
105 let schema = self.schema();
106 let Self {
107 catalog_filter,
108 db_schema_filter_pattern,
109 mut catalog_name,
110 mut db_schema_name,
111 } = self;
112
113 let catalog_name = catalog_name.finish();
115 let db_schema_name = db_schema_name.finish();
116
117 let mut filters = vec![];
118
119 if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
120 let scalar = StringArray::new_scalar(db_schema_filter_pattern);
122 filters.push(like(&db_schema_name, &scalar)?)
123 }
124
125 if let Some(catalog_filter_name) = catalog_filter {
126 let scalar = StringArray::new_scalar(catalog_filter_name);
127 filters.push(eq(&catalog_name, &scalar)?);
128 }
129
130 let mut total_filter = None;
132 while let Some(filter) = filters.pop() {
133 let new_filter = match total_filter {
134 Some(total_filter) => and(&total_filter, &filter)?,
135 None => filter,
136 };
137 total_filter = Some(new_filter);
138 }
139
140 let batch = RecordBatch::try_new(
141 schema,
142 vec![
143 Arc::new(catalog_name) as ArrayRef,
144 Arc::new(db_schema_name) as ArrayRef,
145 ],
146 )?;
147
148 let filtered_batch = if let Some(filter) = total_filter {
150 filter_record_batch(&batch, &filter)?
151 } else {
152 batch
153 };
154
155 let indices = lexsort_to_indices(filtered_batch.columns());
157 let columns = filtered_batch
158 .columns()
159 .iter()
160 .map(|c| take(c, &indices, None))
161 .collect::<std::result::Result<Vec<_>, _>>()?;
162
163 Ok(RecordBatch::try_new(filtered_batch.schema(), columns)?)
164 }
165
166 pub fn schema(&self) -> SchemaRef {
169 get_db_schemas_schema()
170 }
171}
172
173fn get_db_schemas_schema() -> SchemaRef {
174 Arc::clone(&GET_DB_SCHEMAS_SCHEMA)
175}
176
177static GET_DB_SCHEMAS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
179 Arc::new(Schema::new(vec![
180 Field::new("catalog_name", DataType::Utf8, false),
181 Field::new("db_schema_name", DataType::Utf8, false),
182 ]))
183});
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use arrow_array::{StringArray, UInt32Array};
189
190 fn get_ref_batch() -> RecordBatch {
191 RecordBatch::try_new(
192 get_db_schemas_schema(),
193 vec![
194 Arc::new(StringArray::from(vec![
195 "a_catalog",
196 "a_catalog",
197 "b_catalog",
198 "b_catalog",
199 ])) as ArrayRef,
200 Arc::new(StringArray::from(vec![
201 "a_schema", "b_schema", "a_schema", "b_schema",
202 ])) as ArrayRef,
203 ],
204 )
205 .unwrap()
206 }
207
208 #[test]
209 fn test_schemas_are_filtered() {
210 let ref_batch = get_ref_batch();
211
212 let mut builder = GetDbSchemasBuilder::new(None::<String>, None::<String>);
213 builder.append("a_catalog", "a_schema");
214 builder.append("a_catalog", "b_schema");
215 builder.append("b_catalog", "a_schema");
216 builder.append("b_catalog", "b_schema");
217 let schema_batch = builder.build().unwrap();
218
219 assert_eq!(schema_batch, ref_batch);
220
221 let mut builder = GetDbSchemasBuilder::new(None::<String>, Some("a%"));
222 builder.append("a_catalog", "a_schema");
223 builder.append("a_catalog", "b_schema");
224 builder.append("b_catalog", "a_schema");
225 builder.append("b_catalog", "b_schema");
226 let schema_batch = builder.build().unwrap();
227
228 let indices = UInt32Array::from(vec![0, 2]);
229 let ref_filtered = RecordBatch::try_new(
230 get_db_schemas_schema(),
231 ref_batch
232 .columns()
233 .iter()
234 .map(|c| take(c, &indices, None))
235 .collect::<std::result::Result<Vec<_>, _>>()
236 .unwrap(),
237 )
238 .unwrap();
239
240 assert_eq!(schema_batch, ref_filtered);
241 }
242
243 #[test]
244 fn test_schemas_are_sorted() {
245 let ref_batch = get_ref_batch();
246
247 let mut builder = GetDbSchemasBuilder::new(None::<String>, None::<String>);
248 builder.append("a_catalog", "b_schema");
249 builder.append("b_catalog", "a_schema");
250 builder.append("a_catalog", "a_schema");
251 builder.append("b_catalog", "b_schema");
252 let schema_batch = builder.build().unwrap();
253
254 assert_eq!(schema_batch, ref_batch)
255 }
256
257 #[test]
258 fn test_builder_from_query() {
259 let ref_batch = get_ref_batch();
260 let query = CommandGetDbSchemas {
261 catalog: Some("a_catalog".into()),
262 db_schema_filter_pattern: Some("b%".into()),
263 };
264
265 let mut builder = query.into_builder();
266 builder.append("a_catalog", "a_schema");
267 builder.append("a_catalog", "b_schema");
268 builder.append("b_catalog", "a_schema");
269 builder.append("b_catalog", "b_schema");
270 let schema_batch = builder.build().unwrap();
271
272 let indices = UInt32Array::from(vec![1]);
273 let ref_filtered = RecordBatch::try_new(
274 get_db_schemas_schema(),
275 ref_batch
276 .columns()
277 .iter()
278 .map(|c| take(c, &indices, None))
279 .collect::<std::result::Result<Vec<_>, _>>()
280 .unwrap(),
281 )
282 .unwrap();
283
284 assert_eq!(schema_batch, ref_filtered);
285 }
286}