1use std::collections::{BTreeMap, HashMap};
28use std::sync::Arc;
29
30use arrow_arith::boolean::or;
31use arrow_array::array::{Array, UInt32Array, UnionArray};
32use arrow_array::builder::{
33 ArrayBuilder, BooleanBuilder, Int8Builder, Int32Builder, Int64Builder, ListBuilder, MapBuilder,
34 StringBuilder, UInt32Builder,
35};
36use arrow_array::{RecordBatch, Scalar};
37use arrow_data::ArrayData;
38use arrow_ord::cmp::eq;
39use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, UnionFields, UnionMode};
40use arrow_select::filter::filter_record_batch;
41use once_cell::sync::Lazy;
42
43use crate::error::Result;
44use crate::sql::{CommandGetSqlInfo, SqlInfo};
45
46#[derive(Debug, Clone, PartialEq)]
48pub enum SqlInfoValue {
49 String(String),
50 Bool(bool),
51 BigInt(i64),
52 Bitmask(i32),
53 StringList(Vec<String>),
54 ListMap(BTreeMap<i32, Vec<i32>>),
55}
56
57impl From<&str> for SqlInfoValue {
58 fn from(value: &str) -> Self {
59 Self::String(value.to_string())
60 }
61}
62
63impl From<bool> for SqlInfoValue {
64 fn from(value: bool) -> Self {
65 Self::Bool(value)
66 }
67}
68
69impl From<i32> for SqlInfoValue {
70 fn from(value: i32) -> Self {
71 Self::Bitmask(value)
72 }
73}
74
75impl From<i64> for SqlInfoValue {
76 fn from(value: i64) -> Self {
77 Self::BigInt(value)
78 }
79}
80
81impl From<&[&str]> for SqlInfoValue {
82 fn from(values: &[&str]) -> Self {
83 let values = values.iter().map(|s| s.to_string()).collect();
84 Self::StringList(values)
85 }
86}
87
88impl From<Vec<String>> for SqlInfoValue {
89 fn from(values: Vec<String>) -> Self {
90 Self::StringList(values)
91 }
92}
93
94impl From<BTreeMap<i32, Vec<i32>>> for SqlInfoValue {
95 fn from(value: BTreeMap<i32, Vec<i32>>) -> Self {
96 Self::ListMap(value)
97 }
98}
99
100impl From<HashMap<i32, Vec<i32>>> for SqlInfoValue {
101 fn from(value: HashMap<i32, Vec<i32>>) -> Self {
102 Self::ListMap(value.into_iter().collect())
103 }
104}
105
106impl From<&HashMap<i32, Vec<i32>>> for SqlInfoValue {
107 fn from(value: &HashMap<i32, Vec<i32>>) -> Self {
108 Self::ListMap(
109 value
110 .iter()
111 .map(|(k, v)| (k.to_owned(), v.to_owned()))
112 .collect(),
113 )
114 }
115}
116
117pub trait SqlInfoName {
119 fn as_u32(&self) -> u32;
120}
121
122impl SqlInfoName for SqlInfo {
123 fn as_u32(&self) -> u32 {
124 u32::try_from(i32::from(*self)).expect("SqlInfo fit into u32")
127 }
128}
129
130impl SqlInfoName for u32 {
132 fn as_u32(&self) -> u32 {
133 *self
134 }
135}
136
137struct SqlInfoUnionBuilder {
154 string_values: StringBuilder,
156 bool_values: BooleanBuilder,
157 bigint_values: Int64Builder,
158 int32_bitmask_values: Int32Builder,
159 string_list_values: ListBuilder<StringBuilder>,
160 int32_to_int32_list_map_values: MapBuilder<Int32Builder, ListBuilder<Int32Builder>>,
161 type_ids: Int8Builder,
162 offsets: Int32Builder,
163}
164
165static UNION_TYPE: Lazy<DataType> = Lazy::new(|| {
167 let fields = vec![
168 Field::new("string_value", DataType::Utf8, false),
169 Field::new("bool_value", DataType::Boolean, false),
170 Field::new("bigint_value", DataType::Int64, false),
171 Field::new("int32_bitmask", DataType::Int32, false),
172 Field::new(
174 "string_list",
175 DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
176 true,
177 ),
178 Field::new(
179 "int32_to_int32_list_map",
180 DataType::Map(
181 Arc::new(Field::new(
182 "entries",
183 DataType::Struct(Fields::from(vec![
184 Field::new("keys", DataType::Int32, false),
185 Field::new(
186 "values",
187 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
188 true,
189 ),
190 ])),
191 false,
192 )),
193 false,
194 ),
195 true,
196 ),
197 ];
198
199 DataType::Union(UnionFields::from_fields(fields), UnionMode::Dense)
200});
201
202impl SqlInfoUnionBuilder {
203 pub fn new() -> Self {
204 Self {
205 string_values: StringBuilder::new(),
206 bool_values: BooleanBuilder::new(),
207 bigint_values: Int64Builder::new(),
208 int32_bitmask_values: Int32Builder::new(),
209 string_list_values: ListBuilder::new(StringBuilder::new()),
210 int32_to_int32_list_map_values: MapBuilder::new(
211 None,
212 Int32Builder::new(),
213 ListBuilder::new(Int32Builder::new()),
214 ),
215 type_ids: Int8Builder::new(),
216 offsets: Int32Builder::new(),
217 }
218 }
219
220 pub fn schema() -> &'static DataType {
222 &UNION_TYPE
223 }
224
225 pub fn append_value(&mut self, v: &SqlInfoValue) -> Result<()> {
227 let (type_id, len) = match v {
230 SqlInfoValue::String(v) => {
231 self.string_values.append_value(v);
232 (0, self.string_values.len())
233 }
234 SqlInfoValue::Bool(v) => {
235 self.bool_values.append_value(*v);
236 (1, self.bool_values.len())
237 }
238 SqlInfoValue::BigInt(v) => {
239 self.bigint_values.append_value(*v);
240 (2, self.bigint_values.len())
241 }
242 SqlInfoValue::Bitmask(v) => {
243 self.int32_bitmask_values.append_value(*v);
244 (3, self.int32_bitmask_values.len())
245 }
246 SqlInfoValue::StringList(values) => {
247 for v in values {
249 self.string_list_values.values().append_value(v);
250 }
251 self.string_list_values.append(true);
253 (4, self.string_list_values.len())
254 }
255 SqlInfoValue::ListMap(values) => {
256 for (k, v) in values.clone() {
258 self.int32_to_int32_list_map_values.keys().append_value(k);
259 self.int32_to_int32_list_map_values
260 .values()
261 .append_value(v.into_iter().map(Some));
262 }
263 self.int32_to_int32_list_map_values.append(true)?;
265 (5, self.int32_to_int32_list_map_values.len())
266 }
267 };
268
269 self.type_ids.append_value(type_id);
270 let len = i32::try_from(len).expect("offset fit in i32");
271 self.offsets.append_value(len - 1);
272 Ok(())
273 }
274
275 pub fn finish(self) -> UnionArray {
277 let Self {
278 mut string_values,
279 mut bool_values,
280 mut bigint_values,
281 mut int32_bitmask_values,
282 mut string_list_values,
283 mut int32_to_int32_list_map_values,
284 mut type_ids,
285 mut offsets,
286 } = self;
287 let type_ids = type_ids.finish();
288 let offsets = offsets.finish();
289
290 let len = offsets.len();
293 let null_bit_buffer = None;
294 let offset = 0;
295
296 let buffers = vec![
297 type_ids.into_data().buffers()[0].clone(),
298 offsets.into_data().buffers()[0].clone(),
299 ];
300
301 let child_data = vec![
302 string_values.finish().into_data(),
303 bool_values.finish().into_data(),
304 bigint_values.finish().into_data(),
305 int32_bitmask_values.finish().into_data(),
306 string_list_values.finish().into_data(),
307 int32_to_int32_list_map_values.finish().into_data(),
308 ];
309
310 let data = ArrayData::try_new(
311 UNION_TYPE.clone(),
312 len,
313 null_bit_buffer,
314 offset,
315 buffers,
316 child_data,
317 )
318 .expect("Correctly created UnionArray");
319
320 UnionArray::from(data)
321 }
322}
323
324#[derive(Debug, Clone, PartialEq, Default)]
332pub struct SqlInfoDataBuilder {
333 infos: BTreeMap<u32, SqlInfoValue>,
339}
340
341impl SqlInfoDataBuilder {
342 pub fn new() -> Self {
344 Self::default()
345 }
346
347 pub fn append(&mut self, name: impl SqlInfoName, value: impl Into<SqlInfoValue>) {
349 self.infos.insert(name.as_u32(), value.into());
350 }
351
352 pub fn build(self) -> Result<SqlInfoData> {
356 let mut name_builder = UInt32Builder::new();
357 let mut value_builder = SqlInfoUnionBuilder::new();
358
359 let mut names: Vec<_> = self.infos.keys().cloned().collect();
360 names.sort_unstable();
361
362 for key in names {
363 let (name, value) = self.infos.get_key_value(&key).unwrap();
364 name_builder.append_value(*name);
365 value_builder.append_value(value)?
366 }
367
368 let batch = RecordBatch::try_from_iter(vec![
369 ("info_name", Arc::new(name_builder.finish()) as _),
370 ("value", Arc::new(value_builder.finish()) as _),
371 ])?;
372
373 Ok(SqlInfoData { batch })
374 }
375
376 pub fn schema() -> &'static Schema {
378 &SQL_INFO_SCHEMA
380 }
381}
382
383pub struct SqlInfoData {
403 batch: RecordBatch,
404}
405
406impl SqlInfoData {
407 pub fn record_batch(&self, info: impl IntoIterator<Item = u32>) -> Result<RecordBatch> {
410 let arr = self.batch.column(0);
411 let type_filter = info
412 .into_iter()
413 .map(|tt| {
414 let s = UInt32Array::from(vec![tt]);
415 eq(arr, &Scalar::new(&s))
416 })
417 .collect::<std::result::Result<Vec<_>, _>>()?
418 .into_iter()
419 .reduce(|filter, arr| or(&filter, &arr).unwrap());
421 if let Some(filter) = type_filter {
422 Ok(filter_record_batch(&self.batch, &filter)?)
423 } else {
424 Ok(self.batch.clone())
425 }
426 }
427
428 pub fn schema(&self) -> SchemaRef {
431 self.batch.schema()
432 }
433}
434
435pub struct GetSqlInfoBuilder<'a> {
437 info: Vec<u32>,
439 infos: &'a SqlInfoData,
440}
441
442impl CommandGetSqlInfo {
443 pub fn into_builder(self, infos: &SqlInfoData) -> GetSqlInfoBuilder<'_> {
445 GetSqlInfoBuilder {
446 info: self.info,
447 infos,
448 }
449 }
450}
451
452impl GetSqlInfoBuilder<'_> {
453 pub fn build(self) -> Result<RecordBatch> {
455 self.infos.record_batch(self.info)
456 }
457
458 pub fn schema(&self) -> SchemaRef {
461 self.infos.schema()
462 }
463}
464
465static SQL_INFO_SCHEMA: Lazy<Schema> = Lazy::new(|| {
467 Schema::new(vec![
468 Field::new("info_name", DataType::UInt32, false),
469 Field::new("value", SqlInfoUnionBuilder::schema().clone(), false),
470 ])
471});
472
473#[cfg(test)]
474mod tests {
475 use std::collections::HashMap;
476
477 use super::SqlInfoDataBuilder;
478 use crate::sql::metadata::tests::assert_batches_eq;
479 use crate::sql::{SqlInfo, SqlNullOrdering, SqlSupportedTransaction, SqlSupportsConvert};
480
481 #[test]
482 fn test_sql_infos() {
483 let mut convert: HashMap<i32, Vec<i32>> = HashMap::new();
484 convert.insert(
485 SqlSupportsConvert::SqlConvertInteger as i32,
486 vec![
487 SqlSupportsConvert::SqlConvertFloat as i32,
488 SqlSupportsConvert::SqlConvertReal as i32,
489 ],
490 );
491
492 let mut builder = SqlInfoDataBuilder::new();
493 builder.append(SqlInfo::SqlIdentifierQuoteChar, r#"""#);
495 builder.append(SqlInfo::SqlDdlCatalog, false);
497 builder.append(
499 SqlInfo::SqlNullOrdering,
500 SqlNullOrdering::SqlNullsSortedHigh as i32,
501 );
502 builder.append(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64);
504 builder.append(SqlInfo::SqlKeywords, &["SELECT", "DELETE"] as &[&str]);
506 builder.append(SqlInfo::SqlSupportsConvert, &convert);
507
508 let batch = builder.build().unwrap().record_batch(None).unwrap();
509
510 let expected = vec![
511 "+-----------+----------------------------------------+",
512 "| info_name | value |",
513 "+-----------+----------------------------------------+",
514 "| 500 | {bool_value=false} |",
515 "| 504 | {string_value=\"} |",
516 "| 507 | {int32_bitmask=0} |",
517 "| 508 | {string_list=[SELECT, DELETE]} |",
518 "| 517 | {int32_to_int32_list_map={7: [6, 13]}} |",
519 "| 541 | {bigint_value=2147483647} |",
520 "+-----------+----------------------------------------+",
521 ];
522
523 assert_batches_eq(&[batch], &expected);
524 }
525
526 #[test]
527 fn test_filter_sql_infos() {
528 let mut builder = SqlInfoDataBuilder::new();
529 builder.append(SqlInfo::FlightSqlServerName, "server name");
530 builder.append(
531 SqlInfo::FlightSqlServerTransaction,
532 SqlSupportedTransaction::Transaction as i32,
533 );
534 let data = builder.build().unwrap();
535
536 let batch = data.record_batch(None).unwrap();
537 assert_eq!(batch.num_rows(), 2);
538
539 let batch = data
540 .record_batch([SqlInfo::FlightSqlServerTransaction as u32])
541 .unwrap();
542 let mut ref_builder = SqlInfoDataBuilder::new();
543 ref_builder.append(
544 SqlInfo::FlightSqlServerTransaction,
545 SqlSupportedTransaction::Transaction as i32,
546 );
547 let ref_batch = ref_builder.build().unwrap().record_batch(None).unwrap();
548
549 assert_eq!(batch, ref_batch);
550 }
551}