1use arrow_buffer::Buffer;
21use arrow_schema::*;
22use core::panic;
23use flatbuffers::{
24 FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
25 VerifierOptions, WIPOffset,
26};
27use std::collections::HashMap;
28use std::fmt::{Debug, Formatter};
29use std::sync::Arc;
30
31use crate::writer::DictionaryTracker;
32use crate::{CONTINUATION_MARKER, KeyValue, Message};
33use DataType::*;
34
35#[derive(Debug)]
66pub struct IpcSchemaEncoder<'a> {
67 dictionary_tracker: Option<&'a mut DictionaryTracker>,
68}
69
70impl Default for IpcSchemaEncoder<'_> {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl<'a> IpcSchemaEncoder<'a> {
77 pub fn new() -> IpcSchemaEncoder<'a> {
79 IpcSchemaEncoder {
80 dictionary_tracker: None,
81 }
82 }
83
84 pub fn with_dictionary_tracker(
86 mut self,
87 dictionary_tracker: &'a mut DictionaryTracker,
88 ) -> Self {
89 self.dictionary_tracker = Some(dictionary_tracker);
90 self
91 }
92
93 pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
97 let mut fbb = FlatBufferBuilder::new();
98
99 let root = self.schema_to_fb_offset(&mut fbb, schema);
100
101 fbb.finish(root, None);
102
103 fbb
104 }
105
106 pub fn schema_to_fb_offset<'b>(
108 &mut self,
109 fbb: &mut FlatBufferBuilder<'b>,
110 schema: &Schema,
111 ) -> WIPOffset<crate::Schema<'b>> {
112 let fields = schema
113 .fields()
114 .iter()
115 .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
116 .collect::<Vec<_>>();
117 let fb_field_list = fbb.create_vector(&fields);
118
119 let fb_metadata_list =
120 (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
121
122 let mut builder = crate::SchemaBuilder::new(fbb);
123 builder.add_fields(fb_field_list);
124 if let Some(fb_metadata_list) = fb_metadata_list {
125 builder.add_custom_metadata(fb_metadata_list);
126 }
127 builder.finish()
128 }
129}
130
131pub fn metadata_to_fb<'a>(
133 fbb: &mut FlatBufferBuilder<'a>,
134 metadata: &HashMap<String, String>,
135) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
136 let mut ordered_keys = metadata.keys().collect::<Vec<_>>();
137 ordered_keys.sort();
138 let custom_metadata = ordered_keys
139 .into_iter()
140 .map(|k| {
141 let v = metadata.get(k).unwrap();
142 let fb_key_name = fbb.create_string(k);
143 let fb_val_name = fbb.create_string(v);
144
145 let mut kv_builder = crate::KeyValueBuilder::new(fbb);
146 kv_builder.add_key(fb_key_name);
147 kv_builder.add_value(fb_val_name);
148 kv_builder.finish()
149 })
150 .collect::<Vec<_>>();
151 fbb.create_vector(&custom_metadata)
152}
153
154pub fn schema_to_fb_offset<'a>(
156 fbb: &mut FlatBufferBuilder<'a>,
157 schema: &Schema,
158) -> WIPOffset<crate::Schema<'a>> {
159 IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
160}
161
162impl From<crate::Field<'_>> for Field {
164 fn from(field: crate::Field) -> Field {
165 let arrow_field = if let Some(dictionary) = field.dictionary() {
166 #[allow(deprecated)]
167 Field::new_dict(
168 field.name().unwrap_or_default(),
169 get_data_type(field, true),
170 field.nullable(),
171 dictionary.id(),
172 dictionary.isOrdered(),
173 )
174 } else {
175 Field::new(
176 field.name().unwrap_or_default(),
177 get_data_type(field, true),
178 field.nullable(),
179 )
180 };
181
182 let mut metadata_map = HashMap::default();
183 if let Some(list) = field.custom_metadata() {
184 for kv in list {
185 if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
186 metadata_map.insert(k.to_string(), v.to_string());
187 }
188 }
189 }
190
191 arrow_field.with_metadata(metadata_map)
192 }
193}
194
195pub fn fb_to_schema(fb: crate::Schema) -> Schema {
197 let mut fields: Vec<Field> = vec![];
198 let c_fields = fb.fields().unwrap();
199 let len = c_fields.len();
200 for i in 0..len {
201 let c_field: crate::Field = c_fields.get(i);
202 match c_field.type_type() {
203 crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
204 unimplemented!("Big Endian is not supported for Decimal!")
205 }
206 _ => (),
207 };
208 fields.push(c_field.into());
209 }
210
211 let mut metadata: HashMap<String, String> = HashMap::default();
212 if let Some(md_fields) = fb.custom_metadata() {
213 let len = md_fields.len();
214 for i in 0..len {
215 let kv = md_fields.get(i);
216 let k_str = kv.key();
217 let v_str = kv.value();
218 if let Some(k) = k_str {
219 if let Some(v) = v_str {
220 metadata.insert(k.to_string(), v.to_string());
221 }
222 }
223 }
224 }
225 Schema::new_with_metadata(fields, metadata)
226}
227
228pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
230 if let Ok(ipc) = crate::root_as_message(bytes) {
231 if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
232 Ok(schema)
233 } else {
234 Err(ArrowError::ParseError(
235 "Unable to get head as schema".to_string(),
236 ))
237 }
238 } else {
239 Err(ArrowError::ParseError(
240 "Unable to get root as message".to_string(),
241 ))
242 }
243}
244
245pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
247 if buffer.len() < 4 {
257 return Err(ArrowError::ParseError(
258 "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
259 ));
260 }
261
262 let (len, buffer) = if buffer[..4] == CONTINUATION_MARKER {
263 if buffer.len() < 8 {
264 return Err(ArrowError::ParseError(
265 "The buffer length is less than 8 and missing the length of buffer".to_string(),
266 ));
267 }
268 buffer[4..].split_at(4)
269 } else {
270 buffer.split_at(4)
271 };
272
273 let len = <i32>::from_le_bytes(len.try_into().unwrap());
274 if len < 0 {
275 return Err(ArrowError::ParseError(format!(
276 "The encapsulated message's reported length is negative ({len})"
277 )));
278 }
279
280 if buffer.len() < len as usize {
281 let actual_len = buffer.len();
282 return Err(ArrowError::ParseError(format!(
283 "The buffer length ({actual_len}) is less than the encapsulated message's reported length ({len})"
284 )));
285 }
286
287 let msg = crate::root_as_message(buffer)
288 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
289 let ipc_schema = msg.header_as_schema().ok_or_else(|| {
290 ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
291 })?;
292 Ok(fb_to_schema(ipc_schema))
293}
294
295pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
297 if let Some(dictionary) = field.dictionary() {
298 if may_be_dictionary {
299 let int = dictionary.indexType().unwrap();
300 let index_type = match (int.bitWidth(), int.is_signed()) {
301 (8, true) => DataType::Int8,
302 (8, false) => DataType::UInt8,
303 (16, true) => DataType::Int16,
304 (16, false) => DataType::UInt16,
305 (32, true) => DataType::Int32,
306 (32, false) => DataType::UInt32,
307 (64, true) => DataType::Int64,
308 (64, false) => DataType::UInt64,
309 _ => panic!("Unexpected bitwidth and signed"),
310 };
311 return DataType::Dictionary(
312 Box::new(index_type),
313 Box::new(get_data_type(field, false)),
314 );
315 }
316 }
317
318 match field.type_type() {
319 crate::Type::Null => DataType::Null,
320 crate::Type::Bool => DataType::Boolean,
321 crate::Type::Int => {
322 let int = field.type_as_int().unwrap();
323 match (int.bitWidth(), int.is_signed()) {
324 (8, true) => DataType::Int8,
325 (8, false) => DataType::UInt8,
326 (16, true) => DataType::Int16,
327 (16, false) => DataType::UInt16,
328 (32, true) => DataType::Int32,
329 (32, false) => DataType::UInt32,
330 (64, true) => DataType::Int64,
331 (64, false) => DataType::UInt64,
332 z => panic!(
333 "Int type with bit width of {} and signed of {} not supported",
334 z.0, z.1
335 ),
336 }
337 }
338 crate::Type::Binary => DataType::Binary,
339 crate::Type::BinaryView => DataType::BinaryView,
340 crate::Type::LargeBinary => DataType::LargeBinary,
341 crate::Type::Utf8 => DataType::Utf8,
342 crate::Type::Utf8View => DataType::Utf8View,
343 crate::Type::LargeUtf8 => DataType::LargeUtf8,
344 crate::Type::FixedSizeBinary => {
345 let fsb = field.type_as_fixed_size_binary().unwrap();
346 DataType::FixedSizeBinary(fsb.byteWidth())
347 }
348 crate::Type::FloatingPoint => {
349 let float = field.type_as_floating_point().unwrap();
350 match float.precision() {
351 crate::Precision::HALF => DataType::Float16,
352 crate::Precision::SINGLE => DataType::Float32,
353 crate::Precision::DOUBLE => DataType::Float64,
354 z => panic!("FloatingPoint type with precision of {z:?} not supported"),
355 }
356 }
357 crate::Type::Date => {
358 let date = field.type_as_date().unwrap();
359 match date.unit() {
360 crate::DateUnit::DAY => DataType::Date32,
361 crate::DateUnit::MILLISECOND => DataType::Date64,
362 z => panic!("Date type with unit of {z:?} not supported"),
363 }
364 }
365 crate::Type::Time => {
366 let time = field.type_as_time().unwrap();
367 match (time.bitWidth(), time.unit()) {
368 (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
369 (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
370 (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
371 (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
372 z => panic!(
373 "Time type with bit width of {} and unit of {:?} not supported",
374 z.0, z.1
375 ),
376 }
377 }
378 crate::Type::Timestamp => {
379 let timestamp = field.type_as_timestamp().unwrap();
380 let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
381 match timestamp.unit() {
382 crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
383 crate::TimeUnit::MILLISECOND => {
384 DataType::Timestamp(TimeUnit::Millisecond, timezone)
385 }
386 crate::TimeUnit::MICROSECOND => {
387 DataType::Timestamp(TimeUnit::Microsecond, timezone)
388 }
389 crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
390 z => panic!("Timestamp type with unit of {z:?} not supported"),
391 }
392 }
393 crate::Type::Interval => {
394 let interval = field.type_as_interval().unwrap();
395 match interval.unit() {
396 crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
397 crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
398 crate::IntervalUnit::MONTH_DAY_NANO => {
399 DataType::Interval(IntervalUnit::MonthDayNano)
400 }
401 z => panic!("Interval type with unit of {z:?} unsupported"),
402 }
403 }
404 crate::Type::Duration => {
405 let duration = field.type_as_duration().unwrap();
406 match duration.unit() {
407 crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
408 crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
409 crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
410 crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
411 z => panic!("Duration type with unit of {z:?} unsupported"),
412 }
413 }
414 crate::Type::List => {
415 let children = field.children().unwrap();
416 if children.len() != 1 {
417 panic!("expect a list to have one child")
418 }
419 DataType::List(Arc::new(children.get(0).into()))
420 }
421 crate::Type::LargeList => {
422 let children = field.children().unwrap();
423 if children.len() != 1 {
424 panic!("expect a large list to have one child")
425 }
426 DataType::LargeList(Arc::new(children.get(0).into()))
427 }
428 crate::Type::ListView => {
429 let children = field.children().unwrap();
430 if children.len() != 1 {
431 panic!("expect a listview to have one child")
432 }
433 DataType::ListView(Arc::new(children.get(0).into()))
434 }
435 crate::Type::LargeListView => {
436 let children = field.children().unwrap();
437 if children.len() != 1 {
438 panic!("expect a large listview to have one child")
439 }
440 DataType::LargeListView(Arc::new(children.get(0).into()))
441 }
442 crate::Type::FixedSizeList => {
443 let children = field.children().unwrap();
444 if children.len() != 1 {
445 panic!("expect a list to have one child")
446 }
447 let fsl = field.type_as_fixed_size_list().unwrap();
448 DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
449 }
450 crate::Type::Struct_ => {
451 let fields = match field.children() {
452 Some(children) => children.iter().map(Field::from).collect(),
453 None => Fields::empty(),
454 };
455 DataType::Struct(fields)
456 }
457 crate::Type::RunEndEncoded => {
458 let children = field.children().unwrap();
459 if children.len() != 2 {
460 panic!(
461 "RunEndEncoded type should have exactly two children. Found {}",
462 children.len()
463 )
464 }
465 let run_ends_field = children.get(0).into();
466 let values_field = children.get(1).into();
467 DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
468 }
469 crate::Type::Map => {
470 let map = field.type_as_map().unwrap();
471 let children = field.children().unwrap();
472 if children.len() != 1 {
473 panic!("expect a map to have one child")
474 }
475 DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
476 }
477 crate::Type::Decimal => {
478 let fsb = field.type_as_decimal().unwrap();
479 let bit_width = fsb.bitWidth();
480 let precision: u8 = fsb.precision().try_into().unwrap();
481 let scale: i8 = fsb.scale().try_into().unwrap();
482 match bit_width {
483 32 => DataType::Decimal32(precision, scale),
484 64 => DataType::Decimal64(precision, scale),
485 128 => DataType::Decimal128(precision, scale),
486 256 => DataType::Decimal256(precision, scale),
487 _ => panic!("Unexpected decimal bit width {bit_width}"),
488 }
489 }
490 crate::Type::Union => {
491 let union = field.type_as_union().unwrap();
492
493 let union_mode = match union.mode() {
494 crate::UnionMode::Dense => UnionMode::Dense,
495 crate::UnionMode::Sparse => UnionMode::Sparse,
496 mode => panic!("Unexpected union mode: {mode:?}"),
497 };
498
499 let mut fields = vec![];
500 if let Some(children) = field.children() {
501 for i in 0..children.len() {
502 fields.push(Field::from(children.get(i)));
503 }
504 };
505
506 let fields = match union.typeIds() {
507 None => UnionFields::from_fields(fields),
508 Some(ids) => UnionFields::try_new(ids.iter().map(|i| i as i8), fields)
509 .expect("invalid union field"),
510 };
511
512 DataType::Union(fields, union_mode)
513 }
514 t => unimplemented!("Type {:?} not supported", t),
515 }
516}
517
518pub(crate) struct FBFieldType<'b> {
519 pub(crate) type_type: crate::Type,
520 pub(crate) type_: WIPOffset<UnionWIPOffset>,
521 pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
522}
523
524pub(crate) fn build_field<'a>(
526 fbb: &mut FlatBufferBuilder<'a>,
527 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
528 field: &Field,
529) -> WIPOffset<crate::Field<'a>> {
530 let mut fb_metadata = None;
532 if !field.metadata().is_empty() {
533 fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
534 };
535
536 let fb_field_name = fbb.create_string(field.name().as_str());
537 let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
538
539 let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
540 match dictionary_tracker {
541 Some(tracker) => Some(get_fb_dictionary(
542 index_type,
543 tracker.next_dict_id(),
544 field
545 .dict_is_ordered()
546 .expect("All Dictionary types have `dict_is_ordered`"),
547 fbb,
548 )),
549 None => panic!("IPC must no longer be used without dictionary tracker"),
550 }
551 } else {
552 None
553 };
554
555 let mut field_builder = crate::FieldBuilder::new(fbb);
556 field_builder.add_name(fb_field_name);
557 if let Some(dictionary) = fb_dictionary {
558 field_builder.add_dictionary(dictionary)
559 }
560 field_builder.add_type_type(field_type.type_type);
561 field_builder.add_nullable(field.is_nullable());
562 match field_type.children {
563 None => {}
564 Some(children) => field_builder.add_children(children),
565 };
566 field_builder.add_type_(field_type.type_);
567
568 if let Some(fb_metadata) = fb_metadata {
569 field_builder.add_custom_metadata(fb_metadata);
570 }
571
572 field_builder.finish()
573}
574
575pub(crate) fn get_fb_field_type<'a>(
577 data_type: &DataType,
578 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
579 fbb: &mut FlatBufferBuilder<'a>,
580) -> FBFieldType<'a> {
581 let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
584 match data_type {
585 Null => FBFieldType {
586 type_type: crate::Type::Null,
587 type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
588 children: Some(fbb.create_vector(&empty_fields[..])),
589 },
590 Boolean => FBFieldType {
591 type_type: crate::Type::Bool,
592 type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
593 children: Some(fbb.create_vector(&empty_fields[..])),
594 },
595 UInt8 | UInt16 | UInt32 | UInt64 => {
596 let children = fbb.create_vector(&empty_fields[..]);
597 let mut builder = crate::IntBuilder::new(fbb);
598 builder.add_is_signed(false);
599 match data_type {
600 UInt8 => builder.add_bitWidth(8),
601 UInt16 => builder.add_bitWidth(16),
602 UInt32 => builder.add_bitWidth(32),
603 UInt64 => builder.add_bitWidth(64),
604 _ => {}
605 };
606 FBFieldType {
607 type_type: crate::Type::Int,
608 type_: builder.finish().as_union_value(),
609 children: Some(children),
610 }
611 }
612 Int8 | Int16 | Int32 | Int64 => {
613 let children = fbb.create_vector(&empty_fields[..]);
614 let mut builder = crate::IntBuilder::new(fbb);
615 builder.add_is_signed(true);
616 match data_type {
617 Int8 => builder.add_bitWidth(8),
618 Int16 => builder.add_bitWidth(16),
619 Int32 => builder.add_bitWidth(32),
620 Int64 => builder.add_bitWidth(64),
621 _ => {}
622 };
623 FBFieldType {
624 type_type: crate::Type::Int,
625 type_: builder.finish().as_union_value(),
626 children: Some(children),
627 }
628 }
629 Float16 | Float32 | Float64 => {
630 let children = fbb.create_vector(&empty_fields[..]);
631 let mut builder = crate::FloatingPointBuilder::new(fbb);
632 match data_type {
633 Float16 => builder.add_precision(crate::Precision::HALF),
634 Float32 => builder.add_precision(crate::Precision::SINGLE),
635 Float64 => builder.add_precision(crate::Precision::DOUBLE),
636 _ => {}
637 };
638 FBFieldType {
639 type_type: crate::Type::FloatingPoint,
640 type_: builder.finish().as_union_value(),
641 children: Some(children),
642 }
643 }
644 Binary => FBFieldType {
645 type_type: crate::Type::Binary,
646 type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
647 children: Some(fbb.create_vector(&empty_fields[..])),
648 },
649 LargeBinary => FBFieldType {
650 type_type: crate::Type::LargeBinary,
651 type_: crate::LargeBinaryBuilder::new(fbb)
652 .finish()
653 .as_union_value(),
654 children: Some(fbb.create_vector(&empty_fields[..])),
655 },
656 BinaryView => FBFieldType {
657 type_type: crate::Type::BinaryView,
658 type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
659 children: Some(fbb.create_vector(&empty_fields[..])),
660 },
661 Utf8View => FBFieldType {
662 type_type: crate::Type::Utf8View,
663 type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
664 children: Some(fbb.create_vector(&empty_fields[..])),
665 },
666 Utf8 => FBFieldType {
667 type_type: crate::Type::Utf8,
668 type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
669 children: Some(fbb.create_vector(&empty_fields[..])),
670 },
671 LargeUtf8 => FBFieldType {
672 type_type: crate::Type::LargeUtf8,
673 type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
674 children: Some(fbb.create_vector(&empty_fields[..])),
675 },
676 FixedSizeBinary(len) => {
677 let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
678 builder.add_byteWidth(*len);
679 FBFieldType {
680 type_type: crate::Type::FixedSizeBinary,
681 type_: builder.finish().as_union_value(),
682 children: Some(fbb.create_vector(&empty_fields[..])),
683 }
684 }
685 Date32 => {
686 let mut builder = crate::DateBuilder::new(fbb);
687 builder.add_unit(crate::DateUnit::DAY);
688 FBFieldType {
689 type_type: crate::Type::Date,
690 type_: builder.finish().as_union_value(),
691 children: Some(fbb.create_vector(&empty_fields[..])),
692 }
693 }
694 Date64 => {
695 let mut builder = crate::DateBuilder::new(fbb);
696 builder.add_unit(crate::DateUnit::MILLISECOND);
697 FBFieldType {
698 type_type: crate::Type::Date,
699 type_: builder.finish().as_union_value(),
700 children: Some(fbb.create_vector(&empty_fields[..])),
701 }
702 }
703 Time32(unit) | Time64(unit) => {
704 let mut builder = crate::TimeBuilder::new(fbb);
705 match unit {
706 TimeUnit::Second => {
707 builder.add_bitWidth(32);
708 builder.add_unit(crate::TimeUnit::SECOND);
709 }
710 TimeUnit::Millisecond => {
711 builder.add_bitWidth(32);
712 builder.add_unit(crate::TimeUnit::MILLISECOND);
713 }
714 TimeUnit::Microsecond => {
715 builder.add_bitWidth(64);
716 builder.add_unit(crate::TimeUnit::MICROSECOND);
717 }
718 TimeUnit::Nanosecond => {
719 builder.add_bitWidth(64);
720 builder.add_unit(crate::TimeUnit::NANOSECOND);
721 }
722 }
723 FBFieldType {
724 type_type: crate::Type::Time,
725 type_: builder.finish().as_union_value(),
726 children: Some(fbb.create_vector(&empty_fields[..])),
727 }
728 }
729 Timestamp(unit, tz) => {
730 let tz = tz.as_deref().unwrap_or_default();
731 let tz_str = fbb.create_string(tz);
732 let mut builder = crate::TimestampBuilder::new(fbb);
733 let time_unit = match unit {
734 TimeUnit::Second => crate::TimeUnit::SECOND,
735 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
736 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
737 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
738 };
739 builder.add_unit(time_unit);
740 if !tz.is_empty() {
741 builder.add_timezone(tz_str);
742 }
743 FBFieldType {
744 type_type: crate::Type::Timestamp,
745 type_: builder.finish().as_union_value(),
746 children: Some(fbb.create_vector(&empty_fields[..])),
747 }
748 }
749 Interval(unit) => {
750 let mut builder = crate::IntervalBuilder::new(fbb);
751 let interval_unit = match unit {
752 IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
753 IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
754 IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
755 };
756 builder.add_unit(interval_unit);
757 FBFieldType {
758 type_type: crate::Type::Interval,
759 type_: builder.finish().as_union_value(),
760 children: Some(fbb.create_vector(&empty_fields[..])),
761 }
762 }
763 Duration(unit) => {
764 let mut builder = crate::DurationBuilder::new(fbb);
765 let time_unit = match unit {
766 TimeUnit::Second => crate::TimeUnit::SECOND,
767 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
768 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
769 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
770 };
771 builder.add_unit(time_unit);
772 FBFieldType {
773 type_type: crate::Type::Duration,
774 type_: builder.finish().as_union_value(),
775 children: Some(fbb.create_vector(&empty_fields[..])),
776 }
777 }
778 List(list_type) => {
779 let child = build_field(fbb, dictionary_tracker, list_type);
780 FBFieldType {
781 type_type: crate::Type::List,
782 type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
783 children: Some(fbb.create_vector(&[child])),
784 }
785 }
786 ListView(list_type) => {
787 let child = build_field(fbb, dictionary_tracker, list_type);
788 FBFieldType {
789 type_type: crate::Type::ListView,
790 type_: crate::ListViewBuilder::new(fbb).finish().as_union_value(),
791 children: Some(fbb.create_vector(&[child])),
792 }
793 }
794 LargeListView(list_type) => {
795 let child = build_field(fbb, dictionary_tracker, list_type);
796 FBFieldType {
797 type_type: crate::Type::LargeListView,
798 type_: crate::LargeListViewBuilder::new(fbb)
799 .finish()
800 .as_union_value(),
801 children: Some(fbb.create_vector(&[child])),
802 }
803 }
804 LargeList(list_type) => {
805 let child = build_field(fbb, dictionary_tracker, list_type);
806 FBFieldType {
807 type_type: crate::Type::LargeList,
808 type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
809 children: Some(fbb.create_vector(&[child])),
810 }
811 }
812 FixedSizeList(list_type, len) => {
813 let child = build_field(fbb, dictionary_tracker, list_type);
814 let mut builder = crate::FixedSizeListBuilder::new(fbb);
815 builder.add_listSize(*len);
816 FBFieldType {
817 type_type: crate::Type::FixedSizeList,
818 type_: builder.finish().as_union_value(),
819 children: Some(fbb.create_vector(&[child])),
820 }
821 }
822 Struct(fields) => {
823 let mut children = vec![];
825 for field in fields {
826 children.push(build_field(fbb, dictionary_tracker, field));
827 }
828 FBFieldType {
829 type_type: crate::Type::Struct_,
830 type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
831 children: Some(fbb.create_vector(&children[..])),
832 }
833 }
834 RunEndEncoded(run_ends, values) => {
835 let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
836 let values_field = build_field(fbb, dictionary_tracker, values);
837 let children = [run_ends_field, values_field];
838 FBFieldType {
839 type_type: crate::Type::RunEndEncoded,
840 type_: crate::RunEndEncodedBuilder::new(fbb)
841 .finish()
842 .as_union_value(),
843 children: Some(fbb.create_vector(&children[..])),
844 }
845 }
846 Map(map_field, keys_sorted) => {
847 let child = build_field(fbb, dictionary_tracker, map_field);
848 let mut field_type = crate::MapBuilder::new(fbb);
849 field_type.add_keysSorted(*keys_sorted);
850 FBFieldType {
851 type_type: crate::Type::Map,
852 type_: field_type.finish().as_union_value(),
853 children: Some(fbb.create_vector(&[child])),
854 }
855 }
856 Dictionary(_, value_type) => {
857 get_fb_field_type(value_type, dictionary_tracker, fbb)
861 }
862 Decimal32(precision, scale) => {
863 let mut builder = crate::DecimalBuilder::new(fbb);
864 builder.add_precision(*precision as i32);
865 builder.add_scale(*scale as i32);
866 builder.add_bitWidth(32);
867 FBFieldType {
868 type_type: crate::Type::Decimal,
869 type_: builder.finish().as_union_value(),
870 children: Some(fbb.create_vector(&empty_fields[..])),
871 }
872 }
873 Decimal64(precision, scale) => {
874 let mut builder = crate::DecimalBuilder::new(fbb);
875 builder.add_precision(*precision as i32);
876 builder.add_scale(*scale as i32);
877 builder.add_bitWidth(64);
878 FBFieldType {
879 type_type: crate::Type::Decimal,
880 type_: builder.finish().as_union_value(),
881 children: Some(fbb.create_vector(&empty_fields[..])),
882 }
883 }
884 Decimal128(precision, scale) => {
885 let mut builder = crate::DecimalBuilder::new(fbb);
886 builder.add_precision(*precision as i32);
887 builder.add_scale(*scale as i32);
888 builder.add_bitWidth(128);
889 FBFieldType {
890 type_type: crate::Type::Decimal,
891 type_: builder.finish().as_union_value(),
892 children: Some(fbb.create_vector(&empty_fields[..])),
893 }
894 }
895 Decimal256(precision, scale) => {
896 let mut builder = crate::DecimalBuilder::new(fbb);
897 builder.add_precision(*precision as i32);
898 builder.add_scale(*scale as i32);
899 builder.add_bitWidth(256);
900 FBFieldType {
901 type_type: crate::Type::Decimal,
902 type_: builder.finish().as_union_value(),
903 children: Some(fbb.create_vector(&empty_fields[..])),
904 }
905 }
906 Union(fields, mode) => {
907 let mut children = vec![];
908 for (_, field) in fields.iter() {
909 children.push(build_field(fbb, dictionary_tracker, field));
910 }
911
912 let union_mode = match mode {
913 UnionMode::Sparse => crate::UnionMode::Sparse,
914 UnionMode::Dense => crate::UnionMode::Dense,
915 };
916
917 let fbb_type_ids =
918 fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
919 let mut builder = crate::UnionBuilder::new(fbb);
920 builder.add_mode(union_mode);
921 builder.add_typeIds(fbb_type_ids);
922
923 FBFieldType {
924 type_type: crate::Type::Union,
925 type_: builder.finish().as_union_value(),
926 children: Some(fbb.create_vector(&children[..])),
927 }
928 }
929 }
930}
931
932pub(crate) fn get_fb_dictionary<'a>(
934 index_type: &DataType,
935 dict_id: i64,
936 dict_is_ordered: bool,
937 fbb: &mut FlatBufferBuilder<'a>,
938) -> WIPOffset<crate::DictionaryEncoding<'a>> {
939 let mut index_builder = crate::IntBuilder::new(fbb);
942
943 match *index_type {
944 Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
945 UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
946 _ => {}
947 }
948
949 match *index_type {
950 Int8 | UInt8 => index_builder.add_bitWidth(8),
951 Int16 | UInt16 => index_builder.add_bitWidth(16),
952 Int32 | UInt32 => index_builder.add_bitWidth(32),
953 Int64 | UInt64 => index_builder.add_bitWidth(64),
954 _ => {}
955 }
956
957 let index_builder = index_builder.finish();
958
959 let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
960 builder.add_id(dict_id);
961 builder.add_indexType(index_builder);
962 builder.add_isOrdered(dict_is_ordered);
963
964 builder.finish()
965}
966
967#[derive(Clone)]
979pub struct MessageBuffer(Buffer);
980
981impl Debug for MessageBuffer {
982 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
983 self.as_ref().fmt(f)
984 }
985}
986
987impl MessageBuffer {
988 pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
990 let opts = VerifierOptions::default();
991 let mut v = Verifier::new(&opts, &buf);
992 <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
993 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
994 })?;
995 Ok(Self(buf))
996 }
997
998 #[inline]
1000 pub fn as_ref(&self) -> Message<'_> {
1001 unsafe { crate::root_as_message_unchecked(&self.0) }
1003 }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use super::*;
1009
1010 #[test]
1011 fn convert_schema_round_trip() {
1012 let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
1013 .iter()
1014 .cloned()
1015 .collect();
1016 let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
1017 .iter()
1018 .cloned()
1019 .collect();
1020 let schema = Schema::new_with_metadata(
1021 vec![
1022 Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
1023 Field::new("uint16", DataType::UInt16, true),
1024 Field::new("uint32", DataType::UInt32, false),
1025 Field::new("uint64", DataType::UInt64, true),
1026 Field::new("int8", DataType::Int8, true),
1027 Field::new("int16", DataType::Int16, false),
1028 Field::new("int32", DataType::Int32, true),
1029 Field::new("int64", DataType::Int64, false),
1030 Field::new("float16", DataType::Float16, true),
1031 Field::new("float32", DataType::Float32, false),
1032 Field::new("float64", DataType::Float64, true),
1033 Field::new("null", DataType::Null, false),
1034 Field::new("bool", DataType::Boolean, false),
1035 Field::new("date32", DataType::Date32, false),
1036 Field::new("date64", DataType::Date64, true),
1037 Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
1038 Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
1039 Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
1040 Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
1041 Field::new(
1042 "timestamp[s]",
1043 DataType::Timestamp(TimeUnit::Second, None),
1044 false,
1045 ),
1046 Field::new(
1047 "timestamp[ms]",
1048 DataType::Timestamp(TimeUnit::Millisecond, None),
1049 true,
1050 ),
1051 Field::new(
1052 "timestamp[us]",
1053 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1054 false,
1055 ),
1056 Field::new(
1057 "timestamp[ns]",
1058 DataType::Timestamp(TimeUnit::Nanosecond, None),
1059 true,
1060 ),
1061 Field::new(
1062 "interval[ym]",
1063 DataType::Interval(IntervalUnit::YearMonth),
1064 true,
1065 ),
1066 Field::new(
1067 "interval[dt]",
1068 DataType::Interval(IntervalUnit::DayTime),
1069 true,
1070 ),
1071 Field::new(
1072 "interval[mdn]",
1073 DataType::Interval(IntervalUnit::MonthDayNano),
1074 true,
1075 ),
1076 Field::new("utf8", DataType::Utf8, false),
1077 Field::new("utf8_view", DataType::Utf8View, false),
1078 Field::new("binary", DataType::Binary, false),
1079 Field::new("binary_view", DataType::BinaryView, false),
1080 Field::new_list(
1081 "list[u8]",
1082 Field::new_list_field(DataType::UInt8, false),
1083 true,
1084 ),
1085 Field::new_fixed_size_list(
1086 "fixed_size_list[u8]",
1087 Field::new_list_field(DataType::UInt8, false),
1088 2,
1089 true,
1090 ),
1091 Field::new_list(
1092 "list[struct<float32, int32, bool>]",
1093 Field::new_struct(
1094 "struct",
1095 vec![
1096 Field::new("float32", UInt8, false),
1097 Field::new("int32", Int32, true),
1098 Field::new("bool", Boolean, true),
1099 ],
1100 true,
1101 ),
1102 false,
1103 ),
1104 Field::new_struct(
1105 "struct<dictionary<int32, utf8>>",
1106 vec![Field::new(
1107 "dictionary<int32, utf8>",
1108 Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1109 false,
1110 )],
1111 false,
1112 ),
1113 Field::new_struct(
1114 "struct<int64, list[struct<date32, list[struct<>]>]>",
1115 vec![
1116 Field::new("int64", DataType::Int64, true),
1117 Field::new_list(
1118 "list[struct<date32, list[struct<>]>]",
1119 Field::new_struct(
1120 "struct",
1121 vec![
1122 Field::new("date32", DataType::Date32, true),
1123 Field::new_list(
1124 "list[struct<>]",
1125 Field::new(
1126 "struct",
1127 DataType::Struct(Fields::empty()),
1128 false,
1129 ),
1130 false,
1131 ),
1132 ],
1133 false,
1134 ),
1135 false,
1136 ),
1137 ],
1138 false,
1139 ),
1140 Field::new_union(
1141 "union<int64, list[union<date32, list[union<>]>]>",
1142 vec![0, 1],
1143 vec![
1144 Field::new("int64", DataType::Int64, true),
1145 Field::new_list(
1146 "list[union<date32, list[union<>]>]",
1147 Field::new_union(
1148 "union<date32, list[union<>]>",
1149 vec![0, 1],
1150 vec![
1151 Field::new("date32", DataType::Date32, true),
1152 Field::new_list(
1153 "list[union<>]",
1154 Field::new(
1155 "union",
1156 DataType::Union(
1157 UnionFields::empty(),
1158 UnionMode::Sparse,
1159 ),
1160 false,
1161 ),
1162 false,
1163 ),
1164 ],
1165 UnionMode::Dense,
1166 ),
1167 false,
1168 ),
1169 ],
1170 UnionMode::Sparse,
1171 ),
1172 Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1173 Field::new(
1174 "union<>",
1175 DataType::Union(UnionFields::empty(), UnionMode::Dense),
1176 true,
1177 ),
1178 Field::new(
1179 "union<>",
1180 DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1181 true,
1182 ),
1183 Field::new(
1184 "union<int32, utf8>",
1185 DataType::Union(
1186 UnionFields::try_new(
1187 vec![2, 3], vec![
1189 Field::new("int32", DataType::Int32, true),
1190 Field::new("utf8", DataType::Utf8, true),
1191 ],
1192 )
1193 .unwrap(),
1194 UnionMode::Dense,
1195 ),
1196 true,
1197 ),
1198 #[allow(deprecated)]
1199 Field::new_dict(
1200 "dictionary<int32, utf8>",
1201 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1202 true,
1203 123,
1204 true,
1205 ),
1206 #[allow(deprecated)]
1207 Field::new_dict(
1208 "dictionary<uint8, uint32>",
1209 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1210 true,
1211 123,
1212 true,
1213 ),
1214 Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1215 ],
1216 md,
1217 );
1218
1219 let mut dictionary_tracker = DictionaryTracker::new(true);
1220 let fb = IpcSchemaEncoder::new()
1221 .with_dictionary_tracker(&mut dictionary_tracker)
1222 .schema_to_fb(&schema);
1223
1224 let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1226 let schema2 = fb_to_schema(ipc);
1227 assert_eq!(schema, schema2);
1228 }
1229
1230 #[test]
1231 fn schema_from_bytes() {
1232 let bytes: Vec<u8> = vec![
1242 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1243 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1244 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1245 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1246 0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1247 ];
1248 let ipc = crate::root_as_message(&bytes).unwrap();
1249 let schema = ipc.header_as_schema().unwrap();
1250
1251 let data_gen = crate::writer::IpcDataGenerator::default();
1253 let mut dictionary_tracker = DictionaryTracker::new(true);
1254 let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1255 let bytes = data_gen
1256 .schema_to_bytes_with_dictionary_tracker(
1257 &arrow_schema,
1258 &mut dictionary_tracker,
1259 &crate::writer::IpcWriteOptions::default(),
1260 )
1261 .ipc_message;
1262
1263 let ipc2 = crate::root_as_message(&bytes).unwrap();
1264 let schema2 = ipc2.header_as_schema().unwrap();
1265
1266 assert!(schema.custom_metadata().is_none());
1268 assert!(schema2.custom_metadata().is_none());
1269 assert_eq!(schema.endianness(), schema2.endianness());
1270 assert!(schema.features().is_none());
1271 assert!(schema2.features().is_none());
1272 assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1273
1274 assert_eq!(ipc.version(), ipc2.version());
1275 assert_eq!(ipc.header_type(), ipc2.header_type());
1276 assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1277 assert!(ipc.custom_metadata().is_none());
1278 assert!(ipc2.custom_metadata().is_none());
1279 }
1280}