1use arrow_buffer::Buffer;
21use arrow_schema::*;
22use flatbuffers::{
23 FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
24 VerifierOptions, WIPOffset,
25};
26use std::collections::HashMap;
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use crate::writer::DictionaryTracker;
31use crate::{KeyValue, Message, CONTINUATION_MARKER};
32use DataType::*;
33
34#[derive(Debug)]
65pub struct IpcSchemaEncoder<'a> {
66 dictionary_tracker: Option<&'a mut DictionaryTracker>,
67}
68
69impl Default for IpcSchemaEncoder<'_> {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl<'a> IpcSchemaEncoder<'a> {
76 pub fn new() -> IpcSchemaEncoder<'a> {
78 IpcSchemaEncoder {
79 dictionary_tracker: None,
80 }
81 }
82
83 pub fn with_dictionary_tracker(
85 mut self,
86 dictionary_tracker: &'a mut DictionaryTracker,
87 ) -> Self {
88 self.dictionary_tracker = Some(dictionary_tracker);
89 self
90 }
91
92 pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
96 let mut fbb = FlatBufferBuilder::new();
97
98 let root = self.schema_to_fb_offset(&mut fbb, schema);
99
100 fbb.finish(root, None);
101
102 fbb
103 }
104
105 pub fn schema_to_fb_offset<'b>(
107 &mut self,
108 fbb: &mut FlatBufferBuilder<'b>,
109 schema: &Schema,
110 ) -> WIPOffset<crate::Schema<'b>> {
111 let fields = schema
112 .fields()
113 .iter()
114 .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
115 .collect::<Vec<_>>();
116 let fb_field_list = fbb.create_vector(&fields);
117
118 let fb_metadata_list =
119 (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
120
121 let mut builder = crate::SchemaBuilder::new(fbb);
122 builder.add_fields(fb_field_list);
123 if let Some(fb_metadata_list) = fb_metadata_list {
124 builder.add_custom_metadata(fb_metadata_list);
125 }
126 builder.finish()
127 }
128}
129
130#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")]
132pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> {
133 IpcSchemaEncoder::new().schema_to_fb(schema)
134}
135
136pub fn metadata_to_fb<'a>(
138 fbb: &mut FlatBufferBuilder<'a>,
139 metadata: &HashMap<String, String>,
140) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
141 let custom_metadata = metadata
142 .iter()
143 .map(|(k, v)| {
144 let fb_key_name = fbb.create_string(k);
145 let fb_val_name = fbb.create_string(v);
146
147 let mut kv_builder = crate::KeyValueBuilder::new(fbb);
148 kv_builder.add_key(fb_key_name);
149 kv_builder.add_value(fb_val_name);
150 kv_builder.finish()
151 })
152 .collect::<Vec<_>>();
153 fbb.create_vector(&custom_metadata)
154}
155
156pub fn schema_to_fb_offset<'a>(
158 fbb: &mut FlatBufferBuilder<'a>,
159 schema: &Schema,
160) -> WIPOffset<crate::Schema<'a>> {
161 IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
162}
163
164impl From<crate::Field<'_>> for Field {
166 fn from(field: crate::Field) -> Field {
167 let arrow_field = if let Some(dictionary) = field.dictionary() {
168 #[allow(deprecated)]
169 Field::new_dict(
170 field.name().unwrap(),
171 get_data_type(field, true),
172 field.nullable(),
173 dictionary.id(),
174 dictionary.isOrdered(),
175 )
176 } else {
177 Field::new(
178 field.name().unwrap(),
179 get_data_type(field, true),
180 field.nullable(),
181 )
182 };
183
184 let mut metadata_map = HashMap::default();
185 if let Some(list) = field.custom_metadata() {
186 for kv in list {
187 if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
188 metadata_map.insert(k.to_string(), v.to_string());
189 }
190 }
191 }
192
193 arrow_field.with_metadata(metadata_map)
194 }
195}
196
197pub fn fb_to_schema(fb: crate::Schema) -> Schema {
199 let mut fields: Vec<Field> = vec![];
200 let c_fields = fb.fields().unwrap();
201 let len = c_fields.len();
202 for i in 0..len {
203 let c_field: crate::Field = c_fields.get(i);
204 match c_field.type_type() {
205 crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
206 unimplemented!("Big Endian is not supported for Decimal!")
207 }
208 _ => (),
209 };
210 fields.push(c_field.into());
211 }
212
213 let mut metadata: HashMap<String, String> = HashMap::default();
214 if let Some(md_fields) = fb.custom_metadata() {
215 let len = md_fields.len();
216 for i in 0..len {
217 let kv = md_fields.get(i);
218 let k_str = kv.key();
219 let v_str = kv.value();
220 if let Some(k) = k_str {
221 if let Some(v) = v_str {
222 metadata.insert(k.to_string(), v.to_string());
223 }
224 }
225 }
226 }
227 Schema::new_with_metadata(fields, metadata)
228}
229
230pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
232 if let Ok(ipc) = crate::root_as_message(bytes) {
233 if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
234 Ok(schema)
235 } else {
236 Err(ArrowError::ParseError(
237 "Unable to get head as schema".to_string(),
238 ))
239 }
240 } else {
241 Err(ArrowError::ParseError(
242 "Unable to get root as message".to_string(),
243 ))
244 }
245}
246
247pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
249 if buffer.len() < 4 {
259 return Err(ArrowError::ParseError(
260 "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
261 ));
262 }
263
264 let (len, buffer) = if buffer[..4] == CONTINUATION_MARKER {
265 if buffer.len() < 8 {
266 return Err(ArrowError::ParseError(
267 "The buffer length is less than 8 and missing the length of buffer".to_string(),
268 ));
269 }
270 buffer[4..].split_at(4)
271 } else {
272 buffer.split_at(4)
273 };
274
275 let len = <i32>::from_le_bytes(len.try_into().unwrap());
276 if len < 0 {
277 return Err(ArrowError::ParseError(format!(
278 "The encapsulated message's reported length is negative ({len})"
279 )));
280 }
281
282 if buffer.len() < len as usize {
283 let actual_len = buffer.len();
284 return Err(ArrowError::ParseError(
285 format!("The buffer length ({actual_len}) is less than the encapsulated message's reported length ({len})")
286 ));
287 }
288
289 let msg = crate::root_as_message(buffer)
290 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
291 let ipc_schema = msg.header_as_schema().ok_or_else(|| {
292 ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
293 })?;
294 Ok(fb_to_schema(ipc_schema))
295}
296
297pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
299 if let Some(dictionary) = field.dictionary() {
300 if may_be_dictionary {
301 let int = dictionary.indexType().unwrap();
302 let index_type = match (int.bitWidth(), int.is_signed()) {
303 (8, true) => DataType::Int8,
304 (8, false) => DataType::UInt8,
305 (16, true) => DataType::Int16,
306 (16, false) => DataType::UInt16,
307 (32, true) => DataType::Int32,
308 (32, false) => DataType::UInt32,
309 (64, true) => DataType::Int64,
310 (64, false) => DataType::UInt64,
311 _ => panic!("Unexpected bitwidth and signed"),
312 };
313 return DataType::Dictionary(
314 Box::new(index_type),
315 Box::new(get_data_type(field, false)),
316 );
317 }
318 }
319
320 match field.type_type() {
321 crate::Type::Null => DataType::Null,
322 crate::Type::Bool => DataType::Boolean,
323 crate::Type::Int => {
324 let int = field.type_as_int().unwrap();
325 match (int.bitWidth(), int.is_signed()) {
326 (8, true) => DataType::Int8,
327 (8, false) => DataType::UInt8,
328 (16, true) => DataType::Int16,
329 (16, false) => DataType::UInt16,
330 (32, true) => DataType::Int32,
331 (32, false) => DataType::UInt32,
332 (64, true) => DataType::Int64,
333 (64, false) => DataType::UInt64,
334 z => panic!(
335 "Int type with bit width of {} and signed of {} not supported",
336 z.0, z.1
337 ),
338 }
339 }
340 crate::Type::Binary => DataType::Binary,
341 crate::Type::BinaryView => DataType::BinaryView,
342 crate::Type::LargeBinary => DataType::LargeBinary,
343 crate::Type::Utf8 => DataType::Utf8,
344 crate::Type::Utf8View => DataType::Utf8View,
345 crate::Type::LargeUtf8 => DataType::LargeUtf8,
346 crate::Type::FixedSizeBinary => {
347 let fsb = field.type_as_fixed_size_binary().unwrap();
348 DataType::FixedSizeBinary(fsb.byteWidth())
349 }
350 crate::Type::FloatingPoint => {
351 let float = field.type_as_floating_point().unwrap();
352 match float.precision() {
353 crate::Precision::HALF => DataType::Float16,
354 crate::Precision::SINGLE => DataType::Float32,
355 crate::Precision::DOUBLE => DataType::Float64,
356 z => panic!("FloatingPoint type with precision of {z:?} not supported"),
357 }
358 }
359 crate::Type::Date => {
360 let date = field.type_as_date().unwrap();
361 match date.unit() {
362 crate::DateUnit::DAY => DataType::Date32,
363 crate::DateUnit::MILLISECOND => DataType::Date64,
364 z => panic!("Date type with unit of {z:?} not supported"),
365 }
366 }
367 crate::Type::Time => {
368 let time = field.type_as_time().unwrap();
369 match (time.bitWidth(), time.unit()) {
370 (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
371 (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
372 (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
373 (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
374 z => panic!(
375 "Time type with bit width of {} and unit of {:?} not supported",
376 z.0, z.1
377 ),
378 }
379 }
380 crate::Type::Timestamp => {
381 let timestamp = field.type_as_timestamp().unwrap();
382 let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
383 match timestamp.unit() {
384 crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
385 crate::TimeUnit::MILLISECOND => {
386 DataType::Timestamp(TimeUnit::Millisecond, timezone)
387 }
388 crate::TimeUnit::MICROSECOND => {
389 DataType::Timestamp(TimeUnit::Microsecond, timezone)
390 }
391 crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
392 z => panic!("Timestamp type with unit of {z:?} not supported"),
393 }
394 }
395 crate::Type::Interval => {
396 let interval = field.type_as_interval().unwrap();
397 match interval.unit() {
398 crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
399 crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
400 crate::IntervalUnit::MONTH_DAY_NANO => {
401 DataType::Interval(IntervalUnit::MonthDayNano)
402 }
403 z => panic!("Interval type with unit of {z:?} unsupported"),
404 }
405 }
406 crate::Type::Duration => {
407 let duration = field.type_as_duration().unwrap();
408 match duration.unit() {
409 crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
410 crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
411 crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
412 crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
413 z => panic!("Duration type with unit of {z:?} unsupported"),
414 }
415 }
416 crate::Type::List => {
417 let children = field.children().unwrap();
418 if children.len() != 1 {
419 panic!("expect a list to have one child")
420 }
421 DataType::List(Arc::new(children.get(0).into()))
422 }
423 crate::Type::LargeList => {
424 let children = field.children().unwrap();
425 if children.len() != 1 {
426 panic!("expect a large list to have one child")
427 }
428 DataType::LargeList(Arc::new(children.get(0).into()))
429 }
430 crate::Type::FixedSizeList => {
431 let children = field.children().unwrap();
432 if children.len() != 1 {
433 panic!("expect a list to have one child")
434 }
435 let fsl = field.type_as_fixed_size_list().unwrap();
436 DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
437 }
438 crate::Type::Struct_ => {
439 let fields = match field.children() {
440 Some(children) => children.iter().map(Field::from).collect(),
441 None => Fields::empty(),
442 };
443 DataType::Struct(fields)
444 }
445 crate::Type::RunEndEncoded => {
446 let children = field.children().unwrap();
447 if children.len() != 2 {
448 panic!(
449 "RunEndEncoded type should have exactly two children. Found {}",
450 children.len()
451 )
452 }
453 let run_ends_field = children.get(0).into();
454 let values_field = children.get(1).into();
455 DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
456 }
457 crate::Type::Map => {
458 let map = field.type_as_map().unwrap();
459 let children = field.children().unwrap();
460 if children.len() != 1 {
461 panic!("expect a map to have one child")
462 }
463 DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
464 }
465 crate::Type::Decimal => {
466 let fsb = field.type_as_decimal().unwrap();
467 let bit_width = fsb.bitWidth();
468 let precision: u8 = fsb.precision().try_into().unwrap();
469 let scale: i8 = fsb.scale().try_into().unwrap();
470 match bit_width {
471 128 => DataType::Decimal128(precision, scale),
472 256 => DataType::Decimal256(precision, scale),
473 _ => panic!("Unexpected decimal bit width {bit_width}"),
474 }
475 }
476 crate::Type::Union => {
477 let union = field.type_as_union().unwrap();
478
479 let union_mode = match union.mode() {
480 crate::UnionMode::Dense => UnionMode::Dense,
481 crate::UnionMode::Sparse => UnionMode::Sparse,
482 mode => panic!("Unexpected union mode: {mode:?}"),
483 };
484
485 let mut fields = vec![];
486 if let Some(children) = field.children() {
487 for i in 0..children.len() {
488 fields.push(Field::from(children.get(i)));
489 }
490 };
491
492 let fields = match union.typeIds() {
493 None => UnionFields::new(0_i8..fields.len() as i8, fields),
494 Some(ids) => UnionFields::new(ids.iter().map(|i| i as i8), fields),
495 };
496
497 DataType::Union(fields, union_mode)
498 }
499 t => unimplemented!("Type {:?} not supported", t),
500 }
501}
502
503pub(crate) struct FBFieldType<'b> {
504 pub(crate) type_type: crate::Type,
505 pub(crate) type_: WIPOffset<UnionWIPOffset>,
506 pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
507}
508
509pub(crate) fn build_field<'a>(
511 fbb: &mut FlatBufferBuilder<'a>,
512 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
513 field: &Field,
514) -> WIPOffset<crate::Field<'a>> {
515 let mut fb_metadata = None;
517 if !field.metadata().is_empty() {
518 fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
519 };
520
521 let fb_field_name = fbb.create_string(field.name().as_str());
522 let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
523
524 let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
525 match dictionary_tracker {
526 Some(tracker) => Some(get_fb_dictionary(
527 index_type,
528 #[allow(deprecated)]
529 tracker.set_dict_id(field),
530 field
531 .dict_is_ordered()
532 .expect("All Dictionary types have `dict_is_ordered`"),
533 fbb,
534 )),
535 None => Some(get_fb_dictionary(
536 index_type,
537 #[allow(deprecated)]
538 field
539 .dict_id()
540 .expect("Dictionary type must have a dictionary id"),
541 field
542 .dict_is_ordered()
543 .expect("All Dictionary types have `dict_is_ordered`"),
544 fbb,
545 )),
546 }
547 } else {
548 None
549 };
550
551 let mut field_builder = crate::FieldBuilder::new(fbb);
552 field_builder.add_name(fb_field_name);
553 if let Some(dictionary) = fb_dictionary {
554 field_builder.add_dictionary(dictionary)
555 }
556 field_builder.add_type_type(field_type.type_type);
557 field_builder.add_nullable(field.is_nullable());
558 match field_type.children {
559 None => {}
560 Some(children) => field_builder.add_children(children),
561 };
562 field_builder.add_type_(field_type.type_);
563
564 if let Some(fb_metadata) = fb_metadata {
565 field_builder.add_custom_metadata(fb_metadata);
566 }
567
568 field_builder.finish()
569}
570
571pub(crate) fn get_fb_field_type<'a>(
573 data_type: &DataType,
574 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
575 fbb: &mut FlatBufferBuilder<'a>,
576) -> FBFieldType<'a> {
577 let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
580 match data_type {
581 Null => FBFieldType {
582 type_type: crate::Type::Null,
583 type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
584 children: Some(fbb.create_vector(&empty_fields[..])),
585 },
586 Boolean => FBFieldType {
587 type_type: crate::Type::Bool,
588 type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
589 children: Some(fbb.create_vector(&empty_fields[..])),
590 },
591 UInt8 | UInt16 | UInt32 | UInt64 => {
592 let children = fbb.create_vector(&empty_fields[..]);
593 let mut builder = crate::IntBuilder::new(fbb);
594 builder.add_is_signed(false);
595 match data_type {
596 UInt8 => builder.add_bitWidth(8),
597 UInt16 => builder.add_bitWidth(16),
598 UInt32 => builder.add_bitWidth(32),
599 UInt64 => builder.add_bitWidth(64),
600 _ => {}
601 };
602 FBFieldType {
603 type_type: crate::Type::Int,
604 type_: builder.finish().as_union_value(),
605 children: Some(children),
606 }
607 }
608 Int8 | Int16 | Int32 | Int64 => {
609 let children = fbb.create_vector(&empty_fields[..]);
610 let mut builder = crate::IntBuilder::new(fbb);
611 builder.add_is_signed(true);
612 match data_type {
613 Int8 => builder.add_bitWidth(8),
614 Int16 => builder.add_bitWidth(16),
615 Int32 => builder.add_bitWidth(32),
616 Int64 => builder.add_bitWidth(64),
617 _ => {}
618 };
619 FBFieldType {
620 type_type: crate::Type::Int,
621 type_: builder.finish().as_union_value(),
622 children: Some(children),
623 }
624 }
625 Float16 | Float32 | Float64 => {
626 let children = fbb.create_vector(&empty_fields[..]);
627 let mut builder = crate::FloatingPointBuilder::new(fbb);
628 match data_type {
629 Float16 => builder.add_precision(crate::Precision::HALF),
630 Float32 => builder.add_precision(crate::Precision::SINGLE),
631 Float64 => builder.add_precision(crate::Precision::DOUBLE),
632 _ => {}
633 };
634 FBFieldType {
635 type_type: crate::Type::FloatingPoint,
636 type_: builder.finish().as_union_value(),
637 children: Some(children),
638 }
639 }
640 Binary => FBFieldType {
641 type_type: crate::Type::Binary,
642 type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
643 children: Some(fbb.create_vector(&empty_fields[..])),
644 },
645 LargeBinary => FBFieldType {
646 type_type: crate::Type::LargeBinary,
647 type_: crate::LargeBinaryBuilder::new(fbb)
648 .finish()
649 .as_union_value(),
650 children: Some(fbb.create_vector(&empty_fields[..])),
651 },
652 BinaryView => FBFieldType {
653 type_type: crate::Type::BinaryView,
654 type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
655 children: Some(fbb.create_vector(&empty_fields[..])),
656 },
657 Utf8View => FBFieldType {
658 type_type: crate::Type::Utf8View,
659 type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
660 children: Some(fbb.create_vector(&empty_fields[..])),
661 },
662 Utf8 => FBFieldType {
663 type_type: crate::Type::Utf8,
664 type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
665 children: Some(fbb.create_vector(&empty_fields[..])),
666 },
667 LargeUtf8 => FBFieldType {
668 type_type: crate::Type::LargeUtf8,
669 type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
670 children: Some(fbb.create_vector(&empty_fields[..])),
671 },
672 FixedSizeBinary(len) => {
673 let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
674 builder.add_byteWidth(*len);
675 FBFieldType {
676 type_type: crate::Type::FixedSizeBinary,
677 type_: builder.finish().as_union_value(),
678 children: Some(fbb.create_vector(&empty_fields[..])),
679 }
680 }
681 Date32 => {
682 let mut builder = crate::DateBuilder::new(fbb);
683 builder.add_unit(crate::DateUnit::DAY);
684 FBFieldType {
685 type_type: crate::Type::Date,
686 type_: builder.finish().as_union_value(),
687 children: Some(fbb.create_vector(&empty_fields[..])),
688 }
689 }
690 Date64 => {
691 let mut builder = crate::DateBuilder::new(fbb);
692 builder.add_unit(crate::DateUnit::MILLISECOND);
693 FBFieldType {
694 type_type: crate::Type::Date,
695 type_: builder.finish().as_union_value(),
696 children: Some(fbb.create_vector(&empty_fields[..])),
697 }
698 }
699 Time32(unit) | Time64(unit) => {
700 let mut builder = crate::TimeBuilder::new(fbb);
701 match unit {
702 TimeUnit::Second => {
703 builder.add_bitWidth(32);
704 builder.add_unit(crate::TimeUnit::SECOND);
705 }
706 TimeUnit::Millisecond => {
707 builder.add_bitWidth(32);
708 builder.add_unit(crate::TimeUnit::MILLISECOND);
709 }
710 TimeUnit::Microsecond => {
711 builder.add_bitWidth(64);
712 builder.add_unit(crate::TimeUnit::MICROSECOND);
713 }
714 TimeUnit::Nanosecond => {
715 builder.add_bitWidth(64);
716 builder.add_unit(crate::TimeUnit::NANOSECOND);
717 }
718 }
719 FBFieldType {
720 type_type: crate::Type::Time,
721 type_: builder.finish().as_union_value(),
722 children: Some(fbb.create_vector(&empty_fields[..])),
723 }
724 }
725 Timestamp(unit, tz) => {
726 let tz = tz.as_deref().unwrap_or_default();
727 let tz_str = fbb.create_string(tz);
728 let mut builder = crate::TimestampBuilder::new(fbb);
729 let time_unit = match unit {
730 TimeUnit::Second => crate::TimeUnit::SECOND,
731 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
732 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
733 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
734 };
735 builder.add_unit(time_unit);
736 if !tz.is_empty() {
737 builder.add_timezone(tz_str);
738 }
739 FBFieldType {
740 type_type: crate::Type::Timestamp,
741 type_: builder.finish().as_union_value(),
742 children: Some(fbb.create_vector(&empty_fields[..])),
743 }
744 }
745 Interval(unit) => {
746 let mut builder = crate::IntervalBuilder::new(fbb);
747 let interval_unit = match unit {
748 IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
749 IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
750 IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
751 };
752 builder.add_unit(interval_unit);
753 FBFieldType {
754 type_type: crate::Type::Interval,
755 type_: builder.finish().as_union_value(),
756 children: Some(fbb.create_vector(&empty_fields[..])),
757 }
758 }
759 Duration(unit) => {
760 let mut builder = crate::DurationBuilder::new(fbb);
761 let time_unit = match unit {
762 TimeUnit::Second => crate::TimeUnit::SECOND,
763 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
764 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
765 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
766 };
767 builder.add_unit(time_unit);
768 FBFieldType {
769 type_type: crate::Type::Duration,
770 type_: builder.finish().as_union_value(),
771 children: Some(fbb.create_vector(&empty_fields[..])),
772 }
773 }
774 List(ref list_type) => {
775 let child = build_field(fbb, dictionary_tracker, list_type);
776 FBFieldType {
777 type_type: crate::Type::List,
778 type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
779 children: Some(fbb.create_vector(&[child])),
780 }
781 }
782 ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
783 LargeList(ref list_type) => {
784 let child = build_field(fbb, dictionary_tracker, list_type);
785 FBFieldType {
786 type_type: crate::Type::LargeList,
787 type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
788 children: Some(fbb.create_vector(&[child])),
789 }
790 }
791 FixedSizeList(ref list_type, len) => {
792 let child = build_field(fbb, dictionary_tracker, list_type);
793 let mut builder = crate::FixedSizeListBuilder::new(fbb);
794 builder.add_listSize(*len);
795 FBFieldType {
796 type_type: crate::Type::FixedSizeList,
797 type_: builder.finish().as_union_value(),
798 children: Some(fbb.create_vector(&[child])),
799 }
800 }
801 Struct(fields) => {
802 let mut children = vec![];
804 for field in fields {
805 children.push(build_field(fbb, dictionary_tracker, field));
806 }
807 FBFieldType {
808 type_type: crate::Type::Struct_,
809 type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
810 children: Some(fbb.create_vector(&children[..])),
811 }
812 }
813 RunEndEncoded(run_ends, values) => {
814 let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
815 let values_field = build_field(fbb, dictionary_tracker, values);
816 let children = [run_ends_field, values_field];
817 FBFieldType {
818 type_type: crate::Type::RunEndEncoded,
819 type_: crate::RunEndEncodedBuilder::new(fbb)
820 .finish()
821 .as_union_value(),
822 children: Some(fbb.create_vector(&children[..])),
823 }
824 }
825 Map(map_field, keys_sorted) => {
826 let child = build_field(fbb, dictionary_tracker, map_field);
827 let mut field_type = crate::MapBuilder::new(fbb);
828 field_type.add_keysSorted(*keys_sorted);
829 FBFieldType {
830 type_type: crate::Type::Map,
831 type_: field_type.finish().as_union_value(),
832 children: Some(fbb.create_vector(&[child])),
833 }
834 }
835 Dictionary(_, value_type) => {
836 get_fb_field_type(value_type, dictionary_tracker, fbb)
840 }
841 Decimal128(precision, scale) => {
842 let mut builder = crate::DecimalBuilder::new(fbb);
843 builder.add_precision(*precision as i32);
844 builder.add_scale(*scale as i32);
845 builder.add_bitWidth(128);
846 FBFieldType {
847 type_type: crate::Type::Decimal,
848 type_: builder.finish().as_union_value(),
849 children: Some(fbb.create_vector(&empty_fields[..])),
850 }
851 }
852 Decimal256(precision, scale) => {
853 let mut builder = crate::DecimalBuilder::new(fbb);
854 builder.add_precision(*precision as i32);
855 builder.add_scale(*scale as i32);
856 builder.add_bitWidth(256);
857 FBFieldType {
858 type_type: crate::Type::Decimal,
859 type_: builder.finish().as_union_value(),
860 children: Some(fbb.create_vector(&empty_fields[..])),
861 }
862 }
863 Union(fields, mode) => {
864 let mut children = vec![];
865 for (_, field) in fields.iter() {
866 children.push(build_field(fbb, dictionary_tracker, field));
867 }
868
869 let union_mode = match mode {
870 UnionMode::Sparse => crate::UnionMode::Sparse,
871 UnionMode::Dense => crate::UnionMode::Dense,
872 };
873
874 let fbb_type_ids =
875 fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
876 let mut builder = crate::UnionBuilder::new(fbb);
877 builder.add_mode(union_mode);
878 builder.add_typeIds(fbb_type_ids);
879
880 FBFieldType {
881 type_type: crate::Type::Union,
882 type_: builder.finish().as_union_value(),
883 children: Some(fbb.create_vector(&children[..])),
884 }
885 }
886 }
887}
888
889pub(crate) fn get_fb_dictionary<'a>(
891 index_type: &DataType,
892 dict_id: i64,
893 dict_is_ordered: bool,
894 fbb: &mut FlatBufferBuilder<'a>,
895) -> WIPOffset<crate::DictionaryEncoding<'a>> {
896 let mut index_builder = crate::IntBuilder::new(fbb);
899
900 match *index_type {
901 Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
902 UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
903 _ => {}
904 }
905
906 match *index_type {
907 Int8 | UInt8 => index_builder.add_bitWidth(8),
908 Int16 | UInt16 => index_builder.add_bitWidth(16),
909 Int32 | UInt32 => index_builder.add_bitWidth(32),
910 Int64 | UInt64 => index_builder.add_bitWidth(64),
911 _ => {}
912 }
913
914 let index_builder = index_builder.finish();
915
916 let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
917 builder.add_id(dict_id);
918 builder.add_indexType(index_builder);
919 builder.add_isOrdered(dict_is_ordered);
920
921 builder.finish()
922}
923
924#[derive(Clone)]
936pub struct MessageBuffer(Buffer);
937
938impl Debug for MessageBuffer {
939 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
940 self.as_ref().fmt(f)
941 }
942}
943
944impl MessageBuffer {
945 pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
947 let opts = VerifierOptions::default();
948 let mut v = Verifier::new(&opts, &buf);
949 <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
950 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
951 })?;
952 Ok(Self(buf))
953 }
954
955 #[inline]
957 pub fn as_ref(&self) -> Message<'_> {
958 unsafe { crate::root_as_message_unchecked(&self.0) }
960 }
961}
962
963#[cfg(test)]
964mod tests {
965 use super::*;
966
967 #[test]
968 fn convert_schema_round_trip() {
969 let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
970 .iter()
971 .cloned()
972 .collect();
973 let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
974 .iter()
975 .cloned()
976 .collect();
977 let schema = Schema::new_with_metadata(
978 vec![
979 Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
980 Field::new("uint16", DataType::UInt16, true),
981 Field::new("uint32", DataType::UInt32, false),
982 Field::new("uint64", DataType::UInt64, true),
983 Field::new("int8", DataType::Int8, true),
984 Field::new("int16", DataType::Int16, false),
985 Field::new("int32", DataType::Int32, true),
986 Field::new("int64", DataType::Int64, false),
987 Field::new("float16", DataType::Float16, true),
988 Field::new("float32", DataType::Float32, false),
989 Field::new("float64", DataType::Float64, true),
990 Field::new("null", DataType::Null, false),
991 Field::new("bool", DataType::Boolean, false),
992 Field::new("date32", DataType::Date32, false),
993 Field::new("date64", DataType::Date64, true),
994 Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
995 Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
996 Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
997 Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
998 Field::new(
999 "timestamp[s]",
1000 DataType::Timestamp(TimeUnit::Second, None),
1001 false,
1002 ),
1003 Field::new(
1004 "timestamp[ms]",
1005 DataType::Timestamp(TimeUnit::Millisecond, None),
1006 true,
1007 ),
1008 Field::new(
1009 "timestamp[us]",
1010 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1011 false,
1012 ),
1013 Field::new(
1014 "timestamp[ns]",
1015 DataType::Timestamp(TimeUnit::Nanosecond, None),
1016 true,
1017 ),
1018 Field::new(
1019 "interval[ym]",
1020 DataType::Interval(IntervalUnit::YearMonth),
1021 true,
1022 ),
1023 Field::new(
1024 "interval[dt]",
1025 DataType::Interval(IntervalUnit::DayTime),
1026 true,
1027 ),
1028 Field::new(
1029 "interval[mdn]",
1030 DataType::Interval(IntervalUnit::MonthDayNano),
1031 true,
1032 ),
1033 Field::new("utf8", DataType::Utf8, false),
1034 Field::new("utf8_view", DataType::Utf8View, false),
1035 Field::new("binary", DataType::Binary, false),
1036 Field::new("binary_view", DataType::BinaryView, false),
1037 Field::new_list(
1038 "list[u8]",
1039 Field::new_list_field(DataType::UInt8, false),
1040 true,
1041 ),
1042 Field::new_fixed_size_list(
1043 "fixed_size_list[u8]",
1044 Field::new_list_field(DataType::UInt8, false),
1045 2,
1046 true,
1047 ),
1048 Field::new_list(
1049 "list[struct<float32, int32, bool>]",
1050 Field::new_struct(
1051 "struct",
1052 vec![
1053 Field::new("float32", UInt8, false),
1054 Field::new("int32", Int32, true),
1055 Field::new("bool", Boolean, true),
1056 ],
1057 true,
1058 ),
1059 false,
1060 ),
1061 Field::new_struct(
1062 "struct<dictionary<int32, utf8>>",
1063 vec![Field::new(
1064 "dictionary<int32, utf8>",
1065 Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1066 false,
1067 )],
1068 false,
1069 ),
1070 Field::new_struct(
1071 "struct<int64, list[struct<date32, list[struct<>]>]>",
1072 vec![
1073 Field::new("int64", DataType::Int64, true),
1074 Field::new_list(
1075 "list[struct<date32, list[struct<>]>]",
1076 Field::new_struct(
1077 "struct",
1078 vec![
1079 Field::new("date32", DataType::Date32, true),
1080 Field::new_list(
1081 "list[struct<>]",
1082 Field::new(
1083 "struct",
1084 DataType::Struct(Fields::empty()),
1085 false,
1086 ),
1087 false,
1088 ),
1089 ],
1090 false,
1091 ),
1092 false,
1093 ),
1094 ],
1095 false,
1096 ),
1097 Field::new_union(
1098 "union<int64, list[union<date32, list[union<>]>]>",
1099 vec![0, 1],
1100 vec![
1101 Field::new("int64", DataType::Int64, true),
1102 Field::new_list(
1103 "list[union<date32, list[union<>]>]",
1104 Field::new_union(
1105 "union<date32, list[union<>]>",
1106 vec![0, 1],
1107 vec![
1108 Field::new("date32", DataType::Date32, true),
1109 Field::new_list(
1110 "list[union<>]",
1111 Field::new(
1112 "union",
1113 DataType::Union(
1114 UnionFields::empty(),
1115 UnionMode::Sparse,
1116 ),
1117 false,
1118 ),
1119 false,
1120 ),
1121 ],
1122 UnionMode::Dense,
1123 ),
1124 false,
1125 ),
1126 ],
1127 UnionMode::Sparse,
1128 ),
1129 Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1130 Field::new(
1131 "union<>",
1132 DataType::Union(UnionFields::empty(), UnionMode::Dense),
1133 true,
1134 ),
1135 Field::new(
1136 "union<>",
1137 DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1138 true,
1139 ),
1140 Field::new(
1141 "union<int32, utf8>",
1142 DataType::Union(
1143 UnionFields::new(
1144 vec![2, 3], vec![
1146 Field::new("int32", DataType::Int32, true),
1147 Field::new("utf8", DataType::Utf8, true),
1148 ],
1149 ),
1150 UnionMode::Dense,
1151 ),
1152 true,
1153 ),
1154 #[allow(deprecated)]
1155 Field::new_dict(
1156 "dictionary<int32, utf8>",
1157 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1158 true,
1159 123,
1160 true,
1161 ),
1162 #[allow(deprecated)]
1163 Field::new_dict(
1164 "dictionary<uint8, uint32>",
1165 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1166 true,
1167 123,
1168 true,
1169 ),
1170 Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1171 ],
1172 md,
1173 );
1174
1175 let mut dictionary_tracker = DictionaryTracker::new(true);
1176 let fb = IpcSchemaEncoder::new()
1177 .with_dictionary_tracker(&mut dictionary_tracker)
1178 .schema_to_fb(&schema);
1179
1180 let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1182 let schema2 = fb_to_schema(ipc);
1183 assert_eq!(schema, schema2);
1184 }
1185
1186 #[test]
1187 fn schema_from_bytes() {
1188 let bytes: Vec<u8> = vec![
1198 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1199 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1200 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1201 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1202 0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1203 ];
1204 let ipc = crate::root_as_message(&bytes).unwrap();
1205 let schema = ipc.header_as_schema().unwrap();
1206
1207 let data_gen = crate::writer::IpcDataGenerator::default();
1209 let mut dictionary_tracker = DictionaryTracker::new(true);
1210 let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1211 let bytes = data_gen
1212 .schema_to_bytes_with_dictionary_tracker(
1213 &arrow_schema,
1214 &mut dictionary_tracker,
1215 &crate::writer::IpcWriteOptions::default(),
1216 )
1217 .ipc_message;
1218
1219 let ipc2 = crate::root_as_message(&bytes).unwrap();
1220 let schema2 = ipc2.header_as_schema().unwrap();
1221
1222 assert!(schema.custom_metadata().is_none());
1224 assert!(schema2.custom_metadata().is_none());
1225 assert_eq!(schema.endianness(), schema2.endianness());
1226 assert!(schema.features().is_none());
1227 assert!(schema2.features().is_none());
1228 assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1229
1230 assert_eq!(ipc.version(), ipc2.version());
1231 assert_eq!(ipc.header_type(), ipc2.header_type());
1232 assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1233 assert!(ipc.custom_metadata().is_none());
1234 assert!(ipc2.custom_metadata().is_none());
1235 }
1236}