arrow_ipc/
convert.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for converting between IPC types and native Arrow types
19
20use arrow_buffer::Buffer;
21use arrow_schema::*;
22use flatbuffers::{
23    FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
24    VerifierOptions, WIPOffset,
25};
26use std::collections::HashMap;
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use crate::writer::DictionaryTracker;
31use crate::{KeyValue, Message, CONTINUATION_MARKER};
32use DataType::*;
33
34/// Low level Arrow [Schema] to IPC bytes converter
35///
36/// See also [`fb_to_schema`] for the reverse operation
37///
38/// # Example
39/// ```
40/// # use arrow_ipc::convert::{fb_to_schema, IpcSchemaEncoder};
41/// # use arrow_ipc::root_as_schema;
42/// # use arrow_ipc::writer::DictionaryTracker;
43/// # use arrow_schema::{DataType, Field, Schema};
44/// // given an arrow schema to serialize
45/// let schema = Schema::new(vec![
46///    Field::new("a", DataType::Int32, false),
47/// ]);
48///
49/// // Use a dictionary tracker to track dictionary id if needed
50///  let mut dictionary_tracker = DictionaryTracker::new(true);
51/// // create a FlatBuffersBuilder that contains the encoded bytes
52///  let fb = IpcSchemaEncoder::new()
53///    .with_dictionary_tracker(&mut dictionary_tracker)
54///    .schema_to_fb(&schema);
55///
56/// // the bytes are in `fb.finished_data()`
57/// let ipc_bytes = fb.finished_data();
58///
59///  // convert the IPC bytes back to an Arrow schema
60///  let ipc_schema = root_as_schema(ipc_bytes).unwrap();
61///  let schema2 = fb_to_schema(ipc_schema);
62/// assert_eq!(schema, schema2);
63/// ```
64#[derive(Debug)]
65pub struct IpcSchemaEncoder<'a> {
66    dictionary_tracker: Option<&'a mut DictionaryTracker>,
67}
68
69impl Default for IpcSchemaEncoder<'_> {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl<'a> IpcSchemaEncoder<'a> {
76    /// Create a new schema encoder
77    pub fn new() -> IpcSchemaEncoder<'a> {
78        IpcSchemaEncoder {
79            dictionary_tracker: None,
80        }
81    }
82
83    /// Specify a dictionary tracker to use
84    pub fn with_dictionary_tracker(
85        mut self,
86        dictionary_tracker: &'a mut DictionaryTracker,
87    ) -> Self {
88        self.dictionary_tracker = Some(dictionary_tracker);
89        self
90    }
91
92    /// Serialize a schema in IPC format, returning a completed [`FlatBufferBuilder`]
93    ///
94    /// Note: Call [`FlatBufferBuilder::finished_data`] to get the serialized bytes
95    pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> {
96        let mut fbb = FlatBufferBuilder::new();
97
98        let root = self.schema_to_fb_offset(&mut fbb, schema);
99
100        fbb.finish(root, None);
101
102        fbb
103    }
104
105    /// Serialize a schema to an in progress [`FlatBufferBuilder`], returning the in progress offset.
106    pub fn schema_to_fb_offset<'b>(
107        &mut self,
108        fbb: &mut FlatBufferBuilder<'b>,
109        schema: &Schema,
110    ) -> WIPOffset<crate::Schema<'b>> {
111        let fields = schema
112            .fields()
113            .iter()
114            .map(|field| build_field(fbb, &mut self.dictionary_tracker, field))
115            .collect::<Vec<_>>();
116        let fb_field_list = fbb.create_vector(&fields);
117
118        let fb_metadata_list =
119            (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
120
121        let mut builder = crate::SchemaBuilder::new(fbb);
122        builder.add_fields(fb_field_list);
123        if let Some(fb_metadata_list) = fb_metadata_list {
124            builder.add_custom_metadata(fb_metadata_list);
125        }
126        builder.finish()
127    }
128}
129
130/// Serialize a schema in IPC format
131#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")]
132pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> {
133    IpcSchemaEncoder::new().schema_to_fb(schema)
134}
135
136/// Push a key-value metadata into a FlatBufferBuilder and return [WIPOffset]
137pub fn metadata_to_fb<'a>(
138    fbb: &mut FlatBufferBuilder<'a>,
139    metadata: &HashMap<String, String>,
140) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
141    let mut ordered_keys = metadata.keys().collect::<Vec<_>>();
142    ordered_keys.sort();
143    let custom_metadata = ordered_keys
144        .into_iter()
145        .map(|k| {
146            let v = metadata.get(k).unwrap();
147            let fb_key_name = fbb.create_string(k);
148            let fb_val_name = fbb.create_string(v);
149
150            let mut kv_builder = crate::KeyValueBuilder::new(fbb);
151            kv_builder.add_key(fb_key_name);
152            kv_builder.add_value(fb_val_name);
153            kv_builder.finish()
154        })
155        .collect::<Vec<_>>();
156    fbb.create_vector(&custom_metadata)
157}
158
159/// Adds a [Schema] to a flatbuffer and returns the offset
160pub fn schema_to_fb_offset<'a>(
161    fbb: &mut FlatBufferBuilder<'a>,
162    schema: &Schema,
163) -> WIPOffset<crate::Schema<'a>> {
164    IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema)
165}
166
167/// Convert an IPC Field to Arrow Field
168impl From<crate::Field<'_>> for Field {
169    fn from(field: crate::Field) -> Field {
170        let arrow_field = if let Some(dictionary) = field.dictionary() {
171            #[allow(deprecated)]
172            Field::new_dict(
173                field.name().unwrap(),
174                get_data_type(field, true),
175                field.nullable(),
176                dictionary.id(),
177                dictionary.isOrdered(),
178            )
179        } else {
180            Field::new(
181                field.name().unwrap(),
182                get_data_type(field, true),
183                field.nullable(),
184            )
185        };
186
187        let mut metadata_map = HashMap::default();
188        if let Some(list) = field.custom_metadata() {
189            for kv in list {
190                if let (Some(k), Some(v)) = (kv.key(), kv.value()) {
191                    metadata_map.insert(k.to_string(), v.to_string());
192                }
193            }
194        }
195
196        arrow_field.with_metadata(metadata_map)
197    }
198}
199
200/// Deserialize an ipc [crate::Schema`] from flat buffers to an arrow [Schema].
201pub fn fb_to_schema(fb: crate::Schema) -> Schema {
202    let mut fields: Vec<Field> = vec![];
203    let c_fields = fb.fields().unwrap();
204    let len = c_fields.len();
205    for i in 0..len {
206        let c_field: crate::Field = c_fields.get(i);
207        match c_field.type_type() {
208            crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
209                unimplemented!("Big Endian is not supported for Decimal!")
210            }
211            _ => (),
212        };
213        fields.push(c_field.into());
214    }
215
216    let mut metadata: HashMap<String, String> = HashMap::default();
217    if let Some(md_fields) = fb.custom_metadata() {
218        let len = md_fields.len();
219        for i in 0..len {
220            let kv = md_fields.get(i);
221            let k_str = kv.key();
222            let v_str = kv.value();
223            if let Some(k) = k_str {
224                if let Some(v) = v_str {
225                    metadata.insert(k.to_string(), v.to_string());
226                }
227            }
228        }
229    }
230    Schema::new_with_metadata(fields, metadata)
231}
232
233/// Try deserialize flat buffer format bytes into a schema
234pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
235    if let Ok(ipc) = crate::root_as_message(bytes) {
236        if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
237            Ok(schema)
238        } else {
239            Err(ArrowError::ParseError(
240                "Unable to get head as schema".to_string(),
241            ))
242        }
243    } else {
244        Err(ArrowError::ParseError(
245            "Unable to get root as message".to_string(),
246        ))
247    }
248}
249
250/// Try deserialize the IPC format bytes into a schema
251pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
252    // There are two protocol types: https://issues.apache.org/jira/browse/ARROW-6313
253    // The original protocol is:
254    //   4 bytes - the byte length of the payload
255    //   a flatbuffer Message whose header is the Schema
256    // The latest version of protocol is:
257    // The schema of the dataset in its IPC form:
258    //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
259    //   4 bytes - the byte length of the payload
260    //   a flatbuffer Message whose header is the Schema
261    if buffer.len() < 4 {
262        return Err(ArrowError::ParseError(
263            "The buffer length is less than 4 and missing the continuation marker or length of buffer".to_string()
264        ));
265    }
266
267    let (len, buffer) = if buffer[..4] == CONTINUATION_MARKER {
268        if buffer.len() < 8 {
269            return Err(ArrowError::ParseError(
270                "The buffer length is less than 8 and missing the length of buffer".to_string(),
271            ));
272        }
273        buffer[4..].split_at(4)
274    } else {
275        buffer.split_at(4)
276    };
277
278    let len = <i32>::from_le_bytes(len.try_into().unwrap());
279    if len < 0 {
280        return Err(ArrowError::ParseError(format!(
281            "The encapsulated message's reported length is negative ({len})"
282        )));
283    }
284
285    if buffer.len() < len as usize {
286        let actual_len = buffer.len();
287        return Err(ArrowError::ParseError(
288            format!("The buffer length ({actual_len}) is less than the encapsulated message's reported length ({len})")
289        ));
290    }
291
292    let msg = crate::root_as_message(buffer)
293        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
294    let ipc_schema = msg.header_as_schema().ok_or_else(|| {
295        ArrowError::ParseError("Unable to convert flight info to a schema".to_string())
296    })?;
297    Ok(fb_to_schema(ipc_schema))
298}
299
300/// Get the Arrow data type from the flatbuffer Field table
301pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
302    if let Some(dictionary) = field.dictionary() {
303        if may_be_dictionary {
304            let int = dictionary.indexType().unwrap();
305            let index_type = match (int.bitWidth(), int.is_signed()) {
306                (8, true) => DataType::Int8,
307                (8, false) => DataType::UInt8,
308                (16, true) => DataType::Int16,
309                (16, false) => DataType::UInt16,
310                (32, true) => DataType::Int32,
311                (32, false) => DataType::UInt32,
312                (64, true) => DataType::Int64,
313                (64, false) => DataType::UInt64,
314                _ => panic!("Unexpected bitwidth and signed"),
315            };
316            return DataType::Dictionary(
317                Box::new(index_type),
318                Box::new(get_data_type(field, false)),
319            );
320        }
321    }
322
323    match field.type_type() {
324        crate::Type::Null => DataType::Null,
325        crate::Type::Bool => DataType::Boolean,
326        crate::Type::Int => {
327            let int = field.type_as_int().unwrap();
328            match (int.bitWidth(), int.is_signed()) {
329                (8, true) => DataType::Int8,
330                (8, false) => DataType::UInt8,
331                (16, true) => DataType::Int16,
332                (16, false) => DataType::UInt16,
333                (32, true) => DataType::Int32,
334                (32, false) => DataType::UInt32,
335                (64, true) => DataType::Int64,
336                (64, false) => DataType::UInt64,
337                z => panic!(
338                    "Int type with bit width of {} and signed of {} not supported",
339                    z.0, z.1
340                ),
341            }
342        }
343        crate::Type::Binary => DataType::Binary,
344        crate::Type::BinaryView => DataType::BinaryView,
345        crate::Type::LargeBinary => DataType::LargeBinary,
346        crate::Type::Utf8 => DataType::Utf8,
347        crate::Type::Utf8View => DataType::Utf8View,
348        crate::Type::LargeUtf8 => DataType::LargeUtf8,
349        crate::Type::FixedSizeBinary => {
350            let fsb = field.type_as_fixed_size_binary().unwrap();
351            DataType::FixedSizeBinary(fsb.byteWidth())
352        }
353        crate::Type::FloatingPoint => {
354            let float = field.type_as_floating_point().unwrap();
355            match float.precision() {
356                crate::Precision::HALF => DataType::Float16,
357                crate::Precision::SINGLE => DataType::Float32,
358                crate::Precision::DOUBLE => DataType::Float64,
359                z => panic!("FloatingPoint type with precision of {z:?} not supported"),
360            }
361        }
362        crate::Type::Date => {
363            let date = field.type_as_date().unwrap();
364            match date.unit() {
365                crate::DateUnit::DAY => DataType::Date32,
366                crate::DateUnit::MILLISECOND => DataType::Date64,
367                z => panic!("Date type with unit of {z:?} not supported"),
368            }
369        }
370        crate::Type::Time => {
371            let time = field.type_as_time().unwrap();
372            match (time.bitWidth(), time.unit()) {
373                (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
374                (32, crate::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond),
375                (64, crate::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond),
376                (64, crate::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
377                z => panic!(
378                    "Time type with bit width of {} and unit of {:?} not supported",
379                    z.0, z.1
380                ),
381            }
382        }
383        crate::Type::Timestamp => {
384            let timestamp = field.type_as_timestamp().unwrap();
385            let timezone: Option<_> = timestamp.timezone().map(|tz| tz.into());
386            match timestamp.unit() {
387                crate::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
388                crate::TimeUnit::MILLISECOND => {
389                    DataType::Timestamp(TimeUnit::Millisecond, timezone)
390                }
391                crate::TimeUnit::MICROSECOND => {
392                    DataType::Timestamp(TimeUnit::Microsecond, timezone)
393                }
394                crate::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone),
395                z => panic!("Timestamp type with unit of {z:?} not supported"),
396            }
397        }
398        crate::Type::Interval => {
399            let interval = field.type_as_interval().unwrap();
400            match interval.unit() {
401                crate::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth),
402                crate::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
403                crate::IntervalUnit::MONTH_DAY_NANO => {
404                    DataType::Interval(IntervalUnit::MonthDayNano)
405                }
406                z => panic!("Interval type with unit of {z:?} unsupported"),
407            }
408        }
409        crate::Type::Duration => {
410            let duration = field.type_as_duration().unwrap();
411            match duration.unit() {
412                crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
413                crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
414                crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
415                crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
416                z => panic!("Duration type with unit of {z:?} unsupported"),
417            }
418        }
419        crate::Type::List => {
420            let children = field.children().unwrap();
421            if children.len() != 1 {
422                panic!("expect a list to have one child")
423            }
424            DataType::List(Arc::new(children.get(0).into()))
425        }
426        crate::Type::LargeList => {
427            let children = field.children().unwrap();
428            if children.len() != 1 {
429                panic!("expect a large list to have one child")
430            }
431            DataType::LargeList(Arc::new(children.get(0).into()))
432        }
433        crate::Type::FixedSizeList => {
434            let children = field.children().unwrap();
435            if children.len() != 1 {
436                panic!("expect a list to have one child")
437            }
438            let fsl = field.type_as_fixed_size_list().unwrap();
439            DataType::FixedSizeList(Arc::new(children.get(0).into()), fsl.listSize())
440        }
441        crate::Type::Struct_ => {
442            let fields = match field.children() {
443                Some(children) => children.iter().map(Field::from).collect(),
444                None => Fields::empty(),
445            };
446            DataType::Struct(fields)
447        }
448        crate::Type::RunEndEncoded => {
449            let children = field.children().unwrap();
450            if children.len() != 2 {
451                panic!(
452                    "RunEndEncoded type should have exactly two children. Found {}",
453                    children.len()
454                )
455            }
456            let run_ends_field = children.get(0).into();
457            let values_field = children.get(1).into();
458            DataType::RunEndEncoded(Arc::new(run_ends_field), Arc::new(values_field))
459        }
460        crate::Type::Map => {
461            let map = field.type_as_map().unwrap();
462            let children = field.children().unwrap();
463            if children.len() != 1 {
464                panic!("expect a map to have one child")
465            }
466            DataType::Map(Arc::new(children.get(0).into()), map.keysSorted())
467        }
468        crate::Type::Decimal => {
469            let fsb = field.type_as_decimal().unwrap();
470            let bit_width = fsb.bitWidth();
471            let precision: u8 = fsb.precision().try_into().unwrap();
472            let scale: i8 = fsb.scale().try_into().unwrap();
473            match bit_width {
474                128 => DataType::Decimal128(precision, scale),
475                256 => DataType::Decimal256(precision, scale),
476                _ => panic!("Unexpected decimal bit width {bit_width}"),
477            }
478        }
479        crate::Type::Union => {
480            let union = field.type_as_union().unwrap();
481
482            let union_mode = match union.mode() {
483                crate::UnionMode::Dense => UnionMode::Dense,
484                crate::UnionMode::Sparse => UnionMode::Sparse,
485                mode => panic!("Unexpected union mode: {mode:?}"),
486            };
487
488            let mut fields = vec![];
489            if let Some(children) = field.children() {
490                for i in 0..children.len() {
491                    fields.push(Field::from(children.get(i)));
492                }
493            };
494
495            let fields = match union.typeIds() {
496                None => UnionFields::new(0_i8..fields.len() as i8, fields),
497                Some(ids) => UnionFields::new(ids.iter().map(|i| i as i8), fields),
498            };
499
500            DataType::Union(fields, union_mode)
501        }
502        t => unimplemented!("Type {:?} not supported", t),
503    }
504}
505
506pub(crate) struct FBFieldType<'b> {
507    pub(crate) type_type: crate::Type,
508    pub(crate) type_: WIPOffset<UnionWIPOffset>,
509    pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
510}
511
512/// Create an IPC Field from an Arrow Field
513pub(crate) fn build_field<'a>(
514    fbb: &mut FlatBufferBuilder<'a>,
515    dictionary_tracker: &mut Option<&mut DictionaryTracker>,
516    field: &Field,
517) -> WIPOffset<crate::Field<'a>> {
518    // Optional custom metadata.
519    let mut fb_metadata = None;
520    if !field.metadata().is_empty() {
521        fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
522    };
523
524    let fb_field_name = fbb.create_string(field.name().as_str());
525    let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);
526
527    let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
528        match dictionary_tracker {
529            Some(tracker) => Some(get_fb_dictionary(
530                index_type,
531                #[allow(deprecated)]
532                tracker.set_dict_id(field),
533                field
534                    .dict_is_ordered()
535                    .expect("All Dictionary types have `dict_is_ordered`"),
536                fbb,
537            )),
538            None => Some(get_fb_dictionary(
539                index_type,
540                #[allow(deprecated)]
541                field
542                    .dict_id()
543                    .expect("Dictionary type must have a dictionary id"),
544                field
545                    .dict_is_ordered()
546                    .expect("All Dictionary types have `dict_is_ordered`"),
547                fbb,
548            )),
549        }
550    } else {
551        None
552    };
553
554    let mut field_builder = crate::FieldBuilder::new(fbb);
555    field_builder.add_name(fb_field_name);
556    if let Some(dictionary) = fb_dictionary {
557        field_builder.add_dictionary(dictionary)
558    }
559    field_builder.add_type_type(field_type.type_type);
560    field_builder.add_nullable(field.is_nullable());
561    match field_type.children {
562        None => {}
563        Some(children) => field_builder.add_children(children),
564    };
565    field_builder.add_type_(field_type.type_);
566
567    if let Some(fb_metadata) = fb_metadata {
568        field_builder.add_custom_metadata(fb_metadata);
569    }
570
571    field_builder.finish()
572}
573
574/// Get the IPC type of a data type
575pub(crate) fn get_fb_field_type<'a>(
576    data_type: &DataType,
577    dictionary_tracker: &mut Option<&mut DictionaryTracker>,
578    fbb: &mut FlatBufferBuilder<'a>,
579) -> FBFieldType<'a> {
580    // some IPC implementations expect an empty list for child data, instead of a null value.
581    // An empty field list is thus returned for primitive types
582    let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
583    match data_type {
584        Null => FBFieldType {
585            type_type: crate::Type::Null,
586            type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
587            children: Some(fbb.create_vector(&empty_fields[..])),
588        },
589        Boolean => FBFieldType {
590            type_type: crate::Type::Bool,
591            type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
592            children: Some(fbb.create_vector(&empty_fields[..])),
593        },
594        UInt8 | UInt16 | UInt32 | UInt64 => {
595            let children = fbb.create_vector(&empty_fields[..]);
596            let mut builder = crate::IntBuilder::new(fbb);
597            builder.add_is_signed(false);
598            match data_type {
599                UInt8 => builder.add_bitWidth(8),
600                UInt16 => builder.add_bitWidth(16),
601                UInt32 => builder.add_bitWidth(32),
602                UInt64 => builder.add_bitWidth(64),
603                _ => {}
604            };
605            FBFieldType {
606                type_type: crate::Type::Int,
607                type_: builder.finish().as_union_value(),
608                children: Some(children),
609            }
610        }
611        Int8 | Int16 | Int32 | Int64 => {
612            let children = fbb.create_vector(&empty_fields[..]);
613            let mut builder = crate::IntBuilder::new(fbb);
614            builder.add_is_signed(true);
615            match data_type {
616                Int8 => builder.add_bitWidth(8),
617                Int16 => builder.add_bitWidth(16),
618                Int32 => builder.add_bitWidth(32),
619                Int64 => builder.add_bitWidth(64),
620                _ => {}
621            };
622            FBFieldType {
623                type_type: crate::Type::Int,
624                type_: builder.finish().as_union_value(),
625                children: Some(children),
626            }
627        }
628        Float16 | Float32 | Float64 => {
629            let children = fbb.create_vector(&empty_fields[..]);
630            let mut builder = crate::FloatingPointBuilder::new(fbb);
631            match data_type {
632                Float16 => builder.add_precision(crate::Precision::HALF),
633                Float32 => builder.add_precision(crate::Precision::SINGLE),
634                Float64 => builder.add_precision(crate::Precision::DOUBLE),
635                _ => {}
636            };
637            FBFieldType {
638                type_type: crate::Type::FloatingPoint,
639                type_: builder.finish().as_union_value(),
640                children: Some(children),
641            }
642        }
643        Binary => FBFieldType {
644            type_type: crate::Type::Binary,
645            type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
646            children: Some(fbb.create_vector(&empty_fields[..])),
647        },
648        LargeBinary => FBFieldType {
649            type_type: crate::Type::LargeBinary,
650            type_: crate::LargeBinaryBuilder::new(fbb)
651                .finish()
652                .as_union_value(),
653            children: Some(fbb.create_vector(&empty_fields[..])),
654        },
655        BinaryView => FBFieldType {
656            type_type: crate::Type::BinaryView,
657            type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
658            children: Some(fbb.create_vector(&empty_fields[..])),
659        },
660        Utf8View => FBFieldType {
661            type_type: crate::Type::Utf8View,
662            type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
663            children: Some(fbb.create_vector(&empty_fields[..])),
664        },
665        Utf8 => FBFieldType {
666            type_type: crate::Type::Utf8,
667            type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
668            children: Some(fbb.create_vector(&empty_fields[..])),
669        },
670        LargeUtf8 => FBFieldType {
671            type_type: crate::Type::LargeUtf8,
672            type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
673            children: Some(fbb.create_vector(&empty_fields[..])),
674        },
675        FixedSizeBinary(len) => {
676            let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
677            builder.add_byteWidth(*len);
678            FBFieldType {
679                type_type: crate::Type::FixedSizeBinary,
680                type_: builder.finish().as_union_value(),
681                children: Some(fbb.create_vector(&empty_fields[..])),
682            }
683        }
684        Date32 => {
685            let mut builder = crate::DateBuilder::new(fbb);
686            builder.add_unit(crate::DateUnit::DAY);
687            FBFieldType {
688                type_type: crate::Type::Date,
689                type_: builder.finish().as_union_value(),
690                children: Some(fbb.create_vector(&empty_fields[..])),
691            }
692        }
693        Date64 => {
694            let mut builder = crate::DateBuilder::new(fbb);
695            builder.add_unit(crate::DateUnit::MILLISECOND);
696            FBFieldType {
697                type_type: crate::Type::Date,
698                type_: builder.finish().as_union_value(),
699                children: Some(fbb.create_vector(&empty_fields[..])),
700            }
701        }
702        Time32(unit) | Time64(unit) => {
703            let mut builder = crate::TimeBuilder::new(fbb);
704            match unit {
705                TimeUnit::Second => {
706                    builder.add_bitWidth(32);
707                    builder.add_unit(crate::TimeUnit::SECOND);
708                }
709                TimeUnit::Millisecond => {
710                    builder.add_bitWidth(32);
711                    builder.add_unit(crate::TimeUnit::MILLISECOND);
712                }
713                TimeUnit::Microsecond => {
714                    builder.add_bitWidth(64);
715                    builder.add_unit(crate::TimeUnit::MICROSECOND);
716                }
717                TimeUnit::Nanosecond => {
718                    builder.add_bitWidth(64);
719                    builder.add_unit(crate::TimeUnit::NANOSECOND);
720                }
721            }
722            FBFieldType {
723                type_type: crate::Type::Time,
724                type_: builder.finish().as_union_value(),
725                children: Some(fbb.create_vector(&empty_fields[..])),
726            }
727        }
728        Timestamp(unit, tz) => {
729            let tz = tz.as_deref().unwrap_or_default();
730            let tz_str = fbb.create_string(tz);
731            let mut builder = crate::TimestampBuilder::new(fbb);
732            let time_unit = match unit {
733                TimeUnit::Second => crate::TimeUnit::SECOND,
734                TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
735                TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
736                TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
737            };
738            builder.add_unit(time_unit);
739            if !tz.is_empty() {
740                builder.add_timezone(tz_str);
741            }
742            FBFieldType {
743                type_type: crate::Type::Timestamp,
744                type_: builder.finish().as_union_value(),
745                children: Some(fbb.create_vector(&empty_fields[..])),
746            }
747        }
748        Interval(unit) => {
749            let mut builder = crate::IntervalBuilder::new(fbb);
750            let interval_unit = match unit {
751                IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
752                IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
753                IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
754            };
755            builder.add_unit(interval_unit);
756            FBFieldType {
757                type_type: crate::Type::Interval,
758                type_: builder.finish().as_union_value(),
759                children: Some(fbb.create_vector(&empty_fields[..])),
760            }
761        }
762        Duration(unit) => {
763            let mut builder = crate::DurationBuilder::new(fbb);
764            let time_unit = match unit {
765                TimeUnit::Second => crate::TimeUnit::SECOND,
766                TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
767                TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
768                TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
769            };
770            builder.add_unit(time_unit);
771            FBFieldType {
772                type_type: crate::Type::Duration,
773                type_: builder.finish().as_union_value(),
774                children: Some(fbb.create_vector(&empty_fields[..])),
775            }
776        }
777        List(ref list_type) => {
778            let child = build_field(fbb, dictionary_tracker, list_type);
779            FBFieldType {
780                type_type: crate::Type::List,
781                type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
782                children: Some(fbb.create_vector(&[child])),
783            }
784        }
785        ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
786        LargeList(ref list_type) => {
787            let child = build_field(fbb, dictionary_tracker, list_type);
788            FBFieldType {
789                type_type: crate::Type::LargeList,
790                type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
791                children: Some(fbb.create_vector(&[child])),
792            }
793        }
794        FixedSizeList(ref list_type, len) => {
795            let child = build_field(fbb, dictionary_tracker, list_type);
796            let mut builder = crate::FixedSizeListBuilder::new(fbb);
797            builder.add_listSize(*len);
798            FBFieldType {
799                type_type: crate::Type::FixedSizeList,
800                type_: builder.finish().as_union_value(),
801                children: Some(fbb.create_vector(&[child])),
802            }
803        }
804        Struct(fields) => {
805            // struct's fields are children
806            let mut children = vec![];
807            for field in fields {
808                children.push(build_field(fbb, dictionary_tracker, field));
809            }
810            FBFieldType {
811                type_type: crate::Type::Struct_,
812                type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
813                children: Some(fbb.create_vector(&children[..])),
814            }
815        }
816        RunEndEncoded(run_ends, values) => {
817            let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
818            let values_field = build_field(fbb, dictionary_tracker, values);
819            let children = [run_ends_field, values_field];
820            FBFieldType {
821                type_type: crate::Type::RunEndEncoded,
822                type_: crate::RunEndEncodedBuilder::new(fbb)
823                    .finish()
824                    .as_union_value(),
825                children: Some(fbb.create_vector(&children[..])),
826            }
827        }
828        Map(map_field, keys_sorted) => {
829            let child = build_field(fbb, dictionary_tracker, map_field);
830            let mut field_type = crate::MapBuilder::new(fbb);
831            field_type.add_keysSorted(*keys_sorted);
832            FBFieldType {
833                type_type: crate::Type::Map,
834                type_: field_type.finish().as_union_value(),
835                children: Some(fbb.create_vector(&[child])),
836            }
837        }
838        Dictionary(_, value_type) => {
839            // In this library, the dictionary "type" is a logical construct. Here we
840            // pass through to the value type, as we've already captured the index
841            // type in the DictionaryEncoding metadata in the parent field
842            get_fb_field_type(value_type, dictionary_tracker, fbb)
843        }
844        Decimal128(precision, scale) => {
845            let mut builder = crate::DecimalBuilder::new(fbb);
846            builder.add_precision(*precision as i32);
847            builder.add_scale(*scale as i32);
848            builder.add_bitWidth(128);
849            FBFieldType {
850                type_type: crate::Type::Decimal,
851                type_: builder.finish().as_union_value(),
852                children: Some(fbb.create_vector(&empty_fields[..])),
853            }
854        }
855        Decimal256(precision, scale) => {
856            let mut builder = crate::DecimalBuilder::new(fbb);
857            builder.add_precision(*precision as i32);
858            builder.add_scale(*scale as i32);
859            builder.add_bitWidth(256);
860            FBFieldType {
861                type_type: crate::Type::Decimal,
862                type_: builder.finish().as_union_value(),
863                children: Some(fbb.create_vector(&empty_fields[..])),
864            }
865        }
866        Union(fields, mode) => {
867            let mut children = vec![];
868            for (_, field) in fields.iter() {
869                children.push(build_field(fbb, dictionary_tracker, field));
870            }
871
872            let union_mode = match mode {
873                UnionMode::Sparse => crate::UnionMode::Sparse,
874                UnionMode::Dense => crate::UnionMode::Dense,
875            };
876
877            let fbb_type_ids =
878                fbb.create_vector(&fields.iter().map(|(t, _)| t as i32).collect::<Vec<_>>());
879            let mut builder = crate::UnionBuilder::new(fbb);
880            builder.add_mode(union_mode);
881            builder.add_typeIds(fbb_type_ids);
882
883            FBFieldType {
884                type_type: crate::Type::Union,
885                type_: builder.finish().as_union_value(),
886                children: Some(fbb.create_vector(&children[..])),
887            }
888        }
889    }
890}
891
892/// Create an IPC dictionary encoding
893pub(crate) fn get_fb_dictionary<'a>(
894    index_type: &DataType,
895    dict_id: i64,
896    dict_is_ordered: bool,
897    fbb: &mut FlatBufferBuilder<'a>,
898) -> WIPOffset<crate::DictionaryEncoding<'a>> {
899    // We assume that the dictionary index type (as an integer) has already been
900    // validated elsewhere, and can safely assume we are dealing with integers
901    let mut index_builder = crate::IntBuilder::new(fbb);
902
903    match *index_type {
904        Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
905        UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false),
906        _ => {}
907    }
908
909    match *index_type {
910        Int8 | UInt8 => index_builder.add_bitWidth(8),
911        Int16 | UInt16 => index_builder.add_bitWidth(16),
912        Int32 | UInt32 => index_builder.add_bitWidth(32),
913        Int64 | UInt64 => index_builder.add_bitWidth(64),
914        _ => {}
915    }
916
917    let index_builder = index_builder.finish();
918
919    let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
920    builder.add_id(dict_id);
921    builder.add_indexType(index_builder);
922    builder.add_isOrdered(dict_is_ordered);
923
924    builder.finish()
925}
926
927/// An owned container for a validated [`Message`]
928///
929/// Safely decoding a flatbuffer requires validating the various embedded offsets,
930/// see [`Verifier`]. This is a potentially expensive operation, and it is therefore desirable
931/// to only do this once. [`crate::root_as_message`] performs this validation on construction,
932/// however, it returns a [`Message`] borrowing the provided byte slice. This prevents
933/// storing this [`Message`] in the same data structure that owns the buffer, as this
934/// would require self-referential borrows.
935///
936/// [`MessageBuffer`] solves this problem by providing a safe API for a [`Message`]
937/// without a lifetime bound.
938#[derive(Clone)]
939pub struct MessageBuffer(Buffer);
940
941impl Debug for MessageBuffer {
942    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
943        self.as_ref().fmt(f)
944    }
945}
946
947impl MessageBuffer {
948    /// Try to create a [`MessageBuffer`] from the provided [`Buffer`]
949    pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
950        let opts = VerifierOptions::default();
951        let mut v = Verifier::new(&opts, &buf);
952        <ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
953            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
954        })?;
955        Ok(Self(buf))
956    }
957
958    /// Return the [`Message`]
959    #[inline]
960    pub fn as_ref(&self) -> Message<'_> {
961        // SAFETY: Run verifier on construction
962        unsafe { crate::root_as_message_unchecked(&self.0) }
963    }
964}
965
966#[cfg(test)]
967mod tests {
968    use super::*;
969
970    #[test]
971    fn convert_schema_round_trip() {
972        let md: HashMap<String, String> = [("Key".to_string(), "value".to_string())]
973            .iter()
974            .cloned()
975            .collect();
976        let field_md: HashMap<String, String> = [("k".to_string(), "v".to_string())]
977            .iter()
978            .cloned()
979            .collect();
980        let schema = Schema::new_with_metadata(
981            vec![
982                Field::new("uint8", DataType::UInt8, false).with_metadata(field_md),
983                Field::new("uint16", DataType::UInt16, true),
984                Field::new("uint32", DataType::UInt32, false),
985                Field::new("uint64", DataType::UInt64, true),
986                Field::new("int8", DataType::Int8, true),
987                Field::new("int16", DataType::Int16, false),
988                Field::new("int32", DataType::Int32, true),
989                Field::new("int64", DataType::Int64, false),
990                Field::new("float16", DataType::Float16, true),
991                Field::new("float32", DataType::Float32, false),
992                Field::new("float64", DataType::Float64, true),
993                Field::new("null", DataType::Null, false),
994                Field::new("bool", DataType::Boolean, false),
995                Field::new("date32", DataType::Date32, false),
996                Field::new("date64", DataType::Date64, true),
997                Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true),
998                Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false),
999                Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false),
1000                Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true),
1001                Field::new(
1002                    "timestamp[s]",
1003                    DataType::Timestamp(TimeUnit::Second, None),
1004                    false,
1005                ),
1006                Field::new(
1007                    "timestamp[ms]",
1008                    DataType::Timestamp(TimeUnit::Millisecond, None),
1009                    true,
1010                ),
1011                Field::new(
1012                    "timestamp[us]",
1013                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1014                    false,
1015                ),
1016                Field::new(
1017                    "timestamp[ns]",
1018                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1019                    true,
1020                ),
1021                Field::new(
1022                    "interval[ym]",
1023                    DataType::Interval(IntervalUnit::YearMonth),
1024                    true,
1025                ),
1026                Field::new(
1027                    "interval[dt]",
1028                    DataType::Interval(IntervalUnit::DayTime),
1029                    true,
1030                ),
1031                Field::new(
1032                    "interval[mdn]",
1033                    DataType::Interval(IntervalUnit::MonthDayNano),
1034                    true,
1035                ),
1036                Field::new("utf8", DataType::Utf8, false),
1037                Field::new("utf8_view", DataType::Utf8View, false),
1038                Field::new("binary", DataType::Binary, false),
1039                Field::new("binary_view", DataType::BinaryView, false),
1040                Field::new_list(
1041                    "list[u8]",
1042                    Field::new_list_field(DataType::UInt8, false),
1043                    true,
1044                ),
1045                Field::new_fixed_size_list(
1046                    "fixed_size_list[u8]",
1047                    Field::new_list_field(DataType::UInt8, false),
1048                    2,
1049                    true,
1050                ),
1051                Field::new_list(
1052                    "list[struct<float32, int32, bool>]",
1053                    Field::new_struct(
1054                        "struct",
1055                        vec![
1056                            Field::new("float32", UInt8, false),
1057                            Field::new("int32", Int32, true),
1058                            Field::new("bool", Boolean, true),
1059                        ],
1060                        true,
1061                    ),
1062                    false,
1063                ),
1064                Field::new_struct(
1065                    "struct<dictionary<int32, utf8>>",
1066                    vec![Field::new(
1067                        "dictionary<int32, utf8>",
1068                        Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1069                        false,
1070                    )],
1071                    false,
1072                ),
1073                Field::new_struct(
1074                    "struct<int64, list[struct<date32, list[struct<>]>]>",
1075                    vec![
1076                        Field::new("int64", DataType::Int64, true),
1077                        Field::new_list(
1078                            "list[struct<date32, list[struct<>]>]",
1079                            Field::new_struct(
1080                                "struct",
1081                                vec![
1082                                    Field::new("date32", DataType::Date32, true),
1083                                    Field::new_list(
1084                                        "list[struct<>]",
1085                                        Field::new(
1086                                            "struct",
1087                                            DataType::Struct(Fields::empty()),
1088                                            false,
1089                                        ),
1090                                        false,
1091                                    ),
1092                                ],
1093                                false,
1094                            ),
1095                            false,
1096                        ),
1097                    ],
1098                    false,
1099                ),
1100                Field::new_union(
1101                    "union<int64, list[union<date32, list[union<>]>]>",
1102                    vec![0, 1],
1103                    vec![
1104                        Field::new("int64", DataType::Int64, true),
1105                        Field::new_list(
1106                            "list[union<date32, list[union<>]>]",
1107                            Field::new_union(
1108                                "union<date32, list[union<>]>",
1109                                vec![0, 1],
1110                                vec![
1111                                    Field::new("date32", DataType::Date32, true),
1112                                    Field::new_list(
1113                                        "list[union<>]",
1114                                        Field::new(
1115                                            "union",
1116                                            DataType::Union(
1117                                                UnionFields::empty(),
1118                                                UnionMode::Sparse,
1119                                            ),
1120                                            false,
1121                                        ),
1122                                        false,
1123                                    ),
1124                                ],
1125                                UnionMode::Dense,
1126                            ),
1127                            false,
1128                        ),
1129                    ],
1130                    UnionMode::Sparse,
1131                ),
1132                Field::new("struct<>", DataType::Struct(Fields::empty()), true),
1133                Field::new(
1134                    "union<>",
1135                    DataType::Union(UnionFields::empty(), UnionMode::Dense),
1136                    true,
1137                ),
1138                Field::new(
1139                    "union<>",
1140                    DataType::Union(UnionFields::empty(), UnionMode::Sparse),
1141                    true,
1142                ),
1143                Field::new(
1144                    "union<int32, utf8>",
1145                    DataType::Union(
1146                        UnionFields::new(
1147                            vec![2, 3], // non-default type ids
1148                            vec![
1149                                Field::new("int32", DataType::Int32, true),
1150                                Field::new("utf8", DataType::Utf8, true),
1151                            ],
1152                        ),
1153                        UnionMode::Dense,
1154                    ),
1155                    true,
1156                ),
1157                #[allow(deprecated)]
1158                Field::new_dict(
1159                    "dictionary<int32, utf8>",
1160                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1161                    true,
1162                    123,
1163                    true,
1164                ),
1165                #[allow(deprecated)]
1166                Field::new_dict(
1167                    "dictionary<uint8, uint32>",
1168                    DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
1169                    true,
1170                    123,
1171                    true,
1172                ),
1173                Field::new("decimal<usize, usize>", DataType::Decimal128(10, 6), false),
1174            ],
1175            md,
1176        );
1177
1178        let mut dictionary_tracker = DictionaryTracker::new(true);
1179        let fb = IpcSchemaEncoder::new()
1180            .with_dictionary_tracker(&mut dictionary_tracker)
1181            .schema_to_fb(&schema);
1182
1183        // read back fields
1184        let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
1185        let schema2 = fb_to_schema(ipc);
1186        assert_eq!(schema, schema2);
1187    }
1188
1189    #[test]
1190    fn schema_from_bytes() {
1191        // Bytes of a schema generated via following python code, using pyarrow 10.0.1:
1192        //
1193        // import pyarrow as pa
1194        // schema = pa.schema([pa.field('field1', pa.uint32(), nullable=False)])
1195        // sink = pa.BufferOutputStream()
1196        // with pa.ipc.new_stream(sink, schema) as writer:
1197        //     pass
1198        // # stripping continuation & length prefix & suffix bytes to get only schema bytes
1199        // [x for x in sink.getvalue().to_pybytes()][8:-8]
1200        let bytes: Vec<u8> = vec![
1201            16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0, 12, 0, 0,
1202            0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, 0, 0, 0, 16, 0, 20,
1203            0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, 0, 0, 2, 16, 0, 0, 0, 32, 0,
1204            0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 6,
1205            0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0,
1206        ];
1207        let ipc = crate::root_as_message(&bytes).unwrap();
1208        let schema = ipc.header_as_schema().unwrap();
1209
1210        // generate same message with Rust
1211        let data_gen = crate::writer::IpcDataGenerator::default();
1212        let mut dictionary_tracker = DictionaryTracker::new(true);
1213        let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
1214        let bytes = data_gen
1215            .schema_to_bytes_with_dictionary_tracker(
1216                &arrow_schema,
1217                &mut dictionary_tracker,
1218                &crate::writer::IpcWriteOptions::default(),
1219            )
1220            .ipc_message;
1221
1222        let ipc2 = crate::root_as_message(&bytes).unwrap();
1223        let schema2 = ipc2.header_as_schema().unwrap();
1224
1225        // can't compare schema directly as it compares the underlying bytes, which can differ
1226        assert!(schema.custom_metadata().is_none());
1227        assert!(schema2.custom_metadata().is_none());
1228        assert_eq!(schema.endianness(), schema2.endianness());
1229        assert!(schema.features().is_none());
1230        assert!(schema2.features().is_none());
1231        assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
1232
1233        assert_eq!(ipc.version(), ipc2.version());
1234        assert_eq!(ipc.header_type(), ipc2.header_type());
1235        assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
1236        assert!(ipc.custom_metadata().is_none());
1237        assert!(ipc2.custom_metadata().is_none());
1238    }
1239}