1use arrow_buffer::Buffer;
21use arrow_schema::*;
22use flatbuffers::{
23 FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
24 VerifierOptions, WIPOffset,
25};
26use std::collections::HashMap;
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use crate::writer::DictionaryTracker;
31use crate::{KeyValue, Message, CONTINUATION_MARKER};
32use DataType::*;
33
34#[derive(Debug)]
65pub struct IpcSchemaEncoder<'a> {
66 dictionary_tracker: Option<&'a mut DictionaryTracker>,
67}
68
69impl Default for IpcSchemaEncoder<'_> {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl<'a> IpcSchemaEncoder<'a> {
76 pub fn new() -> IpcSchemaEncoder<'a> {
78 IpcSchemaEncoder {
79 dictionary_tracker: None,
80 }
81 }
82
83 pub fn with_dictionary_tracker(
85 mut self,
86 dictionary_tracker: &'a mut DictionaryTracker,
87 ) -> Self {
88 self.dictionary_tracker = Some(dictionary_tracker);
89 self
90 }
91
92 pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
96 let mut fbb = FlatBufferBuilder::new();
97
98 let root = self.schema_to_fb_offset(&mut fbb, schema);
99
100 fbb.finish(root, None);
101
102 fbb
103 }
104
105 pub fn schema_to_fb_offset<'b>(
107 &mut self,
108 fbb: &mut FlatBufferBuilder<'b>,
109 schema: &Schema,
110 ) -> WIPOffset<crate::Schema<'b>> {
111 let fields = schema
112 .fields()
113 .iter()
114 .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
115 .collect::<Vec<_>>();
116 let fb_field_list = fbb.create_vector(&fields);
117
118 let fb_metadata_list =
119 (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
120
121 let mut builder = crate::SchemaBuilder::new(fbb);
122 builder.add_fields(fb_field_list);
123 if let Some(fb_metadata_list) = fb_metadata_list {
124 builder.add_custom_metadata(fb_metadata_list);
125 }
126 builder.finish()
127 }
128}
129
130#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")]
132pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> {
133 IpcSchemaEncoder::new().schema_to_fb(schema)
134}
135
136pub fn metadata_to_fb<'a>(
138 fbb: &mut FlatBufferBuilder<'a>,
139 metadata: &HashMap<String, String>,
140) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
141 let mut ordered_keys = metadata.keys().collect::<Vec<_>>();
142 ordered_keys.sort();
143 let custom_metadata = ordered_keys
144 .into_iter()
145 .map(|k| {
146 let v = metadata.get(k).unwrap();
147 let fb_key_name = fbb.create_string(k);
148 let fb_val_name = fbb.create_string(v);
149
150 let mut kv_builder = crate::KeyValueBuilder::new(fbb);
151 kv_builder.add_key(fb_key_name);
152 kv_builder.add_value(fb_val_name);
153 kv_builder.finish()
154 })
155 .collect::<Vec<_>>();
156 fbb.create_vector(&custom_metadata)
157}
158
159pub fn schema_to_fb_offset<'a>(
161 fbb: &mut FlatBufferBuilder<'a>,
162 schema: &Schema,
163) -> WIPOffset<crate::Schema<'a>> {
164 IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
165}
166
167impl From<crate::Field<'_>> for Field {
169 fn from(field: crate::Field) -> Field {
170 let arrow_field = if let Some(dictionary) = field.dictionary() {
171 #[allow(deprecated)]
172 Field::new_dict(
173 field.name().unwrap(),
174 get_data_type(field, true),
175 field.nullable(),
176 dictionary.id(),
177 dictionary.isOrdered(),
178 )
179 } else {
180 Field::new(
181 field.name().unwrap(),
182 get_data_type(field, true),
183 field.nullable(),
184 )
185 };
186
187 let mut metadata_map = HashMap::default();
188 if let Some(list) = field.custom_metadata() {
189 for kv in list {
190 if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
191 metadata_map.insert(k.to_string(), v.to_string());
192 }
193 }
194 }
195
196 arrow_field.with_metadata(metadata_map)
197 }
198}
199
200pub fn fb_to_schema(fb: crate::Schema) -> Schema {
202 let mut fields: Vec<Field> = vec![];
203 let c_fields = fb.fields().unwrap();
204 let len = c_fields.len();
205 for i in 0..len {
206 let c_field: crate::Field = c_fields.get(i);
207 match c_field.type_type() {
208 crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
209 unimplemented!("Big Endian is not supported for Decimal!")
210 }
211 _ => (),
212 };
213 fields.push(c_field.into());
214 }
215
216 let mut metadata: HashMap<String, String> = HashMap::default();
217 if let Some(md_fields) = fb.custom_metadata() {
218 let len = md_fields.len();
219 for i in 0..len {
220 let kv = md_fields.get(i);
221 let k_str = kv.key();
222 let v_str = kv.value();
223 if let Some(k) = k_str {
224 if let Some(v) = v_str {
225 metadata.insert(k.to_string(), v.to_string());
226 }
227 }
228 }
229 }
230 Schema::new_with_metadata(fields, metadata)
231}
232
233pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
235 if let Ok(ipc) = crate::root_as_message(bytes) {
236 if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
237 Ok(schema)
238 } else {
239 Err(ArrowError::ParseError(
240 "Unable to get head as schema".to_string(),
241 ))
242 }
243 } else {
244 Err(ArrowError::ParseError(
245 "Unable to get root as message".to_string(),
246 ))
247 }
248}
249
250pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
252 if buffer.len() < 4 {
262 return Err(ArrowError::ParseError(
263 "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
264 ));
265 }
266
267 let (len, buffer) = if buffer[..4] == CONTINUATION_MARKER {
268 if buffer.len() < 8 {
269 return Err(ArrowError::ParseError(
270 "The buffer length is less than 8 and missing the length of buffer".to_string(),
271 ));
272 }
273 buffer[4..].split_at(4)
274 } else {
275 buffer.split_at(4)
276 };
277
278 let len = <i32>::from_le_bytes(len.try_into().unwrap());
279 if len < 0 {
280 return Err(ArrowError::ParseError(format!(
281 "The encapsulated message's reported length is negative ({len})"
282 )));
283 }
284
285 if buffer.len() < len as usize {
286 let actual_len = buffer.len();
287 return Err(ArrowError::ParseError(
288 format!("The buffer length ({actual_len}) is less than the encapsulated message's reported length ({len})")
289 ));
290 }
291
292 let msg = crate::root_as_message(buffer)
293 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
294 let ipc_schema = msg.header_as_schema().ok_or_else(|| {
295 ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
296 })?;
297 Ok(fb_to_schema(ipc_schema))
298}
299
300pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
302 if let Some(dictionary) = field.dictionary() {
303 if may_be_dictionary {
304 let int = dictionary.indexType().unwrap();
305 let index_type = match (int.bitWidth(), int.is_signed()) {
306 (8, true) => DataType::Int8,
307 (8, false) => DataType::UInt8,
308 (16, true) => DataType::Int16,
309 (16, false) => DataType::UInt16,
310 (32, true) => DataType::Int32,
311 (32, false) => DataType::UInt32,
312 (64, true) => DataType::Int64,
313 (64, false) => DataType::UInt64,
314 _ => panic!("Unexpected bitwidth and signed"),
315 };
316 return DataType::Dictionary(
317 Box::new(index_type),
318 Box::new(get_data_type(field, false)),
319 );
320 }
321 }
322
323 match field.type_type() {
324 crate::Type::Null => DataType::Null,
325 crate::Type::Bool => DataType::Boolean,
326 crate::Type::Int => {
327 let int = field.type_as_int().unwrap();
328 match (int.bitWidth(), int.is_signed()) {
329 (8, true) => DataType::Int8,
330 (8, false) => DataType::UInt8,
331 (16, true) => DataType::Int16,
332 (16, false) => DataType::UInt16,
333 (32, true) => DataType::Int32,
334 (32, false) => DataType::UInt32,
335 (64, true) => DataType::Int64,
336 (64, false) => DataType::UInt64,
337 z => panic!(
338 "Int type with bit width of {} and signed of {} not supported",
339 z.0, z.1
340 ),
341 }
342 }
343 crate::Type::Binary => DataType::Binary,
344 crate::Type::BinaryView => DataType::BinaryView,
345 crate::Type::LargeBinary => DataType::LargeBinary,
346 crate::Type::Utf8 => DataType::Utf8,
347 crate::Type::Utf8View => DataType::Utf8View,
348 crate::Type::LargeUtf8 => DataType::LargeUtf8,
349 crate::Type::FixedSizeBinary => {
350 let fsb = field.type_as_fixed_size_binary().unwrap();
351 DataType::FixedSizeBinary(fsb.byteWidth())
352 }
353 crate::Type::FloatingPoint => {
354 let float = field.type_as_floating_point().unwrap();
355 match float.precision() {
356 crate::Precision::HALF => DataType::Float16,
357 crate::Precision::SINGLE => DataType::Float32,
358 crate::Precision::DOUBLE => DataType::Float64,
359 z => panic!("FloatingPoint type with precision of {z:?} not supported"),
360 }
361 }
362 crate::Type::Date => {
363 let date = field.type_as_date().unwrap();
364 match date.unit() {
365 crate::DateUnit::DAY => DataType::Date32,
366 crate::DateUnit::MILLISECOND => DataType::Date64,
367 z => panic!("Date type with unit of {z:?} not supported"),
368 }
369 }
370 crate::Type::Time => {
371 let time = field.type_as_time().unwrap();
372 match (time.bitWidth(), time.unit()) {
373 (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
374 (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
375 (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
376 (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
377 z => panic!(
378 "Time type with bit width of {} and unit of {:?} not supported",
379 z.0, z.1
380 ),
381 }
382 }
383 crate::Type::Timestamp => {
384 let timestamp = field.type_as_timestamp().unwrap();
385 let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
386 match timestamp.unit() {
387 crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
388 crate::TimeUnit::MILLISECOND => {
389 DataType::Timestamp(TimeUnit::Millisecond, timezone)
390 }
391 crate::TimeUnit::MICROSECOND => {
392 DataType::Timestamp(TimeUnit::Microsecond, timezone)
393 }
394 crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
395 z => panic!("Timestamp type with unit of {z:?} not supported"),
396 }
397 }
398 crate::Type::Interval => {
399 let interval = field.type_as_interval().unwrap();
400 match interval.unit() {
401 crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
402 crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
403 crate::IntervalUnit::MONTH_DAY_NANO => {
404 DataType::Interval(IntervalUnit::MonthDayNano)
405 }
406 z => panic!("Interval type with unit of {z:?} unsupported"),
407 }
408 }
409 crate::Type::Duration => {
410 let duration = field.type_as_duration().unwrap();
411 match duration.unit() {
412 crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
413 crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
414 crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
415 crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
416 z => panic!("Duration type with unit of {z:?} unsupported"),
417 }
418 }
419 crate::Type::List => {
420 let children = field.children().unwrap();
421 if children.len() != 1 {
422 panic!("expect a list to have one child")
423 }
424 DataType::List(Arc::new(children.get(0).into()))
425 }
426 crate::Type::LargeList => {
427 let children = field.children().unwrap();
428 if children.len() != 1 {
429 panic!("expect a large list to have one child")
430 }
431 DataType::LargeList(Arc::new(children.get(0).into()))
432 }
433 crate::Type::FixedSizeList => {
434 let children = field.children().unwrap();
435 if children.len() != 1 {
436 panic!("expect a list to have one child")
437 }
438 let fsl = field.type_as_fixed_size_list().unwrap();
439 DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
440 }
441 crate::Type::Struct_ => {
442 let fields = match field.children() {
443 Some(children) => children.iter().map(Field::from).collect(),
444 None => Fields::empty(),
445 };
446 DataType::Struct(fields)
447 }
448 crate::Type::RunEndEncoded => {
449 let children = field.children().unwrap();
450 if children.len() != 2 {
451 panic!(
452 "RunEndEncoded type should have exactly two children. Found {}",
453 children.len()
454 )
455 }
456 let run_ends_field = children.get(0).into();
457 let values_field = children.get(1).into();
458 DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
459 }
460 crate::Type::Map => {
461 let map = field.type_as_map().unwrap();
462 let children = field.children().unwrap();
463 if children.len() != 1 {
464 panic!("expect a map to have one child")
465 }
466 DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
467 }
468 crate::Type::Decimal => {
469 let fsb = field.type_as_decimal().unwrap();
470 let bit_width = fsb.bitWidth();
471 let precision: u8 = fsb.precision().try_into().unwrap();
472 let scale: i8 = fsb.scale().try_into().unwrap();
473 match bit_width {
474 32 => DataType::Decimal32(precision, scale),
475 64 => DataType::Decimal64(precision, scale),
476 128 => DataType::Decimal128(precision, scale),
477 256 => DataType::Decimal256(precision, scale),
478 _ => panic!("Unexpected decimal bit width {bit_width}"),
479 }
480 }
481 crate::Type::Union => {
482 let union = field.type_as_union().unwrap();
483
484 let union_mode = match union.mode() {
485 crate::UnionMode::Dense => UnionMode::Dense,
486 crate::UnionMode::Sparse => UnionMode::Sparse,
487 mode => panic!("Unexpected union mode: {mode:?}"),
488 };
489
490 let mut fields = vec![];
491 if let Some(children) = field.children() {
492 for i in 0..children.len() {
493 fields.push(Field::from(children.get(i)));
494 }
495 };
496
497 let fields = match union.typeIds() {
498 None => UnionFields::new(0_i8..fields.len() as i8, fields),
499 Some(ids) => UnionFields::new(ids.iter().map(|i| i as i8), fields),
500 };
501
502 DataType::Union(fields, union_mode)
503 }
504 t => unimplemented!("Type {:?} not supported", t),
505 }
506}
507
508pub(crate) struct FBFieldType<'b> {
509 pub(crate) type_type: crate::Type,
510 pub(crate) type_: WIPOffset<UnionWIPOffset>,
511 pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
512}
513
514pub(crate) fn build_field<'a>(
516 fbb: &mut FlatBufferBuilder<'a>,
517 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
518 field: &Field,
519) -> WIPOffset<crate::Field<'a>> {
520 let mut fb_metadata = None;
522 if !field.metadata().is_empty() {
523 fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
524 };
525
526 let fb_field_name = fbb.create_string(field.name().as_str());
527 let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
528
529 let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
530 match dictionary_tracker {
531 Some(tracker) => Some(get_fb_dictionary(
532 index_type,
533 #[allow(deprecated)]
534 tracker.set_dict_id(field),
535 field
536 .dict_is_ordered()
537 .expect("All Dictionary types have `dict_is_ordered`"),
538 fbb,
539 )),
540 None => Some(get_fb_dictionary(
541 index_type,
542 #[allow(deprecated)]
543 field
544 .dict_id()
545 .expect("Dictionary type must have a dictionary id"),
546 field
547 .dict_is_ordered()
548 .expect("All Dictionary types have `dict_is_ordered`"),
549 fbb,
550 )),
551 }
552 } else {
553 None
554 };
555
556 let mut field_builder = crate::FieldBuilder::new(fbb);
557 field_builder.add_name(fb_field_name);
558 if let Some(dictionary) = fb_dictionary {
559 field_builder.add_dictionary(dictionary)
560 }
561 field_builder.add_type_type(field_type.type_type);
562 field_builder.add_nullable(field.is_nullable());
563 match field_type.children {
564 None => {}
565 Some(children) => field_builder.add_children(children),
566 };
567 field_builder.add_type_(field_type.type_);
568
569 if let Some(fb_metadata) = fb_metadata {
570 field_builder.add_custom_metadata(fb_metadata);
571 }
572
573 field_builder.finish()
574}
575
576pub(crate) fn get_fb_field_type<'a>(
578 data_type: &DataType,
579 dictionary_tracker: &mut Option<&mut DictionaryTracker>,
580 fbb: &mut FlatBufferBuilder<'a>,
581) -> FBFieldType<'a> {
582 let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
585 match data_type {
586 Null => FBFieldType {
587 type_type: crate::Type::Null,
588 type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
589 children: Some(fbb.create_vector(&empty_fields[..])),
590 },
591 Boolean => FBFieldType {
592 type_type: crate::Type::Bool,
593 type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
594 children: Some(fbb.create_vector(&empty_fields[..])),
595 },
596 UInt8 | UInt16 | UInt32 | UInt64 => {
597 let children = fbb.create_vector(&empty_fields[..]);
598 let mut builder = crate::IntBuilder::new(fbb);
599 builder.add_is_signed(false);
600 match data_type {
601 UInt8 => builder.add_bitWidth(8),
602 UInt16 => builder.add_bitWidth(16),
603 UInt32 => builder.add_bitWidth(32),
604 UInt64 => builder.add_bitWidth(64),
605 _ => {}
606 };
607 FBFieldType {
608 type_type: crate::Type::Int,
609 type_: builder.finish().as_union_value(),
610 children: Some(children),
611 }
612 }
613 Int8 | Int16 | Int32 | Int64 => {
614 let children = fbb.create_vector(&empty_fields[..]);
615 let mut builder = crate::IntBuilder::new(fbb);
616 builder.add_is_signed(true);
617 match data_type {
618 Int8 => builder.add_bitWidth(8),
619 Int16 => builder.add_bitWidth(16),
620 Int32 => builder.add_bitWidth(32),
621 Int64 => builder.add_bitWidth(64),
622 _ => {}
623 };
624 FBFieldType {
625 type_type: crate::Type::Int,
626 type_: builder.finish().as_union_value(),
627 children: Some(children),
628 }
629 }
630 Float16 | Float32 | Float64 => {
631 let children = fbb.create_vector(&empty_fields[..]);
632 let mut builder = crate::FloatingPointBuilder::new(fbb);
633 match data_type {
634 Float16 => builder.add_precision(crate::Precision::HALF),
635 Float32 => builder.add_precision(crate::Precision::SINGLE),
636 Float64 => builder.add_precision(crate::Precision::DOUBLE),
637 _ => {}
638 };
639 FBFieldType {
640 type_type: crate::Type::FloatingPoint,
641 type_: builder.finish().as_union_value(),
642 children: Some(children),
643 }
644 }
645 Binary => FBFieldType {
646 type_type: crate::Type::Binary,
647 type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
648 children: Some(fbb.create_vector(&empty_fields[..])),
649 },
650 LargeBinary => FBFieldType {
651 type_type: crate::Type::LargeBinary,
652 type_: crate::LargeBinaryBuilder::new(fbb)
653 .finish()
654 .as_union_value(),
655 children: Some(fbb.create_vector(&empty_fields[..])),
656 },
657 BinaryView => FBFieldType {
658 type_type: crate::Type::BinaryView,
659 type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
660 children: Some(fbb.create_vector(&empty_fields[..])),
661 },
662 Utf8View => FBFieldType {
663 type_type: crate::Type::Utf8View,
664 type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
665 children: Some(fbb.create_vector(&empty_fields[..])),
666 },
667 Utf8 => FBFieldType {
668 type_type: crate::Type::Utf8,
669 type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
670 children: Some(fbb.create_vector(&empty_fields[..])),
671 },
672 LargeUtf8 => FBFieldType {
673 type_type: crate::Type::LargeUtf8,
674 type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
675 children: Some(fbb.create_vector(&empty_fields[..])),
676 },
677 FixedSizeBinary(len) => {
678 let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
679 builder.add_byteWidth(*len);
680 FBFieldType {
681 type_type: crate::Type::FixedSizeBinary,
682 type_: builder.finish().as_union_value(),
683 children: Some(fbb.create_vector(&empty_fields[..])),
684 }
685 }
686 Date32 => {
687 let mut builder = crate::DateBuilder::new(fbb);
688 builder.add_unit(crate::DateUnit::DAY);
689 FBFieldType {
690 type_type: crate::Type::Date,
691 type_: builder.finish().as_union_value(),
692 children: Some(fbb.create_vector(&empty_fields[..])),
693 }
694 }
695 Date64 => {
696 let mut builder = crate::DateBuilder::new(fbb);
697 builder.add_unit(crate::DateUnit::MILLISECOND);
698 FBFieldType {
699 type_type: crate::Type::Date,
700 type_: builder.finish().as_union_value(),
701 children: Some(fbb.create_vector(&empty_fields[..])),
702 }
703 }
704 Time32(unit) | Time64(unit) => {
705 let mut builder = crate::TimeBuilder::new(fbb);
706 match unit {
707 TimeUnit::Second => {
708 builder.add_bitWidth(32);
709 builder.add_unit(crate::TimeUnit::SECOND);
710 }
711 TimeUnit::Millisecond => {
712 builder.add_bitWidth(32);
713 builder.add_unit(crate::TimeUnit::MILLISECOND);
714 }
715 TimeUnit::Microsecond => {
716 builder.add_bitWidth(64);
717 builder.add_unit(crate::TimeUnit::MICROSECOND);
718 }
719 TimeUnit::Nanosecond => {
720 builder.add_bitWidth(64);
721 builder.add_unit(crate::TimeUnit::NANOSECOND);
722 }
723 }
724 FBFieldType {
725 type_type: crate::Type::Time,
726 type_: builder.finish().as_union_value(),
727 children: Some(fbb.create_vector(&empty_fields[..])),
728 }
729 }
730 Timestamp(unit, tz) => {
731 let tz = tz.as_deref().unwrap_or_default();
732 let tz_str = fbb.create_string(tz);
733 let mut builder = crate::TimestampBuilder::new(fbb);
734 let time_unit = match unit {
735 TimeUnit::Second => crate::TimeUnit::SECOND,
736 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
737 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
738 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
739 };
740 builder.add_unit(time_unit);
741 if !tz.is_empty() {
742 builder.add_timezone(tz_str);
743 }
744 FBFieldType {
745 type_type: crate::Type::Timestamp,
746 type_: builder.finish().as_union_value(),
747 children: Some(fbb.create_vector(&empty_fields[..])),
748 }
749 }
750 Interval(unit) => {
751 let mut builder = crate::IntervalBuilder::new(fbb);
752 let interval_unit = match unit {
753 IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
754 IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
755 IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
756 };
757 builder.add_unit(interval_unit);
758 FBFieldType {
759 type_type: crate::Type::Interval,
760 type_: builder.finish().as_union_value(),
761 children: Some(fbb.create_vector(&empty_fields[..])),
762 }
763 }
764 Duration(unit) => {
765 let mut builder = crate::DurationBuilder::new(fbb);
766 let time_unit = match unit {
767 TimeUnit::Second => crate::TimeUnit::SECOND,
768 TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
769 TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
770 TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
771 };
772 builder.add_unit(time_unit);
773 FBFieldType {
774 type_type: crate::Type::Duration,
775 type_: builder.finish().as_union_value(),
776 children: Some(fbb.create_vector(&empty_fields[..])),
777 }
778 }
779 List(ref list_type) => {
780 let child = build_field(fbb, dictionary_tracker, list_type);
781 FBFieldType {
782 type_type: crate::Type::List,
783 type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
784 children: Some(fbb.create_vector(&[child])),
785 }
786 }
787 ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
788 LargeList(ref list_type) => {
789 let child = build_field(fbb, dictionary_tracker, list_type);
790 FBFieldType {
791 type_type: crate::Type::LargeList,
792 type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
793 children: Some(fbb.create_vector(&[child])),
794 }
795 }
796 FixedSizeList(ref list_type, len) => {
797 let child = build_field(fbb, dictionary_tracker, list_type);
798 let mut builder = crate::FixedSizeListBuilder::new(fbb);
799 builder.add_listSize(*len);
800 FBFieldType {
801 type_type: crate::Type::FixedSizeList,
802 type_: builder.finish().as_union_value(),
803 children: Some(fbb.create_vector(&[child])),
804 }
805 }
806 Struct(fields) => {
807 let mut children = vec![];
809 for field in fields {
810 children.push(build_field(fbb, dictionary_tracker, field));
811 }
812 FBFieldType {
813 type_type: crate::Type::Struct_,
814 type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
815 children: Some(fbb.create_vector(&children[..])),
816 }
817 }
818 RunEndEncoded(run_ends, values) => {
819 let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
820 let values_field = build_field(fbb, dictionary_tracker, values);
821 let children = [run_ends_field, values_field];
822 FBFieldType {
823 type_type: crate::Type::RunEndEncoded,
824 type_: crate::RunEndEncodedBuilder::new(fbb)
825 .finish()
826 .as_union_value(),
827 children: Some(fbb.create_vector(&children[..])),
828 }
829 }
830 Map(map_field, keys_sorted) => {
831 let child = build_field(fbb, dictionary_tracker, map_field);
832 let mut field_type = crate::MapBuilder::new(fbb);
833 field_type.add_keysSorted(*keys_sorted);
834 FBFieldType {
835 type_type: crate::Type::Map,
836 type_: field_type.finish().as_union_value(),
837 children: Some(fbb.create_vector(&[child])),
838 }
839 }
840 Dictionary(_, value_type) => {
841 get_fb_field_type(value_type, dictionary_tracker, fbb)
845 }
846 Decimal32(precision, scale) => {
847 let mut builder = crate::DecimalBuilder::new(fbb);
848 builder.add_precision(*precision as i32);
849 builder.add_scale(*scale as i32);
850 builder.add_bitWidth(32);
851 FBFieldType {
852 type_type: crate::Type::Decimal,
853 type_: builder.finish().as_union_value(),
854 children: Some(fbb.create_vector(&empty_fields[..])),
855 }
856 }
857 Decimal64(precision, scale) => {
858 let mut builder = crate::DecimalBuilder::new(fbb);
859 builder.add_precision(*precision as i32);
860 builder.add_scale(*scale as i32);
861 builder.add_bitWidth(64);
862 FBFieldType {
863 type_type: crate::Type::Decimal,
864 type_: builder.finish().as_union_value(),
865 children: Some(fbb.create_vector(&empty_fields[..])),
866 }
867 }
868 Decimal128(precision, scale) => {
869 let mut builder = crate::DecimalBuilder::new(fbb);
870 builder.add_precision(*precision as i32);
871 builder.add_scale(*scale as i32);
872 builder.add_bitWidth(128);
873 FBFieldType {
874 type_type: crate::Type::Decimal,
875 type_: builder.finish().as_union_value(),
876 children: Some(fbb.create_vector(&empty_fields[..])),
877 }
878 }
879 Decimal256(precision, scale) => {
880 let mut builder = crate::DecimalBuilder::new(fbb);
881 builder.add_precision(*precision as i32);
882 builder.add_scale(*scale as i32);
883 builder.add_bitWidth(256);
884 FBFieldType {
885 type_type: crate::Type::Decimal,
886 type_: builder.finish().as_union_value(),
887 children: Some(fbb.create_vector(&empty_fields[..])),
888 }
889 }
890 Union(fields, mode) => {
891 let mut children = vec![];
892 for (_, field) in fields.iter() {
893 children.push(build_field(fbb, dictionary_tracker, field));
894 }
895
896 let union_mode = match mode {
897 UnionMode::Sparse => crate::UnionMode::Sparse,
898 UnionMode::Dense => crate::UnionMode::Dense,
899 };
900
901 let fbb_type_ids =
902 fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
903 let mut builder = crate::UnionBuilder::new(fbb);
904 builder.add_mode(union_mode);
905 builder.add_typeIds(fbb_type_ids);
906
907 FBFieldType {
908 type_type: crate::Type::Union,
909 type_: builder.finish().as_union_value(),
910 children: Some(fbb.create_vector(&children[..])),
911 }
912 }
913 }
914}
915
916pub(crate) fn get_fb_dictionary<'a>(
918 index_type: &DataType,
919 dict_id: i64,
920 dict_is_ordered: bool,
921 fbb: &mut FlatBufferBuilder<'a>,
922) -> WIPOffset<crate::DictionaryEncoding<'a>> {
923 let mut index_builder = crate::IntBuilder::new(fbb);
926
927 match *index_type {
928 Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
929 UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
930 _ => {}
931 }
932
933 match *index_type {
934 Int8 | UInt8 => index_builder.add_bitWidth(8),
935 Int16 | UInt16 => index_builder.add_bitWidth(16),
936 Int32 | UInt32 => index_builder.add_bitWidth(32),
937 Int64 | UInt64 => index_builder.add_bitWidth(64),
938 _ => {}
939 }
940
941 let index_builder = index_builder.finish();
942
943 let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
944 builder.add_id(dict_id);
945 builder.add_indexType(index_builder);
946 builder.add_isOrdered(dict_is_ordered);
947
948 builder.finish()
949}
950
951#[derive(Clone)]
963pub struct MessageBuffer(Buffer);
964
965impl Debug for MessageBuffer {
966 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
967 self.as_ref().fmt(f)
968 }
969}
970
971impl MessageBuffer {
972 pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
974 let opts = VerifierOptions::default();
975 let mut v = Verifier::new(&opts, &buf);
976 <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
977 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
978 })?;
979 Ok(Self(buf))
980 }
981
982 #[inline]
984 pub fn as_ref(&self) -> Message<'_> {
985 unsafe { crate::root_as_message_unchecked(&self.0) }
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993
994 #[test]
995 fn convert_schema_round_trip() {
996 let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
997 .iter()
998 .cloned()
999 .collect();
1000 let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
1001 .iter()
1002 .cloned()
1003 .collect();
1004 let schema = Schema::new_with_metadata(
1005 vec![
1006 Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
1007 Field::new("uint16", DataType::UInt16, true),
1008 Field::new("uint32", DataType::UInt32, false),
1009 Field::new("uint64", DataType::UInt64, true),
1010 Field::new("int8", DataType::Int8, true),
1011 Field::new("int16", DataType::Int16, false),
1012 Field::new("int32", DataType::Int32, true),
1013 Field::new("int64", DataType::Int64, false),
1014 Field::new("float16", DataType::Float16, true),
1015 Field::new("float32", DataType::Float32, false),
1016 Field::new("float64", DataType::Float64, true),
1017 Field::new("null", DataType::Null, false),
1018 Field::new("bool", DataType::Boolean, false),
1019 Field::new("date32", DataType::Date32, false),
1020 Field::new("date64", DataType::Date64, true),
1021 Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
1022 Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
1023 Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
1024 Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
1025 Field::new(
1026 "timestamp[s]",
1027 DataType::Timestamp(TimeUnit::Second, None),
1028 false,
1029 ),
1030 Field::new(
1031 "timestamp[ms]",
1032 DataType::Timestamp(TimeUnit::Millisecond, None),
1033 true,
1034 ),
1035 Field::new(
1036 "timestamp[us]",
1037 DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1038 false,
1039 ),
1040 Field::new(
1041 "timestamp[ns]",
1042 DataType::Timestamp(TimeUnit::Nanosecond, None),
1043 true,
1044 ),
1045 Field::new(
1046 "interval[ym]",
1047 DataType::Interval(IntervalUnit::YearMonth),
1048 true,
1049 ),
1050 Field::new(
1051 "interval[dt]",
1052 DataType::Interval(IntervalUnit::DayTime),
1053 true,
1054 ),
1055 Field::new(
1056 "interval[mdn]",
1057 DataType::Interval(IntervalUnit::MonthDayNano),
1058 true,
1059 ),
1060 Field::new("utf8", DataType::Utf8, false),
1061 Field::new("utf8_view", DataType::Utf8View, false),
1062 Field::new("binary", DataType::Binary, false),
1063 Field::new("binary_view", DataType::BinaryView, false),
1064 Field::new_list(
1065 "list[u8]",
1066 Field::new_list_field(DataType::UInt8, false),
1067 true,
1068 ),
1069 Field::new_fixed_size_list(
1070 "fixed_size_list[u8]",
1071 Field::new_list_field(DataType::UInt8, false),
1072 2,
1073 true,
1074 ),
1075 Field::new_list(
1076 "list[struct<float32, int32, bool>]",
1077 Field::new_struct(
1078 "struct",
1079 vec![
1080 Field::new("float32", UInt8, false),
1081 Field::new("int32", Int32, true),
1082 Field::new("bool", Boolean, true),
1083 ],
1084 true,
1085 ),
1086 false,
1087 ),
1088 Field::new_struct(
1089 "struct<dictionary<int32, utf8>>",
1090 vec![Field::new(
1091 "dictionary<int32, utf8>",
1092 Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1093 false,
1094 )],
1095 false,
1096 ),
1097 Field::new_struct(
1098 "struct<int64, list[struct<date32, list[struct<>]>]>",
1099 vec![
1100 Field::new("int64", DataType::Int64, true),
1101 Field::new_list(
1102 "list[struct<date32, list[struct<>]>]",
1103 Field::new_struct(
1104 "struct",
1105 vec![
1106 Field::new("date32", DataType::Date32, true),
1107 Field::new_list(
1108 "list[struct<>]",
1109 Field::new(
1110 "struct",
1111 DataType::Struct(Fields::empty()),
1112 false,
1113 ),
1114 false,
1115 ),
1116 ],
1117 false,
1118 ),
1119 false,
1120 ),
1121 ],
1122 false,
1123 ),
1124 Field::new_union(
1125 "union<int64, list[union<date32, list[union<>]>]>",
1126 vec![0, 1],
1127 vec![
1128 Field::new("int64", DataType::Int64, true),
1129 Field::new_list(
1130 "list[union<date32, list[union<>]>]",
1131 Field::new_union(
1132 "union<date32, list[union<>]>",
1133 vec![0, 1],
1134 vec![
1135 Field::new("date32", DataType::Date32, true),
1136 Field::new_list(
1137 "list[union<>]",
1138 Field::new(
1139 "union",
1140 DataType::Union(
1141 UnionFields::empty(),
1142 UnionMode::Sparse,
1143 ),
1144 false,
1145 ),
1146 false,
1147 ),
1148 ],
1149 UnionMode::Dense,
1150 ),
1151 false,
1152 ),
1153 ],
1154 UnionMode::Sparse,
1155 ),
1156 Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1157 Field::new(
1158 "union<>",
1159 DataType::Union(UnionFields::empty(), UnionMode::Dense),
1160 true,
1161 ),
1162 Field::new(
1163 "union<>",
1164 DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1165 true,
1166 ),
1167 Field::new(
1168 "union<int32, utf8>",
1169 DataType::Union(
1170 UnionFields::new(
1171 vec![2, 3], vec![
1173 Field::new("int32", DataType::Int32, true),
1174 Field::new("utf8", DataType::Utf8, true),
1175 ],
1176 ),
1177 UnionMode::Dense,
1178 ),
1179 true,
1180 ),
1181 #[allow(deprecated)]
1182 Field::new_dict(
1183 "dictionary<int32, utf8>",
1184 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1185 true,
1186 123,
1187 true,
1188 ),
1189 #[allow(deprecated)]
1190 Field::new_dict(
1191 "dictionary<uint8, uint32>",
1192 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1193 true,
1194 123,
1195 true,
1196 ),
1197 Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1198 ],
1199 md,
1200 );
1201
1202 let mut dictionary_tracker = DictionaryTracker::new(true);
1203 let fb = IpcSchemaEncoder::new()
1204 .with_dictionary_tracker(&mut dictionary_tracker)
1205 .schema_to_fb(&schema);
1206
1207 let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1209 let schema2 = fb_to_schema(ipc);
1210 assert_eq!(schema, schema2);
1211 }
1212
1213 #[test]
1214 fn schema_from_bytes() {
1215 let bytes: Vec<u8> = vec![
1225 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1226 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1227 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1228 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1229 0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1230 ];
1231 let ipc = crate::root_as_message(&bytes).unwrap();
1232 let schema = ipc.header_as_schema().unwrap();
1233
1234 let data_gen = crate::writer::IpcDataGenerator::default();
1236 let mut dictionary_tracker = DictionaryTracker::new(true);
1237 let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1238 let bytes = data_gen
1239 .schema_to_bytes_with_dictionary_tracker(
1240 &arrow_schema,
1241 &mut dictionary_tracker,
1242 &crate::writer::IpcWriteOptions::default(),
1243 )
1244 .ipc_message;
1245
1246 let ipc2 = crate::root_as_message(&bytes).unwrap();
1247 let schema2 = ipc2.header_as_schema().unwrap();
1248
1249 assert!(schema.custom_metadata().is_none());
1251 assert!(schema2.custom_metadata().is_none());
1252 assert_eq!(schema.endianness(), schema2.endianness());
1253 assert!(schema.features().is_none());
1254 assert!(schema2.features().is_none());
1255 assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1256
1257 assert_eq!(ipc.version(), ipc2.version());
1258 assert_eq!(ipc.header_type(), ipc2.header_type());
1259 assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1260 assert!(ipc.custom_metadata().is_none());
1261 assert!(ipc2.custom_metadata().is_none());
1262 }
1263}