1use std::collections::{BTreeMap, HashMap};
28use std::sync::Arc;
29
30use arrow_arith::boolean::or;
31use arrow_array::array::{Array, UInt32Array, UnionArray};
32use arrow_array::builder::{
33 ArrayBuilder, BooleanBuilder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, MapBuilder,
34 StringBuilder, UInt32Builder,
35};
36use arrow_array::{RecordBatch, Scalar};
37use arrow_data::ArrayData;
38use arrow_ord::cmp::eq;
39use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, UnionFields, UnionMode};
40use arrow_select::filter::filter_record_batch;
41use once_cell::sync::Lazy;
42
43use crate::error::Result;
44use crate::sql::{CommandGetSqlInfo, SqlInfo};
45
46#[derive(Debug, Clone, PartialEq)]
48pub enum SqlInfoValue {
49 String(String),
50 Bool(bool),
51 BigInt(i64),
52 Bitmask(i32),
53 StringList(Vec<String>),
54 ListMap(BTreeMap<i32, Vec<i32>>),
55}
56
57impl From<&str> for SqlInfoValue {
58 fn from(value: &str) -> Self {
59 Self::String(value.to_string())
60 }
61}
62
63impl From<bool> for SqlInfoValue {
64 fn from(value: bool) -> Self {
65 Self::Bool(value)
66 }
67}
68
69impl From<i32> for SqlInfoValue {
70 fn from(value: i32) -> Self {
71 Self::Bitmask(value)
72 }
73}
74
75impl From<i64> for SqlInfoValue {
76 fn from(value: i64) -> Self {
77 Self::BigInt(value)
78 }
79}
80
81impl From<&[&str]> for SqlInfoValue {
82 fn from(values: &[&str]) -> Self {
83 let values = values.iter().map(|s| s.to_string()).collect();
84 Self::StringList(values)
85 }
86}
87
88impl From<Vec<String>> for SqlInfoValue {
89 fn from(values: Vec<String>) -> Self {
90 Self::StringList(values)
91 }
92}
93
94impl From<BTreeMap<i32, Vec<i32>>> for SqlInfoValue {
95 fn from(value: BTreeMap<i32, Vec<i32>>) -> Self {
96 Self::ListMap(value)
97 }
98}
99
100impl From<HashMap<i32, Vec<i32>>> for SqlInfoValue {
101 fn from(value: HashMap<i32, Vec<i32>>) -> Self {
102 Self::ListMap(value.into_iter().collect())
103 }
104}
105
106impl From<&HashMap<i32, Vec<i32>>> for SqlInfoValue {
107 fn from(value: &HashMap<i32, Vec<i32>>) -> Self {
108 Self::ListMap(
109 value
110 .iter()
111 .map(|(k, v)| (k.to_owned(), v.to_owned()))
112 .collect(),
113 )
114 }
115}
116
117pub trait SqlInfoName {
119 fn as_u32(&self) -> u32;
120}
121
122impl SqlInfoName for SqlInfo {
123 fn as_u32(&self) -> u32 {
124 u32::try_from(i32::from(*self)).expect("SqlInfo fit into u32")
127 }
128}
129
130impl SqlInfoName for u32 {
132 fn as_u32(&self) -> u32 {
133 *self
134 }
135}
136
137struct SqlInfoUnionBuilder {
154 string_values: StringBuilder,
156 bool_values: BooleanBuilder,
157 bigint_values: Int64Builder,
158 int32_bitmask_values: Int32Builder,
159 string_list_values: ListBuilder<StringBuilder>,
160 int32_to_int32_list_map_values: MapBuilder<Int32Builder, ListBuilder<Int32Builder>>,
161 type_ids: Int8Builder,
162 offsets: Int32Builder,
163}
164
165static UNION_TYPE: Lazy<DataType> = Lazy::new(|| {
167 let fields = vec![
168 Field::new("string_value", DataType::Utf8, false),
169 Field::new("bool_value", DataType::Boolean, false),
170 Field::new("bigint_value", DataType::Int64, false),
171 Field::new("int32_bitmask", DataType::Int32, false),
172 Field::new(
174 "string_list",
175 DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
176 true,
177 ),
178 Field::new(
179 "int32_to_int32_list_map",
180 DataType::Map(
181 Arc::new(Field::new(
182 "entries",
183 DataType::Struct(Fields::from(vec![
184 Field::new("keys", DataType::Int32, false),
185 Field::new(
186 "values",
187 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
188 true,
189 ),
190 ])),
191 false,
192 )),
193 false,
194 ),
195 true,
196 ),
197 ];
198
199 let type_ids: Vec<i8> = (0..fields.len()).map(|v| v as i8).collect();
201
202 DataType::Union(UnionFields::new(type_ids, fields), UnionMode::Dense)
203});
204
205impl SqlInfoUnionBuilder {
206 pub fn new() -> Self {
207 Self {
208 string_values: StringBuilder::new(),
209 bool_values: BooleanBuilder::new(),
210 bigint_values: Int64Builder::new(),
211 int32_bitmask_values: Int32Builder::new(),
212 string_list_values: ListBuilder::new(StringBuilder::new()),
213 int32_to_int32_list_map_values: MapBuilder::new(
214 None,
215 Int32Builder::new(),
216 ListBuilder::new(Int32Builder::new()),
217 ),
218 type_ids: Int8Builder::new(),
219 offsets: Int32Builder::new(),
220 }
221 }
222
223 pub fn schema() -> &'static DataType {
225 &UNION_TYPE
226 }
227
228 pub fn append_value(&mut self, v: &SqlInfoValue) -> Result<()> {
230 let (type_id, len) = match v {
233 SqlInfoValue::String(v) => {
234 self.string_values.append_value(v);
235 (0, self.string_values.len())
236 }
237 SqlInfoValue::Bool(v) => {
238 self.bool_values.append_value(*v);
239 (1, self.bool_values.len())
240 }
241 SqlInfoValue::BigInt(v) => {
242 self.bigint_values.append_value(*v);
243 (2, self.bigint_values.len())
244 }
245 SqlInfoValue::Bitmask(v) => {
246 self.int32_bitmask_values.append_value(*v);
247 (3, self.int32_bitmask_values.len())
248 }
249 SqlInfoValue::StringList(values) => {
250 for v in values {
252 self.string_list_values.values().append_value(v);
253 }
254 self.string_list_values.append(true);
256 (4, self.string_list_values.len())
257 }
258 SqlInfoValue::ListMap(values) => {
259 for (k, v) in values.clone() {
261 self.int32_to_int32_list_map_values.keys().append_value(k);
262 self.int32_to_int32_list_map_values
263 .values()
264 .append_value(v.into_iter().map(Some));
265 }
266 self.int32_to_int32_list_map_values.append(true)?;
268 (5, self.int32_to_int32_list_map_values.len())
269 }
270 };
271
272 self.type_ids.append_value(type_id);
273 let len = i32::try_from(len).expect("offset fit in i32");
274 self.offsets.append_value(len - 1);
275 Ok(())
276 }
277
278 pub fn finish(self) -> UnionArray {
280 let Self {
281 mut string_values,
282 mut bool_values,
283 mut bigint_values,
284 mut int32_bitmask_values,
285 mut string_list_values,
286 mut int32_to_int32_list_map_values,
287 mut type_ids,
288 mut offsets,
289 } = self;
290 let type_ids = type_ids.finish();
291 let offsets = offsets.finish();
292
293 let len = offsets.len();
296 let null_bit_buffer = None;
297 let offset = 0;
298
299 let buffers = vec![
300 type_ids.into_data().buffers()[0].clone(),
301 offsets.into_data().buffers()[0].clone(),
302 ];
303
304 let child_data = vec![
305 string_values.finish().into_data(),
306 bool_values.finish().into_data(),
307 bigint_values.finish().into_data(),
308 int32_bitmask_values.finish().into_data(),
309 string_list_values.finish().into_data(),
310 int32_to_int32_list_map_values.finish().into_data(),
311 ];
312
313 let data = ArrayData::try_new(
314 UNION_TYPE.clone(),
315 len,
316 null_bit_buffer,
317 offset,
318 buffers,
319 child_data,
320 )
321 .expect("Correctly created UnionArray");
322
323 UnionArray::from(data)
324 }
325}
326
327#[derive(Debug, Clone, PartialEq, Default)]
335pub struct SqlInfoDataBuilder {
336 infos: BTreeMap<u32, SqlInfoValue>,
342}
343
344impl SqlInfoDataBuilder {
345 pub fn new() -> Self {
347 Self::default()
348 }
349
350 pub fn append(&mut self, name: impl SqlInfoName, value: impl Into<SqlInfoValue>) {
352 self.infos.insert(name.as_u32(), value.into());
353 }
354
355 pub fn build(self) -> Result<SqlInfoData> {
359 let mut name_builder = UInt32Builder::new();
360 let mut value_builder = SqlInfoUnionBuilder::new();
361
362 let mut names: Vec<_> = self.infos.keys().cloned().collect();
363 names.sort_unstable();
364
365 for key in names {
366 let (name, value) = self.infos.get_key_value(&key).unwrap();
367 name_builder.append_value(*name);
368 value_builder.append_value(value)?
369 }
370
371 let batch = RecordBatch::try_from_iter(vec![
372 ("info_name", Arc::new(name_builder.finish()) as _),
373 ("value", Arc::new(value_builder.finish()) as _),
374 ])?;
375
376 Ok(SqlInfoData { batch })
377 }
378
379 pub fn schema() -> &'static Schema {
381 &SQL_INFO_SCHEMA
383 }
384}
385
386pub struct SqlInfoData {
406 batch: RecordBatch,
407}
408
409impl SqlInfoData {
410 pub fn record_batch(&self, info: impl IntoIterator<Item = u32>) -> Result<RecordBatch> {
413 let arr = self.batch.column(0);
414 let type_filter = info
415 .into_iter()
416 .map(|tt| {
417 let s = UInt32Array::from(vec![tt]);
418 eq(arr, &Scalar::new(&s))
419 })
420 .collect::<std::result::Result<Vec<_>, _>>()?
421 .into_iter()
422 .reduce(|filter, arr| or(&filter, &arr).unwrap());
424 if let Some(filter) = type_filter {
425 Ok(filter_record_batch(&self.batch, &filter)?)
426 } else {
427 Ok(self.batch.clone())
428 }
429 }
430
431 pub fn schema(&self) -> SchemaRef {
434 self.batch.schema()
435 }
436}
437
438pub struct GetSqlInfoBuilder<'a> {
440 info: Vec<u32>,
442 infos: &'a SqlInfoData,
443}
444
445impl CommandGetSqlInfo {
446 pub fn into_builder(self, infos: &SqlInfoData) -> GetSqlInfoBuilder {
448 GetSqlInfoBuilder {
449 info: self.info,
450 infos,
451 }
452 }
453}
454
455impl GetSqlInfoBuilder<'_> {
456 pub fn build(self) -> Result<RecordBatch> {
458 self.infos.record_batch(self.info)
459 }
460
461 pub fn schema(&self) -> SchemaRef {
464 self.infos.schema()
465 }
466}
467
468static SQL_INFO_SCHEMA: Lazy<Schema> = Lazy::new(|| {
470 Schema::new(vec![
471 Field::new("info_name", DataType::UInt32, false),
472 Field::new("value", SqlInfoUnionBuilder::schema().clone(), false),
473 ])
474});
475
476#[cfg(test)]
477mod tests {
478 use std::collections::HashMap;
479
480 use super::SqlInfoDataBuilder;
481 use crate::sql::metadata::tests::assert_batches_eq;
482 use crate::sql::{SqlInfo, SqlNullOrdering, SqlSupportedTransaction, SqlSupportsConvert};
483
484 #[test]
485 fn test_sql_infos() {
486 let mut convert: HashMap<i32, Vec<i32>> = HashMap::new();
487 convert.insert(
488 SqlSupportsConvert::SqlConvertInteger as i32,
489 vec![
490 SqlSupportsConvert::SqlConvertFloat as i32,
491 SqlSupportsConvert::SqlConvertReal as i32,
492 ],
493 );
494
495 let mut builder = SqlInfoDataBuilder::new();
496 builder.append(SqlInfo::SqlIdentifierQuoteChar, r#"""#);
498 builder.append(SqlInfo::SqlDdlCatalog, false);
500 builder.append(
502 SqlInfo::SqlNullOrdering,
503 SqlNullOrdering::SqlNullsSortedHigh as i32,
504 );
505 builder.append(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64);
507 builder.append(SqlInfo::SqlKeywords, &["SELECT", "DELETE"] as &[&str]);
509 builder.append(SqlInfo::SqlSupportsConvert, &convert);
510
511 let batch = builder.build().unwrap().record_batch(None).unwrap();
512
513 let expected = vec![
514 "+-----------+----------------------------------------+",
515 "| info_name | value |",
516 "+-----------+----------------------------------------+",
517 "| 500 | {bool_value=false} |",
518 "| 504 | {string_value=\"} |",
519 "| 507 | {int32_bitmask=0} |",
520 "| 508 | {string_list=[SELECT, DELETE]} |",
521 "| 517 | {int32_to_int32_list_map={7: [6, 13]}} |",
522 "| 541 | {bigint_value=2147483647} |",
523 "+-----------+----------------------------------------+",
524 ];
525
526 assert_batches_eq(&[batch], &expected);
527 }
528
529 #[test]
530 fn test_filter_sql_infos() {
531 let mut builder = SqlInfoDataBuilder::new();
532 builder.append(SqlInfo::FlightSqlServerName, "server name");
533 builder.append(
534 SqlInfo::FlightSqlServerTransaction,
535 SqlSupportedTransaction::Transaction as i32,
536 );
537 let data = builder.build().unwrap();
538
539 let batch = data.record_batch(None).unwrap();
540 assert_eq!(batch.num_rows(), 2);
541
542 let batch = data
543 .record_batch([SqlInfo::FlightSqlServerTransaction as u32])
544 .unwrap();
545 let mut ref_builder = SqlInfoDataBuilder::new();
546 ref_builder.append(
547 SqlInfo::FlightSqlServerTransaction,
548 SqlSupportedTransaction::Transaction as i32,
549 );
550 let ref_batch = ref_builder.build().unwrap().record_batch(None).unwrap();
551
552 assert_eq!(batch, ref_batch);
553 }
554}