1use arrow_buffer::Buffer;
21use arrow_schema::*;
22use core::panic;
23use flatbuffers::{
24 FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
25 VerifierOptions, WIPOffset,
26};
27use std::collections::HashMap;
28use std::fmt::{Debug, Formatter};
29use std::sync::Arc;
30
31use crate::writer::DictionaryTracker;
32use crate::{KeyValue, Message, CONTINUATION_MARKER};
33use DataType::*;
34
35#[derive(Debug)]
66pub struct IpcSchemaEncoder<'a> {
67 dictionary_tracker: Option<&'a mut DictionaryTracker>,
68}
69
70impl Default for IpcSchemaEncoder<'_> {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl<'a> IpcSchemaEncoder<'a> {
77 pub fn new() -> IpcSchemaEncoder<'a> {
79 IpcSchemaEncoder {
80 dictionary_tracker: None,
81 }
82 }
83
84 pub fn with_dictionary_tracker(
86 mut self,
87 dictionary_tracker: &'a mut DictionaryTracker,
88 ) -> Self {
89 self.dictionary_tracker = Some(dictionary_tracker);
90 self
91 }
92
93 pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
97 let mut fbb = FlatBufferBuilder::new();
98
99 let root = self.schema_to_fb_offset(&mut fbb, schema);
100
101 fbb.finish(root, None);
102
103 fbb
104 }
105
106 pub fn schema_to_fb_offset<'b>(
108 &mut self,
109 fbb: &mut FlatBufferBuilder<'b>,
110 schema: &Schema,
111 ) -> WIPOffset<crate::Schema<'b>> {
112 let fields = schema
113 .fields()
114 .iter()
115 .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
116 .collect::<Vec<_>>();
117 let fb_field_list = fbb.create_vector(&fields);
118
119 let fb_metadata_list =
120 (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
121
122 let mut builder = crate::SchemaBuilder::new(fbb);
123 builder.add_fields(fb_field_list);
124 if let Some(fb_metadata_list) = fb_metadata_list {
125 builder.add_custom_metadata(fb_metadata_list);
126 }
127 builder.finish()
128 }
129}
130
131pub fn metadata_to_fb<'a>(
133 fbb: &mut FlatBufferBuilder<'a>,
134 metadata: &HashMap<String, String>,
135) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
136 let mut ordered_keys = metadata.keys().collect::<Vec<_>>();
137 ordered_keys.sort();
138 let custom_metadata = ordered_keys
139 .into_iter()
140 .map(|k| {
141 let v = metadata.get(k).unwrap();
142 let fb_key_name = fbb.create_string(k);
143 let fb_val_name = fbb.create_string(v);
144
145 let mut kv_builder = crate::KeyValueBuilder::new(fbb);
146 kv_builder.add_key(fb_key_name);
147 kv_builder.add_value(fb_val_name);
148 kv_builder.finish()
149 })
150 .collect::<Vec<_>>();
151 fbb.create_vector(&custom_metadata)
152}
153
154pub fn schema_to_fb_offset<'a>(
156 fbb: &mut FlatBufferBuilder<'a>,
157 schema: &Schema,
158) -> WIPOffset<crate::Schema<'a>> {
159 IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
160}
161
162impl From<crate::Field<'_>> for Field {
164 fn from(field: crate::Field) -> Field {
165 let arrow_field = if let Some(dictionary) = field.dictionary() {
166 #[allow(deprecated)]
167 Field::new_dict(
168 field.name().unwrap(),
169 get_data_type(field, true),
170 field.nullable(),
171 dictionary.id(),
172 dictionary.isOrdered(),
173 )
174 } else {
175 Field::new(
176 field.name().unwrap(),
177 get_data_type(field, true),
178 field.nullable(),
179 )
180 };
181
182 let mut metadata_map = HashMap::default();
183 if let Some(list) = field.custom_metadata() {
184 for kv in list {
185 if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
186 metadata_map.insert(k.to_string(), v.to_string());
187 }
188 }
189 }
190
191 arrow_field.with_metadata(metadata_map)
192 }
193}
194
195pub fn fb_to_schema(fb: crate::Schema) -> Schema {
197 let mut fields: Vec<Field> = vec![];
198 let c_fields = fb.fields().unwrap();
199 let len = c_fields.len();
200 for i in 0..len {
201 let c_field: crate::Field = c_fields.get(i);
202 match c_field.type_type() {
203 crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
204 unimplemented!("Big Endian is not supported for Decimal!")
205 }
206 _ => (),
207 };
208 fields.push(c_field.into());
209 }
210
211 let mut metadata: HashMap<String, String> = HashMap::default();
212 if let Some(md_fields) = fb.custom_metadata() {
213 let len = md_fields.len();
214 for i in 0..len {
215 let kv = md_fields.get(i);
216 let k_str = kv.key();
217 let v_str = kv.value();
218 if let Some(k) = k_str {
219 if let Some(v) = v_str {
220 metadata.insert(k.to_string(), v.to_string());
221 }
222 }
223 }
224 }
225 Schema::new_with_metadata(fields, metadata)
226}
227
228pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
230 if let Ok(ipc) = crate::root_as_message(bytes) {
231 if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
232 Ok(schema)
233 } else {
234 Err(ArrowError::ParseError(
235 "Unable to get head as schema".to_string(),
236 ))
237 }
238 } else {
239 Err(ArrowError::ParseError(
240 "Unable to get root as message".to_string(),
241 ))
242 }
243}
244
245pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
247 if buffer.len() < 4 {
257 return Err(ArrowError::ParseError(
258 "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
259 ));
260 }
261
262 let (len, buffer) = if buffer[..4] == CONTINUATION_MARKER {
263 if buffer.len() < 8 {
264 return Err(ArrowError::ParseError(
265 "The buffer length is less than 8 and missing the length of buffer".to_string(),
266 ));
267 }
268 buffer[4..].split_at(4)
269 } else {
270 buffer.split_at(4)
271 };
272
273 let len = <i32>::from_le_bytes(len.try_into().unwrap());
274 if len < 0 {
275 return Err(ArrowError::ParseError(format!(
276 "The encapsulated message's reported length is negative ({len})"
277 )));
278 }
279
280 if buffer.len() < len as usize {
281 let actual_len = buffer.len();
282 return Err(ArrowError::ParseError(
283 format!("The buffer length ({actual_len}) is less than the encapsulated message's reported length ({len})")
284 ));
285 }
286
287 let msg = crate::root_as_message(buffer)
288 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
289 let ipc_schema = msg.header_as_schema().ok_or_else(|| {
290 ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
291 })?;
292 Ok(fb_to_schema(ipc_schema))
293}
294
295pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
297 if let Some(dictionary) = field.dictionary() {
298 if may_be_dictionary {
299 let int = dictionary.indexType().unwrap();
300 let index_type = match (int.bitWidth(), int.is_signed()) {
301 (8, true) => DataType::Int8,
302 (8, false) => DataType::UInt8,
303 (16, true) => DataType::Int16,
304 (16, false) => DataType::UInt16,
305 (32, true) => DataType::Int32,
306 (32, false) => DataType::UInt32,
307 (64, true) => DataType::Int64,
308 (64, false) => DataType::UInt64,
309 _ => panic!("Unexpected bitwidth and signed"),
310 };
311 return DataType::Dictionary(
312 Box::new(index_type),
313 Box::new(get_data_type(field, false)),
314 );
315 }
316 }
317
318 match field.type_type() {
319 crate::Type::Null => DataType::Null,
320 crate::Type::Bool => DataType::Boolean,
321 crate::Type::Int => {
322 let int = field.type_as_int().unwrap();
323 match (int.bitWidth(), int.is_signed()) {
324 (8, true) => DataType::Int8,
325 (8, false) => DataType::UInt8,
326 (16, true) => DataType::Int16,
327 (16, false) => DataType::UInt16,
328 (32, true) => DataType::Int32,
329 (32, false) => DataType::UInt32,
330 (64, true) => DataType::Int64,
331 (64, false) => DataType::UInt64,
332 z => panic!(
333 "Int type with bit width of {} and signed of {} not supported",
334 z.0, z.1
335 ),
336 }
337 }
338 crate::Type::Binary => DataType::Binary,
339 crate::Type::BinaryView => DataType::BinaryView,
340 crate::Type::LargeBinary => DataType::LargeBinary,
341 crate::Type::Utf8 => DataType::Utf8,
342 crate::Type::Utf8View => DataType::Utf8View,
343 crate::Type::LargeUtf8 => DataType::LargeUtf8,
344 crate::Type::FixedSizeBinary => {
345 let fsb = field.type_as_fixed_size_binary().unwrap();
346 DataType::FixedSizeBinary(fsb.byteWidth())
347 }
348 crate::Type::FloatingPoint => {
349 let float = field.type_as_floating_point().unwrap();
350 match float.precision() {
351 crate::Precision::HALF => DataType::Float16,
352 crate::Precision::SINGLE => DataType::Float32,
353 crate::Precision::DOUBLE => DataType::Float64,
354 z => panic!("FloatingPoint type with precision of {z:?} not supported"),
355 }
356 }
357 crate::Type::Date => {
358 let date = field.type_as_date().unwrap();
359 match date.unit() {
360 crate::DateUnit::DAY => DataType::Date32,
361 crate::DateUnit::MILLISECOND => DataType::Date64,
362 z => panic!("Date type with unit of {z:?} not supported"),
363 }
364 }
365 crate::Type::Time => {
366 let time = field.type_as_time().unwrap();
367 match (time.bitWidth(), time.unit()) {
368 (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
369 (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
370 (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
371 (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
372 z => panic!(
373 "Time type with bit width of {} and unit of {:?} not supported",
374 z.0, z.1
375 ),
376 }
377 }
378 crate::Type::Timestamp => {
379 let timestamp = field.type_as_timestamp().unwrap();
380 let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
381 match timestamp.unit() {
382 crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
383 crate::TimeUnit::MILLISECOND => {
384 DataType::Timestamp(TimeUnit::Millisecond, timezone)
385 }
386 crate::TimeUnit::MICROSECOND => {
387 DataType::Timestamp(TimeUnit::Microsecond, timezone)
388 }
389 crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
390 z => panic!("Timestamp type with unit of {z:?} not supported"),
391 }
392 }
393 crate::Type::Interval => {
394 let interval = field.type_as_interval().unwrap();
395 match interval.unit() {
396 crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
397 crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
398 crate::IntervalUnit::MONTH_DAY_NANO => {
399 DataType::Interval(IntervalUnit::MonthDayNano)
400 }
401 z => panic!("Interval type with unit of {z:?} unsupported"),
402 }
403 }
404 crate::Type::Duration => {
405 let duration = field.type_as_duration().unwrap();
406 match duration.unit() {
407 crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
408 crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
409 crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
410 crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
411 z => panic!("Duration type with unit of {z:?} unsupported"),
412 }
413 }
414 crate::Type::List => {
415 let children = field.children().unwrap();
416 if children.len() != 1 {
417 panic!("expect a list to have one child")
418 }
419 DataType::List(Arc::new(children.get(0).into()))
420 }
421 crate::Type::LargeList => {
422 let children = field.children().unwrap();
423 if children.len() != 1 {
424 panic!("expect a large list to have one child")
425 }
426 DataType::LargeList(Arc::new(children.get(0).into()))
427 }
428 crate::Type::FixedSizeList => {
429 let children = field.children().unwrap();
430 if children.len() != 1 {
431 panic!("expect a list to have one child")
432 }
433 let fsl = field.type_as_fixed_size_list().unwrap();
434 DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
435 }
436 crate::Type::Struct_ => {
437 let fields = match field.children() {
438 Some(children) => children.iter().map(Field::from).collect(),
439 None => Fields::empty(),
440 };
441 DataType::Struct(fields)
442 }
443 crate::Type::RunEndEncoded => {
444 let children = field.children().unwrap();
445 if children.len() != 2 {
446 panic!(
447 "RunEndEncoded type should have exactly two children. Found {}",
448 children.len()
449 )
450 }
451 let run_ends_field = children.get(0).into();
452 let values_field = children.get(1).into();
453 DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
454 }
455 crate::Type::Map => {
456 let map = field.type_as_map().unwrap();
457 let children = field.children().unwrap();
458 if children.len() != 1 {
459 panic!("expect a map to have one child")
460 }
461 DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
462 }
463 crate::Type::Decimal => {
464 let fsb = field.type_as_decimal().unwrap();
465 let bit_width = fsb.bitWidth();
466 let precision: u8 = fsb.precision().try_into().unwrap();
467 let scale: i8 = fsb.scale().try_into().unwrap();
468 match bit_width {
469 32 => DataType::Decimal32(precision, scale),
470 64 => DataType::Decimal64(precision, scale),
471 128 => DataType::Decimal128(precision, scale),
472 256 => DataType::Decimal256(precision, scale),
473 _ => panic!("Unexpected decimal bit width {bit_width}"),
474 }
475 }
476 crate::Type::Union => {
477 let union = field.type_as_union().unwrap();
478
479 let union_mode = match union.mode() {
480 crate::UnionMode::Dense => UnionMode::Dense,
481 crate::UnionMode::Sparse => UnionMode::Sparse,
482 mode => panic!("Unexpected union mode: {mode:?}"),
483 };
484
485 let mut fields = vec![];
486 if let Some(children) = field.children() {
487 for i in 0..children.len() {
488 fields.push(Field::from(children.get(i)));
489 }
490 };
491
492 let fields = match union.typeIds() {
493 None => UnionFields::new(0_i8..fields.len() as i8, fields),
494 Some(ids) => UnionFields::new(ids.iter().map(|i| i as i8), fields),
495 };
496
497 DataType::Union(fields, union_mode)
498 }
499 t => unimplemented!("Type {:?} not supported", t),
500 }
501}
502
503pub(crate) struct FBFieldType<'b> {
504 pub(crate) type_type: crate::Type,
505 pub(crate) type_: WIPOffset<UnionWIPOffset>,
506 pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
507}
508
509pub(crate) fn build_field<'a>(
511 fbb: &mut FlatBufferBuilder<'a>,
512 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
513 field: &Field,
514) -> WIPOffset<crate::Field<'a>> {
515 let mut fb_metadata = None;
517 if !field.metadata().is_empty() {
518 fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
519 };
520
521 let fb_field_name = fbb.create_string(field.name().as_str());
522 let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
523
524 let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
525 match dictionary_tracker {
526 Some(tracker) => Some(get_fb_dictionary(
527 index_type,
528 tracker.next_dict_id(),
529 field
530 .dict_is_ordered()
531 .expect("All Dictionary types have `dict_is_ordered`"),
532 fbb,
533 )),
534 None => panic!("IPC must no longer be used without dictionary tracker"),
535 }
536 } else {
537 None
538 };
539
540 let mut field_builder = crate::FieldBuilder::new(fbb);
541 field_builder.add_name(fb_field_name);
542 if let Some(dictionary) = fb_dictionary {
543 field_builder.add_dictionary(dictionary)
544 }
545 field_builder.add_type_type(field_type.type_type);
546 field_builder.add_nullable(field.is_nullable());
547 match field_type.children {
548 None => {}
549 Some(children) => field_builder.add_children(children),
550 };
551 field_builder.add_type_(field_type.type_);
552
553 if let Some(fb_metadata) = fb_metadata {
554 field_builder.add_custom_metadata(fb_metadata);
555 }
556
557 field_builder.finish()
558}
559
560pub(crate) fn get_fb_field_type<'a>(
562 data_type: &DataType,
563 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
564 fbb: &mut FlatBufferBuilder<'a>,
565) -> FBFieldType<'a> {
566 let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
569 match data_type {
570 Null => FBFieldType {
571 type_type: crate::Type::Null,
572 type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
573 children: Some(fbb.create_vector(&empty_fields[..])),
574 },
575 Boolean => FBFieldType {
576 type_type: crate::Type::Bool,
577 type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
578 children: Some(fbb.create_vector(&empty_fields[..])),
579 },
580 UInt8 | UInt16 | UInt32 | UInt64 => {
581 let children = fbb.create_vector(&empty_fields[..]);
582 let mut builder = crate::IntBuilder::new(fbb);
583 builder.add_is_signed(false);
584 match data_type {
585 UInt8 => builder.add_bitWidth(8),
586 UInt16 => builder.add_bitWidth(16),
587 UInt32 => builder.add_bitWidth(32),
588 UInt64 => builder.add_bitWidth(64),
589 _ => {}
590 };
591 FBFieldType {
592 type_type: crate::Type::Int,
593 type_: builder.finish().as_union_value(),
594 children: Some(children),
595 }
596 }
597 Int8 | Int16 | Int32 | Int64 => {
598 let children = fbb.create_vector(&empty_fields[..]);
599 let mut builder = crate::IntBuilder::new(fbb);
600 builder.add_is_signed(true);
601 match data_type {
602 Int8 => builder.add_bitWidth(8),
603 Int16 => builder.add_bitWidth(16),
604 Int32 => builder.add_bitWidth(32),
605 Int64 => builder.add_bitWidth(64),
606 _ => {}
607 };
608 FBFieldType {
609 type_type: crate::Type::Int,
610 type_: builder.finish().as_union_value(),
611 children: Some(children),
612 }
613 }
614 Float16 | Float32 | Float64 => {
615 let children = fbb.create_vector(&empty_fields[..]);
616 let mut builder = crate::FloatingPointBuilder::new(fbb);
617 match data_type {
618 Float16 => builder.add_precision(crate::Precision::HALF),
619 Float32 => builder.add_precision(crate::Precision::SINGLE),
620 Float64 => builder.add_precision(crate::Precision::DOUBLE),
621 _ => {}
622 };
623 FBFieldType {
624 type_type: crate::Type::FloatingPoint,
625 type_: builder.finish().as_union_value(),
626 children: Some(children),
627 }
628 }
629 Binary => FBFieldType {
630 type_type: crate::Type::Binary,
631 type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
632 children: Some(fbb.create_vector(&empty_fields[..])),
633 },
634 LargeBinary => FBFieldType {
635 type_type: crate::Type::LargeBinary,
636 type_: crate::LargeBinaryBuilder::new(fbb)
637 .finish()
638 .as_union_value(),
639 children: Some(fbb.create_vector(&empty_fields[..])),
640 },
641 BinaryView => FBFieldType {
642 type_type: crate::Type::BinaryView,
643 type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
644 children: Some(fbb.create_vector(&empty_fields[..])),
645 },
646 Utf8View => FBFieldType {
647 type_type: crate::Type::Utf8View,
648 type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
649 children: Some(fbb.create_vector(&empty_fields[..])),
650 },
651 Utf8 => FBFieldType {
652 type_type: crate::Type::Utf8,
653 type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
654 children: Some(fbb.create_vector(&empty_fields[..])),
655 },
656 LargeUtf8 => FBFieldType {
657 type_type: crate::Type::LargeUtf8,
658 type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
659 children: Some(fbb.create_vector(&empty_fields[..])),
660 },
661 FixedSizeBinary(len) => {
662 let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
663 builder.add_byteWidth(*len);
664 FBFieldType {
665 type_type: crate::Type::FixedSizeBinary,
666 type_: builder.finish().as_union_value(),
667 children: Some(fbb.create_vector(&empty_fields[..])),
668 }
669 }
670 Date32 => {
671 let mut builder = crate::DateBuilder::new(fbb);
672 builder.add_unit(crate::DateUnit::DAY);
673 FBFieldType {
674 type_type: crate::Type::Date,
675 type_: builder.finish().as_union_value(),
676 children: Some(fbb.create_vector(&empty_fields[..])),
677 }
678 }
679 Date64 => {
680 let mut builder = crate::DateBuilder::new(fbb);
681 builder.add_unit(crate::DateUnit::MILLISECOND);
682 FBFieldType {
683 type_type: crate::Type::Date,
684 type_: builder.finish().as_union_value(),
685 children: Some(fbb.create_vector(&empty_fields[..])),
686 }
687 }
688 Time32(unit) | Time64(unit) => {
689 let mut builder = crate::TimeBuilder::new(fbb);
690 match unit {
691 TimeUnit::Second => {
692 builder.add_bitWidth(32);
693 builder.add_unit(crate::TimeUnit::SECOND);
694 }
695 TimeUnit::Millisecond => {
696 builder.add_bitWidth(32);
697 builder.add_unit(crate::TimeUnit::MILLISECOND);
698 }
699 TimeUnit::Microsecond => {
700 builder.add_bitWidth(64);
701 builder.add_unit(crate::TimeUnit::MICROSECOND);
702 }
703 TimeUnit::Nanosecond => {
704 builder.add_bitWidth(64);
705 builder.add_unit(crate::TimeUnit::NANOSECOND);
706 }
707 }
708 FBFieldType {
709 type_type: crate::Type::Time,
710 type_: builder.finish().as_union_value(),
711 children: Some(fbb.create_vector(&empty_fields[..])),
712 }
713 }
714 Timestamp(unit, tz) => {
715 let tz = tz.as_deref().unwrap_or_default();
716 let tz_str = fbb.create_string(tz);
717 let mut builder = crate::TimestampBuilder::new(fbb);
718 let time_unit = match unit {
719 TimeUnit::Second => crate::TimeUnit::SECOND,
720 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
721 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
722 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
723 };
724 builder.add_unit(time_unit);
725 if !tz.is_empty() {
726 builder.add_timezone(tz_str);
727 }
728 FBFieldType {
729 type_type: crate::Type::Timestamp,
730 type_: builder.finish().as_union_value(),
731 children: Some(fbb.create_vector(&empty_fields[..])),
732 }
733 }
734 Interval(unit) => {
735 let mut builder = crate::IntervalBuilder::new(fbb);
736 let interval_unit = match unit {
737 IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
738 IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
739 IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
740 };
741 builder.add_unit(interval_unit);
742 FBFieldType {
743 type_type: crate::Type::Interval,
744 type_: builder.finish().as_union_value(),
745 children: Some(fbb.create_vector(&empty_fields[..])),
746 }
747 }
748 Duration(unit) => {
749 let mut builder = crate::DurationBuilder::new(fbb);
750 let time_unit = match unit {
751 TimeUnit::Second => crate::TimeUnit::SECOND,
752 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
753 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
754 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
755 };
756 builder.add_unit(time_unit);
757 FBFieldType {
758 type_type: crate::Type::Duration,
759 type_: builder.finish().as_union_value(),
760 children: Some(fbb.create_vector(&empty_fields[..])),
761 }
762 }
763 List(ref list_type) => {
764 let child = build_field(fbb, dictionary_tracker, list_type);
765 FBFieldType {
766 type_type: crate::Type::List,
767 type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
768 children: Some(fbb.create_vector(&[child])),
769 }
770 }
771 ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
772 LargeList(ref list_type) => {
773 let child = build_field(fbb, dictionary_tracker, list_type);
774 FBFieldType {
775 type_type: crate::Type::LargeList,
776 type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
777 children: Some(fbb.create_vector(&[child])),
778 }
779 }
780 FixedSizeList(ref list_type, len) => {
781 let child = build_field(fbb, dictionary_tracker, list_type);
782 let mut builder = crate::FixedSizeListBuilder::new(fbb);
783 builder.add_listSize(*len);
784 FBFieldType {
785 type_type: crate::Type::FixedSizeList,
786 type_: builder.finish().as_union_value(),
787 children: Some(fbb.create_vector(&[child])),
788 }
789 }
790 Struct(fields) => {
791 let mut children = vec![];
793 for field in fields {
794 children.push(build_field(fbb, dictionary_tracker, field));
795 }
796 FBFieldType {
797 type_type: crate::Type::Struct_,
798 type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
799 children: Some(fbb.create_vector(&children[..])),
800 }
801 }
802 RunEndEncoded(run_ends, values) => {
803 let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
804 let values_field = build_field(fbb, dictionary_tracker, values);
805 let children = [run_ends_field, values_field];
806 FBFieldType {
807 type_type: crate::Type::RunEndEncoded,
808 type_: crate::RunEndEncodedBuilder::new(fbb)
809 .finish()
810 .as_union_value(),
811 children: Some(fbb.create_vector(&children[..])),
812 }
813 }
814 Map(map_field, keys_sorted) => {
815 let child = build_field(fbb, dictionary_tracker, map_field);
816 let mut field_type = crate::MapBuilder::new(fbb);
817 field_type.add_keysSorted(*keys_sorted);
818 FBFieldType {
819 type_type: crate::Type::Map,
820 type_: field_type.finish().as_union_value(),
821 children: Some(fbb.create_vector(&[child])),
822 }
823 }
824 Dictionary(_, value_type) => {
825 get_fb_field_type(value_type, dictionary_tracker, fbb)
829 }
830 Decimal32(precision, scale) => {
831 let mut builder = crate::DecimalBuilder::new(fbb);
832 builder.add_precision(*precision as i32);
833 builder.add_scale(*scale as i32);
834 builder.add_bitWidth(32);
835 FBFieldType {
836 type_type: crate::Type::Decimal,
837 type_: builder.finish().as_union_value(),
838 children: Some(fbb.create_vector(&empty_fields[..])),
839 }
840 }
841 Decimal64(precision, scale) => {
842 let mut builder = crate::DecimalBuilder::new(fbb);
843 builder.add_precision(*precision as i32);
844 builder.add_scale(*scale as i32);
845 builder.add_bitWidth(64);
846 FBFieldType {
847 type_type: crate::Type::Decimal,
848 type_: builder.finish().as_union_value(),
849 children: Some(fbb.create_vector(&empty_fields[..])),
850 }
851 }
852 Decimal128(precision, scale) => {
853 let mut builder = crate::DecimalBuilder::new(fbb);
854 builder.add_precision(*precision as i32);
855 builder.add_scale(*scale as i32);
856 builder.add_bitWidth(128);
857 FBFieldType {
858 type_type: crate::Type::Decimal,
859 type_: builder.finish().as_union_value(),
860 children: Some(fbb.create_vector(&empty_fields[..])),
861 }
862 }
863 Decimal256(precision, scale) => {
864 let mut builder = crate::DecimalBuilder::new(fbb);
865 builder.add_precision(*precision as i32);
866 builder.add_scale(*scale as i32);
867 builder.add_bitWidth(256);
868 FBFieldType {
869 type_type: crate::Type::Decimal,
870 type_: builder.finish().as_union_value(),
871 children: Some(fbb.create_vector(&empty_fields[..])),
872 }
873 }
874 Union(fields, mode) => {
875 let mut children = vec![];
876 for (_, field) in fields.iter() {
877 children.push(build_field(fbb, dictionary_tracker, field));
878 }
879
880 let union_mode = match mode {
881 UnionMode::Sparse => crate::UnionMode::Sparse,
882 UnionMode::Dense => crate::UnionMode::Dense,
883 };
884
885 let fbb_type_ids =
886 fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
887 let mut builder = crate::UnionBuilder::new(fbb);
888 builder.add_mode(union_mode);
889 builder.add_typeIds(fbb_type_ids);
890
891 FBFieldType {
892 type_type: crate::Type::Union,
893 type_: builder.finish().as_union_value(),
894 children: Some(fbb.create_vector(&children[..])),
895 }
896 }
897 }
898}
899
900pub(crate) fn get_fb_dictionary<'a>(
902 index_type: &DataType,
903 dict_id: i64,
904 dict_is_ordered: bool,
905 fbb: &mut FlatBufferBuilder<'a>,
906) -> WIPOffset<crate::DictionaryEncoding<'a>> {
907 let mut index_builder = crate::IntBuilder::new(fbb);
910
911 match *index_type {
912 Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
913 UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
914 _ => {}
915 }
916
917 match *index_type {
918 Int8 | UInt8 => index_builder.add_bitWidth(8),
919 Int16 | UInt16 => index_builder.add_bitWidth(16),
920 Int32 | UInt32 => index_builder.add_bitWidth(32),
921 Int64 | UInt64 => index_builder.add_bitWidth(64),
922 _ => {}
923 }
924
925 let index_builder = index_builder.finish();
926
927 let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
928 builder.add_id(dict_id);
929 builder.add_indexType(index_builder);
930 builder.add_isOrdered(dict_is_ordered);
931
932 builder.finish()
933}
934
935#[derive(Clone)]
947pub struct MessageBuffer(Buffer);
948
949impl Debug for MessageBuffer {
950 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
951 self.as_ref().fmt(f)
952 }
953}
954
955impl MessageBuffer {
956 pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
958 let opts = VerifierOptions::default();
959 let mut v = Verifier::new(&opts, &buf);
960 <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
961 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
962 })?;
963 Ok(Self(buf))
964 }
965
966 #[inline]
968 pub fn as_ref(&self) -> Message<'_> {
969 unsafe { crate::root_as_message_unchecked(&self.0) }
971 }
972}
973
974#[cfg(test)]
975mod tests {
976 use super::*;
977
978 #[test]
979 fn convert_schema_round_trip() {
980 let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
981 .iter()
982 .cloned()
983 .collect();
984 let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
985 .iter()
986 .cloned()
987 .collect();
988 let schema = Schema::new_with_metadata(
989 vec![
990 Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
991 Field::new("uint16", DataType::UInt16, true),
992 Field::new("uint32", DataType::UInt32, false),
993 Field::new("uint64", DataType::UInt64, true),
994 Field::new("int8", DataType::Int8, true),
995 Field::new("int16", DataType::Int16, false),
996 Field::new("int32", DataType::Int32, true),
997 Field::new("int64", DataType::Int64, false),
998 Field::new("float16", DataType::Float16, true),
999 Field::new("float32", DataType::Float32, false),
1000 Field::new("float64", DataType::Float64, true),
1001 Field::new("null", DataType::Null, false),
1002 Field::new("bool", DataType::Boolean, false),
1003 Field::new("date32", DataType::Date32, false),
1004 Field::new("date64", DataType::Date64, true),
1005 Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
1006 Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
1007 Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
1008 Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
1009 Field::new(
1010 "timestamp[s]",
1011 DataType::Timestamp(TimeUnit::Second, None),
1012 false,
1013 ),
1014 Field::new(
1015 "timestamp[ms]",
1016 DataType::Timestamp(TimeUnit::Millisecond, None),
1017 true,
1018 ),
1019 Field::new(
1020 "timestamp[us]",
1021 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1022 false,
1023 ),
1024 Field::new(
1025 "timestamp[ns]",
1026 DataType::Timestamp(TimeUnit::Nanosecond, None),
1027 true,
1028 ),
1029 Field::new(
1030 "interval[ym]",
1031 DataType::Interval(IntervalUnit::YearMonth),
1032 true,
1033 ),
1034 Field::new(
1035 "interval[dt]",
1036 DataType::Interval(IntervalUnit::DayTime),
1037 true,
1038 ),
1039 Field::new(
1040 "interval[mdn]",
1041 DataType::Interval(IntervalUnit::MonthDayNano),
1042 true,
1043 ),
1044 Field::new("utf8", DataType::Utf8, false),
1045 Field::new("utf8_view", DataType::Utf8View, false),
1046 Field::new("binary", DataType::Binary, false),
1047 Field::new("binary_view", DataType::BinaryView, false),
1048 Field::new_list(
1049 "list[u8]",
1050 Field::new_list_field(DataType::UInt8, false),
1051 true,
1052 ),
1053 Field::new_fixed_size_list(
1054 "fixed_size_list[u8]",
1055 Field::new_list_field(DataType::UInt8, false),
1056 2,
1057 true,
1058 ),
1059 Field::new_list(
1060 "list[struct<float32, int32, bool>]",
1061 Field::new_struct(
1062 "struct",
1063 vec![
1064 Field::new("float32", UInt8, false),
1065 Field::new("int32", Int32, true),
1066 Field::new("bool", Boolean, true),
1067 ],
1068 true,
1069 ),
1070 false,
1071 ),
1072 Field::new_struct(
1073 "struct<dictionary<int32, utf8>>",
1074 vec![Field::new(
1075 "dictionary<int32, utf8>",
1076 Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1077 false,
1078 )],
1079 false,
1080 ),
1081 Field::new_struct(
1082 "struct<int64, list[struct<date32, list[struct<>]>]>",
1083 vec![
1084 Field::new("int64", DataType::Int64, true),
1085 Field::new_list(
1086 "list[struct<date32, list[struct<>]>]",
1087 Field::new_struct(
1088 "struct",
1089 vec![
1090 Field::new("date32", DataType::Date32, true),
1091 Field::new_list(
1092 "list[struct<>]",
1093 Field::new(
1094 "struct",
1095 DataType::Struct(Fields::empty()),
1096 false,
1097 ),
1098 false,
1099 ),
1100 ],
1101 false,
1102 ),
1103 false,
1104 ),
1105 ],
1106 false,
1107 ),
1108 Field::new_union(
1109 "union<int64, list[union<date32, list[union<>]>]>",
1110 vec![0, 1],
1111 vec![
1112 Field::new("int64", DataType::Int64, true),
1113 Field::new_list(
1114 "list[union<date32, list[union<>]>]",
1115 Field::new_union(
1116 "union<date32, list[union<>]>",
1117 vec![0, 1],
1118 vec![
1119 Field::new("date32", DataType::Date32, true),
1120 Field::new_list(
1121 "list[union<>]",
1122 Field::new(
1123 "union",
1124 DataType::Union(
1125 UnionFields::empty(),
1126 UnionMode::Sparse,
1127 ),
1128 false,
1129 ),
1130 false,
1131 ),
1132 ],
1133 UnionMode::Dense,
1134 ),
1135 false,
1136 ),
1137 ],
1138 UnionMode::Sparse,
1139 ),
1140 Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1141 Field::new(
1142 "union<>",
1143 DataType::Union(UnionFields::empty(), UnionMode::Dense),
1144 true,
1145 ),
1146 Field::new(
1147 "union<>",
1148 DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1149 true,
1150 ),
1151 Field::new(
1152 "union<int32, utf8>",
1153 DataType::Union(
1154 UnionFields::new(
1155 vec![2, 3], vec![
1157 Field::new("int32", DataType::Int32, true),
1158 Field::new("utf8", DataType::Utf8, true),
1159 ],
1160 ),
1161 UnionMode::Dense,
1162 ),
1163 true,
1164 ),
1165 #[allow(deprecated)]
1166 Field::new_dict(
1167 "dictionary<int32, utf8>",
1168 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1169 true,
1170 123,
1171 true,
1172 ),
1173 #[allow(deprecated)]
1174 Field::new_dict(
1175 "dictionary<uint8, uint32>",
1176 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1177 true,
1178 123,
1179 true,
1180 ),
1181 Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1182 ],
1183 md,
1184 );
1185
1186 let mut dictionary_tracker = DictionaryTracker::new(true);
1187 let fb = IpcSchemaEncoder::new()
1188 .with_dictionary_tracker(&mut dictionary_tracker)
1189 .schema_to_fb(&schema);
1190
1191 let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1193 let schema2 = fb_to_schema(ipc);
1194 assert_eq!(schema, schema2);
1195 }
1196
1197 #[test]
1198 fn schema_from_bytes() {
1199 let bytes: Vec<u8> = vec![
1209 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1210 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1211 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1212 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1213 0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1214 ];
1215 let ipc = crate::root_as_message(&bytes).unwrap();
1216 let schema = ipc.header_as_schema().unwrap();
1217
1218 let data_gen = crate::writer::IpcDataGenerator::default();
1220 let mut dictionary_tracker = DictionaryTracker::new(true);
1221 let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1222 let bytes = data_gen
1223 .schema_to_bytes_with_dictionary_tracker(
1224 &arrow_schema,
1225 &mut dictionary_tracker,
1226 &crate::writer::IpcWriteOptions::default(),
1227 )
1228 .ipc_message;
1229
1230 let ipc2 = crate::root_as_message(&bytes).unwrap();
1231 let schema2 = ipc2.header_as_schema().unwrap();
1232
1233 assert!(schema.custom_metadata().is_none());
1235 assert!(schema2.custom_metadata().is_none());
1236 assert_eq!(schema.endianness(), schema2.endianness());
1237 assert!(schema.features().is_none());
1238 assert!(schema2.features().is_none());
1239 assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1240
1241 assert_eq!(ipc.version(), ipc2.version());
1242 assert_eq!(ipc.header_type(), ipc2.header_type());
1243 assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1244 assert!(ipc.custom_metadata().is_none());
1245 assert!(ipc2.custom_metadata().is_none());
1246 }
1247}