arrow_ipc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Readers
19//!
20//! # Notes
21//!
22//! The [`FileReader`] and [`StreamReader`] have similar interfaces,
23//! however the [`FileReader`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::CompressionCodec;
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50/// Read a buffer based on offset and length
51/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
52/// Each constituent buffer is first compressed with the indicated
53/// compressor, and then written with the uncompressed length in the first 8
54/// bytes as a 64-bit little-endian signed integer followed by the compressed
55/// buffer bytes (and then padding as required by the protocol). The
56/// uncompressed length may be set to -1 to indicate that the data that
57/// follows is not compressed, which can be useful for cases where
58/// compression does not yield appreciable savings.
59fn read_buffer(
60    buf: &crate::Buffer,
61    a_data: &Buffer,
62    compression_codec: Option<CompressionCodec>,
63) -> Result<Buffer, ArrowError> {
64    let start_offset = buf.offset() as usize;
65    let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
66    // corner case: empty buffer
67    match (buf_data.is_empty(), compression_codec) {
68        (true, _) | (_, None) => Ok(buf_data),
69        (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
70    }
71}
72impl RecordBatchDecoder<'_> {
73    /// Coordinates reading arrays based on data types.
74    ///
75    /// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
76    /// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
77    ///
78    /// Notes:
79    /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
80    /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
81    ///   We thus:
82    ///     - check if the bit width of non-64-bit numbers is 64, and
83    ///     - read the buffer as 64-bit (signed integer or float), and
84    ///     - cast the 64-bit array to the appropriate data type
85    fn create_array(
86        &mut self,
87        field: &Field,
88        variadic_counts: &mut VecDeque<i64>,
89    ) -> Result<ArrayRef, ArrowError> {
90        let data_type = field.data_type();
91        match data_type {
92            Utf8 | Binary | LargeBinary | LargeUtf8 => {
93                let field_node = self.next_node(field)?;
94                let buffers = [
95                    self.next_buffer()?,
96                    self.next_buffer()?,
97                    self.next_buffer()?,
98                ];
99                self.create_primitive_array(field_node, data_type, &buffers)
100            }
101            BinaryView | Utf8View => {
102                let count = variadic_counts
103                    .pop_front()
104                    .ok_or(ArrowError::IpcError(format!(
105                        "Missing variadic count for {data_type} column"
106                    )))?;
107                let count = count + 2; // view and null buffer.
108                let buffers = (0..count)
109                    .map(|_| self.next_buffer())
110                    .collect::<Result<Vec<_>, _>>()?;
111                let field_node = self.next_node(field)?;
112                self.create_primitive_array(field_node, data_type, &buffers)
113            }
114            FixedSizeBinary(_) => {
115                let field_node = self.next_node(field)?;
116                let buffers = [self.next_buffer()?, self.next_buffer()?];
117                self.create_primitive_array(field_node, data_type, &buffers)
118            }
119            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
120                let list_node = self.next_node(field)?;
121                let list_buffers = [self.next_buffer()?, self.next_buffer()?];
122                let values = self.create_array(list_field, variadic_counts)?;
123                self.create_list_array(list_node, data_type, &list_buffers, values)
124            }
125            FixedSizeList(list_field, _) => {
126                let list_node = self.next_node(field)?;
127                let list_buffers = [self.next_buffer()?];
128                let values = self.create_array(list_field, variadic_counts)?;
129                self.create_list_array(list_node, data_type, &list_buffers, values)
130            }
131            Struct(struct_fields) => {
132                let struct_node = self.next_node(field)?;
133                let null_buffer = self.next_buffer()?;
134
135                // read the arrays for each field
136                let mut struct_arrays = vec![];
137                // TODO investigate whether just knowing the number of buffers could
138                // still work
139                for struct_field in struct_fields {
140                    let child = self.create_array(struct_field, variadic_counts)?;
141                    struct_arrays.push(child);
142                }
143                self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
144            }
145            RunEndEncoded(run_ends_field, values_field) => {
146                let run_node = self.next_node(field)?;
147                let run_ends = self.create_array(run_ends_field, variadic_counts)?;
148                let values = self.create_array(values_field, variadic_counts)?;
149
150                let run_array_length = run_node.length() as usize;
151                let builder = ArrayData::builder(data_type.clone())
152                    .len(run_array_length)
153                    .offset(0)
154                    .add_child_data(run_ends.into_data())
155                    .add_child_data(values.into_data())
156                    .null_count(run_node.null_count() as usize);
157
158                self.create_array_from_builder(builder)
159            }
160            // Create dictionary array from RecordBatch
161            Dictionary(_, _) => {
162                let index_node = self.next_node(field)?;
163                let index_buffers = [self.next_buffer()?, self.next_buffer()?];
164
165                #[allow(deprecated)]
166                let dict_id = field.dict_id().ok_or_else(|| {
167                    ArrowError::ParseError(format!("Field {field} does not have dict id"))
168                })?;
169
170                let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
171                    ArrowError::ParseError(format!(
172                        "Cannot find a dictionary batch with dict id: {dict_id}"
173                    ))
174                })?;
175
176                self.create_dictionary_array(
177                    index_node,
178                    data_type,
179                    &index_buffers,
180                    value_array.clone(),
181                )
182            }
183            Union(fields, mode) => {
184                let union_node = self.next_node(field)?;
185                let len = union_node.length() as usize;
186
187                // In V4, union types has validity bitmap
188                // In V5 and later, union types have no validity bitmap
189                if self.version < MetadataVersion::V5 {
190                    self.next_buffer()?;
191                }
192
193                let type_ids: ScalarBuffer<i8> =
194                    self.next_buffer()?.slice_with_length(0, len).into();
195
196                let value_offsets = match mode {
197                    UnionMode::Dense => {
198                        let offsets: ScalarBuffer<i32> =
199                            self.next_buffer()?.slice_with_length(0, len * 4).into();
200                        Some(offsets)
201                    }
202                    UnionMode::Sparse => None,
203                };
204
205                let mut children = Vec::with_capacity(fields.len());
206
207                for (_id, field) in fields.iter() {
208                    let child = self.create_array(field, variadic_counts)?;
209                    children.push(child);
210                }
211
212                let array = if self.skip_validation.get() {
213                    // safety: flag can only be set via unsafe code
214                    unsafe {
215                        UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
216                    }
217                } else {
218                    UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
219                };
220                Ok(Arc::new(array))
221            }
222            Null => {
223                let node = self.next_node(field)?;
224                let length = node.length();
225                let null_count = node.null_count();
226
227                if length != null_count {
228                    return Err(ArrowError::SchemaError(format!(
229                        "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
230                    )));
231                }
232
233                let builder = ArrayData::builder(data_type.clone())
234                    .len(length as usize)
235                    .offset(0);
236                self.create_array_from_builder(builder)
237            }
238            _ => {
239                let field_node = self.next_node(field)?;
240                let buffers = [self.next_buffer()?, self.next_buffer()?];
241                self.create_primitive_array(field_node, data_type, &buffers)
242            }
243        }
244    }
245
246    /// Reads the correct number of buffers based on data type and null_count, and creates a
247    /// primitive array ref
248    fn create_primitive_array(
249        &self,
250        field_node: &FieldNode,
251        data_type: &DataType,
252        buffers: &[Buffer],
253    ) -> Result<ArrayRef, ArrowError> {
254        let length = field_node.length() as usize;
255        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
256        let mut builder = match data_type {
257            Utf8 | Binary | LargeBinary | LargeUtf8 => {
258                // read 3 buffers: null buffer (optional), offsets buffer and data buffer
259                ArrayData::builder(data_type.clone())
260                    .len(length)
261                    .buffers(buffers[1..3].to_vec())
262                    .null_bit_buffer(null_buffer)
263            }
264            BinaryView | Utf8View => ArrayData::builder(data_type.clone())
265                .len(length)
266                .buffers(buffers[1..].to_vec())
267                .null_bit_buffer(null_buffer),
268            _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
269                // read 2 buffers: null buffer (optional) and data buffer
270                ArrayData::builder(data_type.clone())
271                    .len(length)
272                    .add_buffer(buffers[1].clone())
273                    .null_bit_buffer(null_buffer)
274            }
275            t => unreachable!("Data type {:?} either unsupported or not primitive", t),
276        };
277
278        builder = builder.null_count(field_node.null_count() as usize);
279
280        self.create_array_from_builder(builder)
281    }
282
283    /// Update the ArrayDataBuilder based on settings in this decoder
284    fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
285        let mut builder = builder.align_buffers(!self.require_alignment);
286        if self.skip_validation.get() {
287            // SAFETY: flag can only be set via unsafe code
288            unsafe { builder = builder.skip_validation(true) }
289        };
290        Ok(make_array(builder.build()?))
291    }
292
293    /// Reads the correct number of buffers based on list type and null_count, and creates a
294    /// list array ref
295    fn create_list_array(
296        &self,
297        field_node: &FieldNode,
298        data_type: &DataType,
299        buffers: &[Buffer],
300        child_array: ArrayRef,
301    ) -> Result<ArrayRef, ArrowError> {
302        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
303        let length = field_node.length() as usize;
304        let child_data = child_array.into_data();
305        let mut builder = match data_type {
306            List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
307                .len(length)
308                .add_buffer(buffers[1].clone())
309                .add_child_data(child_data)
310                .null_bit_buffer(null_buffer),
311
312            FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
313                .len(length)
314                .add_child_data(child_data)
315                .null_bit_buffer(null_buffer),
316
317            _ => unreachable!("Cannot create list or map array from {:?}", data_type),
318        };
319
320        builder = builder.null_count(field_node.null_count() as usize);
321
322        self.create_array_from_builder(builder)
323    }
324
325    fn create_struct_array(
326        &self,
327        struct_node: &FieldNode,
328        null_buffer: Buffer,
329        struct_fields: &Fields,
330        struct_arrays: Vec<ArrayRef>,
331    ) -> Result<ArrayRef, ArrowError> {
332        let null_count = struct_node.null_count() as usize;
333        let len = struct_node.length() as usize;
334        let skip_validation = self.skip_validation.get();
335
336        let nulls = if null_count > 0 {
337            let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
338            let null_buffer = if skip_validation {
339                // safety: flag can only be set via unsafe code
340                unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
341            } else {
342                let null_buffer = NullBuffer::new(validity_buffer);
343
344                if null_buffer.null_count() != null_count {
345                    return Err(ArrowError::InvalidArgumentError(format!(
346                        "null_count value ({}) doesn't match actual number of nulls in array ({})",
347                        null_count,
348                        null_buffer.null_count()
349                    )));
350                }
351
352                null_buffer
353            };
354
355            Some(null_buffer)
356        } else {
357            None
358        };
359        if struct_arrays.is_empty() {
360            // `StructArray::from` can't infer the correct row count
361            // if we have zero fields
362            return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
363        }
364
365        let struct_array = if skip_validation {
366            // safety: flag can only be set via unsafe code
367            unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
368        } else {
369            StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
370        };
371
372        Ok(Arc::new(struct_array))
373    }
374
375    /// Reads the correct number of buffers based on list type and null_count, and creates a
376    /// list array ref
377    fn create_dictionary_array(
378        &self,
379        field_node: &FieldNode,
380        data_type: &DataType,
381        buffers: &[Buffer],
382        value_array: ArrayRef,
383    ) -> Result<ArrayRef, ArrowError> {
384        if let Dictionary(_, _) = *data_type {
385            let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
386            let builder = ArrayData::builder(data_type.clone())
387                .len(field_node.length() as usize)
388                .add_buffer(buffers[1].clone())
389                .add_child_data(value_array.into_data())
390                .null_bit_buffer(null_buffer)
391                .null_count(field_node.null_count() as usize);
392            self.create_array_from_builder(builder)
393        } else {
394            unreachable!("Cannot create dictionary array from {:?}", data_type)
395        }
396    }
397}
398
399/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
400/// [`RecordBatch`]
401///
402/// [IPC RecordBatch]: crate::RecordBatch
403///
404pub struct RecordBatchDecoder<'a> {
405    /// The flatbuffers encoded record batch
406    batch: crate::RecordBatch<'a>,
407    /// The output schema
408    schema: SchemaRef,
409    /// Decoded dictionaries indexed by dictionary id
410    dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
411    /// Optional compression codec
412    compression: Option<CompressionCodec>,
413    /// The format version
414    version: MetadataVersion,
415    /// The raw data buffer
416    data: &'a Buffer,
417    /// The fields comprising this array
418    nodes: VectorIter<'a, FieldNode>,
419    /// The buffers comprising this array
420    buffers: VectorIter<'a, crate::Buffer>,
421    /// Projection (subset of columns) to read, if any
422    /// See [`RecordBatchDecoder::with_projection`] for details
423    projection: Option<&'a [usize]>,
424    /// Are buffers required to already be aligned? See
425    /// [`RecordBatchDecoder::with_require_alignment`] for details
426    require_alignment: bool,
427    /// Should validation be skipped when reading data? Defaults to false.
428    ///
429    /// See [`FileDecoder::with_skip_validation`] for details.
430    skip_validation: UnsafeFlag,
431}
432
433impl<'a> RecordBatchDecoder<'a> {
434    /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
435    fn try_new(
436        buf: &'a Buffer,
437        batch: crate::RecordBatch<'a>,
438        schema: SchemaRef,
439        dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
440        metadata: &'a MetadataVersion,
441    ) -> Result<Self, ArrowError> {
442        let buffers = batch.buffers().ok_or_else(|| {
443            ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
444        })?;
445        let field_nodes = batch.nodes().ok_or_else(|| {
446            ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
447        })?;
448
449        let batch_compression = batch.compression();
450        let compression = batch_compression
451            .map(|batch_compression| batch_compression.codec().try_into())
452            .transpose()?;
453
454        Ok(Self {
455            batch,
456            schema,
457            dictionaries_by_id,
458            compression,
459            version: *metadata,
460            data: buf,
461            nodes: field_nodes.iter(),
462            buffers: buffers.iter(),
463            projection: None,
464            require_alignment: false,
465            skip_validation: UnsafeFlag::new(),
466        })
467    }
468
469    /// Set the projection (default: None)
470    ///
471    /// If set, the projection is the list  of column indices
472    /// that will be read
473    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
474        self.projection = projection;
475        self
476    }
477
478    /// Set require_alignment (default: false)
479    ///
480    /// If true, buffers must be aligned appropriately or error will
481    /// result. If false, buffers will be copied to aligned buffers
482    /// if necessary.
483    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
484        self.require_alignment = require_alignment;
485        self
486    }
487
488    /// Specifies if validation should be skipped when reading data (defaults to `false`)
489    ///
490    /// Note this API is somewhat "funky" as it allows the caller to skip validation
491    /// without having to use `unsafe` code. If this is ever made public
492    /// it should be made clearer that this is a potentially unsafe by
493    /// using an `unsafe` function that takes a boolean flag.
494    ///
495    /// # Safety
496    ///
497    /// Relies on the caller only passing a flag with `true` value if they are
498    /// certain that the data is valid
499    pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
500        self.skip_validation = skip_validation;
501        self
502    }
503
504    /// Read the record batch, consuming the reader
505    fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
506        let mut variadic_counts: VecDeque<i64> = self
507            .batch
508            .variadicBufferCounts()
509            .into_iter()
510            .flatten()
511            .collect();
512
513        let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
514
515        let schema = Arc::clone(&self.schema);
516        if let Some(projection) = self.projection {
517            let mut arrays = vec![];
518            // project fields
519            for (idx, field) in schema.fields().iter().enumerate() {
520                // Create array for projected field
521                if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
522                    let child = self.create_array(field, &mut variadic_counts)?;
523                    arrays.push((proj_idx, child));
524                } else {
525                    self.skip_field(field, &mut variadic_counts)?;
526                }
527            }
528
529            arrays.sort_by_key(|t| t.0);
530
531            let schema = Arc::new(schema.project(projection)?);
532            let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
533
534            if self.skip_validation.get() {
535                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
536                unsafe {
537                    Ok(RecordBatch::new_unchecked(
538                        schema,
539                        columns,
540                        self.batch.length() as usize,
541                    ))
542                }
543            } else {
544                assert!(variadic_counts.is_empty());
545                RecordBatch::try_new_with_options(schema, columns, &options)
546            }
547        } else {
548            let mut children = vec![];
549            // keep track of index as lists require more than one node
550            for field in schema.fields() {
551                let child = self.create_array(field, &mut variadic_counts)?;
552                children.push(child);
553            }
554
555            if self.skip_validation.get() {
556                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
557                unsafe {
558                    Ok(RecordBatch::new_unchecked(
559                        schema,
560                        children,
561                        self.batch.length() as usize,
562                    ))
563                }
564            } else {
565                assert!(variadic_counts.is_empty());
566                RecordBatch::try_new_with_options(schema, children, &options)
567            }
568        }
569    }
570
571    fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
572        let buffer = self.buffers.next().ok_or_else(|| {
573            ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
574        })?;
575        read_buffer(buffer, self.data, self.compression)
576    }
577
578    fn skip_buffer(&mut self) {
579        self.buffers.next().unwrap();
580    }
581
582    fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
583        self.nodes.next().ok_or_else(|| {
584            ArrowError::SchemaError(format!(
585                "Invalid data for schema. {field} refers to node not found in schema",
586            ))
587        })
588    }
589
590    fn skip_field(
591        &mut self,
592        field: &Field,
593        variadic_count: &mut VecDeque<i64>,
594    ) -> Result<(), ArrowError> {
595        self.next_node(field)?;
596
597        match field.data_type() {
598            Utf8 | Binary | LargeBinary | LargeUtf8 => {
599                for _ in 0..3 {
600                    self.skip_buffer()
601                }
602            }
603            Utf8View | BinaryView => {
604                let count = variadic_count
605                    .pop_front()
606                    .ok_or(ArrowError::IpcError(format!(
607                        "Missing variadic count for {} column",
608                        field.data_type()
609                    )))?;
610                let count = count + 2; // view and null buffer.
611                for _i in 0..count {
612                    self.skip_buffer()
613                }
614            }
615            FixedSizeBinary(_) => {
616                self.skip_buffer();
617                self.skip_buffer();
618            }
619            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
620                self.skip_buffer();
621                self.skip_buffer();
622                self.skip_field(list_field, variadic_count)?;
623            }
624            FixedSizeList(list_field, _) => {
625                self.skip_buffer();
626                self.skip_field(list_field, variadic_count)?;
627            }
628            Struct(struct_fields) => {
629                self.skip_buffer();
630
631                // skip for each field
632                for struct_field in struct_fields {
633                    self.skip_field(struct_field, variadic_count)?
634                }
635            }
636            RunEndEncoded(run_ends_field, values_field) => {
637                self.skip_field(run_ends_field, variadic_count)?;
638                self.skip_field(values_field, variadic_count)?;
639            }
640            Dictionary(_, _) => {
641                self.skip_buffer(); // Nulls
642                self.skip_buffer(); // Indices
643            }
644            Union(fields, mode) => {
645                self.skip_buffer(); // Nulls
646
647                match mode {
648                    UnionMode::Dense => self.skip_buffer(),
649                    UnionMode::Sparse => {}
650                };
651
652                for (_, field) in fields.iter() {
653                    self.skip_field(field, variadic_count)?
654                }
655            }
656            Null => {} // No buffer increases
657            _ => {
658                self.skip_buffer();
659                self.skip_buffer();
660            }
661        };
662        Ok(())
663    }
664}
665
666/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
667///
668/// If `require_alignment` is true, this function will return an error if any array data in the
669/// input `buf` is not properly aligned.
670/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct [`arrow_data::ArrayData`].
671///
672/// If `require_alignment` is false, this function will automatically allocate a new aligned buffer
673/// and copy over the data if any array data in the input `buf` is not properly aligned.
674/// (Properly aligned array data will remain zero-copy.)
675/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
676pub fn read_record_batch(
677    buf: &Buffer,
678    batch: crate::RecordBatch,
679    schema: SchemaRef,
680    dictionaries_by_id: &HashMap<i64, ArrayRef>,
681    projection: Option<&[usize]>,
682    metadata: &MetadataVersion,
683) -> Result<RecordBatch, ArrowError> {
684    RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
685        .with_projection(projection)
686        .with_require_alignment(false)
687        .read_record_batch()
688}
689
690/// Read the dictionary from the buffer and provided metadata,
691/// updating the `dictionaries_by_id` with the resulting dictionary
692pub fn read_dictionary(
693    buf: &Buffer,
694    batch: crate::DictionaryBatch,
695    schema: &Schema,
696    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
697    metadata: &MetadataVersion,
698) -> Result<(), ArrowError> {
699    read_dictionary_impl(
700        buf,
701        batch,
702        schema,
703        dictionaries_by_id,
704        metadata,
705        false,
706        UnsafeFlag::new(),
707    )
708}
709
710fn read_dictionary_impl(
711    buf: &Buffer,
712    batch: crate::DictionaryBatch,
713    schema: &Schema,
714    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
715    metadata: &MetadataVersion,
716    require_alignment: bool,
717    skip_validation: UnsafeFlag,
718) -> Result<(), ArrowError> {
719    let id = batch.id();
720
721    let dictionary_values = get_dictionary_values(
722        buf,
723        batch,
724        schema,
725        dictionaries_by_id,
726        metadata,
727        require_alignment,
728        skip_validation,
729    )?;
730
731    update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
732
733    Ok(())
734}
735
736/// Updates the `dictionaries_by_id` with the provided dictionary values and id.
737///
738/// # Errors
739/// - If `is_delta` is true and there is no existing dictionary for the given
740///   `dict_id`
741/// - If `is_delta` is true and the concatenation of the existing and new
742///   dictionary fails. This usually signals a type mismatch between the old and
743///   new values.
744fn update_dictionaries(
745    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
746    is_delta: bool,
747    dict_id: i64,
748    dict_values: ArrayRef,
749) -> Result<(), ArrowError> {
750    if !is_delta {
751        // We don't currently record the isOrdered field. This could be general
752        // attributes of arrays.
753        // Add (possibly multiple) array refs to the dictionaries array.
754        dictionaries_by_id.insert(dict_id, dict_values.clone());
755        return Ok(());
756    }
757
758    let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
759        ArrowError::InvalidArgumentError(format!(
760            "No existing dictionary for delta dictionary with id '{dict_id}'"
761        ))
762    })?;
763
764    let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
765        ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
766    })?;
767
768    dictionaries_by_id.insert(dict_id, combined);
769
770    Ok(())
771}
772
773/// Given a dictionary batch IPC message/body along with the full state of a
774/// stream including schema, dictionary cache, metadata, and other flags, this
775/// function will parse the buffer into an array of dictionary values.
776fn get_dictionary_values(
777    buf: &Buffer,
778    batch: crate::DictionaryBatch,
779    schema: &Schema,
780    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
781    metadata: &MetadataVersion,
782    require_alignment: bool,
783    skip_validation: UnsafeFlag,
784) -> Result<ArrayRef, ArrowError> {
785    let id = batch.id();
786    #[allow(deprecated)]
787    let fields_using_this_dictionary = schema.fields_with_dict_id(id);
788    let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
789        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
790    })?;
791
792    // As the dictionary batch does not contain the type of the
793    // values array, we need to retrieve this from the schema.
794    // Get an array representing this dictionary's values.
795    let dictionary_values: ArrayRef = match first_field.data_type() {
796        DataType::Dictionary(_, value_type) => {
797            // Make a fake schema for the dictionary batch.
798            let value = value_type.as_ref().clone();
799            let schema = Schema::new(vec![Field::new("", value, true)]);
800            // Read a single column
801            let record_batch = RecordBatchDecoder::try_new(
802                buf,
803                batch.data().unwrap(),
804                Arc::new(schema),
805                dictionaries_by_id,
806                metadata,
807            )?
808            .with_require_alignment(require_alignment)
809            .with_skip_validation(skip_validation)
810            .read_record_batch()?;
811
812            Some(record_batch.column(0).clone())
813        }
814        _ => None,
815    }
816    .ok_or_else(|| {
817        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
818    })?;
819
820    Ok(dictionary_values)
821}
822
823/// Read the data for a given block
824fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
825    reader.seek(SeekFrom::Start(block.offset() as u64))?;
826    let body_len = block.bodyLength().to_usize().unwrap();
827    let metadata_len = block.metaDataLength().to_usize().unwrap();
828    let total_len = body_len.checked_add(metadata_len).unwrap();
829
830    let mut buf = MutableBuffer::from_len_zeroed(total_len);
831    reader.read_exact(&mut buf)?;
832    Ok(buf.into())
833}
834
835/// Parse an encapsulated message
836///
837/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
838fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
839    let buf = match buf[..4] == CONTINUATION_MARKER {
840        true => &buf[8..],
841        false => &buf[4..],
842    };
843    crate::root_as_message(buf)
844        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
845}
846
847/// Read the footer length from the last 10 bytes of an Arrow IPC file
848///
849/// Expects a 4 byte footer length followed by `b"ARROW1"`
850pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
851    if buf[4..] != super::ARROW_MAGIC {
852        return Err(ArrowError::ParseError(
853            "Arrow file does not contain correct footer".to_string(),
854        ));
855    }
856
857    // read footer length
858    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
859    footer_len
860        .try_into()
861        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
862}
863
864/// A low-level, push-based interface for reading an IPC file
865///
866/// For a higher-level interface see [`FileReader`]
867///
868/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] example.
869///
870/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
871///
872/// ```
873/// # use std::sync::Arc;
874/// # use arrow_array::*;
875/// # use arrow_array::types::Int32Type;
876/// # use arrow_buffer::Buffer;
877/// # use arrow_ipc::convert::fb_to_schema;
878/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
879/// # use arrow_ipc::root_as_footer;
880/// # use arrow_ipc::writer::FileWriter;
881/// // Write an IPC file
882///
883/// let batch = RecordBatch::try_from_iter([
884///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
885///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
886///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", "hello", "world"])) as _),
887/// ]).unwrap();
888///
889/// let schema = batch.schema();
890///
891/// let mut out = Vec::with_capacity(1024);
892/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
893/// writer.write(&batch).unwrap();
894/// writer.finish().unwrap();
895///
896/// drop(writer);
897///
898/// // Read IPC file
899///
900/// let buffer = Buffer::from_vec(out);
901/// let trailer_start = buffer.len() - 10;
902/// let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
903/// let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
904///
905/// let back = fb_to_schema(footer.schema().unwrap());
906/// assert_eq!(&back, schema.as_ref());
907///
908/// let mut decoder = FileDecoder::new(schema, footer.version());
909///
910/// // Read dictionaries
911/// for block in footer.dictionaries().iter().flatten() {
912///     let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
913///     let data = buffer.slice_with_length(block.offset() as _, block_len);
914///     decoder.read_dictionary(&block, &data).unwrap();
915/// }
916///
917/// // Read record batch
918/// let batches = footer.recordBatches().unwrap();
919/// assert_eq!(batches.len(), 1); // Only wrote a single batch
920///
921/// let block = batches.get(0);
922/// let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
923/// let data = buffer.slice_with_length(block.offset() as _, block_len);
924/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
925///
926/// assert_eq!(batch, back);
927/// ```
928#[derive(Debug)]
929pub struct FileDecoder {
930    schema: SchemaRef,
931    dictionaries: HashMap<i64, ArrayRef>,
932    version: MetadataVersion,
933    projection: Option<Vec<usize>>,
934    require_alignment: bool,
935    skip_validation: UnsafeFlag,
936}
937
938impl FileDecoder {
939    /// Create a new [`FileDecoder`] with the given schema and version
940    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
941        Self {
942            schema,
943            version,
944            dictionaries: Default::default(),
945            projection: None,
946            require_alignment: false,
947            skip_validation: UnsafeFlag::new(),
948        }
949    }
950
951    /// Specify a projection
952    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
953        self.projection = Some(projection);
954        self
955    }
956
957    /// Specifies if the array data in input buffers is required to be properly aligned.
958    ///
959    /// If `require_alignment` is true, this decoder will return an error if any array data in the
960    /// input `buf` is not properly aligned.
961    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
962    /// [`arrow_data::ArrayData`].
963    ///
964    /// If `require_alignment` is false (the default), this decoder will automatically allocate a
965    /// new aligned buffer and copy over the data if any array data in the input `buf` is not
966    /// properly aligned. (Properly aligned array data will remain zero-copy.)
967    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
968    /// [`arrow_data::ArrayData`].
969    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
970        self.require_alignment = require_alignment;
971        self
972    }
973
974    /// Specifies if validation should be skipped when reading data (defaults to `false`)
975    ///
976    /// # Safety
977    ///
978    /// This flag must only be set to `true` when you trust the input data and are sure the data you are
979    /// reading is a valid Arrow IPC file, otherwise undefined behavior may
980    /// result.
981    ///
982    /// For example, some programs may wish to trust reading IPC files written
983    /// by the same process that created the files.
984    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
985        unsafe { self.skip_validation.set(skip_validation) };
986        self
987    }
988
989    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
990        let message = parse_message(buf)?;
991
992        // some old test data's footer metadata is not set, so we account for that
993        if self.version != MetadataVersion::V1 && message.version() != self.version {
994            return Err(ArrowError::IpcError(
995                "Could not read IPC message as metadata versions mismatch".to_string(),
996            ));
997        }
998        Ok(message)
999    }
1000
1001    /// Read the dictionary with the given block and data buffer
1002    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1003        let message = self.read_message(buf)?;
1004        match message.header_type() {
1005            crate::MessageHeader::DictionaryBatch => {
1006                let batch = message.header_as_dictionary_batch().unwrap();
1007                read_dictionary_impl(
1008                    &buf.slice(block.metaDataLength() as _),
1009                    batch,
1010                    &self.schema,
1011                    &mut self.dictionaries,
1012                    &message.version(),
1013                    self.require_alignment,
1014                    self.skip_validation.clone(),
1015                )
1016            }
1017            t => Err(ArrowError::ParseError(format!(
1018                "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1019            ))),
1020        }
1021    }
1022
1023    /// Read the RecordBatch with the given block and data buffer
1024    pub fn read_record_batch(
1025        &self,
1026        block: &Block,
1027        buf: &Buffer,
1028    ) -> Result<Option<RecordBatch>, ArrowError> {
1029        let message = self.read_message(buf)?;
1030        match message.header_type() {
1031            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1032                "Not expecting a schema when messages are read".to_string(),
1033            )),
1034            crate::MessageHeader::RecordBatch => {
1035                let batch = message.header_as_record_batch().ok_or_else(|| {
1036                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1037                })?;
1038                // read the block that makes up the record batch into a buffer
1039                RecordBatchDecoder::try_new(
1040                    &buf.slice(block.metaDataLength() as _),
1041                    batch,
1042                    self.schema.clone(),
1043                    &self.dictionaries,
1044                    &message.version(),
1045                )?
1046                .with_projection(self.projection.as_deref())
1047                .with_require_alignment(self.require_alignment)
1048                .with_skip_validation(self.skip_validation.clone())
1049                .read_record_batch()
1050                .map(Some)
1051            }
1052            crate::MessageHeader::NONE => Ok(None),
1053            t => Err(ArrowError::InvalidArgumentError(format!(
1054                "Reading types other than record batches not yet supported, unable to read {t:?}"
1055            ))),
1056        }
1057    }
1058}
1059
1060/// Build an Arrow [`FileReader`] with custom options.
1061#[derive(Debug)]
1062pub struct FileReaderBuilder {
1063    /// Optional projection for which columns to load (zero-based column indices)
1064    projection: Option<Vec<usize>>,
1065    /// Passed through to construct [`VerifierOptions`]
1066    max_footer_fb_tables: usize,
1067    /// Passed through to construct [`VerifierOptions`]
1068    max_footer_fb_depth: usize,
1069}
1070
1071impl Default for FileReaderBuilder {
1072    fn default() -> Self {
1073        let verifier_options = VerifierOptions::default();
1074        Self {
1075            max_footer_fb_tables: verifier_options.max_tables,
1076            max_footer_fb_depth: verifier_options.max_depth,
1077            projection: None,
1078        }
1079    }
1080}
1081
1082impl FileReaderBuilder {
1083    /// Options for creating a new [`FileReader`].
1084    ///
1085    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
1086    pub fn new() -> Self {
1087        Self::default()
1088    }
1089
1090    /// Optional projection for which columns to load (zero-based column indices).
1091    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1092        self.projection = Some(projection);
1093        self
1094    }
1095
1096    /// Flatbuffers option for parsing the footer. Controls the max number of fields and
1097    /// metadata key-value pairs that can be parsed from the schema of the footer.
1098    ///
1099    /// By default this is set to `1_000_000` which roughly translates to a schema with
1100    /// no metadata key-value pairs but 499,999 fields.
1101    ///
1102    /// This default limit is enforced to protect against malicious files with a massive
1103    /// amount of flatbuffer tables which could cause a denial of service attack.
1104    ///
1105    /// If you need to ingest a trusted file with a massive number of fields and/or
1106    /// metadata key-value pairs and are facing the error `"Unable to get root as
1107    /// footer: TooManyTables"` then increase this parameter as necessary.
1108    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1109        self.max_footer_fb_tables = max_footer_fb_tables;
1110        self
1111    }
1112
1113    /// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
1114    /// nested fields parsed from the footer.
1115    ///
1116    /// By default this is set to `64` which roughly translates to a schema with
1117    /// a field nested 60 levels down through other struct fields.
1118    ///
1119    /// This default limit is enforced to protect against malicious files with a extremely
1120    /// deep flatbuffer structure which could cause a denial of service attack.
1121    ///
1122    /// If you need to ingest a trusted file with a deeply nested field and are facing the
1123    /// error `"Unable to get root as footer: DepthLimitReached"` then increase this
1124    /// parameter as necessary.
1125    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1126        self.max_footer_fb_depth = max_footer_fb_depth;
1127        self
1128    }
1129
1130    /// Build [`FileReader`] with given reader.
1131    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1132        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
1133        let mut buffer = [0; 10];
1134        reader.seek(SeekFrom::End(-10))?;
1135        reader.read_exact(&mut buffer)?;
1136
1137        let footer_len = read_footer_length(buffer)?;
1138
1139        // read footer
1140        let mut footer_data = vec![0; footer_len];
1141        reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1142        reader.read_exact(&mut footer_data)?;
1143
1144        let verifier_options = VerifierOptions {
1145            max_tables: self.max_footer_fb_tables,
1146            max_depth: self.max_footer_fb_depth,
1147            ..Default::default()
1148        };
1149        let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1150            |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1151        )?;
1152
1153        let blocks = footer.recordBatches().ok_or_else(|| {
1154            ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1155        })?;
1156
1157        let total_blocks = blocks.len();
1158
1159        let ipc_schema = footer.schema().unwrap();
1160        if !ipc_schema.endianness().equals_to_target_endianness() {
1161            return Err(ArrowError::IpcError(
1162                "the endianness of the source system does not match the endianness of the target system.".to_owned()
1163            ));
1164        }
1165
1166        let schema = crate::convert::fb_to_schema(ipc_schema);
1167
1168        let mut custom_metadata = HashMap::new();
1169        if let Some(fb_custom_metadata) = footer.custom_metadata() {
1170            for kv in fb_custom_metadata.into_iter() {
1171                custom_metadata.insert(
1172                    kv.key().unwrap().to_string(),
1173                    kv.value().unwrap().to_string(),
1174                );
1175            }
1176        }
1177
1178        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1179        if let Some(projection) = self.projection {
1180            decoder = decoder.with_projection(projection)
1181        }
1182
1183        // Create an array of optional dictionary value arrays, one per field.
1184        if let Some(dictionaries) = footer.dictionaries() {
1185            for block in dictionaries {
1186                let buf = read_block(&mut reader, block)?;
1187                decoder.read_dictionary(block, &buf)?;
1188            }
1189        }
1190
1191        Ok(FileReader {
1192            reader,
1193            blocks: blocks.iter().copied().collect(),
1194            current_block: 0,
1195            total_blocks,
1196            decoder,
1197            custom_metadata,
1198        })
1199    }
1200}
1201
1202/// Arrow File Reader
1203///
1204/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC File Format],
1205/// providing random access to the record batches.
1206///
1207/// # See Also
1208///
1209/// * [`Self::set_index`] for random access
1210/// * [`StreamReader`] for reading streaming data
1211///
1212/// # Example: Reading from a `File`
1213/// ```
1214/// # use std::io::Cursor;
1215/// use arrow_array::record_batch;
1216/// # use arrow_ipc::reader::FileReader;
1217/// # use arrow_ipc::writer::FileWriter;
1218/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1219/// # let mut file = vec![]; // mimic a stream for the example
1220/// # {
1221/// #  let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1222/// #  writer.write(&batch).unwrap();
1223/// #  writer.write(&batch).unwrap();
1224/// #  writer.finish().unwrap();
1225/// # }
1226/// # let mut file = Cursor::new(&file);
1227/// let projection = None; // read all columns
1228/// let mut reader = FileReader::try_new(&mut file, projection).unwrap();
1229/// // Position the reader to the second batch
1230/// reader.set_index(1).unwrap();
1231/// // read batches from the reader using the Iterator trait
1232/// let mut num_rows = 0;
1233/// for batch in reader {
1234///    let batch = batch.unwrap();
1235///    num_rows += batch.num_rows();
1236/// }
1237/// assert_eq!(num_rows, 3);
1238/// ```
1239/// # Example: Reading from `mmap`ed file
1240///
1241/// For an example creating Arrays without copying using  memory mapped (`mmap`)
1242/// files see the [`zero_copy_ipc`] example.
1243///
1244/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1245/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
1246pub struct FileReader<R> {
1247    /// File reader that supports reading and seeking
1248    reader: R,
1249
1250    /// The decoder
1251    decoder: FileDecoder,
1252
1253    /// The blocks in the file
1254    ///
1255    /// A block indicates the regions in the file to read to get data
1256    blocks: Vec<Block>,
1257
1258    /// A counter to keep track of the current block that should be read
1259    current_block: usize,
1260
1261    /// The total number of blocks, which may contain record batches and other types
1262    total_blocks: usize,
1263
1264    /// User defined metadata
1265    custom_metadata: HashMap<String, String>,
1266}
1267
1268impl<R> fmt::Debug for FileReader<R> {
1269    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1270        f.debug_struct("FileReader<R>")
1271            .field("decoder", &self.decoder)
1272            .field("blocks", &self.blocks)
1273            .field("current_block", &self.current_block)
1274            .field("total_blocks", &self.total_blocks)
1275            .finish_non_exhaustive()
1276    }
1277}
1278
1279impl<R: Read + Seek> FileReader<BufReader<R>> {
1280    /// Try to create a new file reader with the reader wrapped in a BufReader.
1281    ///
1282    /// See [`FileReader::try_new`] for an unbuffered version.
1283    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1284        Self::try_new(BufReader::new(reader), projection)
1285    }
1286}
1287
1288impl<R: Read + Seek> FileReader<R> {
1289    /// Try to create a new file reader.
1290    ///
1291    /// There is no internal buffering. If buffered reads are needed you likely want to use
1292    /// [`FileReader::try_new_buffered`] instead.
1293    ///
1294    /// # Errors
1295    ///
1296    /// An ['Err'](Result::Err) may be returned if:
1297    /// - the file does not meet the Arrow Format footer requirements, or
1298    /// - file endianness does not match the target endianness.
1299    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1300        let builder = FileReaderBuilder {
1301            projection,
1302            ..Default::default()
1303        };
1304        builder.build(reader)
1305    }
1306
1307    /// Return user defined customized metadata
1308    pub fn custom_metadata(&self) -> &HashMap<String, String> {
1309        &self.custom_metadata
1310    }
1311
1312    /// Return the number of batches in the file
1313    pub fn num_batches(&self) -> usize {
1314        self.total_blocks
1315    }
1316
1317    /// Return the schema of the file
1318    pub fn schema(&self) -> SchemaRef {
1319        self.decoder.schema.clone()
1320    }
1321
1322    /// See to a specific [`RecordBatch`]
1323    ///
1324    /// Sets the current block to the index, allowing random reads
1325    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1326        if index >= self.total_blocks {
1327            Err(ArrowError::InvalidArgumentError(format!(
1328                "Cannot set batch to index {} from {} total batches",
1329                index, self.total_blocks
1330            )))
1331        } else {
1332            self.current_block = index;
1333            Ok(())
1334        }
1335    }
1336
1337    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1338        let block = &self.blocks[self.current_block];
1339        self.current_block += 1;
1340
1341        // read length
1342        let buffer = read_block(&mut self.reader, block)?;
1343        self.decoder.read_record_batch(block, &buffer)
1344    }
1345
1346    /// Gets a reference to the underlying reader.
1347    ///
1348    /// It is inadvisable to directly read from the underlying reader.
1349    pub fn get_ref(&self) -> &R {
1350        &self.reader
1351    }
1352
1353    /// Gets a mutable reference to the underlying reader.
1354    ///
1355    /// It is inadvisable to directly read from the underlying reader.
1356    pub fn get_mut(&mut self) -> &mut R {
1357        &mut self.reader
1358    }
1359
1360    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1361    ///
1362    /// # Safety
1363    ///
1364    /// See [`FileDecoder::with_skip_validation`]
1365    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1366        self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1367        self
1368    }
1369}
1370
1371impl<R: Read + Seek> Iterator for FileReader<R> {
1372    type Item = Result<RecordBatch, ArrowError>;
1373
1374    fn next(&mut self) -> Option<Self::Item> {
1375        // get current block
1376        if self.current_block < self.total_blocks {
1377            self.maybe_next().transpose()
1378        } else {
1379            None
1380        }
1381    }
1382}
1383
1384impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1385    fn schema(&self) -> SchemaRef {
1386        self.schema()
1387    }
1388}
1389
1390/// Arrow Stream Reader
1391///
1392/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC Streaming Format].
1393///
1394/// # See Also
1395///
1396/// * [`FileReader`] for random access.
1397///
1398/// # Example
1399/// ```
1400/// # use arrow_array::record_batch;
1401/// # use arrow_ipc::reader::StreamReader;
1402/// # use arrow_ipc::writer::StreamWriter;
1403/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1404/// # let mut stream = vec![]; // mimic a stream for the example
1405/// # {
1406/// #  let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1407/// #  writer.write(&batch).unwrap();
1408/// #  writer.finish().unwrap();
1409/// # }
1410/// # let stream = stream.as_slice();
1411/// let projection = None; // read all columns
1412/// let mut reader = StreamReader::try_new(stream, projection).unwrap();
1413/// // read batches from the reader using the Iterator trait
1414/// let mut num_rows = 0;
1415/// for batch in reader {
1416///    let batch = batch.unwrap();
1417///    num_rows += batch.num_rows();
1418/// }
1419/// assert_eq!(num_rows, 3);
1420/// ```
1421///
1422/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1423pub struct StreamReader<R> {
1424    /// Stream reader
1425    reader: MessageReader<R>,
1426
1427    /// The schema that is read from the stream's first message
1428    schema: SchemaRef,
1429
1430    /// Optional dictionaries for each schema field.
1431    ///
1432    /// Dictionaries may be appended to in the streaming format.
1433    dictionaries_by_id: HashMap<i64, ArrayRef>,
1434
1435    /// An indicator of whether the stream is complete.
1436    ///
1437    /// This value is set to `true` the first time the reader's `next()` returns `None`.
1438    finished: bool,
1439
1440    /// Optional projection
1441    projection: Option<(Vec<usize>, Schema)>,
1442
1443    /// Should validation be skipped when reading data? Defaults to false.
1444    ///
1445    /// See [`FileDecoder::with_skip_validation`] for details.
1446    skip_validation: UnsafeFlag,
1447}
1448
1449impl<R> fmt::Debug for StreamReader<R> {
1450    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1451        f.debug_struct("StreamReader<R>")
1452            .field("reader", &"R")
1453            .field("schema", &self.schema)
1454            .field("dictionaries_by_id", &self.dictionaries_by_id)
1455            .field("finished", &self.finished)
1456            .field("projection", &self.projection)
1457            .finish()
1458    }
1459}
1460
1461impl<R: Read> StreamReader<BufReader<R>> {
1462    /// Try to create a new stream reader with the reader wrapped in a BufReader.
1463    ///
1464    /// See [`StreamReader::try_new`] for an unbuffered version.
1465    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1466        Self::try_new(BufReader::new(reader), projection)
1467    }
1468}
1469
1470impl<R: Read> StreamReader<R> {
1471    /// Try to create a new stream reader.
1472    ///
1473    /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
1474    ///
1475    /// There is no internal buffering. If buffered reads are needed you likely want to use
1476    /// [`StreamReader::try_new_buffered`] instead.
1477    ///
1478    /// # Errors
1479    ///
1480    /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
1481    /// as the first message in the stream.
1482    pub fn try_new(
1483        reader: R,
1484        projection: Option<Vec<usize>>,
1485    ) -> Result<StreamReader<R>, ArrowError> {
1486        let mut msg_reader = MessageReader::new(reader);
1487        let message = msg_reader.maybe_next()?;
1488        let Some((message, _)) = message else {
1489            return Err(ArrowError::IpcError(
1490                "Expected schema message, found empty stream.".to_string(),
1491            ));
1492        };
1493
1494        if message.header_type() != Message::MessageHeader::Schema {
1495            return Err(ArrowError::IpcError(format!(
1496                "Expected a schema as the first message in the stream, got: {:?}",
1497                message.header_type()
1498            )));
1499        }
1500
1501        let schema = message.header_as_schema().ok_or_else(|| {
1502            ArrowError::ParseError("Failed to parse schema from message header".to_string())
1503        })?;
1504        let schema = crate::convert::fb_to_schema(schema);
1505
1506        // Create an array of optional dictionary value arrays, one per field.
1507        let dictionaries_by_id = HashMap::new();
1508
1509        let projection = match projection {
1510            Some(projection_indices) => {
1511                let schema = schema.project(&projection_indices)?;
1512                Some((projection_indices, schema))
1513            }
1514            _ => None,
1515        };
1516
1517        Ok(Self {
1518            reader: msg_reader,
1519            schema: Arc::new(schema),
1520            finished: false,
1521            dictionaries_by_id,
1522            projection,
1523            skip_validation: UnsafeFlag::new(),
1524        })
1525    }
1526
1527    /// Deprecated, use [`StreamReader::try_new`] instead.
1528    #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1529    pub fn try_new_unbuffered(
1530        reader: R,
1531        projection: Option<Vec<usize>>,
1532    ) -> Result<Self, ArrowError> {
1533        Self::try_new(reader, projection)
1534    }
1535
1536    /// Return the schema of the stream
1537    pub fn schema(&self) -> SchemaRef {
1538        self.schema.clone()
1539    }
1540
1541    /// Check if the stream is finished
1542    pub fn is_finished(&self) -> bool {
1543        self.finished
1544    }
1545
1546    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1547        if self.finished {
1548            return Ok(None);
1549        }
1550
1551        // Read messages until we get a record batch or end of stream
1552        loop {
1553            let message = self.next_ipc_message()?;
1554            let Some(message) = message else {
1555                // If the message is None, we have reached the end of the stream.
1556                self.finished = true;
1557                return Ok(None);
1558            };
1559
1560            match message {
1561                IpcMessage::Schema(_) => {
1562                    return Err(ArrowError::IpcError(
1563                        "Expected a record batch, but found a schema".to_string(),
1564                    ));
1565                }
1566                IpcMessage::RecordBatch(record_batch) => {
1567                    return Ok(Some(record_batch));
1568                }
1569                IpcMessage::DictionaryBatch { .. } => {
1570                    continue;
1571                }
1572            };
1573        }
1574    }
1575
1576    /// Reads and fully parses the next IPC message from the stream. Whereas
1577    /// [`Self::maybe_next`] is a higher level method focused on reading
1578    /// `RecordBatch`es, this method returns the individual fully parsed IPC
1579    /// messages from the underlying stream.
1580    ///
1581    /// This is useful primarily for testing reader/writer behaviors as it
1582    /// allows a full view into the messages that have been written to a stream.
1583    pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1584        let message = self.reader.maybe_next()?;
1585        let Some((message, body)) = message else {
1586            // If the message is None, we have reached the end of the stream.
1587            return Ok(None);
1588        };
1589
1590        let ipc_message = match message.header_type() {
1591            Message::MessageHeader::Schema => {
1592                let schema = message.header_as_schema().ok_or_else(|| {
1593                    ArrowError::ParseError("Failed to parse schema from message header".to_string())
1594                })?;
1595                let arrow_schema = crate::convert::fb_to_schema(schema);
1596                IpcMessage::Schema(arrow_schema)
1597            }
1598            Message::MessageHeader::RecordBatch => {
1599                let batch = message.header_as_record_batch().ok_or_else(|| {
1600                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1601                })?;
1602
1603                let version = message.version();
1604                let schema = self.schema.clone();
1605                let record_batch = RecordBatchDecoder::try_new(
1606                    &body.into(),
1607                    batch,
1608                    schema,
1609                    &self.dictionaries_by_id,
1610                    &version,
1611                )?
1612                .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1613                .with_require_alignment(false)
1614                .with_skip_validation(self.skip_validation.clone())
1615                .read_record_batch()?;
1616                IpcMessage::RecordBatch(record_batch)
1617            }
1618            Message::MessageHeader::DictionaryBatch => {
1619                let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1620                    ArrowError::ParseError(
1621                        "Failed to parse dictionary batch from message header".to_string(),
1622                    )
1623                })?;
1624
1625                let version = message.version();
1626                let dict_values = get_dictionary_values(
1627                    &body.into(),
1628                    dict,
1629                    &self.schema,
1630                    &mut self.dictionaries_by_id,
1631                    &version,
1632                    false,
1633                    self.skip_validation.clone(),
1634                )?;
1635
1636                update_dictionaries(
1637                    &mut self.dictionaries_by_id,
1638                    dict.isDelta(),
1639                    dict.id(),
1640                    dict_values.clone(),
1641                )?;
1642
1643                IpcMessage::DictionaryBatch {
1644                    id: dict.id(),
1645                    is_delta: (dict.isDelta()),
1646                    values: (dict_values),
1647                }
1648            }
1649            x => {
1650                return Err(ArrowError::ParseError(format!(
1651                    "Unsupported message header type in IPC stream: '{x:?}'"
1652                )));
1653            }
1654        };
1655
1656        Ok(Some(ipc_message))
1657    }
1658
1659    /// Gets a reference to the underlying reader.
1660    ///
1661    /// It is inadvisable to directly read from the underlying reader.
1662    pub fn get_ref(&self) -> &R {
1663        self.reader.inner()
1664    }
1665
1666    /// Gets a mutable reference to the underlying reader.
1667    ///
1668    /// It is inadvisable to directly read from the underlying reader.
1669    pub fn get_mut(&mut self) -> &mut R {
1670        self.reader.inner_mut()
1671    }
1672
1673    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1674    ///
1675    /// # Safety
1676    ///
1677    /// See [`FileDecoder::with_skip_validation`]
1678    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1679        unsafe { self.skip_validation.set(skip_validation) };
1680        self
1681    }
1682}
1683
1684impl<R: Read> Iterator for StreamReader<R> {
1685    type Item = Result<RecordBatch, ArrowError>;
1686
1687    fn next(&mut self) -> Option<Self::Item> {
1688        self.maybe_next().transpose()
1689    }
1690}
1691
1692impl<R: Read> RecordBatchReader for StreamReader<R> {
1693    fn schema(&self) -> SchemaRef {
1694        self.schema.clone()
1695    }
1696}
1697
1698/// Representation of a fully parsed IpcMessage from the underlying stream.
1699/// Parsing this kind of message is done by higher level constructs such as
1700/// [`StreamReader`], because fully interpreting the messages into a record
1701/// batch or dictionary batch requires access to stream state such as schema
1702/// and the full dictionary cache.
1703#[derive(Debug)]
1704#[allow(dead_code)]
1705pub(crate) enum IpcMessage {
1706    Schema(arrow_schema::Schema),
1707    RecordBatch(RecordBatch),
1708    DictionaryBatch {
1709        id: i64,
1710        is_delta: bool,
1711        values: ArrayRef,
1712    },
1713}
1714
1715/// A low-level construct that reads [`Message::Message`]s from a reader while
1716/// re-using a buffer for metadata. This is composed into [`StreamReader`].
1717struct MessageReader<R> {
1718    reader: R,
1719    buf: Vec<u8>,
1720}
1721
1722impl<R: Read> MessageReader<R> {
1723    fn new(reader: R) -> Self {
1724        Self {
1725            reader,
1726            buf: Vec::new(),
1727        }
1728    }
1729
1730    /// Reads the entire next message from the underlying reader which includes
1731    /// the metadata length, the metadata, and the body.
1732    ///
1733    /// # Returns
1734    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1735    ///   the first read
1736    /// - `Err(_)` if the reader returns an error other than on the first
1737    ///   read, or if the metadata length is invalid
1738    /// - `Ok(Some(_))` with the Message and buffer containiner the
1739    ///   body bytes otherwise.
1740    fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1741        let meta_len = self.read_meta_len()?;
1742        let Some(meta_len) = meta_len else {
1743            return Ok(None);
1744        };
1745
1746        self.buf.resize(meta_len, 0);
1747        self.reader.read_exact(&mut self.buf)?;
1748
1749        let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1750            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1751        })?;
1752
1753        let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1754        self.reader.read_exact(&mut buf)?;
1755
1756        Ok(Some((message, buf)))
1757    }
1758
1759    /// Get a mutable reference to the underlying reader.
1760    fn inner_mut(&mut self) -> &mut R {
1761        &mut self.reader
1762    }
1763
1764    /// Get an immutable reference to the underlying reader.
1765    fn inner(&self) -> &R {
1766        &self.reader
1767    }
1768
1769    /// Read the metadata length for the next message from the underlying stream.
1770    ///
1771    /// # Returns
1772    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1773    ///   the first read
1774    /// - `Err(_)` if the reader returns an error other than on the first
1775    ///   read, or if the metadata length is less than 0.
1776    /// - `Ok(Some(_))` with the length otherwise.
1777    pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1778        let mut meta_len: [u8; 4] = [0; 4];
1779        match self.reader.read_exact(&mut meta_len) {
1780            Ok(_) => {}
1781            Err(e) => {
1782                return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1783                    // Handle EOF without the "0xFFFFFFFF 0x00000000"
1784                    // valid according to:
1785                    // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1786                    Ok(None)
1787                } else {
1788                    Err(ArrowError::from(e))
1789                };
1790            }
1791        };
1792
1793        let meta_len = {
1794            // If a continuation marker is encountered, skip over it and read
1795            // the size from the next four bytes.
1796            if meta_len == CONTINUATION_MARKER {
1797                self.reader.read_exact(&mut meta_len)?;
1798            }
1799
1800            i32::from_le_bytes(meta_len)
1801        };
1802
1803        if meta_len == 0 {
1804            return Ok(None);
1805        }
1806
1807        let meta_len = usize::try_from(meta_len)
1808            .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1809
1810        Ok(Some(meta_len))
1811    }
1812}
1813
1814#[cfg(test)]
1815mod tests {
1816    use std::io::Cursor;
1817
1818    use crate::convert::fb_to_schema;
1819    use crate::writer::{
1820        DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1821    };
1822
1823    use super::*;
1824
1825    use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1826    use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1827    use arrow_array::types::*;
1828    use arrow_buffer::{NullBuffer, OffsetBuffer};
1829    use arrow_data::ArrayDataBuilder;
1830
1831    fn create_test_projection_schema() -> Schema {
1832        // define field types
1833        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1834
1835        let fixed_size_list_data_type =
1836            DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1837
1838        let union_fields = UnionFields::new(
1839            vec![0, 1],
1840            vec![
1841                Field::new("a", DataType::Int32, false),
1842                Field::new("b", DataType::Float64, false),
1843            ],
1844        );
1845
1846        let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1847
1848        let struct_fields = Fields::from(vec![
1849            Field::new("id", DataType::Int32, false),
1850            Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1851        ]);
1852        let struct_data_type = DataType::Struct(struct_fields);
1853
1854        let run_encoded_data_type = DataType::RunEndEncoded(
1855            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1856            Arc::new(Field::new("values", DataType::Int32, true)),
1857        );
1858
1859        // define schema
1860        Schema::new(vec![
1861            Field::new("f0", DataType::UInt32, false),
1862            Field::new("f1", DataType::Utf8, false),
1863            Field::new("f2", DataType::Boolean, false),
1864            Field::new("f3", union_data_type, true),
1865            Field::new("f4", DataType::Null, true),
1866            Field::new("f5", DataType::Float64, true),
1867            Field::new("f6", list_data_type, false),
1868            Field::new("f7", DataType::FixedSizeBinary(3), true),
1869            Field::new("f8", fixed_size_list_data_type, false),
1870            Field::new("f9", struct_data_type, false),
1871            Field::new("f10", run_encoded_data_type, false),
1872            Field::new("f11", DataType::Boolean, false),
1873            Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1874            Field::new("f13", DataType::Utf8, false),
1875        ])
1876    }
1877
1878    fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1879        // set test data for each column
1880        let array0 = UInt32Array::from(vec![1, 2, 3]);
1881        let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1882        let array2 = BooleanArray::from(vec![true, false, true]);
1883
1884        let mut union_builder = UnionBuilder::new_dense();
1885        union_builder.append::<Int32Type>("a", 1).unwrap();
1886        union_builder.append::<Float64Type>("b", 10.1).unwrap();
1887        union_builder.append_null::<Float64Type>("b").unwrap();
1888        let array3 = union_builder.build().unwrap();
1889
1890        let array4 = NullArray::new(3);
1891        let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1892        let array6_values = vec![
1893            Some(vec![Some(10), Some(10), Some(10)]),
1894            Some(vec![Some(20), Some(20), Some(20)]),
1895            Some(vec![Some(30), Some(30)]),
1896        ];
1897        let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1898        let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1899        let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1900
1901        let array8_values = ArrayData::builder(DataType::Int32)
1902            .len(9)
1903            .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1904            .build()
1905            .unwrap();
1906        let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1907            .len(3)
1908            .add_child_data(array8_values)
1909            .build()
1910            .unwrap();
1911        let array8 = FixedSizeListArray::from(array8_data);
1912
1913        let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1914        let array9_list: ArrayRef =
1915            Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1916                Some(vec![Some(-10)]),
1917                Some(vec![Some(-20), Some(-20), Some(-20)]),
1918                Some(vec![Some(-30)]),
1919            ]));
1920        let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1921            .add_child_data(array9_id.into_data())
1922            .add_child_data(array9_list.into_data())
1923            .len(3)
1924            .build()
1925            .unwrap();
1926        let array9: ArrayRef = Arc::new(StructArray::from(array9));
1927
1928        let array10_input = vec![Some(1_i32), None, None];
1929        let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1930        array10_builder.extend(array10_input);
1931        let array10 = array10_builder.finish();
1932
1933        let array11 = BooleanArray::from(vec![false, false, true]);
1934
1935        let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1936        let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1937        let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1938
1939        let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1940
1941        // create record batch
1942        RecordBatch::try_new(
1943            Arc::new(schema.clone()),
1944            vec![
1945                Arc::new(array0),
1946                Arc::new(array1),
1947                Arc::new(array2),
1948                Arc::new(array3),
1949                Arc::new(array4),
1950                Arc::new(array5),
1951                Arc::new(array6),
1952                Arc::new(array7),
1953                Arc::new(array8),
1954                Arc::new(array9),
1955                Arc::new(array10),
1956                Arc::new(array11),
1957                Arc::new(array12),
1958                Arc::new(array13),
1959            ],
1960        )
1961        .unwrap()
1962    }
1963
1964    #[test]
1965    fn test_negative_meta_len_start_stream() {
1966        let bytes = i32::to_le_bytes(-1);
1967        let mut buf = vec![];
1968        buf.extend(CONTINUATION_MARKER);
1969        buf.extend(bytes);
1970
1971        let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
1972        assert!(reader_err.is_some());
1973        assert_eq!(
1974            reader_err.unwrap().to_string(),
1975            "Parser error: Invalid metadata length: -1"
1976        );
1977    }
1978
1979    #[test]
1980    fn test_negative_meta_len_mid_stream() {
1981        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1982        let mut buf = Vec::new();
1983        {
1984            let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
1985            let batch =
1986                RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
1987                    .unwrap();
1988            writer.write(&batch).unwrap();
1989        }
1990
1991        let bytes = i32::to_le_bytes(-1);
1992        buf.extend(CONTINUATION_MARKER);
1993        buf.extend(bytes);
1994
1995        let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
1996        // Read the valid value
1997        assert!(reader.maybe_next().is_ok());
1998        // Read the invalid meta len
1999        let batch_err = reader.maybe_next().err();
2000        assert!(batch_err.is_some());
2001        assert_eq!(
2002            batch_err.unwrap().to_string(),
2003            "Parser error: Invalid metadata length: -1"
2004        );
2005    }
2006
2007    #[test]
2008    fn test_missing_buffer_metadata_error() {
2009        use crate::r#gen::Message::*;
2010        use flatbuffers::FlatBufferBuilder;
2011
2012        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2013
2014        // create RecordBatch buffer metadata with invalid buffer count
2015        // Int32Array needs 2 buffers (validity + data) but we provide only 1
2016        let mut fbb = FlatBufferBuilder::new();
2017        let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2018        let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2019        let batch_offset = RecordBatch::create(
2020            &mut fbb,
2021            &RecordBatchArgs {
2022                length: 2,
2023                nodes: Some(nodes),
2024                buffers: Some(buffers),
2025                compression: None,
2026                variadicBufferCounts: None,
2027            },
2028        );
2029        fbb.finish_minimal(batch_offset);
2030        let batch_bytes = fbb.finished_data().to_vec();
2031        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2032
2033        let data_buffer = Buffer::from(vec![0u8; 8]);
2034        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2035        let metadata = MetadataVersion::V5;
2036
2037        let decoder = RecordBatchDecoder::try_new(
2038            &data_buffer,
2039            batch,
2040            schema.clone(),
2041            &dictionaries,
2042            &metadata,
2043        )
2044        .unwrap();
2045
2046        let result = decoder.read_record_batch();
2047
2048        match result {
2049            Err(ArrowError::IpcError(msg)) => {
2050                assert_eq!(msg, "Buffer count mismatched with metadata");
2051            }
2052            other => panic!("unexpected error: {other:?}"),
2053        }
2054    }
2055
2056    #[test]
2057    fn test_projection_array_values() {
2058        // define schema
2059        let schema = create_test_projection_schema();
2060
2061        // create record batch with test data
2062        let batch = create_test_projection_batch_data(&schema);
2063
2064        // write record batch in IPC format
2065        let mut buf = Vec::new();
2066        {
2067            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2068            writer.write(&batch).unwrap();
2069            writer.finish().unwrap();
2070        }
2071
2072        // read record batch with projection
2073        for index in 0..12 {
2074            let projection = vec![index];
2075            let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2076            let read_batch = reader.unwrap().next().unwrap().unwrap();
2077            let projected_column = read_batch.column(0);
2078            let expected_column = batch.column(index);
2079
2080            // check the projected column equals the expected column
2081            assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2082        }
2083
2084        {
2085            // read record batch with reversed projection
2086            let reader =
2087                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2088            let read_batch = reader.unwrap().next().unwrap().unwrap();
2089            let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2090            assert_eq!(read_batch, expected_batch);
2091        }
2092    }
2093
2094    #[test]
2095    fn test_arrow_single_float_row() {
2096        let schema = Schema::new(vec![
2097            Field::new("a", DataType::Float32, false),
2098            Field::new("b", DataType::Float32, false),
2099            Field::new("c", DataType::Int32, false),
2100            Field::new("d", DataType::Int32, false),
2101        ]);
2102        let arrays = vec![
2103            Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2104            Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2105            Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2106            Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2107        ];
2108        let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2109        // create stream writer
2110        let mut file = tempfile::tempfile().unwrap();
2111        let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2112        stream_writer.write(&batch).unwrap();
2113        stream_writer.finish().unwrap();
2114
2115        drop(stream_writer);
2116
2117        file.rewind().unwrap();
2118
2119        // read stream back
2120        let reader = StreamReader::try_new(&mut file, None).unwrap();
2121
2122        reader.for_each(|batch| {
2123            let batch = batch.unwrap();
2124            assert!(
2125                batch
2126                    .column(0)
2127                    .as_any()
2128                    .downcast_ref::<Float32Array>()
2129                    .unwrap()
2130                    .value(0)
2131                    != 0.0
2132            );
2133            assert!(
2134                batch
2135                    .column(1)
2136                    .as_any()
2137                    .downcast_ref::<Float32Array>()
2138                    .unwrap()
2139                    .value(0)
2140                    != 0.0
2141            );
2142        });
2143
2144        file.rewind().unwrap();
2145
2146        // Read with projection
2147        let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2148
2149        reader.for_each(|batch| {
2150            let batch = batch.unwrap();
2151            assert_eq!(batch.schema().fields().len(), 2);
2152            assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2153            assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2154        });
2155    }
2156
2157    /// Write the record batch to an in-memory buffer in IPC File format
2158    fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2159        let mut buf = Vec::new();
2160        let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2161        writer.write(rb).unwrap();
2162        writer.finish().unwrap();
2163        buf
2164    }
2165
2166    /// Return the first record batch read from the IPC File buffer
2167    fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2168        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2169        reader.next().unwrap()
2170    }
2171
2172    /// Return the first record batch read from the IPC File buffer, disabling
2173    /// validation
2174    fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2175        let mut reader = unsafe {
2176            FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2177        };
2178        reader.next().unwrap()
2179    }
2180
2181    fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2182        let buf = write_ipc(rb);
2183        read_ipc(&buf).unwrap()
2184    }
2185
2186    /// Return the first record batch read from the IPC File buffer
2187    /// using the FileDecoder API
2188    fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2189        read_ipc_with_decoder_inner(buf, false)
2190    }
2191
2192    /// Return the first record batch read from the IPC File buffer
2193    /// using the FileDecoder API, disabling validation
2194    fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2195        read_ipc_with_decoder_inner(buf, true)
2196    }
2197
2198    fn read_ipc_with_decoder_inner(
2199        buf: Vec<u8>,
2200        skip_validation: bool,
2201    ) -> Result<RecordBatch, ArrowError> {
2202        let buffer = Buffer::from_vec(buf);
2203        let trailer_start = buffer.len() - 10;
2204        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2205        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2206            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2207
2208        let schema = fb_to_schema(footer.schema().unwrap());
2209
2210        let mut decoder = unsafe {
2211            FileDecoder::new(Arc::new(schema), footer.version())
2212                .with_skip_validation(skip_validation)
2213        };
2214        // Read dictionaries
2215        for block in footer.dictionaries().iter().flatten() {
2216            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2217            let data = buffer.slice_with_length(block.offset() as _, block_len);
2218            decoder.read_dictionary(block, &data)?
2219        }
2220
2221        // Read record batch
2222        let batches = footer.recordBatches().unwrap();
2223        assert_eq!(batches.len(), 1); // Only wrote a single batch
2224
2225        let block = batches.get(0);
2226        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2227        let data = buffer.slice_with_length(block.offset() as _, block_len);
2228        Ok(decoder.read_record_batch(block, &data)?.unwrap())
2229    }
2230
2231    /// Write the record batch to an in-memory buffer in IPC Stream format
2232    fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2233        let mut buf = Vec::new();
2234        let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2235        writer.write(rb).unwrap();
2236        writer.finish().unwrap();
2237        buf
2238    }
2239
2240    /// Return the first record batch read from the IPC Stream buffer
2241    fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2242        let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2243        reader.next().unwrap()
2244    }
2245
2246    /// Return the first record batch read from the IPC Stream buffer,
2247    /// disabling validation
2248    fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2249        let mut reader = unsafe {
2250            StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2251        };
2252        reader.next().unwrap()
2253    }
2254
2255    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2256        let buf = write_stream(rb);
2257        read_stream(&buf).unwrap()
2258    }
2259
2260    #[test]
2261    fn test_roundtrip_with_custom_metadata() {
2262        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2263        let mut buf = Vec::new();
2264        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2265        let mut test_metadata = HashMap::new();
2266        test_metadata.insert("abc".to_string(), "abc".to_string());
2267        test_metadata.insert("def".to_string(), "def".to_string());
2268        for (k, v) in &test_metadata {
2269            writer.write_metadata(k, v);
2270        }
2271        writer.finish().unwrap();
2272        drop(writer);
2273
2274        let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2275        assert_eq!(reader.custom_metadata(), &test_metadata);
2276    }
2277
2278    #[test]
2279    fn test_roundtrip_nested_dict() {
2280        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2281
2282        let array = Arc::new(inner) as ArrayRef;
2283
2284        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2285
2286        let s = StructArray::from(vec![(dctfield, array)]);
2287        let struct_array = Arc::new(s) as ArrayRef;
2288
2289        let schema = Arc::new(Schema::new(vec![Field::new(
2290            "struct",
2291            struct_array.data_type().clone(),
2292            false,
2293        )]));
2294
2295        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2296
2297        assert_eq!(batch, roundtrip_ipc(&batch));
2298    }
2299
2300    #[test]
2301    fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2302        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2303
2304        let array = Arc::new(inner) as ArrayRef;
2305
2306        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2307
2308        let s = StructArray::from(vec![(dctfield, array)]);
2309        let struct_array = Arc::new(s) as ArrayRef;
2310
2311        let schema = Arc::new(Schema::new(vec![Field::new(
2312            "struct",
2313            struct_array.data_type().clone(),
2314            false,
2315        )]));
2316
2317        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2318
2319        let mut buf = Vec::new();
2320        let mut writer = crate::writer::FileWriter::try_new_with_options(
2321            &mut buf,
2322            batch.schema_ref(),
2323            IpcWriteOptions::default(),
2324        )
2325        .unwrap();
2326        writer.write(&batch).unwrap();
2327        writer.finish().unwrap();
2328        drop(writer);
2329
2330        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2331
2332        assert_eq!(batch, reader.next().unwrap().unwrap());
2333    }
2334
2335    fn check_union_with_builder(mut builder: UnionBuilder) {
2336        builder.append::<Int32Type>("a", 1).unwrap();
2337        builder.append_null::<Int32Type>("a").unwrap();
2338        builder.append::<Float64Type>("c", 3.0).unwrap();
2339        builder.append::<Int32Type>("a", 4).unwrap();
2340        builder.append::<Int64Type>("d", 11).unwrap();
2341        let union = builder.build().unwrap();
2342
2343        let schema = Arc::new(Schema::new(vec![Field::new(
2344            "union",
2345            union.data_type().clone(),
2346            false,
2347        )]));
2348
2349        let union_array = Arc::new(union) as ArrayRef;
2350
2351        let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2352        let rb2 = roundtrip_ipc(&rb);
2353        // TODO: equality not yet implemented for union, so we check that the length of the array is
2354        // the same and that all of the buffers are the same instead.
2355        assert_eq!(rb.schema(), rb2.schema());
2356        assert_eq!(rb.num_columns(), rb2.num_columns());
2357        assert_eq!(rb.num_rows(), rb2.num_rows());
2358        let union1 = rb.column(0);
2359        let union2 = rb2.column(0);
2360
2361        assert_eq!(union1, union2);
2362    }
2363
2364    #[test]
2365    fn test_roundtrip_dense_union() {
2366        check_union_with_builder(UnionBuilder::new_dense());
2367    }
2368
2369    #[test]
2370    fn test_roundtrip_sparse_union() {
2371        check_union_with_builder(UnionBuilder::new_sparse());
2372    }
2373
2374    #[test]
2375    fn test_roundtrip_struct_empty_fields() {
2376        let nulls = NullBuffer::from(&[true, true, false]);
2377        let rb = RecordBatch::try_from_iter([(
2378            "",
2379            Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2380        )])
2381        .unwrap();
2382        let rb2 = roundtrip_ipc(&rb);
2383        assert_eq!(rb, rb2);
2384    }
2385
2386    #[test]
2387    fn test_roundtrip_stream_run_array_sliced() {
2388        let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2389            .into_iter()
2390            .collect();
2391        let run_array_1_sliced = run_array_1.slice(2, 5);
2392
2393        let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2394        let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2395        run_array_2_builder.extend(run_array_2_inupt);
2396        let run_array_2 = run_array_2_builder.finish();
2397
2398        let schema = Arc::new(Schema::new(vec![
2399            Field::new(
2400                "run_array_1_sliced",
2401                run_array_1_sliced.data_type().clone(),
2402                false,
2403            ),
2404            Field::new("run_array_2", run_array_2.data_type().clone(), false),
2405        ]));
2406        let input_batch = RecordBatch::try_new(
2407            schema,
2408            vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2409        )
2410        .unwrap();
2411        let output_batch = roundtrip_ipc_stream(&input_batch);
2412
2413        // As partial comparison not yet supported for run arrays, the sliced run array
2414        // has to be unsliced before comparing with the output. the second run array
2415        // can be compared as such.
2416        assert_eq!(input_batch.column(1), output_batch.column(1));
2417
2418        let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2419        assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2420    }
2421
2422    #[test]
2423    fn test_roundtrip_stream_nested_dict() {
2424        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2425        let dict = Arc::new(
2426            xs.clone()
2427                .into_iter()
2428                .collect::<DictionaryArray<Int8Type>>(),
2429        );
2430        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2431        let struct_array = StructArray::from(vec![
2432            (
2433                Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2434                string_array,
2435            ),
2436            (
2437                Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2438                dict.clone() as ArrayRef,
2439            ),
2440        ]);
2441        let schema = Arc::new(Schema::new(vec![
2442            Field::new("f1_string", DataType::Utf8, false),
2443            Field::new("f2_struct", struct_array.data_type().clone(), false),
2444        ]));
2445        let input_batch = RecordBatch::try_new(
2446            schema,
2447            vec![
2448                Arc::new(StringArray::from(xs.clone())),
2449                Arc::new(struct_array),
2450            ],
2451        )
2452        .unwrap();
2453        let output_batch = roundtrip_ipc_stream(&input_batch);
2454        assert_eq!(input_batch, output_batch);
2455    }
2456
2457    #[test]
2458    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2459        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2460        let values = Arc::new(values) as ArrayRef;
2461        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2462        let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2463
2464        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2465        let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2466
2467        #[allow(deprecated)]
2468        let keys_field = Arc::new(Field::new_dict(
2469            "keys",
2470            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2471            true, // It is technically not legal for this field to be null.
2472            1,
2473            false,
2474        ));
2475        #[allow(deprecated)]
2476        let values_field = Arc::new(Field::new_dict(
2477            "values",
2478            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2479            true,
2480            2,
2481            false,
2482        ));
2483        let entry_struct = StructArray::from(vec![
2484            (keys_field, make_array(key_dict_array.into_data())),
2485            (values_field, make_array(value_dict_array.into_data())),
2486        ]);
2487        let map_data_type = DataType::Map(
2488            Arc::new(Field::new(
2489                "entries",
2490                entry_struct.data_type().clone(),
2491                false,
2492            )),
2493            false,
2494        );
2495
2496        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2497        let map_data = ArrayData::builder(map_data_type)
2498            .len(3)
2499            .add_buffer(entry_offsets)
2500            .add_child_data(entry_struct.into_data())
2501            .build()
2502            .unwrap();
2503        let map_array = MapArray::from(map_data);
2504
2505        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2506        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2507
2508        let schema = Arc::new(Schema::new(vec![Field::new(
2509            "f1",
2510            dict_dict_array.data_type().clone(),
2511            false,
2512        )]));
2513        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2514        let output_batch = roundtrip_ipc_stream(&input_batch);
2515        assert_eq!(input_batch, output_batch);
2516    }
2517
2518    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2519        OffsetSize: OffsetSizeTrait,
2520        U: ArrowNativeType,
2521    >(
2522        list_data_type: DataType,
2523        offsets: &[U; 5],
2524    ) {
2525        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2526        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2527        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2528        let dict_data = dict_array.to_data();
2529
2530        let value_offsets = Buffer::from_slice_ref(offsets);
2531
2532        let list_data = ArrayData::builder(list_data_type)
2533            .len(4)
2534            .add_buffer(value_offsets)
2535            .add_child_data(dict_data)
2536            .build()
2537            .unwrap();
2538        let list_array = GenericListArray::<OffsetSize>::from(list_data);
2539
2540        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2541        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2542
2543        let schema = Arc::new(Schema::new(vec![Field::new(
2544            "f1",
2545            dict_dict_array.data_type().clone(),
2546            false,
2547        )]));
2548        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2549        let output_batch = roundtrip_ipc_stream(&input_batch);
2550        assert_eq!(input_batch, output_batch);
2551    }
2552
2553    #[test]
2554    fn test_roundtrip_stream_dict_of_list_of_dict() {
2555        // list
2556        #[allow(deprecated)]
2557        let list_data_type = DataType::List(Arc::new(Field::new_dict(
2558            "item",
2559            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2560            true,
2561            1,
2562            false,
2563        )));
2564        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2565        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2566
2567        // large list
2568        #[allow(deprecated)]
2569        let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2570            "item",
2571            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2572            true,
2573            1,
2574            false,
2575        )));
2576        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2577        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2578    }
2579
2580    #[test]
2581    fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2582        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2583        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2584        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2585        let dict_data = dict_array.into_data();
2586
2587        #[allow(deprecated)]
2588        let list_data_type = DataType::FixedSizeList(
2589            Arc::new(Field::new_dict(
2590                "item",
2591                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2592                true,
2593                1,
2594                false,
2595            )),
2596            3,
2597        );
2598        let list_data = ArrayData::builder(list_data_type)
2599            .len(3)
2600            .add_child_data(dict_data)
2601            .build()
2602            .unwrap();
2603        let list_array = FixedSizeListArray::from(list_data);
2604
2605        let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2606        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2607
2608        let schema = Arc::new(Schema::new(vec![Field::new(
2609            "f1",
2610            dict_dict_array.data_type().clone(),
2611            false,
2612        )]));
2613        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2614        let output_batch = roundtrip_ipc_stream(&input_batch);
2615        assert_eq!(input_batch, output_batch);
2616    }
2617
2618    const LONG_TEST_STRING: &str =
2619        "This is a long string to make sure binary view array handles it";
2620
2621    #[test]
2622    fn test_roundtrip_view_types() {
2623        let schema = Schema::new(vec![
2624            Field::new("field_1", DataType::BinaryView, true),
2625            Field::new("field_2", DataType::Utf8, true),
2626            Field::new("field_3", DataType::Utf8View, true),
2627        ]);
2628        let bin_values: Vec<Option<&[u8]>> = vec![
2629            Some(b"foo"),
2630            None,
2631            Some(b"bar"),
2632            Some(LONG_TEST_STRING.as_bytes()),
2633        ];
2634        let utf8_values: Vec<Option<&str>> =
2635            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2636        let bin_view_array = BinaryViewArray::from_iter(bin_values);
2637        let utf8_array = StringArray::from_iter(utf8_values.iter());
2638        let utf8_view_array = StringViewArray::from_iter(utf8_values);
2639        let record_batch = RecordBatch::try_new(
2640            Arc::new(schema.clone()),
2641            vec![
2642                Arc::new(bin_view_array),
2643                Arc::new(utf8_array),
2644                Arc::new(utf8_view_array),
2645            ],
2646        )
2647        .unwrap();
2648
2649        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2650        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2651
2652        let sliced_batch = record_batch.slice(1, 2);
2653        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2654        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2655    }
2656
2657    #[test]
2658    fn test_roundtrip_view_types_nested_dict() {
2659        let bin_values: Vec<Option<&[u8]>> = vec![
2660            Some(b"foo"),
2661            None,
2662            Some(b"bar"),
2663            Some(LONG_TEST_STRING.as_bytes()),
2664            Some(b"field"),
2665        ];
2666        let utf8_values: Vec<Option<&str>> = vec![
2667            Some("foo"),
2668            None,
2669            Some("bar"),
2670            Some(LONG_TEST_STRING),
2671            Some("field"),
2672        ];
2673        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2674        let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2675
2676        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2677        let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2678        #[allow(deprecated)]
2679        let keys_field = Arc::new(Field::new_dict(
2680            "keys",
2681            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2682            true,
2683            1,
2684            false,
2685        ));
2686
2687        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2688        let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2689        #[allow(deprecated)]
2690        let values_field = Arc::new(Field::new_dict(
2691            "values",
2692            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2693            true,
2694            2,
2695            false,
2696        ));
2697        let entry_struct = StructArray::from(vec![
2698            (keys_field, make_array(key_dict_array.into_data())),
2699            (values_field, make_array(value_dict_array.into_data())),
2700        ]);
2701
2702        let map_data_type = DataType::Map(
2703            Arc::new(Field::new(
2704                "entries",
2705                entry_struct.data_type().clone(),
2706                false,
2707            )),
2708            false,
2709        );
2710        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2711        let map_data = ArrayData::builder(map_data_type)
2712            .len(3)
2713            .add_buffer(entry_offsets)
2714            .add_child_data(entry_struct.into_data())
2715            .build()
2716            .unwrap();
2717        let map_array = MapArray::from(map_data);
2718
2719        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2720        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2721        let schema = Arc::new(Schema::new(vec![Field::new(
2722            "f1",
2723            dict_dict_array.data_type().clone(),
2724            false,
2725        )]));
2726        let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2727        assert_eq!(batch, roundtrip_ipc(&batch));
2728        assert_eq!(batch, roundtrip_ipc_stream(&batch));
2729
2730        let sliced_batch = batch.slice(1, 2);
2731        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2732        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2733    }
2734
2735    #[test]
2736    fn test_no_columns_batch() {
2737        let schema = Arc::new(Schema::empty());
2738        let options = RecordBatchOptions::new()
2739            .with_match_field_names(true)
2740            .with_row_count(Some(10));
2741        let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2742        let output_batch = roundtrip_ipc_stream(&input_batch);
2743        assert_eq!(input_batch, output_batch);
2744    }
2745
2746    #[test]
2747    fn test_unaligned() {
2748        let batch = RecordBatch::try_from_iter(vec![(
2749            "i32",
2750            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2751        )])
2752        .unwrap();
2753
2754        let r#gen = IpcDataGenerator {};
2755        let mut dict_tracker = DictionaryTracker::new(false);
2756        let (_, encoded) = r#gen
2757            .encode(
2758                &batch,
2759                &mut dict_tracker,
2760                &Default::default(),
2761                &mut Default::default(),
2762            )
2763            .unwrap();
2764
2765        let message = root_as_message(&encoded.ipc_message).unwrap();
2766
2767        // Construct an unaligned buffer
2768        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2769        buffer.push(0_u8);
2770        buffer.extend_from_slice(&encoded.arrow_data);
2771        let b = Buffer::from(buffer).slice(1);
2772        assert_ne!(b.as_ptr().align_offset(8), 0);
2773
2774        let ipc_batch = message.header_as_record_batch().unwrap();
2775        let roundtrip = RecordBatchDecoder::try_new(
2776            &b,
2777            ipc_batch,
2778            batch.schema(),
2779            &Default::default(),
2780            &message.version(),
2781        )
2782        .unwrap()
2783        .with_require_alignment(false)
2784        .read_record_batch()
2785        .unwrap();
2786        assert_eq!(batch, roundtrip);
2787    }
2788
2789    #[test]
2790    fn test_unaligned_throws_error_with_require_alignment() {
2791        let batch = RecordBatch::try_from_iter(vec![(
2792            "i32",
2793            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2794        )])
2795        .unwrap();
2796
2797        let r#gen = IpcDataGenerator {};
2798        let mut dict_tracker = DictionaryTracker::new(false);
2799        let (_, encoded) = r#gen
2800            .encode(
2801                &batch,
2802                &mut dict_tracker,
2803                &Default::default(),
2804                &mut Default::default(),
2805            )
2806            .unwrap();
2807
2808        let message = root_as_message(&encoded.ipc_message).unwrap();
2809
2810        // Construct an unaligned buffer
2811        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2812        buffer.push(0_u8);
2813        buffer.extend_from_slice(&encoded.arrow_data);
2814        let b = Buffer::from(buffer).slice(1);
2815        assert_ne!(b.as_ptr().align_offset(8), 0);
2816
2817        let ipc_batch = message.header_as_record_batch().unwrap();
2818        let result = RecordBatchDecoder::try_new(
2819            &b,
2820            ipc_batch,
2821            batch.schema(),
2822            &Default::default(),
2823            &message.version(),
2824        )
2825        .unwrap()
2826        .with_require_alignment(true)
2827        .read_record_batch();
2828
2829        let error = result.unwrap_err();
2830        assert_eq!(
2831            error.to_string(),
2832            "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2833             offset from expected alignment of 4 by 1"
2834        );
2835    }
2836
2837    #[test]
2838    fn test_file_with_massive_column_count() {
2839        // 499_999 is upper limit for default settings (1_000_000)
2840        let limit = 600_000;
2841
2842        let fields = (0..limit)
2843            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2844            .collect::<Vec<_>>();
2845        let schema = Arc::new(Schema::new(fields));
2846        let batch = RecordBatch::new_empty(schema);
2847
2848        let mut buf = Vec::new();
2849        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2850        writer.write(&batch).unwrap();
2851        writer.finish().unwrap();
2852        drop(writer);
2853
2854        let mut reader = FileReaderBuilder::new()
2855            .with_max_footer_fb_tables(1_500_000)
2856            .build(std::io::Cursor::new(buf))
2857            .unwrap();
2858        let roundtrip_batch = reader.next().unwrap().unwrap();
2859
2860        assert_eq!(batch, roundtrip_batch);
2861    }
2862
2863    #[test]
2864    fn test_file_with_deeply_nested_columns() {
2865        // 60 is upper limit for default settings (64)
2866        let limit = 61;
2867
2868        let fields = (0..limit).fold(
2869            vec![Field::new("leaf", DataType::Boolean, false)],
2870            |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2871        );
2872        let schema = Arc::new(Schema::new(fields));
2873        let batch = RecordBatch::new_empty(schema);
2874
2875        let mut buf = Vec::new();
2876        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2877        writer.write(&batch).unwrap();
2878        writer.finish().unwrap();
2879        drop(writer);
2880
2881        let mut reader = FileReaderBuilder::new()
2882            .with_max_footer_fb_depth(65)
2883            .build(std::io::Cursor::new(buf))
2884            .unwrap();
2885        let roundtrip_batch = reader.next().unwrap().unwrap();
2886
2887        assert_eq!(batch, roundtrip_batch);
2888    }
2889
2890    #[test]
2891    fn test_invalid_struct_array_ipc_read_errors() {
2892        let a_field = Field::new("a", DataType::Int32, false);
2893        let b_field = Field::new("b", DataType::Int32, false);
2894        let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
2895
2896        let a_array_data = ArrayData::builder(a_field.data_type().clone())
2897            .len(4)
2898            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2899            .build()
2900            .unwrap();
2901        let b_array_data = ArrayData::builder(b_field.data_type().clone())
2902            .len(3)
2903            .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2904            .build()
2905            .unwrap();
2906
2907        let invalid_struct_arr = unsafe {
2908            StructArray::new_unchecked(
2909                struct_fields,
2910                vec![make_array(a_array_data), make_array(b_array_data)],
2911                None,
2912            )
2913        };
2914
2915        expect_ipc_validation_error(
2916            Arc::new(invalid_struct_arr),
2917            "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
2918        );
2919    }
2920
2921    #[test]
2922    fn test_invalid_nested_array_ipc_read_errors() {
2923        // one of the nested arrays has invalid data
2924        let a_field = Field::new("a", DataType::Int32, false);
2925        let b_field = Field::new("b", DataType::Utf8, false);
2926
2927        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2928            "s",
2929            vec![a_field.clone(), b_field.clone()],
2930            false,
2931        )]));
2932
2933        let a_array_data = ArrayData::builder(a_field.data_type().clone())
2934            .len(4)
2935            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2936            .build()
2937            .unwrap();
2938        // invalid nested child array -- length is correct, but has invalid utf8 data
2939        let b_array_data = {
2940            let valid: &[u8] = b"   ";
2941            let mut invalid = vec![];
2942            invalid.extend_from_slice(b"ValidString");
2943            invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2944            let binary_array =
2945                BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2946            let array = unsafe {
2947                StringArray::new_unchecked(
2948                    binary_array.offsets().clone(),
2949                    binary_array.values().clone(),
2950                    binary_array.nulls().cloned(),
2951                )
2952            };
2953            array.into_data()
2954        };
2955        let struct_data_type = schema.field(0).data_type();
2956
2957        let invalid_struct_arr = unsafe {
2958            make_array(
2959                ArrayData::builder(struct_data_type.clone())
2960                    .len(4)
2961                    .add_child_data(a_array_data)
2962                    .add_child_data(b_array_data)
2963                    .build_unchecked(),
2964            )
2965        };
2966        expect_ipc_validation_error(
2967            Arc::new(invalid_struct_arr),
2968            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
2969        );
2970    }
2971
2972    #[test]
2973    fn test_same_dict_id_without_preserve() {
2974        let batch = RecordBatch::try_new(
2975            Arc::new(Schema::new(
2976                ["a", "b"]
2977                    .iter()
2978                    .map(|name| {
2979                        #[allow(deprecated)]
2980                        Field::new_dict(
2981                            name.to_string(),
2982                            DataType::Dictionary(
2983                                Box::new(DataType::Int32),
2984                                Box::new(DataType::Utf8),
2985                            ),
2986                            true,
2987                            0,
2988                            false,
2989                        )
2990                    })
2991                    .collect::<Vec<Field>>(),
2992            )),
2993            vec![
2994                Arc::new(
2995                    vec![Some("c"), Some("d")]
2996                        .into_iter()
2997                        .collect::<DictionaryArray<Int32Type>>(),
2998                ) as ArrayRef,
2999                Arc::new(
3000                    vec![Some("e"), Some("f")]
3001                        .into_iter()
3002                        .collect::<DictionaryArray<Int32Type>>(),
3003                ) as ArrayRef,
3004            ],
3005        )
3006        .expect("Failed to create RecordBatch");
3007
3008        // serialize the record batch as an IPC stream
3009        let mut buf = vec![];
3010        {
3011            let mut writer = crate::writer::StreamWriter::try_new_with_options(
3012                &mut buf,
3013                batch.schema().as_ref(),
3014                crate::writer::IpcWriteOptions::default(),
3015            )
3016            .expect("Failed to create StreamWriter");
3017            writer.write(&batch).expect("Failed to write RecordBatch");
3018            writer.finish().expect("Failed to finish StreamWriter");
3019        }
3020
3021        StreamReader::try_new(std::io::Cursor::new(buf), None)
3022            .expect("Failed to create StreamReader")
3023            .for_each(|decoded_batch| {
3024                assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3025            });
3026    }
3027
3028    #[test]
3029    fn test_validation_of_invalid_list_array() {
3030        // ListArray with invalid offsets
3031        let array = unsafe {
3032            let values = Int32Array::from(vec![1, 2, 3]);
3033            let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); // offsets can't go backwards
3034            let offsets = OffsetBuffer::new_unchecked(bad_offsets); // INVALID array created
3035            let field = Field::new_list_field(DataType::Int32, true);
3036            let nulls = None;
3037            ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3038        };
3039
3040        expect_ipc_validation_error(
3041            Arc::new(array),
3042            "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3043        );
3044    }
3045
3046    #[test]
3047    fn test_validation_of_invalid_string_array() {
3048        let valid: &[u8] = b"   ";
3049        let mut invalid = vec![];
3050        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3051        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3052        let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3053        // data is not valid utf8 we can not construct a correct StringArray
3054        // safely, so purposely create an invalid StringArray
3055        let array = unsafe {
3056            StringArray::new_unchecked(
3057                binary_array.offsets().clone(),
3058                binary_array.values().clone(),
3059                binary_array.nulls().cloned(),
3060            )
3061        };
3062        expect_ipc_validation_error(
3063            Arc::new(array),
3064            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3065        );
3066    }
3067
3068    #[test]
3069    fn test_validation_of_invalid_string_view_array() {
3070        let valid: &[u8] = b"   ";
3071        let mut invalid = vec![];
3072        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3073        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3074        let binary_view_array =
3075            BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3076        // data is not valid utf8 we can not construct a correct StringArray
3077        // safely, so purposely create an invalid StringArray
3078        let array = unsafe {
3079            StringViewArray::new_unchecked(
3080                binary_view_array.views().clone(),
3081                binary_view_array.data_buffers().to_vec(),
3082                binary_view_array.nulls().cloned(),
3083            )
3084        };
3085        expect_ipc_validation_error(
3086            Arc::new(array),
3087            "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3088        );
3089    }
3090
3091    /// return an invalid dictionary array (key is larger than values)
3092    /// ListArray with invalid offsets
3093    #[test]
3094    fn test_validation_of_invalid_dictionary_array() {
3095        let array = unsafe {
3096            let values = StringArray::from_iter_values(["a", "b", "c"]);
3097            let keys = Int32Array::from(vec![1, 200]); // keys are not valid for values
3098            DictionaryArray::new_unchecked(keys, Arc::new(values))
3099        };
3100
3101        expect_ipc_validation_error(
3102            Arc::new(array),
3103            "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3104        );
3105    }
3106
3107    #[test]
3108    fn test_validation_of_invalid_union_array() {
3109        let array = unsafe {
3110            let fields = UnionFields::new(
3111                vec![1, 3], // typeids : type id 2 is not valid
3112                vec![
3113                    Field::new("a", DataType::Int32, false),
3114                    Field::new("b", DataType::Utf8, false),
3115                ],
3116            );
3117            let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); // 2 is invalid
3118            let offsets = None;
3119            let children: Vec<ArrayRef> = vec![
3120                Arc::new(Int32Array::from(vec![10, 20, 30])),
3121                Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3122            ];
3123
3124            UnionArray::new_unchecked(fields, type_ids, offsets, children)
3125        };
3126
3127        expect_ipc_validation_error(
3128            Arc::new(array),
3129            "Invalid argument error: Type Ids values must match one of the field type ids",
3130        );
3131    }
3132
3133    /// Invalid Utf-8 sequence in the first character
3134    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3135    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3136
3137    /// Expect an error when reading the record batch using IPC or IPC Streams
3138    fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3139        let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3140
3141        // IPC Stream format
3142        let buf = write_stream(&rb); // write is ok
3143        read_stream_skip_validation(&buf).unwrap();
3144        let err = read_stream(&buf).unwrap_err();
3145        assert_eq!(err.to_string(), expected_err);
3146
3147        // IPC File format
3148        let buf = write_ipc(&rb); // write is ok
3149        read_ipc_skip_validation(&buf).unwrap();
3150        let err = read_ipc(&buf).unwrap_err();
3151        assert_eq!(err.to_string(), expected_err);
3152
3153        // IPC Format with FileDecoder
3154        read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3155        let err = read_ipc_with_decoder(buf).unwrap_err();
3156        assert_eq!(err.to_string(), expected_err);
3157    }
3158
3159    #[test]
3160    fn test_roundtrip_schema() {
3161        let schema = Schema::new(vec![
3162            Field::new(
3163                "a",
3164                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3165                false,
3166            ),
3167            Field::new(
3168                "b",
3169                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3170                false,
3171            ),
3172        ]);
3173
3174        let options = IpcWriteOptions::default();
3175        let data_gen = IpcDataGenerator::default();
3176        let mut dict_tracker = DictionaryTracker::new(false);
3177        let encoded_data =
3178            data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3179        let mut schema_bytes = vec![];
3180        write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3181
3182        let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3183            4
3184        } else {
3185            0
3186        };
3187
3188        size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3189            .expect_err("size_prefixed_root_as_message");
3190
3191        let msg = parse_message(&schema_bytes).expect("parse_message");
3192        let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3193        let new_schema = fb_to_schema(ipc_schema);
3194
3195        assert_eq!(schema, new_schema);
3196    }
3197
3198    #[test]
3199    fn test_negative_meta_len() {
3200        let bytes = i32::to_le_bytes(-1);
3201        let mut buf = vec![];
3202        buf.extend(CONTINUATION_MARKER);
3203        buf.extend(bytes);
3204
3205        let reader = StreamReader::try_new(Cursor::new(buf), None);
3206        assert!(reader.is_err());
3207    }
3208}