1use arrow_buffer::Buffer;
21use arrow_schema::*;
22use flatbuffers::{
23 FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
24 VerifierOptions, WIPOffset,
25};
26use std::collections::HashMap;
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use crate::writer::DictionaryTracker;
31use crate::{KeyValue, Message, CONTINUATION_MARKER};
32use DataType::*;
33
34#[derive(Debug)]
65pub struct IpcSchemaEncoder<'a> {
66 dictionary_tracker: Option<&'a mut DictionaryTracker>,
67}
68
69impl Default for IpcSchemaEncoder<'_> {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl<'a> IpcSchemaEncoder<'a> {
76 pub fn new() -> IpcSchemaEncoder<'a> {
78 IpcSchemaEncoder {
79 dictionary_tracker: None,
80 }
81 }
82
83 pub fn with_dictionary_tracker(
85 mut self,
86 dictionary_tracker: &'a mut DictionaryTracker,
87 ) -> Self {
88 self.dictionary_tracker = Some(dictionary_tracker);
89 self
90 }
91
92 pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
96 let mut fbb = FlatBufferBuilder::new();
97
98 let root = self.schema_to_fb_offset(&mut fbb, schema);
99
100 fbb.finish(root, None);
101
102 fbb
103 }
104
105 pub fn schema_to_fb_offset<'b>(
107 &mut self,
108 fbb: &mut FlatBufferBuilder<'b>,
109 schema: &Schema,
110 ) -> WIPOffset<crate::Schema<'b>> {
111 let fields = schema
112 .fields()
113 .iter()
114 .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
115 .collect::<Vec<_>>();
116 let fb_field_list = fbb.create_vector(&fields);
117
118 let fb_metadata_list =
119 (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
120
121 let mut builder = crate::SchemaBuilder::new(fbb);
122 builder.add_fields(fb_field_list);
123 if let Some(fb_metadata_list) = fb_metadata_list {
124 builder.add_custom_metadata(fb_metadata_list);
125 }
126 builder.finish()
127 }
128}
129
130#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")]
132pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> {
133 IpcSchemaEncoder::new().schema_to_fb(schema)
134}
135
136pub fn metadata_to_fb<'a>(
138 fbb: &mut FlatBufferBuilder<'a>,
139 metadata: &HashMap<String, String>,
140) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
141 let mut ordered_keys = metadata.keys().collect::<Vec<_>>();
142 ordered_keys.sort();
143 let custom_metadata = ordered_keys
144 .into_iter()
145 .map(|k| {
146 let v = metadata.get(k).unwrap();
147 let fb_key_name = fbb.create_string(k);
148 let fb_val_name = fbb.create_string(v);
149
150 let mut kv_builder = crate::KeyValueBuilder::new(fbb);
151 kv_builder.add_key(fb_key_name);
152 kv_builder.add_value(fb_val_name);
153 kv_builder.finish()
154 })
155 .collect::<Vec<_>>();
156 fbb.create_vector(&custom_metadata)
157}
158
159pub fn schema_to_fb_offset<'a>(
161 fbb: &mut FlatBufferBuilder<'a>,
162 schema: &Schema,
163) -> WIPOffset<crate::Schema<'a>> {
164 IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
165}
166
167impl From<crate::Field<'_>> for Field {
169 fn from(field: crate::Field) -> Field {
170 let arrow_field = if let Some(dictionary) = field.dictionary() {
171 #[allow(deprecated)]
172 Field::new_dict(
173 field.name().unwrap(),
174 get_data_type(field, true),
175 field.nullable(),
176 dictionary.id(),
177 dictionary.isOrdered(),
178 )
179 } else {
180 Field::new(
181 field.name().unwrap(),
182 get_data_type(field, true),
183 field.nullable(),
184 )
185 };
186
187 let mut metadata_map = HashMap::default();
188 if let Some(list) = field.custom_metadata() {
189 for kv in list {
190 if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
191 metadata_map.insert(k.to_string(), v.to_string());
192 }
193 }
194 }
195
196 arrow_field.with_metadata(metadata_map)
197 }
198}
199
200pub fn fb_to_schema(fb: crate::Schema) -> Schema {
202 let mut fields: Vec<Field> = vec![];
203 let c_fields = fb.fields().unwrap();
204 let len = c_fields.len();
205 for i in 0..len {
206 let c_field: crate::Field = c_fields.get(i);
207 match c_field.type_type() {
208 crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
209 unimplemented!("Big Endian is not supported for Decimal!")
210 }
211 _ => (),
212 };
213 fields.push(c_field.into());
214 }
215
216 let mut metadata: HashMap<String, String> = HashMap::default();
217 if let Some(md_fields) = fb.custom_metadata() {
218 let len = md_fields.len();
219 for i in 0..len {
220 let kv = md_fields.get(i);
221 let k_str = kv.key();
222 let v_str = kv.value();
223 if let Some(k) = k_str {
224 if let Some(v) = v_str {
225 metadata.insert(k.to_string(), v.to_string());
226 }
227 }
228 }
229 }
230 Schema::new_with_metadata(fields, metadata)
231}
232
233pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
235 if let Ok(ipc) = crate::root_as_message(bytes) {
236 if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
237 Ok(schema)
238 } else {
239 Err(ArrowError::ParseError(
240 "Unable to get head as schema".to_string(),
241 ))
242 }
243 } else {
244 Err(ArrowError::ParseError(
245 "Unable to get root as message".to_string(),
246 ))
247 }
248}
249
250pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
252 if buffer.len() < 4 {
262 return Err(ArrowError::ParseError(
263 "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
264 ));
265 }
266
267 let (len, buffer) = if buffer[..4] == CONTINUATION_MARKER {
268 if buffer.len() < 8 {
269 return Err(ArrowError::ParseError(
270 "The buffer length is less than 8 and missing the length of buffer".to_string(),
271 ));
272 }
273 buffer[4..].split_at(4)
274 } else {
275 buffer.split_at(4)
276 };
277
278 let len = <i32>::from_le_bytes(len.try_into().unwrap());
279 if len < 0 {
280 return Err(ArrowError::ParseError(format!(
281 "The encapsulated message's reported length is negative ({len})"
282 )));
283 }
284
285 if buffer.len() < len as usize {
286 let actual_len = buffer.len();
287 return Err(ArrowError::ParseError(
288 format!("The buffer length ({actual_len}) is less than the encapsulated message's reported length ({len})")
289 ));
290 }
291
292 let msg = crate::root_as_message(buffer)
293 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
294 let ipc_schema = msg.header_as_schema().ok_or_else(|| {
295 ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
296 })?;
297 Ok(fb_to_schema(ipc_schema))
298}
299
300pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
302 if let Some(dictionary) = field.dictionary() {
303 if may_be_dictionary {
304 let int = dictionary.indexType().unwrap();
305 let index_type = match (int.bitWidth(), int.is_signed()) {
306 (8, true) => DataType::Int8,
307 (8, false) => DataType::UInt8,
308 (16, true) => DataType::Int16,
309 (16, false) => DataType::UInt16,
310 (32, true) => DataType::Int32,
311 (32, false) => DataType::UInt32,
312 (64, true) => DataType::Int64,
313 (64, false) => DataType::UInt64,
314 _ => panic!("Unexpected bitwidth and signed"),
315 };
316 return DataType::Dictionary(
317 Box::new(index_type),
318 Box::new(get_data_type(field, false)),
319 );
320 }
321 }
322
323 match field.type_type() {
324 crate::Type::Null => DataType::Null,
325 crate::Type::Bool => DataType::Boolean,
326 crate::Type::Int => {
327 let int = field.type_as_int().unwrap();
328 match (int.bitWidth(), int.is_signed()) {
329 (8, true) => DataType::Int8,
330 (8, false) => DataType::UInt8,
331 (16, true) => DataType::Int16,
332 (16, false) => DataType::UInt16,
333 (32, true) => DataType::Int32,
334 (32, false) => DataType::UInt32,
335 (64, true) => DataType::Int64,
336 (64, false) => DataType::UInt64,
337 z => panic!(
338 "Int type with bit width of {} and signed of {} not supported",
339 z.0, z.1
340 ),
341 }
342 }
343 crate::Type::Binary => DataType::Binary,
344 crate::Type::BinaryView => DataType::BinaryView,
345 crate::Type::LargeBinary => DataType::LargeBinary,
346 crate::Type::Utf8 => DataType::Utf8,
347 crate::Type::Utf8View => DataType::Utf8View,
348 crate::Type::LargeUtf8 => DataType::LargeUtf8,
349 crate::Type::FixedSizeBinary => {
350 let fsb = field.type_as_fixed_size_binary().unwrap();
351 DataType::FixedSizeBinary(fsb.byteWidth())
352 }
353 crate::Type::FloatingPoint => {
354 let float = field.type_as_floating_point().unwrap();
355 match float.precision() {
356 crate::Precision::HALF => DataType::Float16,
357 crate::Precision::SINGLE => DataType::Float32,
358 crate::Precision::DOUBLE => DataType::Float64,
359 z => panic!("FloatingPoint type with precision of {z:?} not supported"),
360 }
361 }
362 crate::Type::Date => {
363 let date = field.type_as_date().unwrap();
364 match date.unit() {
365 crate::DateUnit::DAY => DataType::Date32,
366 crate::DateUnit::MILLISECOND => DataType::Date64,
367 z => panic!("Date type with unit of {z:?} not supported"),
368 }
369 }
370 crate::Type::Time => {
371 let time = field.type_as_time().unwrap();
372 match (time.bitWidth(), time.unit()) {
373 (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
374 (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
375 (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
376 (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
377 z => panic!(
378 "Time type with bit width of {} and unit of {:?} not supported",
379 z.0, z.1
380 ),
381 }
382 }
383 crate::Type::Timestamp => {
384 let timestamp = field.type_as_timestamp().unwrap();
385 let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
386 match timestamp.unit() {
387 crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
388 crate::TimeUnit::MILLISECOND => {
389 DataType::Timestamp(TimeUnit::Millisecond, timezone)
390 }
391 crate::TimeUnit::MICROSECOND => {
392 DataType::Timestamp(TimeUnit::Microsecond, timezone)
393 }
394 crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
395 z => panic!("Timestamp type with unit of {z:?} not supported"),
396 }
397 }
398 crate::Type::Interval => {
399 let interval = field.type_as_interval().unwrap();
400 match interval.unit() {
401 crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
402 crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
403 crate::IntervalUnit::MONTH_DAY_NANO => {
404 DataType::Interval(IntervalUnit::MonthDayNano)
405 }
406 z => panic!("Interval type with unit of {z:?} unsupported"),
407 }
408 }
409 crate::Type::Duration => {
410 let duration = field.type_as_duration().unwrap();
411 match duration.unit() {
412 crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
413 crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
414 crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
415 crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
416 z => panic!("Duration type with unit of {z:?} unsupported"),
417 }
418 }
419 crate::Type::List => {
420 let children = field.children().unwrap();
421 if children.len() != 1 {
422 panic!("expect a list to have one child")
423 }
424 DataType::List(Arc::new(children.get(0).into()))
425 }
426 crate::Type::LargeList => {
427 let children = field.children().unwrap();
428 if children.len() != 1 {
429 panic!("expect a large list to have one child")
430 }
431 DataType::LargeList(Arc::new(children.get(0).into()))
432 }
433 crate::Type::FixedSizeList => {
434 let children = field.children().unwrap();
435 if children.len() != 1 {
436 panic!("expect a list to have one child")
437 }
438 let fsl = field.type_as_fixed_size_list().unwrap();
439 DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
440 }
441 crate::Type::Struct_ => {
442 let fields = match field.children() {
443 Some(children) => children.iter().map(Field::from).collect(),
444 None => Fields::empty(),
445 };
446 DataType::Struct(fields)
447 }
448 crate::Type::RunEndEncoded => {
449 let children = field.children().unwrap();
450 if children.len() != 2 {
451 panic!(
452 "RunEndEncoded type should have exactly two children. Found {}",
453 children.len()
454 )
455 }
456 let run_ends_field = children.get(0).into();
457 let values_field = children.get(1).into();
458 DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
459 }
460 crate::Type::Map => {
461 let map = field.type_as_map().unwrap();
462 let children = field.children().unwrap();
463 if children.len() != 1 {
464 panic!("expect a map to have one child")
465 }
466 DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
467 }
468 crate::Type::Decimal => {
469 let fsb = field.type_as_decimal().unwrap();
470 let bit_width = fsb.bitWidth();
471 let precision: u8 = fsb.precision().try_into().unwrap();
472 let scale: i8 = fsb.scale().try_into().unwrap();
473 match bit_width {
474 128 => DataType::Decimal128(precision, scale),
475 256 => DataType::Decimal256(precision, scale),
476 _ => panic!("Unexpected decimal bit width {bit_width}"),
477 }
478 }
479 crate::Type::Union => {
480 let union = field.type_as_union().unwrap();
481
482 let union_mode = match union.mode() {
483 crate::UnionMode::Dense => UnionMode::Dense,
484 crate::UnionMode::Sparse => UnionMode::Sparse,
485 mode => panic!("Unexpected union mode: {mode:?}"),
486 };
487
488 let mut fields = vec![];
489 if let Some(children) = field.children() {
490 for i in 0..children.len() {
491 fields.push(Field::from(children.get(i)));
492 }
493 };
494
495 let fields = match union.typeIds() {
496 None => UnionFields::new(0_i8..fields.len() as i8, fields),
497 Some(ids) => UnionFields::new(ids.iter().map(|i| i as i8), fields),
498 };
499
500 DataType::Union(fields, union_mode)
501 }
502 t => unimplemented!("Type {:?} not supported", t),
503 }
504}
505
506pub(crate) struct FBFieldType<'b> {
507 pub(crate) type_type: crate::Type,
508 pub(crate) type_: WIPOffset<UnionWIPOffset>,
509 pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
510}
511
512pub(crate) fn build_field<'a>(
514 fbb: &mut FlatBufferBuilder<'a>,
515 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
516 field: &Field,
517) -> WIPOffset<crate::Field<'a>> {
518 let mut fb_metadata = None;
520 if !field.metadata().is_empty() {
521 fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
522 };
523
524 let fb_field_name = fbb.create_string(field.name().as_str());
525 let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
526
527 let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
528 match dictionary_tracker {
529 Some(tracker) => Some(get_fb_dictionary(
530 index_type,
531 #[allow(deprecated)]
532 tracker.set_dict_id(field),
533 field
534 .dict_is_ordered()
535 .expect("All Dictionary types have `dict_is_ordered`"),
536 fbb,
537 )),
538 None => Some(get_fb_dictionary(
539 index_type,
540 #[allow(deprecated)]
541 field
542 .dict_id()
543 .expect("Dictionary type must have a dictionary id"),
544 field
545 .dict_is_ordered()
546 .expect("All Dictionary types have `dict_is_ordered`"),
547 fbb,
548 )),
549 }
550 } else {
551 None
552 };
553
554 let mut field_builder = crate::FieldBuilder::new(fbb);
555 field_builder.add_name(fb_field_name);
556 if let Some(dictionary) = fb_dictionary {
557 field_builder.add_dictionary(dictionary)
558 }
559 field_builder.add_type_type(field_type.type_type);
560 field_builder.add_nullable(field.is_nullable());
561 match field_type.children {
562 None => {}
563 Some(children) => field_builder.add_children(children),
564 };
565 field_builder.add_type_(field_type.type_);
566
567 if let Some(fb_metadata) = fb_metadata {
568 field_builder.add_custom_metadata(fb_metadata);
569 }
570
571 field_builder.finish()
572}
573
574pub(crate) fn get_fb_field_type<'a>(
576 data_type: &DataType,
577 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
578 fbb: &mut FlatBufferBuilder<'a>,
579) -> FBFieldType<'a> {
580 let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
583 match data_type {
584 Null => FBFieldType {
585 type_type: crate::Type::Null,
586 type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
587 children: Some(fbb.create_vector(&empty_fields[..])),
588 },
589 Boolean => FBFieldType {
590 type_type: crate::Type::Bool,
591 type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
592 children: Some(fbb.create_vector(&empty_fields[..])),
593 },
594 UInt8 | UInt16 | UInt32 | UInt64 => {
595 let children = fbb.create_vector(&empty_fields[..]);
596 let mut builder = crate::IntBuilder::new(fbb);
597 builder.add_is_signed(false);
598 match data_type {
599 UInt8 => builder.add_bitWidth(8),
600 UInt16 => builder.add_bitWidth(16),
601 UInt32 => builder.add_bitWidth(32),
602 UInt64 => builder.add_bitWidth(64),
603 _ => {}
604 };
605 FBFieldType {
606 type_type: crate::Type::Int,
607 type_: builder.finish().as_union_value(),
608 children: Some(children),
609 }
610 }
611 Int8 | Int16 | Int32 | Int64 => {
612 let children = fbb.create_vector(&empty_fields[..]);
613 let mut builder = crate::IntBuilder::new(fbb);
614 builder.add_is_signed(true);
615 match data_type {
616 Int8 => builder.add_bitWidth(8),
617 Int16 => builder.add_bitWidth(16),
618 Int32 => builder.add_bitWidth(32),
619 Int64 => builder.add_bitWidth(64),
620 _ => {}
621 };
622 FBFieldType {
623 type_type: crate::Type::Int,
624 type_: builder.finish().as_union_value(),
625 children: Some(children),
626 }
627 }
628 Float16 | Float32 | Float64 => {
629 let children = fbb.create_vector(&empty_fields[..]);
630 let mut builder = crate::FloatingPointBuilder::new(fbb);
631 match data_type {
632 Float16 => builder.add_precision(crate::Precision::HALF),
633 Float32 => builder.add_precision(crate::Precision::SINGLE),
634 Float64 => builder.add_precision(crate::Precision::DOUBLE),
635 _ => {}
636 };
637 FBFieldType {
638 type_type: crate::Type::FloatingPoint,
639 type_: builder.finish().as_union_value(),
640 children: Some(children),
641 }
642 }
643 Binary => FBFieldType {
644 type_type: crate::Type::Binary,
645 type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
646 children: Some(fbb.create_vector(&empty_fields[..])),
647 },
648 LargeBinary => FBFieldType {
649 type_type: crate::Type::LargeBinary,
650 type_: crate::LargeBinaryBuilder::new(fbb)
651 .finish()
652 .as_union_value(),
653 children: Some(fbb.create_vector(&empty_fields[..])),
654 },
655 BinaryView => FBFieldType {
656 type_type: crate::Type::BinaryView,
657 type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
658 children: Some(fbb.create_vector(&empty_fields[..])),
659 },
660 Utf8View => FBFieldType {
661 type_type: crate::Type::Utf8View,
662 type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
663 children: Some(fbb.create_vector(&empty_fields[..])),
664 },
665 Utf8 => FBFieldType {
666 type_type: crate::Type::Utf8,
667 type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
668 children: Some(fbb.create_vector(&empty_fields[..])),
669 },
670 LargeUtf8 => FBFieldType {
671 type_type: crate::Type::LargeUtf8,
672 type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
673 children: Some(fbb.create_vector(&empty_fields[..])),
674 },
675 FixedSizeBinary(len) => {
676 let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
677 builder.add_byteWidth(*len);
678 FBFieldType {
679 type_type: crate::Type::FixedSizeBinary,
680 type_: builder.finish().as_union_value(),
681 children: Some(fbb.create_vector(&empty_fields[..])),
682 }
683 }
684 Date32 => {
685 let mut builder = crate::DateBuilder::new(fbb);
686 builder.add_unit(crate::DateUnit::DAY);
687 FBFieldType {
688 type_type: crate::Type::Date,
689 type_: builder.finish().as_union_value(),
690 children: Some(fbb.create_vector(&empty_fields[..])),
691 }
692 }
693 Date64 => {
694 let mut builder = crate::DateBuilder::new(fbb);
695 builder.add_unit(crate::DateUnit::MILLISECOND);
696 FBFieldType {
697 type_type: crate::Type::Date,
698 type_: builder.finish().as_union_value(),
699 children: Some(fbb.create_vector(&empty_fields[..])),
700 }
701 }
702 Time32(unit) | Time64(unit) => {
703 let mut builder = crate::TimeBuilder::new(fbb);
704 match unit {
705 TimeUnit::Second => {
706 builder.add_bitWidth(32);
707 builder.add_unit(crate::TimeUnit::SECOND);
708 }
709 TimeUnit::Millisecond => {
710 builder.add_bitWidth(32);
711 builder.add_unit(crate::TimeUnit::MILLISECOND);
712 }
713 TimeUnit::Microsecond => {
714 builder.add_bitWidth(64);
715 builder.add_unit(crate::TimeUnit::MICROSECOND);
716 }
717 TimeUnit::Nanosecond => {
718 builder.add_bitWidth(64);
719 builder.add_unit(crate::TimeUnit::NANOSECOND);
720 }
721 }
722 FBFieldType {
723 type_type: crate::Type::Time,
724 type_: builder.finish().as_union_value(),
725 children: Some(fbb.create_vector(&empty_fields[..])),
726 }
727 }
728 Timestamp(unit, tz) => {
729 let tz = tz.as_deref().unwrap_or_default();
730 let tz_str = fbb.create_string(tz);
731 let mut builder = crate::TimestampBuilder::new(fbb);
732 let time_unit = match unit {
733 TimeUnit::Second => crate::TimeUnit::SECOND,
734 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
735 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
736 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
737 };
738 builder.add_unit(time_unit);
739 if !tz.is_empty() {
740 builder.add_timezone(tz_str);
741 }
742 FBFieldType {
743 type_type: crate::Type::Timestamp,
744 type_: builder.finish().as_union_value(),
745 children: Some(fbb.create_vector(&empty_fields[..])),
746 }
747 }
748 Interval(unit) => {
749 let mut builder = crate::IntervalBuilder::new(fbb);
750 let interval_unit = match unit {
751 IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
752 IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
753 IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
754 };
755 builder.add_unit(interval_unit);
756 FBFieldType {
757 type_type: crate::Type::Interval,
758 type_: builder.finish().as_union_value(),
759 children: Some(fbb.create_vector(&empty_fields[..])),
760 }
761 }
762 Duration(unit) => {
763 let mut builder = crate::DurationBuilder::new(fbb);
764 let time_unit = match unit {
765 TimeUnit::Second => crate::TimeUnit::SECOND,
766 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
767 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
768 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
769 };
770 builder.add_unit(time_unit);
771 FBFieldType {
772 type_type: crate::Type::Duration,
773 type_: builder.finish().as_union_value(),
774 children: Some(fbb.create_vector(&empty_fields[..])),
775 }
776 }
777 List(ref list_type) => {
778 let child = build_field(fbb, dictionary_tracker, list_type);
779 FBFieldType {
780 type_type: crate::Type::List,
781 type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
782 children: Some(fbb.create_vector(&[child])),
783 }
784 }
785 ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
786 LargeList(ref list_type) => {
787 let child = build_field(fbb, dictionary_tracker, list_type);
788 FBFieldType {
789 type_type: crate::Type::LargeList,
790 type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
791 children: Some(fbb.create_vector(&[child])),
792 }
793 }
794 FixedSizeList(ref list_type, len) => {
795 let child = build_field(fbb, dictionary_tracker, list_type);
796 let mut builder = crate::FixedSizeListBuilder::new(fbb);
797 builder.add_listSize(*len);
798 FBFieldType {
799 type_type: crate::Type::FixedSizeList,
800 type_: builder.finish().as_union_value(),
801 children: Some(fbb.create_vector(&[child])),
802 }
803 }
804 Struct(fields) => {
805 let mut children = vec![];
807 for field in fields {
808 children.push(build_field(fbb, dictionary_tracker, field));
809 }
810 FBFieldType {
811 type_type: crate::Type::Struct_,
812 type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
813 children: Some(fbb.create_vector(&children[..])),
814 }
815 }
816 RunEndEncoded(run_ends, values) => {
817 let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
818 let values_field = build_field(fbb, dictionary_tracker, values);
819 let children = [run_ends_field, values_field];
820 FBFieldType {
821 type_type: crate::Type::RunEndEncoded,
822 type_: crate::RunEndEncodedBuilder::new(fbb)
823 .finish()
824 .as_union_value(),
825 children: Some(fbb.create_vector(&children[..])),
826 }
827 }
828 Map(map_field, keys_sorted) => {
829 let child = build_field(fbb, dictionary_tracker, map_field);
830 let mut field_type = crate::MapBuilder::new(fbb);
831 field_type.add_keysSorted(*keys_sorted);
832 FBFieldType {
833 type_type: crate::Type::Map,
834 type_: field_type.finish().as_union_value(),
835 children: Some(fbb.create_vector(&[child])),
836 }
837 }
838 Dictionary(_, value_type) => {
839 get_fb_field_type(value_type, dictionary_tracker, fbb)
843 }
844 Decimal128(precision, scale) => {
845 let mut builder = crate::DecimalBuilder::new(fbb);
846 builder.add_precision(*precision as i32);
847 builder.add_scale(*scale as i32);
848 builder.add_bitWidth(128);
849 FBFieldType {
850 type_type: crate::Type::Decimal,
851 type_: builder.finish().as_union_value(),
852 children: Some(fbb.create_vector(&empty_fields[..])),
853 }
854 }
855 Decimal256(precision, scale) => {
856 let mut builder = crate::DecimalBuilder::new(fbb);
857 builder.add_precision(*precision as i32);
858 builder.add_scale(*scale as i32);
859 builder.add_bitWidth(256);
860 FBFieldType {
861 type_type: crate::Type::Decimal,
862 type_: builder.finish().as_union_value(),
863 children: Some(fbb.create_vector(&empty_fields[..])),
864 }
865 }
866 Union(fields, mode) => {
867 let mut children = vec![];
868 for (_, field) in fields.iter() {
869 children.push(build_field(fbb, dictionary_tracker, field));
870 }
871
872 let union_mode = match mode {
873 UnionMode::Sparse => crate::UnionMode::Sparse,
874 UnionMode::Dense => crate::UnionMode::Dense,
875 };
876
877 let fbb_type_ids =
878 fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
879 let mut builder = crate::UnionBuilder::new(fbb);
880 builder.add_mode(union_mode);
881 builder.add_typeIds(fbb_type_ids);
882
883 FBFieldType {
884 type_type: crate::Type::Union,
885 type_: builder.finish().as_union_value(),
886 children: Some(fbb.create_vector(&children[..])),
887 }
888 }
889 }
890}
891
892pub(crate) fn get_fb_dictionary<'a>(
894 index_type: &DataType,
895 dict_id: i64,
896 dict_is_ordered: bool,
897 fbb: &mut FlatBufferBuilder<'a>,
898) -> WIPOffset<crate::DictionaryEncoding<'a>> {
899 let mut index_builder = crate::IntBuilder::new(fbb);
902
903 match *index_type {
904 Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
905 UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
906 _ => {}
907 }
908
909 match *index_type {
910 Int8 | UInt8 => index_builder.add_bitWidth(8),
911 Int16 | UInt16 => index_builder.add_bitWidth(16),
912 Int32 | UInt32 => index_builder.add_bitWidth(32),
913 Int64 | UInt64 => index_builder.add_bitWidth(64),
914 _ => {}
915 }
916
917 let index_builder = index_builder.finish();
918
919 let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
920 builder.add_id(dict_id);
921 builder.add_indexType(index_builder);
922 builder.add_isOrdered(dict_is_ordered);
923
924 builder.finish()
925}
926
927#[derive(Clone)]
939pub struct MessageBuffer(Buffer);
940
941impl Debug for MessageBuffer {
942 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
943 self.as_ref().fmt(f)
944 }
945}
946
947impl MessageBuffer {
948 pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
950 let opts = VerifierOptions::default();
951 let mut v = Verifier::new(&opts, &buf);
952 <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
953 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
954 })?;
955 Ok(Self(buf))
956 }
957
958 #[inline]
960 pub fn as_ref(&self) -> Message<'_> {
961 unsafe { crate::root_as_message_unchecked(&self.0) }
963 }
964}
965
966#[cfg(test)]
967mod tests {
968 use super::*;
969
970 #[test]
971 fn convert_schema_round_trip() {
972 let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
973 .iter()
974 .cloned()
975 .collect();
976 let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
977 .iter()
978 .cloned()
979 .collect();
980 let schema = Schema::new_with_metadata(
981 vec![
982 Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
983 Field::new("uint16", DataType::UInt16, true),
984 Field::new("uint32", DataType::UInt32, false),
985 Field::new("uint64", DataType::UInt64, true),
986 Field::new("int8", DataType::Int8, true),
987 Field::new("int16", DataType::Int16, false),
988 Field::new("int32", DataType::Int32, true),
989 Field::new("int64", DataType::Int64, false),
990 Field::new("float16", DataType::Float16, true),
991 Field::new("float32", DataType::Float32, false),
992 Field::new("float64", DataType::Float64, true),
993 Field::new("null", DataType::Null, false),
994 Field::new("bool", DataType::Boolean, false),
995 Field::new("date32", DataType::Date32, false),
996 Field::new("date64", DataType::Date64, true),
997 Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
998 Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
999 Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
1000 Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
1001 Field::new(
1002 "timestamp[s]",
1003 DataType::Timestamp(TimeUnit::Second, None),
1004 false,
1005 ),
1006 Field::new(
1007 "timestamp[ms]",
1008 DataType::Timestamp(TimeUnit::Millisecond, None),
1009 true,
1010 ),
1011 Field::new(
1012 "timestamp[us]",
1013 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1014 false,
1015 ),
1016 Field::new(
1017 "timestamp[ns]",
1018 DataType::Timestamp(TimeUnit::Nanosecond, None),
1019 true,
1020 ),
1021 Field::new(
1022 "interval[ym]",
1023 DataType::Interval(IntervalUnit::YearMonth),
1024 true,
1025 ),
1026 Field::new(
1027 "interval[dt]",
1028 DataType::Interval(IntervalUnit::DayTime),
1029 true,
1030 ),
1031 Field::new(
1032 "interval[mdn]",
1033 DataType::Interval(IntervalUnit::MonthDayNano),
1034 true,
1035 ),
1036 Field::new("utf8", DataType::Utf8, false),
1037 Field::new("utf8_view", DataType::Utf8View, false),
1038 Field::new("binary", DataType::Binary, false),
1039 Field::new("binary_view", DataType::BinaryView, false),
1040 Field::new_list(
1041 "list[u8]",
1042 Field::new_list_field(DataType::UInt8, false),
1043 true,
1044 ),
1045 Field::new_fixed_size_list(
1046 "fixed_size_list[u8]",
1047 Field::new_list_field(DataType::UInt8, false),
1048 2,
1049 true,
1050 ),
1051 Field::new_list(
1052 "list[struct<float32, int32, bool>]",
1053 Field::new_struct(
1054 "struct",
1055 vec![
1056 Field::new("float32", UInt8, false),
1057 Field::new("int32", Int32, true),
1058 Field::new("bool", Boolean, true),
1059 ],
1060 true,
1061 ),
1062 false,
1063 ),
1064 Field::new_struct(
1065 "struct<dictionary<int32, utf8>>",
1066 vec![Field::new(
1067 "dictionary<int32, utf8>",
1068 Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1069 false,
1070 )],
1071 false,
1072 ),
1073 Field::new_struct(
1074 "struct<int64, list[struct<date32, list[struct<>]>]>",
1075 vec![
1076 Field::new("int64", DataType::Int64, true),
1077 Field::new_list(
1078 "list[struct<date32, list[struct<>]>]",
1079 Field::new_struct(
1080 "struct",
1081 vec![
1082 Field::new("date32", DataType::Date32, true),
1083 Field::new_list(
1084 "list[struct<>]",
1085 Field::new(
1086 "struct",
1087 DataType::Struct(Fields::empty()),
1088 false,
1089 ),
1090 false,
1091 ),
1092 ],
1093 false,
1094 ),
1095 false,
1096 ),
1097 ],
1098 false,
1099 ),
1100 Field::new_union(
1101 "union<int64, list[union<date32, list[union<>]>]>",
1102 vec![0, 1],
1103 vec![
1104 Field::new("int64", DataType::Int64, true),
1105 Field::new_list(
1106 "list[union<date32, list[union<>]>]",
1107 Field::new_union(
1108 "union<date32, list[union<>]>",
1109 vec![0, 1],
1110 vec![
1111 Field::new("date32", DataType::Date32, true),
1112 Field::new_list(
1113 "list[union<>]",
1114 Field::new(
1115 "union",
1116 DataType::Union(
1117 UnionFields::empty(),
1118 UnionMode::Sparse,
1119 ),
1120 false,
1121 ),
1122 false,
1123 ),
1124 ],
1125 UnionMode::Dense,
1126 ),
1127 false,
1128 ),
1129 ],
1130 UnionMode::Sparse,
1131 ),
1132 Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1133 Field::new(
1134 "union<>",
1135 DataType::Union(UnionFields::empty(), UnionMode::Dense),
1136 true,
1137 ),
1138 Field::new(
1139 "union<>",
1140 DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1141 true,
1142 ),
1143 Field::new(
1144 "union<int32, utf8>",
1145 DataType::Union(
1146 UnionFields::new(
1147 vec![2, 3], vec![
1149 Field::new("int32", DataType::Int32, true),
1150 Field::new("utf8", DataType::Utf8, true),
1151 ],
1152 ),
1153 UnionMode::Dense,
1154 ),
1155 true,
1156 ),
1157 #[allow(deprecated)]
1158 Field::new_dict(
1159 "dictionary<int32, utf8>",
1160 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1161 true,
1162 123,
1163 true,
1164 ),
1165 #[allow(deprecated)]
1166 Field::new_dict(
1167 "dictionary<uint8, uint32>",
1168 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1169 true,
1170 123,
1171 true,
1172 ),
1173 Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1174 ],
1175 md,
1176 );
1177
1178 let mut dictionary_tracker = DictionaryTracker::new(true);
1179 let fb = IpcSchemaEncoder::new()
1180 .with_dictionary_tracker(&mut dictionary_tracker)
1181 .schema_to_fb(&schema);
1182
1183 let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1185 let schema2 = fb_to_schema(ipc);
1186 assert_eq!(schema, schema2);
1187 }
1188
1189 #[test]
1190 fn schema_from_bytes() {
1191 let bytes: Vec<u8> = vec![
1201 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1202 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1203 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1204 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1205 0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1206 ];
1207 let ipc = crate::root_as_message(&bytes).unwrap();
1208 let schema = ipc.header_as_schema().unwrap();
1209
1210 let data_gen = crate::writer::IpcDataGenerator::default();
1212 let mut dictionary_tracker = DictionaryTracker::new(true);
1213 let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1214 let bytes = data_gen
1215 .schema_to_bytes_with_dictionary_tracker(
1216 &arrow_schema,
1217 &mut dictionary_tracker,
1218 &crate::writer::IpcWriteOptions::default(),
1219 )
1220 .ipc_message;
1221
1222 let ipc2 = crate::root_as_message(&bytes).unwrap();
1223 let schema2 = ipc2.header_as_schema().unwrap();
1224
1225 assert!(schema.custom_metadata().is_none());
1227 assert!(schema2.custom_metadata().is_none());
1228 assert_eq!(schema.endianness(), schema2.endianness());
1229 assert!(schema.features().is_none());
1230 assert!(schema2.features().is_none());
1231 assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1232
1233 assert_eq!(ipc.version(), ipc2.version());
1234 assert_eq!(ipc.header_type(), ipc2.header_type());
1235 assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1236 assert!(ipc.custom_metadata().is_none());
1237 assert!(ipc2.custom_metadata().is_none());
1238 }
1239}