arrow_ipc/
convert.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for converting between IPC types and native Arrow types
19
20use arrow_buffer::Buffer;
21use arrow_schema::*;
22use flatbuffers::{
23    FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
24    VerifierOptions, WIPOffset,
25};
26use std::collections::HashMap;
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use crate::writer::DictionaryTracker;
31use crate::{KeyValue, Message, CONTINUATION_MARKER};
32use DataType::*;
33
34/// Low level Arrow [Schema] to IPC bytes converter
35///
36/// See also [`fb_to_schema`] for the reverse operation
37///
38/// # Example
39/// ```
40/// # use arrow_ipc::convert::{fb_to_schema, IpcSchemaEncoder};
41/// # use arrow_ipc::root_as_schema;
42/// # use arrow_ipc::writer::DictionaryTracker;
43/// # use arrow_schema::{DataType, Field, Schema};
44/// // given an arrow schema to serialize
45/// let schema = Schema::new(vec![
46///    Field::new("a", DataType::Int32, false),
47/// ]);
48///
49/// // Use a dictionary tracker to track dictionary id if needed
50///  let mut dictionary_tracker = DictionaryTracker::new(true);
51/// // create a FlatBuffersBuilder that contains the encoded bytes
52///  let fb = IpcSchemaEncoder::new()
53///    .with_dictionary_tracker(&mut dictionary_tracker)
54///    .schema_to_fb(&schema);
55///
56/// // the bytes are in `fb.finished_data()`
57/// let ipc_bytes = fb.finished_data();
58///
59///  // convert the IPC bytes back to an Arrow schema
60///  let ipc_schema = root_as_schema(ipc_bytes).unwrap();
61///  let schema2 = fb_to_schema(ipc_schema);
62/// assert_eq!(schema, schema2);
63/// ```
64#[derive(Debug)]
65pub struct IpcSchemaEncoder<'a> {
66    dictionary_tracker: Option<&'a mut DictionaryTracker>,
67}
68
69impl Default for IpcSchemaEncoder<'_> {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl<'a> IpcSchemaEncoder<'a> {
76    /// Create a new schema encoder
77    pub fn new() -> IpcSchemaEncoder<'a> {
78        IpcSchemaEncoder {
79            dictionary_tracker: None,
80        }
81    }
82
83    /// Specify a dictionary tracker to use
84    pub fn with_dictionary_tracker(
85        mut self,
86        dictionary_tracker: &'a mut DictionaryTracker,
87    ) -> Self {
88        self.dictionary_tracker = Some(dictionary_tracker);
89        self
90    }
91
92    /// Serialize a schema in IPC format, returning a completed [`FlatBufferBuilder`]
93    ///
94    /// Note: Call [`FlatBufferBuilder::finished_data`] to get the serialized bytes
95    pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
96        let mut fbb = FlatBufferBuilder::new();
97
98        let root = self.schema_to_fb_offset(&mut fbb, schema);
99
100        fbb.finish(root, None);
101
102        fbb
103    }
104
105    /// Serialize a schema to an in progress [`FlatBufferBuilder`], returning the in progress offset.
106    pub fn schema_to_fb_offset<'b>(
107        &mut self,
108        fbb: &mut FlatBufferBuilder<'b>,
109        schema: &Schema,
110    ) -> WIPOffset<crate::Schema<'b>> {
111        let fields = schema
112            .fields()
113            .iter()
114            .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
115            .collect::<Vec<_>>();
116        let fb_field_list = fbb.create_vector(&fields);
117
118        let fb_metadata_list =
119            (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
120
121        let mut builder = crate::SchemaBuilder::new(fbb);
122        builder.add_fields(fb_field_list);
123        if let Some(fb_metadata_list) = fb_metadata_list {
124            builder.add_custom_metadata(fb_metadata_list);
125        }
126        builder.finish()
127    }
128}
129
130/// Serialize a schema in IPC format
131#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")]
132pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> {
133    IpcSchemaEncoder::new().schema_to_fb(schema)
134}
135
136/// Push a key-value metadata into a FlatBufferBuilder and return [WIPOffset]
137pub fn metadata_to_fb<'a>(
138    fbb: &mut FlatBufferBuilder<'a>,
139    metadata: &HashMap<String, String>,
140) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
141    let custom_metadata = metadata
142        .iter()
143        .map(|(k, v)| {
144            let fb_key_name = fbb.create_string(k);
145            let fb_val_name = fbb.create_string(v);
146
147            let mut kv_builder = crate::KeyValueBuilder::new(fbb);
148            kv_builder.add_key(fb_key_name);
149            kv_builder.add_value(fb_val_name);
150            kv_builder.finish()
151        })
152        .collect::<Vec<_>>();
153    fbb.create_vector(&custom_metadata)
154}
155
156/// Adds a [Schema] to a flatbuffer and returns the offset
157pub fn schema_to_fb_offset<'a>(
158    fbb: &mut FlatBufferBuilder<'a>,
159    schema: &Schema,
160) -> WIPOffset<crate::Schema<'a>> {
161    IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
162}
163
164/// Convert an IPC Field to Arrow Field
165impl From<crate::Field<'_>> for Field {
166    fn from(field: crate::Field) -> Field {
167        let arrow_field = if let Some(dictionary) = field.dictionary() {
168            #[allow(deprecated)]
169            Field::new_dict(
170                field.name().unwrap(),
171                get_data_type(field, true),
172                field.nullable(),
173                dictionary.id(),
174                dictionary.isOrdered(),
175            )
176        } else {
177            Field::new(
178                field.name().unwrap(),
179                get_data_type(field, true),
180                field.nullable(),
181            )
182        };
183
184        let mut metadata_map = HashMap::default();
185        if let Some(list) = field.custom_metadata() {
186            for kv in list {
187                if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
188                    metadata_map.insert(k.to_string(), v.to_string());
189                }
190            }
191        }
192
193        arrow_field.with_metadata(metadata_map)
194    }
195}
196
197/// Deserialize an ipc [crate::Schema`] from flat buffers to an arrow [Schema].
198pub fn fb_to_schema(fb: crate::Schema) -> Schema {
199    let mut fields: Vec<Field> = vec![];
200    let c_fields = fb.fields().unwrap();
201    let len = c_fields.len();
202    for i in 0..len {
203        let c_field: crate::Field = c_fields.get(i);
204        match c_field.type_type() {
205            crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
206                unimplemented!("Big Endian is not supported for Decimal!")
207            }
208            _ => (),
209        };
210        fields.push(c_field.into());
211    }
212
213    let mut metadata: HashMap<String, String> = HashMap::default();
214    if let Some(md_fields) = fb.custom_metadata() {
215        let len = md_fields.len();
216        for i in 0..len {
217            let kv = md_fields.get(i);
218            let k_str = kv.key();
219            let v_str = kv.value();
220            if let Some(k) = k_str {
221                if let Some(v) = v_str {
222                    metadata.insert(k.to_string(), v.to_string());
223                }
224            }
225        }
226    }
227    Schema::new_with_metadata(fields, metadata)
228}
229
230/// Try deserialize flat buffer format bytes into a schema
231pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
232    if let Ok(ipc) = crate::root_as_message(bytes) {
233        if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
234            Ok(schema)
235        } else {
236            Err(ArrowError::ParseError(
237                "Unable to get head as schema".to_string(),
238            ))
239        }
240    } else {
241        Err(ArrowError::ParseError(
242            "Unable to get root as message".to_string(),
243        ))
244    }
245}
246
247/// Try deserialize the IPC format bytes into a schema
248pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
249    // There are two protocol types: https://issues.apache.org/jira/browse/ARROW-6313
250    // The original protocol is:
251    //   4 bytes - the byte length of the payload
252    //   a flatbuffer Message whose header is the Schema
253    // The latest version of protocol is:
254    // The schema of the dataset in its IPC form:
255    //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
256    //   4 bytes - the byte length of the payload
257    //   a flatbuffer Message whose header is the Schema
258    if buffer.len() < 4 {
259        return Err(ArrowError::ParseError(
260            "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
261        ));
262    }
263
264    let (len, buffer) = if buffer[..4] == CONTINUATION_MARKER {
265        if buffer.len() < 8 {
266            return Err(ArrowError::ParseError(
267                "The buffer length is less than 8 and missing the length of buffer".to_string(),
268            ));
269        }
270        buffer[4..].split_at(4)
271    } else {
272        buffer.split_at(4)
273    };
274
275    let len = <i32>::from_le_bytes(len.try_into().unwrap());
276    if len < 0 {
277        return Err(ArrowError::ParseError(format!(
278            "The encapsulated message's reported length is negative ({len})"
279        )));
280    }
281
282    if buffer.len() < len as usize {
283        let actual_len = buffer.len();
284        return Err(ArrowError::ParseError(
285            format!("The buffer length ({actual_len}) is less than the encapsulated message's reported length ({len})")
286        ));
287    }
288
289    let msg = crate::root_as_message(buffer)
290        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
291    let ipc_schema = msg.header_as_schema().ok_or_else(|| {
292        ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
293    })?;
294    Ok(fb_to_schema(ipc_schema))
295}
296
297/// Get the Arrow data type from the flatbuffer Field table
298pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
299    if let Some(dictionary) = field.dictionary() {
300        if may_be_dictionary {
301            let int = dictionary.indexType().unwrap();
302            let index_type = match (int.bitWidth(), int.is_signed()) {
303                (8, true) => DataType::Int8,
304                (8, false) => DataType::UInt8,
305                (16, true) => DataType::Int16,
306                (16, false) => DataType::UInt16,
307                (32, true) => DataType::Int32,
308                (32, false) => DataType::UInt32,
309                (64, true) => DataType::Int64,
310                (64, false) => DataType::UInt64,
311                _ => panic!("Unexpected bitwidth and signed"),
312            };
313            return DataType::Dictionary(
314                Box::new(index_type),
315                Box::new(get_data_type(field, false)),
316            );
317        }
318    }
319
320    match field.type_type() {
321        crate::Type::Null => DataType::Null,
322        crate::Type::Bool => DataType::Boolean,
323        crate::Type::Int => {
324            let int = field.type_as_int().unwrap();
325            match (int.bitWidth(), int.is_signed()) {
326                (8, true) => DataType::Int8,
327                (8, false) => DataType::UInt8,
328                (16, true) => DataType::Int16,
329                (16, false) => DataType::UInt16,
330                (32, true) => DataType::Int32,
331                (32, false) => DataType::UInt32,
332                (64, true) => DataType::Int64,
333                (64, false) => DataType::UInt64,
334                z => panic!(
335                    "Int type with bit width of {} and signed of {} not supported",
336                    z.0, z.1
337                ),
338            }
339        }
340        crate::Type::Binary => DataType::Binary,
341        crate::Type::BinaryView => DataType::BinaryView,
342        crate::Type::LargeBinary => DataType::LargeBinary,
343        crate::Type::Utf8 => DataType::Utf8,
344        crate::Type::Utf8View => DataType::Utf8View,
345        crate::Type::LargeUtf8 => DataType::LargeUtf8,
346        crate::Type::FixedSizeBinary => {
347            let fsb = field.type_as_fixed_size_binary().unwrap();
348            DataType::FixedSizeBinary(fsb.byteWidth())
349        }
350        crate::Type::FloatingPoint => {
351            let float = field.type_as_floating_point().unwrap();
352            match float.precision() {
353                crate::Precision::HALF => DataType::Float16,
354                crate::Precision::SINGLE => DataType::Float32,
355                crate::Precision::DOUBLE => DataType::Float64,
356                z => panic!("FloatingPoint type with precision of {z:?} not supported"),
357            }
358        }
359        crate::Type::Date => {
360            let date = field.type_as_date().unwrap();
361            match date.unit() {
362                crate::DateUnit::DAY => DataType::Date32,
363                crate::DateUnit::MILLISECOND => DataType::Date64,
364                z => panic!("Date type with unit of {z:?} not supported"),
365            }
366        }
367        crate::Type::Time => {
368            let time = field.type_as_time().unwrap();
369            match (time.bitWidth(), time.unit()) {
370                (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
371                (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
372                (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
373                (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
374                z => panic!(
375                    "Time type with bit width of {} and unit of {:?} not supported",
376                    z.0, z.1
377                ),
378            }
379        }
380        crate::Type::Timestamp => {
381            let timestamp = field.type_as_timestamp().unwrap();
382            let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
383            match timestamp.unit() {
384                crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
385                crate::TimeUnit::MILLISECOND => {
386                    DataType::Timestamp(TimeUnit::Millisecond, timezone)
387                }
388                crate::TimeUnit::MICROSECOND => {
389                    DataType::Timestamp(TimeUnit::Microsecond, timezone)
390                }
391                crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
392                z => panic!("Timestamp type with unit of {z:?} not supported"),
393            }
394        }
395        crate::Type::Interval => {
396            let interval = field.type_as_interval().unwrap();
397            match interval.unit() {
398                crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
399                crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
400                crate::IntervalUnit::MONTH_DAY_NANO => {
401                    DataType::Interval(IntervalUnit::MonthDayNano)
402                }
403                z => panic!("Interval type with unit of {z:?} unsupported"),
404            }
405        }
406        crate::Type::Duration => {
407            let duration = field.type_as_duration().unwrap();
408            match duration.unit() {
409                crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
410                crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
411                crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
412                crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
413                z => panic!("Duration type with unit of {z:?} unsupported"),
414            }
415        }
416        crate::Type::List => {
417            let children = field.children().unwrap();
418            if children.len() != 1 {
419                panic!("expect a list to have one child")
420            }
421            DataType::List(Arc::new(children.get(0).into()))
422        }
423        crate::Type::LargeList => {
424            let children = field.children().unwrap();
425            if children.len() != 1 {
426                panic!("expect a large list to have one child")
427            }
428            DataType::LargeList(Arc::new(children.get(0).into()))
429        }
430        crate::Type::FixedSizeList => {
431            let children = field.children().unwrap();
432            if children.len() != 1 {
433                panic!("expect a list to have one child")
434            }
435            let fsl = field.type_as_fixed_size_list().unwrap();
436            DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
437        }
438        crate::Type::Struct_ => {
439            let fields = match field.children() {
440                Some(children) => children.iter().map(Field::from).collect(),
441                None => Fields::empty(),
442            };
443            DataType::Struct(fields)
444        }
445        crate::Type::RunEndEncoded => {
446            let children = field.children().unwrap();
447            if children.len() != 2 {
448                panic!(
449                    "RunEndEncoded type should have exactly two children. Found {}",
450                    children.len()
451                )
452            }
453            let run_ends_field = children.get(0).into();
454            let values_field = children.get(1).into();
455            DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
456        }
457        crate::Type::Map => {
458            let map = field.type_as_map().unwrap();
459            let children = field.children().unwrap();
460            if children.len() != 1 {
461                panic!("expect a map to have one child")
462            }
463            DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
464        }
465        crate::Type::Decimal => {
466            let fsb = field.type_as_decimal().unwrap();
467            let bit_width = fsb.bitWidth();
468            let precision: u8 = fsb.precision().try_into().unwrap();
469            let scale: i8 = fsb.scale().try_into().unwrap();
470            match bit_width {
471                128 => DataType::Decimal128(precision, scale),
472                256 => DataType::Decimal256(precision, scale),
473                _ => panic!("Unexpected decimal bit width {bit_width}"),
474            }
475        }
476        crate::Type::Union => {
477            let union = field.type_as_union().unwrap();
478
479            let union_mode = match union.mode() {
480                crate::UnionMode::Dense => UnionMode::Dense,
481                crate::UnionMode::Sparse => UnionMode::Sparse,
482                mode => panic!("Unexpected union mode: {mode:?}"),
483            };
484
485            let mut fields = vec![];
486            if let Some(children) = field.children() {
487                for i in 0..children.len() {
488                    fields.push(Field::from(children.get(i)));
489                }
490            };
491
492            let fields = match union.typeIds() {
493                None => UnionFields::new(0_i8..fields.len() as i8, fields),
494                Some(ids) => UnionFields::new(ids.iter().map(|i| i as i8), fields),
495            };
496
497            DataType::Union(fields, union_mode)
498        }
499        t => unimplemented!("Type {:?} not supported", t),
500    }
501}
502
503pub(crate) struct FBFieldType<'b> {
504    pub(crate) type_type: crate::Type,
505    pub(crate) type_: WIPOffset<UnionWIPOffset>,
506    pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
507}
508
509/// Create an IPC Field from an Arrow Field
510pub(crate) fn build_field<'a>(
511    fbb: &mut FlatBufferBuilder<'a>,
512    dictionary_tracker: &mut Option<&mut DictionaryTracker>,
513    field: &Field,
514) -> WIPOffset<crate::Field<'a>> {
515    // Optional custom metadata.
516    let mut fb_metadata = None;
517    if !field.metadata().is_empty() {
518        fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
519    };
520
521    let fb_field_name = fbb.create_string(field.name().as_str());
522    let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
523
524    let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
525        match dictionary_tracker {
526            Some(tracker) => Some(get_fb_dictionary(
527                index_type,
528                #[allow(deprecated)]
529                tracker.set_dict_id(field),
530                field
531                    .dict_is_ordered()
532                    .expect("All Dictionary types have `dict_is_ordered`"),
533                fbb,
534            )),
535            None => Some(get_fb_dictionary(
536                index_type,
537                #[allow(deprecated)]
538                field
539                    .dict_id()
540                    .expect("Dictionary type must have a dictionary id"),
541                field
542                    .dict_is_ordered()
543                    .expect("All Dictionary types have `dict_is_ordered`"),
544                fbb,
545            )),
546        }
547    } else {
548        None
549    };
550
551    let mut field_builder = crate::FieldBuilder::new(fbb);
552    field_builder.add_name(fb_field_name);
553    if let Some(dictionary) = fb_dictionary {
554        field_builder.add_dictionary(dictionary)
555    }
556    field_builder.add_type_type(field_type.type_type);
557    field_builder.add_nullable(field.is_nullable());
558    match field_type.children {
559        None => {}
560        Some(children) => field_builder.add_children(children),
561    };
562    field_builder.add_type_(field_type.type_);
563
564    if let Some(fb_metadata) = fb_metadata {
565        field_builder.add_custom_metadata(fb_metadata);
566    }
567
568    field_builder.finish()
569}
570
571/// Get the IPC type of a data type
572pub(crate) fn get_fb_field_type<'a>(
573    data_type: &DataType,
574    dictionary_tracker: &mut Option<&mut DictionaryTracker>,
575    fbb: &mut FlatBufferBuilder<'a>,
576) -> FBFieldType<'a> {
577    // some IPC implementations expect an empty list for child data, instead of a null value.
578    // An empty field list is thus returned for primitive types
579    let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
580    match data_type {
581        Null => FBFieldType {
582            type_type: crate::Type::Null,
583            type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
584            children: Some(fbb.create_vector(&empty_fields[..])),
585        },
586        Boolean => FBFieldType {
587            type_type: crate::Type::Bool,
588            type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
589            children: Some(fbb.create_vector(&empty_fields[..])),
590        },
591        UInt8 | UInt16 | UInt32 | UInt64 => {
592            let children = fbb.create_vector(&empty_fields[..]);
593            let mut builder = crate::IntBuilder::new(fbb);
594            builder.add_is_signed(false);
595            match data_type {
596                UInt8 => builder.add_bitWidth(8),
597                UInt16 => builder.add_bitWidth(16),
598                UInt32 => builder.add_bitWidth(32),
599                UInt64 => builder.add_bitWidth(64),
600                _ => {}
601            };
602            FBFieldType {
603                type_type: crate::Type::Int,
604                type_: builder.finish().as_union_value(),
605                children: Some(children),
606            }
607        }
608        Int8 | Int16 | Int32 | Int64 => {
609            let children = fbb.create_vector(&empty_fields[..]);
610            let mut builder = crate::IntBuilder::new(fbb);
611            builder.add_is_signed(true);
612            match data_type {
613                Int8 => builder.add_bitWidth(8),
614                Int16 => builder.add_bitWidth(16),
615                Int32 => builder.add_bitWidth(32),
616                Int64 => builder.add_bitWidth(64),
617                _ => {}
618            };
619            FBFieldType {
620                type_type: crate::Type::Int,
621                type_: builder.finish().as_union_value(),
622                children: Some(children),
623            }
624        }
625        Float16 | Float32 | Float64 => {
626            let children = fbb.create_vector(&empty_fields[..]);
627            let mut builder = crate::FloatingPointBuilder::new(fbb);
628            match data_type {
629                Float16 => builder.add_precision(crate::Precision::HALF),
630                Float32 => builder.add_precision(crate::Precision::SINGLE),
631                Float64 => builder.add_precision(crate::Precision::DOUBLE),
632                _ => {}
633            };
634            FBFieldType {
635                type_type: crate::Type::FloatingPoint,
636                type_: builder.finish().as_union_value(),
637                children: Some(children),
638            }
639        }
640        Binary => FBFieldType {
641            type_type: crate::Type::Binary,
642            type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
643            children: Some(fbb.create_vector(&empty_fields[..])),
644        },
645        LargeBinary => FBFieldType {
646            type_type: crate::Type::LargeBinary,
647            type_: crate::LargeBinaryBuilder::new(fbb)
648                .finish()
649                .as_union_value(),
650            children: Some(fbb.create_vector(&empty_fields[..])),
651        },
652        BinaryView => FBFieldType {
653            type_type: crate::Type::BinaryView,
654            type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
655            children: Some(fbb.create_vector(&empty_fields[..])),
656        },
657        Utf8View => FBFieldType {
658            type_type: crate::Type::Utf8View,
659            type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
660            children: Some(fbb.create_vector(&empty_fields[..])),
661        },
662        Utf8 => FBFieldType {
663            type_type: crate::Type::Utf8,
664            type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
665            children: Some(fbb.create_vector(&empty_fields[..])),
666        },
667        LargeUtf8 => FBFieldType {
668            type_type: crate::Type::LargeUtf8,
669            type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
670            children: Some(fbb.create_vector(&empty_fields[..])),
671        },
672        FixedSizeBinary(len) => {
673            let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
674            builder.add_byteWidth(*len);
675            FBFieldType {
676                type_type: crate::Type::FixedSizeBinary,
677                type_: builder.finish().as_union_value(),
678                children: Some(fbb.create_vector(&empty_fields[..])),
679            }
680        }
681        Date32 => {
682            let mut builder = crate::DateBuilder::new(fbb);
683            builder.add_unit(crate::DateUnit::DAY);
684            FBFieldType {
685                type_type: crate::Type::Date,
686                type_: builder.finish().as_union_value(),
687                children: Some(fbb.create_vector(&empty_fields[..])),
688            }
689        }
690        Date64 => {
691            let mut builder = crate::DateBuilder::new(fbb);
692            builder.add_unit(crate::DateUnit::MILLISECOND);
693            FBFieldType {
694                type_type: crate::Type::Date,
695                type_: builder.finish().as_union_value(),
696                children: Some(fbb.create_vector(&empty_fields[..])),
697            }
698        }
699        Time32(unit) | Time64(unit) => {
700            let mut builder = crate::TimeBuilder::new(fbb);
701            match unit {
702                TimeUnit::Second => {
703                    builder.add_bitWidth(32);
704                    builder.add_unit(crate::TimeUnit::SECOND);
705                }
706                TimeUnit::Millisecond => {
707                    builder.add_bitWidth(32);
708                    builder.add_unit(crate::TimeUnit::MILLISECOND);
709                }
710                TimeUnit::Microsecond => {
711                    builder.add_bitWidth(64);
712                    builder.add_unit(crate::TimeUnit::MICROSECOND);
713                }
714                TimeUnit::Nanosecond => {
715                    builder.add_bitWidth(64);
716                    builder.add_unit(crate::TimeUnit::NANOSECOND);
717                }
718            }
719            FBFieldType {
720                type_type: crate::Type::Time,
721                type_: builder.finish().as_union_value(),
722                children: Some(fbb.create_vector(&empty_fields[..])),
723            }
724        }
725        Timestamp(unit, tz) => {
726            let tz = tz.as_deref().unwrap_or_default();
727            let tz_str = fbb.create_string(tz);
728            let mut builder = crate::TimestampBuilder::new(fbb);
729            let time_unit = match unit {
730                TimeUnit::Second => crate::TimeUnit::SECOND,
731                TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
732                TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
733                TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
734            };
735            builder.add_unit(time_unit);
736            if !tz.is_empty() {
737                builder.add_timezone(tz_str);
738            }
739            FBFieldType {
740                type_type: crate::Type::Timestamp,
741                type_: builder.finish().as_union_value(),
742                children: Some(fbb.create_vector(&empty_fields[..])),
743            }
744        }
745        Interval(unit) => {
746            let mut builder = crate::IntervalBuilder::new(fbb);
747            let interval_unit = match unit {
748                IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
749                IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
750                IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
751            };
752            builder.add_unit(interval_unit);
753            FBFieldType {
754                type_type: crate::Type::Interval,
755                type_: builder.finish().as_union_value(),
756                children: Some(fbb.create_vector(&empty_fields[..])),
757            }
758        }
759        Duration(unit) => {
760            let mut builder = crate::DurationBuilder::new(fbb);
761            let time_unit = match unit {
762                TimeUnit::Second => crate::TimeUnit::SECOND,
763                TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
764                TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
765                TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
766            };
767            builder.add_unit(time_unit);
768            FBFieldType {
769                type_type: crate::Type::Duration,
770                type_: builder.finish().as_union_value(),
771                children: Some(fbb.create_vector(&empty_fields[..])),
772            }
773        }
774        List(ref list_type) => {
775            let child = build_field(fbb, dictionary_tracker, list_type);
776            FBFieldType {
777                type_type: crate::Type::List,
778                type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
779                children: Some(fbb.create_vector(&[child])),
780            }
781        }
782        ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
783        LargeList(ref list_type) => {
784            let child = build_field(fbb, dictionary_tracker, list_type);
785            FBFieldType {
786                type_type: crate::Type::LargeList,
787                type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
788                children: Some(fbb.create_vector(&[child])),
789            }
790        }
791        FixedSizeList(ref list_type, len) => {
792            let child = build_field(fbb, dictionary_tracker, list_type);
793            let mut builder = crate::FixedSizeListBuilder::new(fbb);
794            builder.add_listSize(*len);
795            FBFieldType {
796                type_type: crate::Type::FixedSizeList,
797                type_: builder.finish().as_union_value(),
798                children: Some(fbb.create_vector(&[child])),
799            }
800        }
801        Struct(fields) => {
802            // struct's fields are children
803            let mut children = vec![];
804            for field in fields {
805                children.push(build_field(fbb, dictionary_tracker, field));
806            }
807            FBFieldType {
808                type_type: crate::Type::Struct_,
809                type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
810                children: Some(fbb.create_vector(&children[..])),
811            }
812        }
813        RunEndEncoded(run_ends, values) => {
814            let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
815            let values_field = build_field(fbb, dictionary_tracker, values);
816            let children = [run_ends_field, values_field];
817            FBFieldType {
818                type_type: crate::Type::RunEndEncoded,
819                type_: crate::RunEndEncodedBuilder::new(fbb)
820                    .finish()
821                    .as_union_value(),
822                children: Some(fbb.create_vector(&children[..])),
823            }
824        }
825        Map(map_field, keys_sorted) => {
826            let child = build_field(fbb, dictionary_tracker, map_field);
827            let mut field_type = crate::MapBuilder::new(fbb);
828            field_type.add_keysSorted(*keys_sorted);
829            FBFieldType {
830                type_type: crate::Type::Map,
831                type_: field_type.finish().as_union_value(),
832                children: Some(fbb.create_vector(&[child])),
833            }
834        }
835        Dictionary(_, value_type) => {
836            // In this library, the dictionary "type" is a logical construct. Here we
837            // pass through to the value type, as we've already captured the index
838            // type in the DictionaryEncoding metadata in the parent field
839            get_fb_field_type(value_type, dictionary_tracker, fbb)
840        }
841        Decimal128(precision, scale) => {
842            let mut builder = crate::DecimalBuilder::new(fbb);
843            builder.add_precision(*precision as i32);
844            builder.add_scale(*scale as i32);
845            builder.add_bitWidth(128);
846            FBFieldType {
847                type_type: crate::Type::Decimal,
848                type_: builder.finish().as_union_value(),
849                children: Some(fbb.create_vector(&empty_fields[..])),
850            }
851        }
852        Decimal256(precision, scale) => {
853            let mut builder = crate::DecimalBuilder::new(fbb);
854            builder.add_precision(*precision as i32);
855            builder.add_scale(*scale as i32);
856            builder.add_bitWidth(256);
857            FBFieldType {
858                type_type: crate::Type::Decimal,
859                type_: builder.finish().as_union_value(),
860                children: Some(fbb.create_vector(&empty_fields[..])),
861            }
862        }
863        Union(fields, mode) => {
864            let mut children = vec![];
865            for (_, field) in fields.iter() {
866                children.push(build_field(fbb, dictionary_tracker, field));
867            }
868
869            let union_mode = match mode {
870                UnionMode::Sparse => crate::UnionMode::Sparse,
871                UnionMode::Dense => crate::UnionMode::Dense,
872            };
873
874            let fbb_type_ids =
875                fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
876            let mut builder = crate::UnionBuilder::new(fbb);
877            builder.add_mode(union_mode);
878            builder.add_typeIds(fbb_type_ids);
879
880            FBFieldType {
881                type_type: crate::Type::Union,
882                type_: builder.finish().as_union_value(),
883                children: Some(fbb.create_vector(&children[..])),
884            }
885        }
886    }
887}
888
889/// Create an IPC dictionary encoding
890pub(crate) fn get_fb_dictionary<'a>(
891    index_type: &DataType,
892    dict_id: i64,
893    dict_is_ordered: bool,
894    fbb: &mut FlatBufferBuilder<'a>,
895) -> WIPOffset<crate::DictionaryEncoding<'a>> {
896    // We assume that the dictionary index type (as an integer) has already been
897    // validated elsewhere, and can safely assume we are dealing with integers
898    let mut index_builder = crate::IntBuilder::new(fbb);
899
900    match *index_type {
901        Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
902        UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
903        _ => {}
904    }
905
906    match *index_type {
907        Int8 | UInt8 => index_builder.add_bitWidth(8),
908        Int16 | UInt16 => index_builder.add_bitWidth(16),
909        Int32 | UInt32 => index_builder.add_bitWidth(32),
910        Int64 | UInt64 => index_builder.add_bitWidth(64),
911        _ => {}
912    }
913
914    let index_builder = index_builder.finish();
915
916    let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
917    builder.add_id(dict_id);
918    builder.add_indexType(index_builder);
919    builder.add_isOrdered(dict_is_ordered);
920
921    builder.finish()
922}
923
924/// An owned container for a validated [`Message`]
925///
926/// Safely decoding a flatbuffer requires validating the various embedded offsets,
927/// see [`Verifier`]. This is a potentially expensive operation, and it is therefore desirable
928/// to only do this once. [`crate::root_as_message`] performs this validation on construction,
929/// however, it returns a [`Message`] borrowing the provided byte slice. This prevents
930/// storing this [`Message`] in the same data structure that owns the buffer, as this
931/// would require self-referential borrows.
932///
933/// [`MessageBuffer`] solves this problem by providing a safe API for a [`Message`]
934/// without a lifetime bound.
935#[derive(Clone)]
936pub struct MessageBuffer(Buffer);
937
938impl Debug for MessageBuffer {
939    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
940        self.as_ref().fmt(f)
941    }
942}
943
944impl MessageBuffer {
945    /// Try to create a [`MessageBuffer`] from the provided [`Buffer`]
946    pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
947        let opts = VerifierOptions::default();
948        let mut v = Verifier::new(&opts, &buf);
949        <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
950            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
951        })?;
952        Ok(Self(buf))
953    }
954
955    /// Return the [`Message`]
956    #[inline]
957    pub fn as_ref(&self) -> Message<'_> {
958        // SAFETY: Run verifier on construction
959        unsafe { crate::root_as_message_unchecked(&self.0) }
960    }
961}
962
963#[cfg(test)]
964mod tests {
965    use super::*;
966
967    #[test]
968    fn convert_schema_round_trip() {
969        let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
970            .iter()
971            .cloned()
972            .collect();
973        let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
974            .iter()
975            .cloned()
976            .collect();
977        let schema = Schema::new_with_metadata(
978            vec![
979                Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
980                Field::new("uint16", DataType::UInt16, true),
981                Field::new("uint32", DataType::UInt32, false),
982                Field::new("uint64", DataType::UInt64, true),
983                Field::new("int8", DataType::Int8, true),
984                Field::new("int16", DataType::Int16, false),
985                Field::new("int32", DataType::Int32, true),
986                Field::new("int64", DataType::Int64, false),
987                Field::new("float16", DataType::Float16, true),
988                Field::new("float32", DataType::Float32, false),
989                Field::new("float64", DataType::Float64, true),
990                Field::new("null", DataType::Null, false),
991                Field::new("bool", DataType::Boolean, false),
992                Field::new("date32", DataType::Date32, false),
993                Field::new("date64", DataType::Date64, true),
994                Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
995                Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
996                Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
997                Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
998                Field::new(
999                    "timestamp[s]",
1000                    DataType::Timestamp(TimeUnit::Second, None),
1001                    false,
1002                ),
1003                Field::new(
1004                    "timestamp[ms]",
1005                    DataType::Timestamp(TimeUnit::Millisecond, None),
1006                    true,
1007                ),
1008                Field::new(
1009                    "timestamp[us]",
1010                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1011                    false,
1012                ),
1013                Field::new(
1014                    "timestamp[ns]",
1015                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1016                    true,
1017                ),
1018                Field::new(
1019                    "interval[ym]",
1020                    DataType::Interval(IntervalUnit::YearMonth),
1021                    true,
1022                ),
1023                Field::new(
1024                    "interval[dt]",
1025                    DataType::Interval(IntervalUnit::DayTime),
1026                    true,
1027                ),
1028                Field::new(
1029                    "interval[mdn]",
1030                    DataType::Interval(IntervalUnit::MonthDayNano),
1031                    true,
1032                ),
1033                Field::new("utf8", DataType::Utf8, false),
1034                Field::new("utf8_view", DataType::Utf8View, false),
1035                Field::new("binary", DataType::Binary, false),
1036                Field::new("binary_view", DataType::BinaryView, false),
1037                Field::new_list(
1038                    "list[u8]",
1039                    Field::new_list_field(DataType::UInt8, false),
1040                    true,
1041                ),
1042                Field::new_fixed_size_list(
1043                    "fixed_size_list[u8]",
1044                    Field::new_list_field(DataType::UInt8, false),
1045                    2,
1046                    true,
1047                ),
1048                Field::new_list(
1049                    "list[struct<float32, int32, bool>]",
1050                    Field::new_struct(
1051                        "struct",
1052                        vec![
1053                            Field::new("float32", UInt8, false),
1054                            Field::new("int32", Int32, true),
1055                            Field::new("bool", Boolean, true),
1056                        ],
1057                        true,
1058                    ),
1059                    false,
1060                ),
1061                Field::new_struct(
1062                    "struct<dictionary<int32, utf8>>",
1063                    vec![Field::new(
1064                        "dictionary<int32, utf8>",
1065                        Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1066                        false,
1067                    )],
1068                    false,
1069                ),
1070                Field::new_struct(
1071                    "struct<int64, list[struct<date32, list[struct<>]>]>",
1072                    vec![
1073                        Field::new("int64", DataType::Int64, true),
1074                        Field::new_list(
1075                            "list[struct<date32, list[struct<>]>]",
1076                            Field::new_struct(
1077                                "struct",
1078                                vec![
1079                                    Field::new("date32", DataType::Date32, true),
1080                                    Field::new_list(
1081                                        "list[struct<>]",
1082                                        Field::new(
1083                                            "struct",
1084                                            DataType::Struct(Fields::empty()),
1085                                            false,
1086                                        ),
1087                                        false,
1088                                    ),
1089                                ],
1090                                false,
1091                            ),
1092                            false,
1093                        ),
1094                    ],
1095                    false,
1096                ),
1097                Field::new_union(
1098                    "union<int64, list[union<date32, list[union<>]>]>",
1099                    vec![0, 1],
1100                    vec![
1101                        Field::new("int64", DataType::Int64, true),
1102                        Field::new_list(
1103                            "list[union<date32, list[union<>]>]",
1104                            Field::new_union(
1105                                "union<date32, list[union<>]>",
1106                                vec![0, 1],
1107                                vec![
1108                                    Field::new("date32", DataType::Date32, true),
1109                                    Field::new_list(
1110                                        "list[union<>]",
1111                                        Field::new(
1112                                            "union",
1113                                            DataType::Union(
1114                                                UnionFields::empty(),
1115                                                UnionMode::Sparse,
1116                                            ),
1117                                            false,
1118                                        ),
1119                                        false,
1120                                    ),
1121                                ],
1122                                UnionMode::Dense,
1123                            ),
1124                            false,
1125                        ),
1126                    ],
1127                    UnionMode::Sparse,
1128                ),
1129                Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1130                Field::new(
1131                    "union<>",
1132                    DataType::Union(UnionFields::empty(), UnionMode::Dense),
1133                    true,
1134                ),
1135                Field::new(
1136                    "union<>",
1137                    DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1138                    true,
1139                ),
1140                Field::new(
1141                    "union<int32, utf8>",
1142                    DataType::Union(
1143                        UnionFields::new(
1144                            vec![2, 3], // non-default type ids
1145                            vec![
1146                                Field::new("int32", DataType::Int32, true),
1147                                Field::new("utf8", DataType::Utf8, true),
1148                            ],
1149                        ),
1150                        UnionMode::Dense,
1151                    ),
1152                    true,
1153                ),
1154                #[allow(deprecated)]
1155                Field::new_dict(
1156                    "dictionary<int32, utf8>",
1157                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1158                    true,
1159                    123,
1160                    true,
1161                ),
1162                #[allow(deprecated)]
1163                Field::new_dict(
1164                    "dictionary<uint8, uint32>",
1165                    DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1166                    true,
1167                    123,
1168                    true,
1169                ),
1170                Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1171            ],
1172            md,
1173        );
1174
1175        let mut dictionary_tracker = DictionaryTracker::new(true);
1176        let fb = IpcSchemaEncoder::new()
1177            .with_dictionary_tracker(&mut dictionary_tracker)
1178            .schema_to_fb(&schema);
1179
1180        // read back fields
1181        let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1182        let schema2 = fb_to_schema(ipc);
1183        assert_eq!(schema, schema2);
1184    }
1185
1186    #[test]
1187    fn schema_from_bytes() {
1188        // Bytes of a schema generated via following python code, using pyarrow 10.0.1:
1189        //
1190        // import pyarrow as pa
1191        // schema = pa.schema([pa.field('field1', pa.uint32(), nullable=False)])
1192        // sink = pa.BufferOutputStream()
1193        // with pa.ipc.new_stream(sink, schema) as writer:
1194        //     pass
1195        // # stripping continuation & length prefix & suffix bytes to get only schema bytes
1196        // [x for x in sink.getvalue().to_pybytes()][8:-8]
1197        let bytes: Vec<u8> = vec![
1198            16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1199            0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1200            0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1201            0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1202            0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1203        ];
1204        let ipc = crate::root_as_message(&bytes).unwrap();
1205        let schema = ipc.header_as_schema().unwrap();
1206
1207        // generate same message with Rust
1208        let data_gen = crate::writer::IpcDataGenerator::default();
1209        let mut dictionary_tracker = DictionaryTracker::new(true);
1210        let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1211        let bytes = data_gen
1212            .schema_to_bytes_with_dictionary_tracker(
1213                &arrow_schema,
1214                &mut dictionary_tracker,
1215                &crate::writer::IpcWriteOptions::default(),
1216            )
1217            .ipc_message;
1218
1219        let ipc2 = crate::root_as_message(&bytes).unwrap();
1220        let schema2 = ipc2.header_as_schema().unwrap();
1221
1222        // can't compare schema directly as it compares the underlying bytes, which can differ
1223        assert!(schema.custom_metadata().is_none());
1224        assert!(schema2.custom_metadata().is_none());
1225        assert_eq!(schema.endianness(), schema2.endianness());
1226        assert!(schema.features().is_none());
1227        assert!(schema2.features().is_none());
1228        assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1229
1230        assert_eq!(ipc.version(), ipc2.version());
1231        assert_eq!(ipc.header_type(), ipc2.header_type());
1232        assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1233        assert!(ipc.custom_metadata().is_none());
1234        assert!(ipc2.custom_metadata().is_none());
1235    }
1236}