arrow_ipc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Readers
19//!
20//! # Notes
21//!
22//! The [`FileReader`] and [`StreamReader`] have similar interfaces,
23//! however the [`FileReader`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::CompressionCodec;
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50/// Read a buffer based on offset and length
51/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
52/// Each constituent buffer is first compressed with the indicated
53/// compressor, and then written with the uncompressed length in the first 8
54/// bytes as a 64-bit little-endian signed integer followed by the compressed
55/// buffer bytes (and then padding as required by the protocol). The
56/// uncompressed length may be set to -1 to indicate that the data that
57/// follows is not compressed, which can be useful for cases where
58/// compression does not yield appreciable savings.
59fn read_buffer(
60    buf: &crate::Buffer,
61    a_data: &Buffer,
62    compression_codec: Option<CompressionCodec>,
63) -> Result<Buffer, ArrowError> {
64    let start_offset = buf.offset() as usize;
65    let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
66    // corner case: empty buffer
67    match (buf_data.is_empty(), compression_codec) {
68        (true, _) | (_, None) => Ok(buf_data),
69        (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
70    }
71}
72impl RecordBatchDecoder<'_> {
73    /// Coordinates reading arrays based on data types.
74    ///
75    /// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
76    /// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
77    ///
78    /// Notes:
79    /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
80    /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
81    ///   We thus:
82    ///     - check if the bit width of non-64-bit numbers is 64, and
83    ///     - read the buffer as 64-bit (signed integer or float), and
84    ///     - cast the 64-bit array to the appropriate data type
85    fn create_array(
86        &mut self,
87        field: &Field,
88        variadic_counts: &mut VecDeque<i64>,
89    ) -> Result<ArrayRef, ArrowError> {
90        let data_type = field.data_type();
91        match data_type {
92            Utf8 | Binary | LargeBinary | LargeUtf8 => {
93                let field_node = self.next_node(field)?;
94                let buffers = [
95                    self.next_buffer()?,
96                    self.next_buffer()?,
97                    self.next_buffer()?,
98                ];
99                self.create_primitive_array(field_node, data_type, &buffers)
100            }
101            BinaryView | Utf8View => {
102                let count = variadic_counts
103                    .pop_front()
104                    .ok_or(ArrowError::IpcError(format!(
105                        "Missing variadic count for {data_type} column"
106                    )))?;
107                let count = count + 2; // view and null buffer.
108                let buffers = (0..count)
109                    .map(|_| self.next_buffer())
110                    .collect::<Result<Vec<_>, _>>()?;
111                let field_node = self.next_node(field)?;
112                self.create_primitive_array(field_node, data_type, &buffers)
113            }
114            FixedSizeBinary(_) => {
115                let field_node = self.next_node(field)?;
116                let buffers = [self.next_buffer()?, self.next_buffer()?];
117                self.create_primitive_array(field_node, data_type, &buffers)
118            }
119            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
120                let list_node = self.next_node(field)?;
121                let list_buffers = [self.next_buffer()?, self.next_buffer()?];
122                let values = self.create_array(list_field, variadic_counts)?;
123                self.create_list_array(list_node, data_type, &list_buffers, values)
124            }
125            ListView(list_field) | LargeListView(list_field) => {
126                let list_node = self.next_node(field)?;
127                let list_buffers = [
128                    self.next_buffer()?, // null buffer
129                    self.next_buffer()?, // offsets
130                    self.next_buffer()?, // sizes
131                ];
132                let values = self.create_array(list_field, variadic_counts)?;
133                self.create_list_view_array(list_node, data_type, &list_buffers, values)
134            }
135            FixedSizeList(list_field, _) => {
136                let list_node = self.next_node(field)?;
137                let list_buffers = [self.next_buffer()?];
138                let values = self.create_array(list_field, variadic_counts)?;
139                self.create_list_array(list_node, data_type, &list_buffers, values)
140            }
141            Struct(struct_fields) => {
142                let struct_node = self.next_node(field)?;
143                let null_buffer = self.next_buffer()?;
144
145                // read the arrays for each field
146                let mut struct_arrays = vec![];
147                // TODO investigate whether just knowing the number of buffers could
148                // still work
149                for struct_field in struct_fields {
150                    let child = self.create_array(struct_field, variadic_counts)?;
151                    struct_arrays.push(child);
152                }
153                self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
154            }
155            RunEndEncoded(run_ends_field, values_field) => {
156                let run_node = self.next_node(field)?;
157                let run_ends = self.create_array(run_ends_field, variadic_counts)?;
158                let values = self.create_array(values_field, variadic_counts)?;
159
160                let run_array_length = run_node.length() as usize;
161                let builder = ArrayData::builder(data_type.clone())
162                    .len(run_array_length)
163                    .offset(0)
164                    .add_child_data(run_ends.into_data())
165                    .add_child_data(values.into_data())
166                    .null_count(run_node.null_count() as usize);
167
168                self.create_array_from_builder(builder)
169            }
170            // Create dictionary array from RecordBatch
171            Dictionary(_, _) => {
172                let index_node = self.next_node(field)?;
173                let index_buffers = [self.next_buffer()?, self.next_buffer()?];
174
175                #[allow(deprecated)]
176                let dict_id = field.dict_id().ok_or_else(|| {
177                    ArrowError::ParseError(format!("Field {field} does not have dict id"))
178                })?;
179
180                let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
181                    ArrowError::ParseError(format!(
182                        "Cannot find a dictionary batch with dict id: {dict_id}"
183                    ))
184                })?;
185
186                self.create_dictionary_array(
187                    index_node,
188                    data_type,
189                    &index_buffers,
190                    value_array.clone(),
191                )
192            }
193            Union(fields, mode) => {
194                let union_node = self.next_node(field)?;
195                let len = union_node.length() as usize;
196
197                // In V4, union types has validity bitmap
198                // In V5 and later, union types have no validity bitmap
199                if self.version < MetadataVersion::V5 {
200                    self.next_buffer()?;
201                }
202
203                let type_ids: ScalarBuffer<i8> =
204                    self.next_buffer()?.slice_with_length(0, len).into();
205
206                let value_offsets = match mode {
207                    UnionMode::Dense => {
208                        let offsets: ScalarBuffer<i32> =
209                            self.next_buffer()?.slice_with_length(0, len * 4).into();
210                        Some(offsets)
211                    }
212                    UnionMode::Sparse => None,
213                };
214
215                let mut children = Vec::with_capacity(fields.len());
216
217                for (_id, field) in fields.iter() {
218                    let child = self.create_array(field, variadic_counts)?;
219                    children.push(child);
220                }
221
222                let array = if self.skip_validation.get() {
223                    // safety: flag can only be set via unsafe code
224                    unsafe {
225                        UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
226                    }
227                } else {
228                    UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
229                };
230                Ok(Arc::new(array))
231            }
232            Null => {
233                let node = self.next_node(field)?;
234                let length = node.length();
235                let null_count = node.null_count();
236
237                if length != null_count {
238                    return Err(ArrowError::SchemaError(format!(
239                        "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
240                    )));
241                }
242
243                let builder = ArrayData::builder(data_type.clone())
244                    .len(length as usize)
245                    .offset(0);
246                self.create_array_from_builder(builder)
247            }
248            _ => {
249                let field_node = self.next_node(field)?;
250                let buffers = [self.next_buffer()?, self.next_buffer()?];
251                self.create_primitive_array(field_node, data_type, &buffers)
252            }
253        }
254    }
255
256    /// Reads the correct number of buffers based on data type and null_count, and creates a
257    /// primitive array ref
258    fn create_primitive_array(
259        &self,
260        field_node: &FieldNode,
261        data_type: &DataType,
262        buffers: &[Buffer],
263    ) -> Result<ArrayRef, ArrowError> {
264        let length = field_node.length() as usize;
265        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
266        let mut builder = match data_type {
267            Utf8 | Binary | LargeBinary | LargeUtf8 => {
268                // read 3 buffers: null buffer (optional), offsets buffer and data buffer
269                ArrayData::builder(data_type.clone())
270                    .len(length)
271                    .buffers(buffers[1..3].to_vec())
272                    .null_bit_buffer(null_buffer)
273            }
274            BinaryView | Utf8View => ArrayData::builder(data_type.clone())
275                .len(length)
276                .buffers(buffers[1..].to_vec())
277                .null_bit_buffer(null_buffer),
278            _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
279                // read 2 buffers: null buffer (optional) and data buffer
280                ArrayData::builder(data_type.clone())
281                    .len(length)
282                    .add_buffer(buffers[1].clone())
283                    .null_bit_buffer(null_buffer)
284            }
285            t => unreachable!("Data type {:?} either unsupported or not primitive", t),
286        };
287
288        builder = builder.null_count(field_node.null_count() as usize);
289
290        self.create_array_from_builder(builder)
291    }
292
293    /// Update the ArrayDataBuilder based on settings in this decoder
294    fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
295        let mut builder = builder.align_buffers(!self.require_alignment);
296        if self.skip_validation.get() {
297            // SAFETY: flag can only be set via unsafe code
298            unsafe { builder = builder.skip_validation(true) }
299        };
300        Ok(make_array(builder.build()?))
301    }
302
303    /// Reads the correct number of buffers based on list type and null_count, and creates a
304    /// list array ref
305    fn create_list_array(
306        &self,
307        field_node: &FieldNode,
308        data_type: &DataType,
309        buffers: &[Buffer],
310        child_array: ArrayRef,
311    ) -> Result<ArrayRef, ArrowError> {
312        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
313        let length = field_node.length() as usize;
314        let child_data = child_array.into_data();
315        let mut builder = match data_type {
316            List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
317                .len(length)
318                .add_buffer(buffers[1].clone())
319                .add_child_data(child_data)
320                .null_bit_buffer(null_buffer),
321
322            FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
323                .len(length)
324                .add_child_data(child_data)
325                .null_bit_buffer(null_buffer),
326
327            _ => unreachable!("Cannot create list or map array from {:?}", data_type),
328        };
329
330        builder = builder.null_count(field_node.null_count() as usize);
331
332        self.create_array_from_builder(builder)
333    }
334
335    fn create_list_view_array(
336        &self,
337        field_node: &FieldNode,
338        data_type: &DataType,
339        buffers: &[Buffer],
340        child_array: ArrayRef,
341    ) -> Result<ArrayRef, ArrowError> {
342        assert!(matches!(data_type, ListView(_) | LargeListView(_)));
343
344        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
345        let length = field_node.length() as usize;
346        let child_data = child_array.into_data();
347
348        self.create_array_from_builder(
349            ArrayData::builder(data_type.clone())
350                .len(length)
351                .add_buffer(buffers[1].clone()) // offsets
352                .add_buffer(buffers[2].clone()) // sizes
353                .add_child_data(child_data)
354                .null_bit_buffer(null_buffer)
355                .null_count(field_node.null_count() as usize),
356        )
357    }
358
359    fn create_struct_array(
360        &self,
361        struct_node: &FieldNode,
362        null_buffer: Buffer,
363        struct_fields: &Fields,
364        struct_arrays: Vec<ArrayRef>,
365    ) -> Result<ArrayRef, ArrowError> {
366        let null_count = struct_node.null_count() as usize;
367        let len = struct_node.length() as usize;
368        let skip_validation = self.skip_validation.get();
369
370        let nulls = if null_count > 0 {
371            let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
372            let null_buffer = if skip_validation {
373                // safety: flag can only be set via unsafe code
374                unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
375            } else {
376                let null_buffer = NullBuffer::new(validity_buffer);
377
378                if null_buffer.null_count() != null_count {
379                    return Err(ArrowError::InvalidArgumentError(format!(
380                        "null_count value ({}) doesn't match actual number of nulls in array ({})",
381                        null_count,
382                        null_buffer.null_count()
383                    )));
384                }
385
386                null_buffer
387            };
388
389            Some(null_buffer)
390        } else {
391            None
392        };
393        if struct_arrays.is_empty() {
394            // `StructArray::from` can't infer the correct row count
395            // if we have zero fields
396            return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
397        }
398
399        let struct_array = if skip_validation {
400            // safety: flag can only be set via unsafe code
401            unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
402        } else {
403            StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
404        };
405
406        Ok(Arc::new(struct_array))
407    }
408
409    /// Reads the correct number of buffers based on list type and null_count, and creates a
410    /// list array ref
411    fn create_dictionary_array(
412        &self,
413        field_node: &FieldNode,
414        data_type: &DataType,
415        buffers: &[Buffer],
416        value_array: ArrayRef,
417    ) -> Result<ArrayRef, ArrowError> {
418        if let Dictionary(_, _) = *data_type {
419            let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
420            let builder = ArrayData::builder(data_type.clone())
421                .len(field_node.length() as usize)
422                .add_buffer(buffers[1].clone())
423                .add_child_data(value_array.into_data())
424                .null_bit_buffer(null_buffer)
425                .null_count(field_node.null_count() as usize);
426            self.create_array_from_builder(builder)
427        } else {
428            unreachable!("Cannot create dictionary array from {:?}", data_type)
429        }
430    }
431}
432
433/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
434/// [`RecordBatch`]
435///
436/// [IPC RecordBatch]: crate::RecordBatch
437///
438pub struct RecordBatchDecoder<'a> {
439    /// The flatbuffers encoded record batch
440    batch: crate::RecordBatch<'a>,
441    /// The output schema
442    schema: SchemaRef,
443    /// Decoded dictionaries indexed by dictionary id
444    dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
445    /// Optional compression codec
446    compression: Option<CompressionCodec>,
447    /// The format version
448    version: MetadataVersion,
449    /// The raw data buffer
450    data: &'a Buffer,
451    /// The fields comprising this array
452    nodes: VectorIter<'a, FieldNode>,
453    /// The buffers comprising this array
454    buffers: VectorIter<'a, crate::Buffer>,
455    /// Projection (subset of columns) to read, if any
456    /// See [`RecordBatchDecoder::with_projection`] for details
457    projection: Option<&'a [usize]>,
458    /// Are buffers required to already be aligned? See
459    /// [`RecordBatchDecoder::with_require_alignment`] for details
460    require_alignment: bool,
461    /// Should validation be skipped when reading data? Defaults to false.
462    ///
463    /// See [`FileDecoder::with_skip_validation`] for details.
464    skip_validation: UnsafeFlag,
465}
466
467impl<'a> RecordBatchDecoder<'a> {
468    /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
469    fn try_new(
470        buf: &'a Buffer,
471        batch: crate::RecordBatch<'a>,
472        schema: SchemaRef,
473        dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
474        metadata: &'a MetadataVersion,
475    ) -> Result<Self, ArrowError> {
476        let buffers = batch.buffers().ok_or_else(|| {
477            ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
478        })?;
479        let field_nodes = batch.nodes().ok_or_else(|| {
480            ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
481        })?;
482
483        let batch_compression = batch.compression();
484        let compression = batch_compression
485            .map(|batch_compression| batch_compression.codec().try_into())
486            .transpose()?;
487
488        Ok(Self {
489            batch,
490            schema,
491            dictionaries_by_id,
492            compression,
493            version: *metadata,
494            data: buf,
495            nodes: field_nodes.iter(),
496            buffers: buffers.iter(),
497            projection: None,
498            require_alignment: false,
499            skip_validation: UnsafeFlag::new(),
500        })
501    }
502
503    /// Set the projection (default: None)
504    ///
505    /// If set, the projection is the list  of column indices
506    /// that will be read
507    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
508        self.projection = projection;
509        self
510    }
511
512    /// Set require_alignment (default: false)
513    ///
514    /// If true, buffers must be aligned appropriately or error will
515    /// result. If false, buffers will be copied to aligned buffers
516    /// if necessary.
517    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
518        self.require_alignment = require_alignment;
519        self
520    }
521
522    /// Specifies if validation should be skipped when reading data (defaults to `false`)
523    ///
524    /// Note this API is somewhat "funky" as it allows the caller to skip validation
525    /// without having to use `unsafe` code. If this is ever made public
526    /// it should be made clearer that this is a potentially unsafe by
527    /// using an `unsafe` function that takes a boolean flag.
528    ///
529    /// # Safety
530    ///
531    /// Relies on the caller only passing a flag with `true` value if they are
532    /// certain that the data is valid
533    pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
534        self.skip_validation = skip_validation;
535        self
536    }
537
538    /// Read the record batch, consuming the reader
539    fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
540        let mut variadic_counts: VecDeque<i64> = self
541            .batch
542            .variadicBufferCounts()
543            .into_iter()
544            .flatten()
545            .collect();
546
547        let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
548
549        let schema = Arc::clone(&self.schema);
550        if let Some(projection) = self.projection {
551            let mut arrays = vec![];
552            // project fields
553            for (idx, field) in schema.fields().iter().enumerate() {
554                // Create array for projected field
555                if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
556                    let child = self.create_array(field, &mut variadic_counts)?;
557                    arrays.push((proj_idx, child));
558                } else {
559                    self.skip_field(field, &mut variadic_counts)?;
560                }
561            }
562
563            arrays.sort_by_key(|t| t.0);
564
565            let schema = Arc::new(schema.project(projection)?);
566            let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
567
568            if self.skip_validation.get() {
569                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
570                unsafe {
571                    Ok(RecordBatch::new_unchecked(
572                        schema,
573                        columns,
574                        self.batch.length() as usize,
575                    ))
576                }
577            } else {
578                assert!(variadic_counts.is_empty());
579                RecordBatch::try_new_with_options(schema, columns, &options)
580            }
581        } else {
582            let mut children = vec![];
583            // keep track of index as lists require more than one node
584            for field in schema.fields() {
585                let child = self.create_array(field, &mut variadic_counts)?;
586                children.push(child);
587            }
588
589            if self.skip_validation.get() {
590                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
591                unsafe {
592                    Ok(RecordBatch::new_unchecked(
593                        schema,
594                        children,
595                        self.batch.length() as usize,
596                    ))
597                }
598            } else {
599                assert!(variadic_counts.is_empty());
600                RecordBatch::try_new_with_options(schema, children, &options)
601            }
602        }
603    }
604
605    fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
606        let buffer = self.buffers.next().ok_or_else(|| {
607            ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
608        })?;
609        read_buffer(buffer, self.data, self.compression)
610    }
611
612    fn skip_buffer(&mut self) {
613        self.buffers.next().unwrap();
614    }
615
616    fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
617        self.nodes.next().ok_or_else(|| {
618            ArrowError::SchemaError(format!(
619                "Invalid data for schema. {field} refers to node not found in schema",
620            ))
621        })
622    }
623
624    fn skip_field(
625        &mut self,
626        field: &Field,
627        variadic_count: &mut VecDeque<i64>,
628    ) -> Result<(), ArrowError> {
629        self.next_node(field)?;
630
631        match field.data_type() {
632            Utf8 | Binary | LargeBinary | LargeUtf8 => {
633                for _ in 0..3 {
634                    self.skip_buffer()
635                }
636            }
637            Utf8View | BinaryView => {
638                let count = variadic_count
639                    .pop_front()
640                    .ok_or(ArrowError::IpcError(format!(
641                        "Missing variadic count for {} column",
642                        field.data_type()
643                    )))?;
644                let count = count + 2; // view and null buffer.
645                for _i in 0..count {
646                    self.skip_buffer()
647                }
648            }
649            FixedSizeBinary(_) => {
650                self.skip_buffer();
651                self.skip_buffer();
652            }
653            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
654                self.skip_buffer();
655                self.skip_buffer();
656                self.skip_field(list_field, variadic_count)?;
657            }
658            FixedSizeList(list_field, _) => {
659                self.skip_buffer();
660                self.skip_field(list_field, variadic_count)?;
661            }
662            Struct(struct_fields) => {
663                self.skip_buffer();
664
665                // skip for each field
666                for struct_field in struct_fields {
667                    self.skip_field(struct_field, variadic_count)?
668                }
669            }
670            RunEndEncoded(run_ends_field, values_field) => {
671                self.skip_field(run_ends_field, variadic_count)?;
672                self.skip_field(values_field, variadic_count)?;
673            }
674            Dictionary(_, _) => {
675                self.skip_buffer(); // Nulls
676                self.skip_buffer(); // Indices
677            }
678            Union(fields, mode) => {
679                self.skip_buffer(); // Nulls
680
681                match mode {
682                    UnionMode::Dense => self.skip_buffer(),
683                    UnionMode::Sparse => {}
684                };
685
686                for (_, field) in fields.iter() {
687                    self.skip_field(field, variadic_count)?
688                }
689            }
690            Null => {} // No buffer increases
691            _ => {
692                self.skip_buffer();
693                self.skip_buffer();
694            }
695        };
696        Ok(())
697    }
698}
699
700/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
701///
702/// If `require_alignment` is true, this function will return an error if any array data in the
703/// input `buf` is not properly aligned.
704/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct [`arrow_data::ArrayData`].
705///
706/// If `require_alignment` is false, this function will automatically allocate a new aligned buffer
707/// and copy over the data if any array data in the input `buf` is not properly aligned.
708/// (Properly aligned array data will remain zero-copy.)
709/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
710pub fn read_record_batch(
711    buf: &Buffer,
712    batch: crate::RecordBatch,
713    schema: SchemaRef,
714    dictionaries_by_id: &HashMap<i64, ArrayRef>,
715    projection: Option<&[usize]>,
716    metadata: &MetadataVersion,
717) -> Result<RecordBatch, ArrowError> {
718    RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
719        .with_projection(projection)
720        .with_require_alignment(false)
721        .read_record_batch()
722}
723
724/// Read the dictionary from the buffer and provided metadata,
725/// updating the `dictionaries_by_id` with the resulting dictionary
726pub fn read_dictionary(
727    buf: &Buffer,
728    batch: crate::DictionaryBatch,
729    schema: &Schema,
730    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
731    metadata: &MetadataVersion,
732) -> Result<(), ArrowError> {
733    read_dictionary_impl(
734        buf,
735        batch,
736        schema,
737        dictionaries_by_id,
738        metadata,
739        false,
740        UnsafeFlag::new(),
741    )
742}
743
744fn read_dictionary_impl(
745    buf: &Buffer,
746    batch: crate::DictionaryBatch,
747    schema: &Schema,
748    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
749    metadata: &MetadataVersion,
750    require_alignment: bool,
751    skip_validation: UnsafeFlag,
752) -> Result<(), ArrowError> {
753    let id = batch.id();
754
755    let dictionary_values = get_dictionary_values(
756        buf,
757        batch,
758        schema,
759        dictionaries_by_id,
760        metadata,
761        require_alignment,
762        skip_validation,
763    )?;
764
765    update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
766
767    Ok(())
768}
769
770/// Updates the `dictionaries_by_id` with the provided dictionary values and id.
771///
772/// # Errors
773/// - If `is_delta` is true and there is no existing dictionary for the given
774///   `dict_id`
775/// - If `is_delta` is true and the concatenation of the existing and new
776///   dictionary fails. This usually signals a type mismatch between the old and
777///   new values.
778fn update_dictionaries(
779    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
780    is_delta: bool,
781    dict_id: i64,
782    dict_values: ArrayRef,
783) -> Result<(), ArrowError> {
784    if !is_delta {
785        // We don't currently record the isOrdered field. This could be general
786        // attributes of arrays.
787        // Add (possibly multiple) array refs to the dictionaries array.
788        dictionaries_by_id.insert(dict_id, dict_values.clone());
789        return Ok(());
790    }
791
792    let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
793        ArrowError::InvalidArgumentError(format!(
794            "No existing dictionary for delta dictionary with id '{dict_id}'"
795        ))
796    })?;
797
798    let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
799        ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
800    })?;
801
802    dictionaries_by_id.insert(dict_id, combined);
803
804    Ok(())
805}
806
807/// Given a dictionary batch IPC message/body along with the full state of a
808/// stream including schema, dictionary cache, metadata, and other flags, this
809/// function will parse the buffer into an array of dictionary values.
810fn get_dictionary_values(
811    buf: &Buffer,
812    batch: crate::DictionaryBatch,
813    schema: &Schema,
814    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
815    metadata: &MetadataVersion,
816    require_alignment: bool,
817    skip_validation: UnsafeFlag,
818) -> Result<ArrayRef, ArrowError> {
819    let id = batch.id();
820    #[allow(deprecated)]
821    let fields_using_this_dictionary = schema.fields_with_dict_id(id);
822    let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
823        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
824    })?;
825
826    // As the dictionary batch does not contain the type of the
827    // values array, we need to retrieve this from the schema.
828    // Get an array representing this dictionary's values.
829    let dictionary_values: ArrayRef = match first_field.data_type() {
830        DataType::Dictionary(_, value_type) => {
831            // Make a fake schema for the dictionary batch.
832            let value = value_type.as_ref().clone();
833            let schema = Schema::new(vec![Field::new("", value, true)]);
834            // Read a single column
835            let record_batch = RecordBatchDecoder::try_new(
836                buf,
837                batch.data().unwrap(),
838                Arc::new(schema),
839                dictionaries_by_id,
840                metadata,
841            )?
842            .with_require_alignment(require_alignment)
843            .with_skip_validation(skip_validation)
844            .read_record_batch()?;
845
846            Some(record_batch.column(0).clone())
847        }
848        _ => None,
849    }
850    .ok_or_else(|| {
851        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
852    })?;
853
854    Ok(dictionary_values)
855}
856
857/// Read the data for a given block
858fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
859    reader.seek(SeekFrom::Start(block.offset() as u64))?;
860    let body_len = block.bodyLength().to_usize().unwrap();
861    let metadata_len = block.metaDataLength().to_usize().unwrap();
862    let total_len = body_len.checked_add(metadata_len).unwrap();
863
864    let mut buf = MutableBuffer::from_len_zeroed(total_len);
865    reader.read_exact(&mut buf)?;
866    Ok(buf.into())
867}
868
869/// Parse an encapsulated message
870///
871/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
872fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
873    let buf = match buf[..4] == CONTINUATION_MARKER {
874        true => &buf[8..],
875        false => &buf[4..],
876    };
877    crate::root_as_message(buf)
878        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
879}
880
881/// Read the footer length from the last 10 bytes of an Arrow IPC file
882///
883/// Expects a 4 byte footer length followed by `b"ARROW1"`
884pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
885    if buf[4..] != super::ARROW_MAGIC {
886        return Err(ArrowError::ParseError(
887            "Arrow file does not contain correct footer".to_string(),
888        ));
889    }
890
891    // read footer length
892    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
893    footer_len
894        .try_into()
895        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
896}
897
898/// A low-level, push-based interface for reading an IPC file
899///
900/// For a higher-level interface see [`FileReader`]
901///
902/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] example.
903///
904/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
905///
906/// ```
907/// # use std::sync::Arc;
908/// # use arrow_array::*;
909/// # use arrow_array::types::Int32Type;
910/// # use arrow_buffer::Buffer;
911/// # use arrow_ipc::convert::fb_to_schema;
912/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
913/// # use arrow_ipc::root_as_footer;
914/// # use arrow_ipc::writer::FileWriter;
915/// // Write an IPC file
916///
917/// let batch = RecordBatch::try_from_iter([
918///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
919///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
920///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", "hello", "world"])) as _),
921/// ]).unwrap();
922///
923/// let schema = batch.schema();
924///
925/// let mut out = Vec::with_capacity(1024);
926/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
927/// writer.write(&batch).unwrap();
928/// writer.finish().unwrap();
929///
930/// drop(writer);
931///
932/// // Read IPC file
933///
934/// let buffer = Buffer::from_vec(out);
935/// let trailer_start = buffer.len() - 10;
936/// let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
937/// let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
938///
939/// let back = fb_to_schema(footer.schema().unwrap());
940/// assert_eq!(&back, schema.as_ref());
941///
942/// let mut decoder = FileDecoder::new(schema, footer.version());
943///
944/// // Read dictionaries
945/// for block in footer.dictionaries().iter().flatten() {
946///     let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
947///     let data = buffer.slice_with_length(block.offset() as _, block_len);
948///     decoder.read_dictionary(&block, &data).unwrap();
949/// }
950///
951/// // Read record batch
952/// let batches = footer.recordBatches().unwrap();
953/// assert_eq!(batches.len(), 1); // Only wrote a single batch
954///
955/// let block = batches.get(0);
956/// let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
957/// let data = buffer.slice_with_length(block.offset() as _, block_len);
958/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
959///
960/// assert_eq!(batch, back);
961/// ```
962#[derive(Debug)]
963pub struct FileDecoder {
964    schema: SchemaRef,
965    dictionaries: HashMap<i64, ArrayRef>,
966    version: MetadataVersion,
967    projection: Option<Vec<usize>>,
968    require_alignment: bool,
969    skip_validation: UnsafeFlag,
970}
971
972impl FileDecoder {
973    /// Create a new [`FileDecoder`] with the given schema and version
974    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
975        Self {
976            schema,
977            version,
978            dictionaries: Default::default(),
979            projection: None,
980            require_alignment: false,
981            skip_validation: UnsafeFlag::new(),
982        }
983    }
984
985    /// Specify a projection
986    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
987        self.projection = Some(projection);
988        self
989    }
990
991    /// Specifies if the array data in input buffers is required to be properly aligned.
992    ///
993    /// If `require_alignment` is true, this decoder will return an error if any array data in the
994    /// input `buf` is not properly aligned.
995    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
996    /// [`arrow_data::ArrayData`].
997    ///
998    /// If `require_alignment` is false (the default), this decoder will automatically allocate a
999    /// new aligned buffer and copy over the data if any array data in the input `buf` is not
1000    /// properly aligned. (Properly aligned array data will remain zero-copy.)
1001    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
1002    /// [`arrow_data::ArrayData`].
1003    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1004        self.require_alignment = require_alignment;
1005        self
1006    }
1007
1008    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1009    ///
1010    /// # Safety
1011    ///
1012    /// This flag must only be set to `true` when you trust the input data and are sure the data you are
1013    /// reading is a valid Arrow IPC file, otherwise undefined behavior may
1014    /// result.
1015    ///
1016    /// For example, some programs may wish to trust reading IPC files written
1017    /// by the same process that created the files.
1018    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1019        unsafe { self.skip_validation.set(skip_validation) };
1020        self
1021    }
1022
1023    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1024        let message = parse_message(buf)?;
1025
1026        // some old test data's footer metadata is not set, so we account for that
1027        if self.version != MetadataVersion::V1 && message.version() != self.version {
1028            return Err(ArrowError::IpcError(
1029                "Could not read IPC message as metadata versions mismatch".to_string(),
1030            ));
1031        }
1032        Ok(message)
1033    }
1034
1035    /// Read the dictionary with the given block and data buffer
1036    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1037        let message = self.read_message(buf)?;
1038        match message.header_type() {
1039            crate::MessageHeader::DictionaryBatch => {
1040                let batch = message.header_as_dictionary_batch().unwrap();
1041                read_dictionary_impl(
1042                    &buf.slice(block.metaDataLength() as _),
1043                    batch,
1044                    &self.schema,
1045                    &mut self.dictionaries,
1046                    &message.version(),
1047                    self.require_alignment,
1048                    self.skip_validation.clone(),
1049                )
1050            }
1051            t => Err(ArrowError::ParseError(format!(
1052                "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1053            ))),
1054        }
1055    }
1056
1057    /// Read the RecordBatch with the given block and data buffer
1058    pub fn read_record_batch(
1059        &self,
1060        block: &Block,
1061        buf: &Buffer,
1062    ) -> Result<Option<RecordBatch>, ArrowError> {
1063        let message = self.read_message(buf)?;
1064        match message.header_type() {
1065            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1066                "Not expecting a schema when messages are read".to_string(),
1067            )),
1068            crate::MessageHeader::RecordBatch => {
1069                let batch = message.header_as_record_batch().ok_or_else(|| {
1070                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1071                })?;
1072                // read the block that makes up the record batch into a buffer
1073                RecordBatchDecoder::try_new(
1074                    &buf.slice(block.metaDataLength() as _),
1075                    batch,
1076                    self.schema.clone(),
1077                    &self.dictionaries,
1078                    &message.version(),
1079                )?
1080                .with_projection(self.projection.as_deref())
1081                .with_require_alignment(self.require_alignment)
1082                .with_skip_validation(self.skip_validation.clone())
1083                .read_record_batch()
1084                .map(Some)
1085            }
1086            crate::MessageHeader::NONE => Ok(None),
1087            t => Err(ArrowError::InvalidArgumentError(format!(
1088                "Reading types other than record batches not yet supported, unable to read {t:?}"
1089            ))),
1090        }
1091    }
1092}
1093
1094/// Build an Arrow [`FileReader`] with custom options.
1095#[derive(Debug)]
1096pub struct FileReaderBuilder {
1097    /// Optional projection for which columns to load (zero-based column indices)
1098    projection: Option<Vec<usize>>,
1099    /// Passed through to construct [`VerifierOptions`]
1100    max_footer_fb_tables: usize,
1101    /// Passed through to construct [`VerifierOptions`]
1102    max_footer_fb_depth: usize,
1103}
1104
1105impl Default for FileReaderBuilder {
1106    fn default() -> Self {
1107        let verifier_options = VerifierOptions::default();
1108        Self {
1109            max_footer_fb_tables: verifier_options.max_tables,
1110            max_footer_fb_depth: verifier_options.max_depth,
1111            projection: None,
1112        }
1113    }
1114}
1115
1116impl FileReaderBuilder {
1117    /// Options for creating a new [`FileReader`].
1118    ///
1119    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
1120    pub fn new() -> Self {
1121        Self::default()
1122    }
1123
1124    /// Optional projection for which columns to load (zero-based column indices).
1125    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1126        self.projection = Some(projection);
1127        self
1128    }
1129
1130    /// Flatbuffers option for parsing the footer. Controls the max number of fields and
1131    /// metadata key-value pairs that can be parsed from the schema of the footer.
1132    ///
1133    /// By default this is set to `1_000_000` which roughly translates to a schema with
1134    /// no metadata key-value pairs but 499,999 fields.
1135    ///
1136    /// This default limit is enforced to protect against malicious files with a massive
1137    /// amount of flatbuffer tables which could cause a denial of service attack.
1138    ///
1139    /// If you need to ingest a trusted file with a massive number of fields and/or
1140    /// metadata key-value pairs and are facing the error `"Unable to get root as
1141    /// footer: TooManyTables"` then increase this parameter as necessary.
1142    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1143        self.max_footer_fb_tables = max_footer_fb_tables;
1144        self
1145    }
1146
1147    /// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
1148    /// nested fields parsed from the footer.
1149    ///
1150    /// By default this is set to `64` which roughly translates to a schema with
1151    /// a field nested 60 levels down through other struct fields.
1152    ///
1153    /// This default limit is enforced to protect against malicious files with a extremely
1154    /// deep flatbuffer structure which could cause a denial of service attack.
1155    ///
1156    /// If you need to ingest a trusted file with a deeply nested field and are facing the
1157    /// error `"Unable to get root as footer: DepthLimitReached"` then increase this
1158    /// parameter as necessary.
1159    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1160        self.max_footer_fb_depth = max_footer_fb_depth;
1161        self
1162    }
1163
1164    /// Build [`FileReader`] with given reader.
1165    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1166        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
1167        let mut buffer = [0; 10];
1168        reader.seek(SeekFrom::End(-10))?;
1169        reader.read_exact(&mut buffer)?;
1170
1171        let footer_len = read_footer_length(buffer)?;
1172
1173        // read footer
1174        let mut footer_data = vec![0; footer_len];
1175        reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1176        reader.read_exact(&mut footer_data)?;
1177
1178        let verifier_options = VerifierOptions {
1179            max_tables: self.max_footer_fb_tables,
1180            max_depth: self.max_footer_fb_depth,
1181            ..Default::default()
1182        };
1183        let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1184            |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1185        )?;
1186
1187        let blocks = footer.recordBatches().ok_or_else(|| {
1188            ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1189        })?;
1190
1191        let total_blocks = blocks.len();
1192
1193        let ipc_schema = footer.schema().unwrap();
1194        if !ipc_schema.endianness().equals_to_target_endianness() {
1195            return Err(ArrowError::IpcError(
1196                "the endianness of the source system does not match the endianness of the target system.".to_owned()
1197            ));
1198        }
1199
1200        let schema = crate::convert::fb_to_schema(ipc_schema);
1201
1202        let mut custom_metadata = HashMap::new();
1203        if let Some(fb_custom_metadata) = footer.custom_metadata() {
1204            for kv in fb_custom_metadata.into_iter() {
1205                custom_metadata.insert(
1206                    kv.key().unwrap().to_string(),
1207                    kv.value().unwrap().to_string(),
1208                );
1209            }
1210        }
1211
1212        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1213        if let Some(projection) = self.projection {
1214            decoder = decoder.with_projection(projection)
1215        }
1216
1217        // Create an array of optional dictionary value arrays, one per field.
1218        if let Some(dictionaries) = footer.dictionaries() {
1219            for block in dictionaries {
1220                let buf = read_block(&mut reader, block)?;
1221                decoder.read_dictionary(block, &buf)?;
1222            }
1223        }
1224
1225        Ok(FileReader {
1226            reader,
1227            blocks: blocks.iter().copied().collect(),
1228            current_block: 0,
1229            total_blocks,
1230            decoder,
1231            custom_metadata,
1232        })
1233    }
1234}
1235
1236/// Arrow File Reader
1237///
1238/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC File Format],
1239/// providing random access to the record batches.
1240///
1241/// # See Also
1242///
1243/// * [`Self::set_index`] for random access
1244/// * [`StreamReader`] for reading streaming data
1245///
1246/// # Example: Reading from a `File`
1247/// ```
1248/// # use std::io::Cursor;
1249/// use arrow_array::record_batch;
1250/// # use arrow_ipc::reader::FileReader;
1251/// # use arrow_ipc::writer::FileWriter;
1252/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1253/// # let mut file = vec![]; // mimic a stream for the example
1254/// # {
1255/// #  let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1256/// #  writer.write(&batch).unwrap();
1257/// #  writer.write(&batch).unwrap();
1258/// #  writer.finish().unwrap();
1259/// # }
1260/// # let mut file = Cursor::new(&file);
1261/// let projection = None; // read all columns
1262/// let mut reader = FileReader::try_new(&mut file, projection).unwrap();
1263/// // Position the reader to the second batch
1264/// reader.set_index(1).unwrap();
1265/// // read batches from the reader using the Iterator trait
1266/// let mut num_rows = 0;
1267/// for batch in reader {
1268///    let batch = batch.unwrap();
1269///    num_rows += batch.num_rows();
1270/// }
1271/// assert_eq!(num_rows, 3);
1272/// ```
1273/// # Example: Reading from `mmap`ed file
1274///
1275/// For an example creating Arrays without copying using  memory mapped (`mmap`)
1276/// files see the [`zero_copy_ipc`] example.
1277///
1278/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1279/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
1280pub struct FileReader<R> {
1281    /// File reader that supports reading and seeking
1282    reader: R,
1283
1284    /// The decoder
1285    decoder: FileDecoder,
1286
1287    /// The blocks in the file
1288    ///
1289    /// A block indicates the regions in the file to read to get data
1290    blocks: Vec<Block>,
1291
1292    /// A counter to keep track of the current block that should be read
1293    current_block: usize,
1294
1295    /// The total number of blocks, which may contain record batches and other types
1296    total_blocks: usize,
1297
1298    /// User defined metadata
1299    custom_metadata: HashMap<String, String>,
1300}
1301
1302impl<R> fmt::Debug for FileReader<R> {
1303    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1304        f.debug_struct("FileReader<R>")
1305            .field("decoder", &self.decoder)
1306            .field("blocks", &self.blocks)
1307            .field("current_block", &self.current_block)
1308            .field("total_blocks", &self.total_blocks)
1309            .finish_non_exhaustive()
1310    }
1311}
1312
1313impl<R: Read + Seek> FileReader<BufReader<R>> {
1314    /// Try to create a new file reader with the reader wrapped in a BufReader.
1315    ///
1316    /// See [`FileReader::try_new`] for an unbuffered version.
1317    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1318        Self::try_new(BufReader::new(reader), projection)
1319    }
1320}
1321
1322impl<R: Read + Seek> FileReader<R> {
1323    /// Try to create a new file reader.
1324    ///
1325    /// There is no internal buffering. If buffered reads are needed you likely want to use
1326    /// [`FileReader::try_new_buffered`] instead.
1327    ///
1328    /// # Errors
1329    ///
1330    /// An ['Err'](Result::Err) may be returned if:
1331    /// - the file does not meet the Arrow Format footer requirements, or
1332    /// - file endianness does not match the target endianness.
1333    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1334        let builder = FileReaderBuilder {
1335            projection,
1336            ..Default::default()
1337        };
1338        builder.build(reader)
1339    }
1340
1341    /// Return user defined customized metadata
1342    pub fn custom_metadata(&self) -> &HashMap<String, String> {
1343        &self.custom_metadata
1344    }
1345
1346    /// Return the number of batches in the file
1347    pub fn num_batches(&self) -> usize {
1348        self.total_blocks
1349    }
1350
1351    /// Return the schema of the file
1352    pub fn schema(&self) -> SchemaRef {
1353        self.decoder.schema.clone()
1354    }
1355
1356    /// See to a specific [`RecordBatch`]
1357    ///
1358    /// Sets the current block to the index, allowing random reads
1359    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1360        if index >= self.total_blocks {
1361            Err(ArrowError::InvalidArgumentError(format!(
1362                "Cannot set batch to index {} from {} total batches",
1363                index, self.total_blocks
1364            )))
1365        } else {
1366            self.current_block = index;
1367            Ok(())
1368        }
1369    }
1370
1371    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1372        let block = &self.blocks[self.current_block];
1373        self.current_block += 1;
1374
1375        // read length
1376        let buffer = read_block(&mut self.reader, block)?;
1377        self.decoder.read_record_batch(block, &buffer)
1378    }
1379
1380    /// Gets a reference to the underlying reader.
1381    ///
1382    /// It is inadvisable to directly read from the underlying reader.
1383    pub fn get_ref(&self) -> &R {
1384        &self.reader
1385    }
1386
1387    /// Gets a mutable reference to the underlying reader.
1388    ///
1389    /// It is inadvisable to directly read from the underlying reader.
1390    pub fn get_mut(&mut self) -> &mut R {
1391        &mut self.reader
1392    }
1393
1394    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1395    ///
1396    /// # Safety
1397    ///
1398    /// See [`FileDecoder::with_skip_validation`]
1399    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1400        self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1401        self
1402    }
1403}
1404
1405impl<R: Read + Seek> Iterator for FileReader<R> {
1406    type Item = Result<RecordBatch, ArrowError>;
1407
1408    fn next(&mut self) -> Option<Self::Item> {
1409        // get current block
1410        if self.current_block < self.total_blocks {
1411            self.maybe_next().transpose()
1412        } else {
1413            None
1414        }
1415    }
1416}
1417
1418impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1419    fn schema(&self) -> SchemaRef {
1420        self.schema()
1421    }
1422}
1423
1424/// Arrow Stream Reader
1425///
1426/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC Streaming Format].
1427///
1428/// # See Also
1429///
1430/// * [`FileReader`] for random access.
1431///
1432/// # Example
1433/// ```
1434/// # use arrow_array::record_batch;
1435/// # use arrow_ipc::reader::StreamReader;
1436/// # use arrow_ipc::writer::StreamWriter;
1437/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1438/// # let mut stream = vec![]; // mimic a stream for the example
1439/// # {
1440/// #  let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1441/// #  writer.write(&batch).unwrap();
1442/// #  writer.finish().unwrap();
1443/// # }
1444/// # let stream = stream.as_slice();
1445/// let projection = None; // read all columns
1446/// let mut reader = StreamReader::try_new(stream, projection).unwrap();
1447/// // read batches from the reader using the Iterator trait
1448/// let mut num_rows = 0;
1449/// for batch in reader {
1450///    let batch = batch.unwrap();
1451///    num_rows += batch.num_rows();
1452/// }
1453/// assert_eq!(num_rows, 3);
1454/// ```
1455///
1456/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1457pub struct StreamReader<R> {
1458    /// Stream reader
1459    reader: MessageReader<R>,
1460
1461    /// The schema that is read from the stream's first message
1462    schema: SchemaRef,
1463
1464    /// Optional dictionaries for each schema field.
1465    ///
1466    /// Dictionaries may be appended to in the streaming format.
1467    dictionaries_by_id: HashMap<i64, ArrayRef>,
1468
1469    /// An indicator of whether the stream is complete.
1470    ///
1471    /// This value is set to `true` the first time the reader's `next()` returns `None`.
1472    finished: bool,
1473
1474    /// Optional projection
1475    projection: Option<(Vec<usize>, Schema)>,
1476
1477    /// Should validation be skipped when reading data? Defaults to false.
1478    ///
1479    /// See [`FileDecoder::with_skip_validation`] for details.
1480    skip_validation: UnsafeFlag,
1481}
1482
1483impl<R> fmt::Debug for StreamReader<R> {
1484    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1485        f.debug_struct("StreamReader<R>")
1486            .field("reader", &"R")
1487            .field("schema", &self.schema)
1488            .field("dictionaries_by_id", &self.dictionaries_by_id)
1489            .field("finished", &self.finished)
1490            .field("projection", &self.projection)
1491            .finish()
1492    }
1493}
1494
1495impl<R: Read> StreamReader<BufReader<R>> {
1496    /// Try to create a new stream reader with the reader wrapped in a BufReader.
1497    ///
1498    /// See [`StreamReader::try_new`] for an unbuffered version.
1499    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1500        Self::try_new(BufReader::new(reader), projection)
1501    }
1502}
1503
1504impl<R: Read> StreamReader<R> {
1505    /// Try to create a new stream reader.
1506    ///
1507    /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
1508    ///
1509    /// There is no internal buffering. If buffered reads are needed you likely want to use
1510    /// [`StreamReader::try_new_buffered`] instead.
1511    ///
1512    /// # Errors
1513    ///
1514    /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
1515    /// as the first message in the stream.
1516    pub fn try_new(
1517        reader: R,
1518        projection: Option<Vec<usize>>,
1519    ) -> Result<StreamReader<R>, ArrowError> {
1520        let mut msg_reader = MessageReader::new(reader);
1521        let message = msg_reader.maybe_next()?;
1522        let Some((message, _)) = message else {
1523            return Err(ArrowError::IpcError(
1524                "Expected schema message, found empty stream.".to_string(),
1525            ));
1526        };
1527
1528        if message.header_type() != Message::MessageHeader::Schema {
1529            return Err(ArrowError::IpcError(format!(
1530                "Expected a schema as the first message in the stream, got: {:?}",
1531                message.header_type()
1532            )));
1533        }
1534
1535        let schema = message.header_as_schema().ok_or_else(|| {
1536            ArrowError::ParseError("Failed to parse schema from message header".to_string())
1537        })?;
1538        let schema = crate::convert::fb_to_schema(schema);
1539
1540        // Create an array of optional dictionary value arrays, one per field.
1541        let dictionaries_by_id = HashMap::new();
1542
1543        let projection = match projection {
1544            Some(projection_indices) => {
1545                let schema = schema.project(&projection_indices)?;
1546                Some((projection_indices, schema))
1547            }
1548            _ => None,
1549        };
1550
1551        Ok(Self {
1552            reader: msg_reader,
1553            schema: Arc::new(schema),
1554            finished: false,
1555            dictionaries_by_id,
1556            projection,
1557            skip_validation: UnsafeFlag::new(),
1558        })
1559    }
1560
1561    /// Deprecated, use [`StreamReader::try_new`] instead.
1562    #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1563    pub fn try_new_unbuffered(
1564        reader: R,
1565        projection: Option<Vec<usize>>,
1566    ) -> Result<Self, ArrowError> {
1567        Self::try_new(reader, projection)
1568    }
1569
1570    /// Return the schema of the stream
1571    pub fn schema(&self) -> SchemaRef {
1572        self.schema.clone()
1573    }
1574
1575    /// Check if the stream is finished
1576    pub fn is_finished(&self) -> bool {
1577        self.finished
1578    }
1579
1580    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1581        if self.finished {
1582            return Ok(None);
1583        }
1584
1585        // Read messages until we get a record batch or end of stream
1586        loop {
1587            let message = self.next_ipc_message()?;
1588            let Some(message) = message else {
1589                // If the message is None, we have reached the end of the stream.
1590                self.finished = true;
1591                return Ok(None);
1592            };
1593
1594            match message {
1595                IpcMessage::Schema(_) => {
1596                    return Err(ArrowError::IpcError(
1597                        "Expected a record batch, but found a schema".to_string(),
1598                    ));
1599                }
1600                IpcMessage::RecordBatch(record_batch) => {
1601                    return Ok(Some(record_batch));
1602                }
1603                IpcMessage::DictionaryBatch { .. } => {
1604                    continue;
1605                }
1606            };
1607        }
1608    }
1609
1610    /// Reads and fully parses the next IPC message from the stream. Whereas
1611    /// [`Self::maybe_next`] is a higher level method focused on reading
1612    /// `RecordBatch`es, this method returns the individual fully parsed IPC
1613    /// messages from the underlying stream.
1614    ///
1615    /// This is useful primarily for testing reader/writer behaviors as it
1616    /// allows a full view into the messages that have been written to a stream.
1617    pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1618        let message = self.reader.maybe_next()?;
1619        let Some((message, body)) = message else {
1620            // If the message is None, we have reached the end of the stream.
1621            return Ok(None);
1622        };
1623
1624        let ipc_message = match message.header_type() {
1625            Message::MessageHeader::Schema => {
1626                let schema = message.header_as_schema().ok_or_else(|| {
1627                    ArrowError::ParseError("Failed to parse schema from message header".to_string())
1628                })?;
1629                let arrow_schema = crate::convert::fb_to_schema(schema);
1630                IpcMessage::Schema(arrow_schema)
1631            }
1632            Message::MessageHeader::RecordBatch => {
1633                let batch = message.header_as_record_batch().ok_or_else(|| {
1634                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1635                })?;
1636
1637                let version = message.version();
1638                let schema = self.schema.clone();
1639                let record_batch = RecordBatchDecoder::try_new(
1640                    &body.into(),
1641                    batch,
1642                    schema,
1643                    &self.dictionaries_by_id,
1644                    &version,
1645                )?
1646                .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1647                .with_require_alignment(false)
1648                .with_skip_validation(self.skip_validation.clone())
1649                .read_record_batch()?;
1650                IpcMessage::RecordBatch(record_batch)
1651            }
1652            Message::MessageHeader::DictionaryBatch => {
1653                let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1654                    ArrowError::ParseError(
1655                        "Failed to parse dictionary batch from message header".to_string(),
1656                    )
1657                })?;
1658
1659                let version = message.version();
1660                let dict_values = get_dictionary_values(
1661                    &body.into(),
1662                    dict,
1663                    &self.schema,
1664                    &mut self.dictionaries_by_id,
1665                    &version,
1666                    false,
1667                    self.skip_validation.clone(),
1668                )?;
1669
1670                update_dictionaries(
1671                    &mut self.dictionaries_by_id,
1672                    dict.isDelta(),
1673                    dict.id(),
1674                    dict_values.clone(),
1675                )?;
1676
1677                IpcMessage::DictionaryBatch {
1678                    id: dict.id(),
1679                    is_delta: (dict.isDelta()),
1680                    values: (dict_values),
1681                }
1682            }
1683            x => {
1684                return Err(ArrowError::ParseError(format!(
1685                    "Unsupported message header type in IPC stream: '{x:?}'"
1686                )));
1687            }
1688        };
1689
1690        Ok(Some(ipc_message))
1691    }
1692
1693    /// Gets a reference to the underlying reader.
1694    ///
1695    /// It is inadvisable to directly read from the underlying reader.
1696    pub fn get_ref(&self) -> &R {
1697        self.reader.inner()
1698    }
1699
1700    /// Gets a mutable reference to the underlying reader.
1701    ///
1702    /// It is inadvisable to directly read from the underlying reader.
1703    pub fn get_mut(&mut self) -> &mut R {
1704        self.reader.inner_mut()
1705    }
1706
1707    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1708    ///
1709    /// # Safety
1710    ///
1711    /// See [`FileDecoder::with_skip_validation`]
1712    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1713        unsafe { self.skip_validation.set(skip_validation) };
1714        self
1715    }
1716}
1717
1718impl<R: Read> Iterator for StreamReader<R> {
1719    type Item = Result<RecordBatch, ArrowError>;
1720
1721    fn next(&mut self) -> Option<Self::Item> {
1722        self.maybe_next().transpose()
1723    }
1724}
1725
1726impl<R: Read> RecordBatchReader for StreamReader<R> {
1727    fn schema(&self) -> SchemaRef {
1728        self.schema.clone()
1729    }
1730}
1731
1732/// Representation of a fully parsed IpcMessage from the underlying stream.
1733/// Parsing this kind of message is done by higher level constructs such as
1734/// [`StreamReader`], because fully interpreting the messages into a record
1735/// batch or dictionary batch requires access to stream state such as schema
1736/// and the full dictionary cache.
1737#[derive(Debug)]
1738#[allow(dead_code)]
1739pub(crate) enum IpcMessage {
1740    Schema(arrow_schema::Schema),
1741    RecordBatch(RecordBatch),
1742    DictionaryBatch {
1743        id: i64,
1744        is_delta: bool,
1745        values: ArrayRef,
1746    },
1747}
1748
1749/// A low-level construct that reads [`Message::Message`]s from a reader while
1750/// re-using a buffer for metadata. This is composed into [`StreamReader`].
1751struct MessageReader<R> {
1752    reader: R,
1753    buf: Vec<u8>,
1754}
1755
1756impl<R: Read> MessageReader<R> {
1757    fn new(reader: R) -> Self {
1758        Self {
1759            reader,
1760            buf: Vec::new(),
1761        }
1762    }
1763
1764    /// Reads the entire next message from the underlying reader which includes
1765    /// the metadata length, the metadata, and the body.
1766    ///
1767    /// # Returns
1768    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1769    ///   the first read
1770    /// - `Err(_)` if the reader returns an error other than on the first
1771    ///   read, or if the metadata length is invalid
1772    /// - `Ok(Some(_))` with the Message and buffer containiner the
1773    ///   body bytes otherwise.
1774    fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1775        let meta_len = self.read_meta_len()?;
1776        let Some(meta_len) = meta_len else {
1777            return Ok(None);
1778        };
1779
1780        self.buf.resize(meta_len, 0);
1781        self.reader.read_exact(&mut self.buf)?;
1782
1783        let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1784            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1785        })?;
1786
1787        let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1788        self.reader.read_exact(&mut buf)?;
1789
1790        Ok(Some((message, buf)))
1791    }
1792
1793    /// Get a mutable reference to the underlying reader.
1794    fn inner_mut(&mut self) -> &mut R {
1795        &mut self.reader
1796    }
1797
1798    /// Get an immutable reference to the underlying reader.
1799    fn inner(&self) -> &R {
1800        &self.reader
1801    }
1802
1803    /// Read the metadata length for the next message from the underlying stream.
1804    ///
1805    /// # Returns
1806    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1807    ///   the first read
1808    /// - `Err(_)` if the reader returns an error other than on the first
1809    ///   read, or if the metadata length is less than 0.
1810    /// - `Ok(Some(_))` with the length otherwise.
1811    pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1812        let mut meta_len: [u8; 4] = [0; 4];
1813        match self.reader.read_exact(&mut meta_len) {
1814            Ok(_) => {}
1815            Err(e) => {
1816                return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1817                    // Handle EOF without the "0xFFFFFFFF 0x00000000"
1818                    // valid according to:
1819                    // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1820                    Ok(None)
1821                } else {
1822                    Err(ArrowError::from(e))
1823                };
1824            }
1825        };
1826
1827        let meta_len = {
1828            // If a continuation marker is encountered, skip over it and read
1829            // the size from the next four bytes.
1830            if meta_len == CONTINUATION_MARKER {
1831                self.reader.read_exact(&mut meta_len)?;
1832            }
1833
1834            i32::from_le_bytes(meta_len)
1835        };
1836
1837        if meta_len == 0 {
1838            return Ok(None);
1839        }
1840
1841        let meta_len = usize::try_from(meta_len)
1842            .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1843
1844        Ok(Some(meta_len))
1845    }
1846}
1847
1848#[cfg(test)]
1849mod tests {
1850    use std::io::Cursor;
1851
1852    use crate::convert::fb_to_schema;
1853    use crate::writer::{
1854        DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1855    };
1856
1857    use super::*;
1858
1859    use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1860    use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1861    use arrow_array::types::*;
1862    use arrow_buffer::{NullBuffer, OffsetBuffer};
1863    use arrow_data::ArrayDataBuilder;
1864
1865    fn create_test_projection_schema() -> Schema {
1866        // define field types
1867        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1868
1869        let fixed_size_list_data_type =
1870            DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1871
1872        let union_fields = UnionFields::from_fields(vec![
1873            Field::new("a", DataType::Int32, false),
1874            Field::new("b", DataType::Float64, false),
1875        ]);
1876
1877        let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1878
1879        let struct_fields = Fields::from(vec![
1880            Field::new("id", DataType::Int32, false),
1881            Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1882        ]);
1883        let struct_data_type = DataType::Struct(struct_fields);
1884
1885        let run_encoded_data_type = DataType::RunEndEncoded(
1886            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1887            Arc::new(Field::new("values", DataType::Int32, true)),
1888        );
1889
1890        // define schema
1891        Schema::new(vec![
1892            Field::new("f0", DataType::UInt32, false),
1893            Field::new("f1", DataType::Utf8, false),
1894            Field::new("f2", DataType::Boolean, false),
1895            Field::new("f3", union_data_type, true),
1896            Field::new("f4", DataType::Null, true),
1897            Field::new("f5", DataType::Float64, true),
1898            Field::new("f6", list_data_type, false),
1899            Field::new("f7", DataType::FixedSizeBinary(3), true),
1900            Field::new("f8", fixed_size_list_data_type, false),
1901            Field::new("f9", struct_data_type, false),
1902            Field::new("f10", run_encoded_data_type, false),
1903            Field::new("f11", DataType::Boolean, false),
1904            Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1905            Field::new("f13", DataType::Utf8, false),
1906        ])
1907    }
1908
1909    fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1910        // set test data for each column
1911        let array0 = UInt32Array::from(vec![1, 2, 3]);
1912        let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1913        let array2 = BooleanArray::from(vec![true, false, true]);
1914
1915        let mut union_builder = UnionBuilder::new_dense();
1916        union_builder.append::<Int32Type>("a", 1).unwrap();
1917        union_builder.append::<Float64Type>("b", 10.1).unwrap();
1918        union_builder.append_null::<Float64Type>("b").unwrap();
1919        let array3 = union_builder.build().unwrap();
1920
1921        let array4 = NullArray::new(3);
1922        let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1923        let array6_values = vec![
1924            Some(vec![Some(10), Some(10), Some(10)]),
1925            Some(vec![Some(20), Some(20), Some(20)]),
1926            Some(vec![Some(30), Some(30)]),
1927        ];
1928        let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1929        let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1930        let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1931
1932        let array8_values = ArrayData::builder(DataType::Int32)
1933            .len(9)
1934            .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1935            .build()
1936            .unwrap();
1937        let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1938            .len(3)
1939            .add_child_data(array8_values)
1940            .build()
1941            .unwrap();
1942        let array8 = FixedSizeListArray::from(array8_data);
1943
1944        let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1945        let array9_list: ArrayRef =
1946            Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1947                Some(vec![Some(-10)]),
1948                Some(vec![Some(-20), Some(-20), Some(-20)]),
1949                Some(vec![Some(-30)]),
1950            ]));
1951        let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1952            .add_child_data(array9_id.into_data())
1953            .add_child_data(array9_list.into_data())
1954            .len(3)
1955            .build()
1956            .unwrap();
1957        let array9: ArrayRef = Arc::new(StructArray::from(array9));
1958
1959        let array10_input = vec![Some(1_i32), None, None];
1960        let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1961        array10_builder.extend(array10_input);
1962        let array10 = array10_builder.finish();
1963
1964        let array11 = BooleanArray::from(vec![false, false, true]);
1965
1966        let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1967        let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1968        let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1969
1970        let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1971
1972        // create record batch
1973        RecordBatch::try_new(
1974            Arc::new(schema.clone()),
1975            vec![
1976                Arc::new(array0),
1977                Arc::new(array1),
1978                Arc::new(array2),
1979                Arc::new(array3),
1980                Arc::new(array4),
1981                Arc::new(array5),
1982                Arc::new(array6),
1983                Arc::new(array7),
1984                Arc::new(array8),
1985                Arc::new(array9),
1986                Arc::new(array10),
1987                Arc::new(array11),
1988                Arc::new(array12),
1989                Arc::new(array13),
1990            ],
1991        )
1992        .unwrap()
1993    }
1994
1995    #[test]
1996    fn test_negative_meta_len_start_stream() {
1997        let bytes = i32::to_le_bytes(-1);
1998        let mut buf = vec![];
1999        buf.extend(CONTINUATION_MARKER);
2000        buf.extend(bytes);
2001
2002        let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2003        assert!(reader_err.is_some());
2004        assert_eq!(
2005            reader_err.unwrap().to_string(),
2006            "Parser error: Invalid metadata length: -1"
2007        );
2008    }
2009
2010    #[test]
2011    fn test_negative_meta_len_mid_stream() {
2012        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2013        let mut buf = Vec::new();
2014        {
2015            let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2016            let batch =
2017                RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2018                    .unwrap();
2019            writer.write(&batch).unwrap();
2020        }
2021
2022        let bytes = i32::to_le_bytes(-1);
2023        buf.extend(CONTINUATION_MARKER);
2024        buf.extend(bytes);
2025
2026        let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2027        // Read the valid value
2028        assert!(reader.maybe_next().is_ok());
2029        // Read the invalid meta len
2030        let batch_err = reader.maybe_next().err();
2031        assert!(batch_err.is_some());
2032        assert_eq!(
2033            batch_err.unwrap().to_string(),
2034            "Parser error: Invalid metadata length: -1"
2035        );
2036    }
2037
2038    #[test]
2039    fn test_missing_buffer_metadata_error() {
2040        use crate::r#gen::Message::*;
2041        use flatbuffers::FlatBufferBuilder;
2042
2043        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2044
2045        // create RecordBatch buffer metadata with invalid buffer count
2046        // Int32Array needs 2 buffers (validity + data) but we provide only 1
2047        let mut fbb = FlatBufferBuilder::new();
2048        let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2049        let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2050        let batch_offset = RecordBatch::create(
2051            &mut fbb,
2052            &RecordBatchArgs {
2053                length: 2,
2054                nodes: Some(nodes),
2055                buffers: Some(buffers),
2056                compression: None,
2057                variadicBufferCounts: None,
2058            },
2059        );
2060        fbb.finish_minimal(batch_offset);
2061        let batch_bytes = fbb.finished_data().to_vec();
2062        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2063
2064        let data_buffer = Buffer::from(vec![0u8; 8]);
2065        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2066        let metadata = MetadataVersion::V5;
2067
2068        let decoder = RecordBatchDecoder::try_new(
2069            &data_buffer,
2070            batch,
2071            schema.clone(),
2072            &dictionaries,
2073            &metadata,
2074        )
2075        .unwrap();
2076
2077        let result = decoder.read_record_batch();
2078
2079        match result {
2080            Err(ArrowError::IpcError(msg)) => {
2081                assert_eq!(msg, "Buffer count mismatched with metadata");
2082            }
2083            other => panic!("unexpected error: {other:?}"),
2084        }
2085    }
2086
2087    #[test]
2088    fn test_projection_array_values() {
2089        // define schema
2090        let schema = create_test_projection_schema();
2091
2092        // create record batch with test data
2093        let batch = create_test_projection_batch_data(&schema);
2094
2095        // write record batch in IPC format
2096        let mut buf = Vec::new();
2097        {
2098            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2099            writer.write(&batch).unwrap();
2100            writer.finish().unwrap();
2101        }
2102
2103        // read record batch with projection
2104        for index in 0..12 {
2105            let projection = vec![index];
2106            let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2107            let read_batch = reader.unwrap().next().unwrap().unwrap();
2108            let projected_column = read_batch.column(0);
2109            let expected_column = batch.column(index);
2110
2111            // check the projected column equals the expected column
2112            assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2113        }
2114
2115        {
2116            // read record batch with reversed projection
2117            let reader =
2118                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2119            let read_batch = reader.unwrap().next().unwrap().unwrap();
2120            let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2121            assert_eq!(read_batch, expected_batch);
2122        }
2123    }
2124
2125    #[test]
2126    fn test_arrow_single_float_row() {
2127        let schema = Schema::new(vec![
2128            Field::new("a", DataType::Float32, false),
2129            Field::new("b", DataType::Float32, false),
2130            Field::new("c", DataType::Int32, false),
2131            Field::new("d", DataType::Int32, false),
2132        ]);
2133        let arrays = vec![
2134            Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2135            Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2136            Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2137            Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2138        ];
2139        let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2140        // create stream writer
2141        let mut file = tempfile::tempfile().unwrap();
2142        let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2143        stream_writer.write(&batch).unwrap();
2144        stream_writer.finish().unwrap();
2145
2146        drop(stream_writer);
2147
2148        file.rewind().unwrap();
2149
2150        // read stream back
2151        let reader = StreamReader::try_new(&mut file, None).unwrap();
2152
2153        reader.for_each(|batch| {
2154            let batch = batch.unwrap();
2155            assert!(
2156                batch
2157                    .column(0)
2158                    .as_any()
2159                    .downcast_ref::<Float32Array>()
2160                    .unwrap()
2161                    .value(0)
2162                    != 0.0
2163            );
2164            assert!(
2165                batch
2166                    .column(1)
2167                    .as_any()
2168                    .downcast_ref::<Float32Array>()
2169                    .unwrap()
2170                    .value(0)
2171                    != 0.0
2172            );
2173        });
2174
2175        file.rewind().unwrap();
2176
2177        // Read with projection
2178        let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2179
2180        reader.for_each(|batch| {
2181            let batch = batch.unwrap();
2182            assert_eq!(batch.schema().fields().len(), 2);
2183            assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2184            assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2185        });
2186    }
2187
2188    /// Write the record batch to an in-memory buffer in IPC File format
2189    fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2190        let mut buf = Vec::new();
2191        let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2192        writer.write(rb).unwrap();
2193        writer.finish().unwrap();
2194        buf
2195    }
2196
2197    /// Return the first record batch read from the IPC File buffer
2198    fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2199        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2200        reader.next().unwrap()
2201    }
2202
2203    /// Return the first record batch read from the IPC File buffer, disabling
2204    /// validation
2205    fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2206        let mut reader = unsafe {
2207            FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2208        };
2209        reader.next().unwrap()
2210    }
2211
2212    fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2213        let buf = write_ipc(rb);
2214        read_ipc(&buf).unwrap()
2215    }
2216
2217    /// Return the first record batch read from the IPC File buffer
2218    /// using the FileDecoder API
2219    fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2220        read_ipc_with_decoder_inner(buf, false)
2221    }
2222
2223    /// Return the first record batch read from the IPC File buffer
2224    /// using the FileDecoder API, disabling validation
2225    fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2226        read_ipc_with_decoder_inner(buf, true)
2227    }
2228
2229    fn read_ipc_with_decoder_inner(
2230        buf: Vec<u8>,
2231        skip_validation: bool,
2232    ) -> Result<RecordBatch, ArrowError> {
2233        let buffer = Buffer::from_vec(buf);
2234        let trailer_start = buffer.len() - 10;
2235        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2236        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2237            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2238
2239        let schema = fb_to_schema(footer.schema().unwrap());
2240
2241        let mut decoder = unsafe {
2242            FileDecoder::new(Arc::new(schema), footer.version())
2243                .with_skip_validation(skip_validation)
2244        };
2245        // Read dictionaries
2246        for block in footer.dictionaries().iter().flatten() {
2247            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2248            let data = buffer.slice_with_length(block.offset() as _, block_len);
2249            decoder.read_dictionary(block, &data)?
2250        }
2251
2252        // Read record batch
2253        let batches = footer.recordBatches().unwrap();
2254        assert_eq!(batches.len(), 1); // Only wrote a single batch
2255
2256        let block = batches.get(0);
2257        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2258        let data = buffer.slice_with_length(block.offset() as _, block_len);
2259        Ok(decoder.read_record_batch(block, &data)?.unwrap())
2260    }
2261
2262    /// Write the record batch to an in-memory buffer in IPC Stream format
2263    fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2264        let mut buf = Vec::new();
2265        let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2266        writer.write(rb).unwrap();
2267        writer.finish().unwrap();
2268        buf
2269    }
2270
2271    /// Return the first record batch read from the IPC Stream buffer
2272    fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2273        let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2274        reader.next().unwrap()
2275    }
2276
2277    /// Return the first record batch read from the IPC Stream buffer,
2278    /// disabling validation
2279    fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2280        let mut reader = unsafe {
2281            StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2282        };
2283        reader.next().unwrap()
2284    }
2285
2286    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2287        let buf = write_stream(rb);
2288        read_stream(&buf).unwrap()
2289    }
2290
2291    #[test]
2292    fn test_roundtrip_with_custom_metadata() {
2293        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2294        let mut buf = Vec::new();
2295        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2296        let mut test_metadata = HashMap::new();
2297        test_metadata.insert("abc".to_string(), "abc".to_string());
2298        test_metadata.insert("def".to_string(), "def".to_string());
2299        for (k, v) in &test_metadata {
2300            writer.write_metadata(k, v);
2301        }
2302        writer.finish().unwrap();
2303        drop(writer);
2304
2305        let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2306        assert_eq!(reader.custom_metadata(), &test_metadata);
2307    }
2308
2309    #[test]
2310    fn test_roundtrip_nested_dict() {
2311        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2312
2313        let array = Arc::new(inner) as ArrayRef;
2314
2315        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2316
2317        let s = StructArray::from(vec![(dctfield, array)]);
2318        let struct_array = Arc::new(s) as ArrayRef;
2319
2320        let schema = Arc::new(Schema::new(vec![Field::new(
2321            "struct",
2322            struct_array.data_type().clone(),
2323            false,
2324        )]));
2325
2326        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2327
2328        assert_eq!(batch, roundtrip_ipc(&batch));
2329    }
2330
2331    #[test]
2332    fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2333        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2334
2335        let array = Arc::new(inner) as ArrayRef;
2336
2337        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2338
2339        let s = StructArray::from(vec![(dctfield, array)]);
2340        let struct_array = Arc::new(s) as ArrayRef;
2341
2342        let schema = Arc::new(Schema::new(vec![Field::new(
2343            "struct",
2344            struct_array.data_type().clone(),
2345            false,
2346        )]));
2347
2348        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2349
2350        let mut buf = Vec::new();
2351        let mut writer = crate::writer::FileWriter::try_new_with_options(
2352            &mut buf,
2353            batch.schema_ref(),
2354            IpcWriteOptions::default(),
2355        )
2356        .unwrap();
2357        writer.write(&batch).unwrap();
2358        writer.finish().unwrap();
2359        drop(writer);
2360
2361        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2362
2363        assert_eq!(batch, reader.next().unwrap().unwrap());
2364    }
2365
2366    fn check_union_with_builder(mut builder: UnionBuilder) {
2367        builder.append::<Int32Type>("a", 1).unwrap();
2368        builder.append_null::<Int32Type>("a").unwrap();
2369        builder.append::<Float64Type>("c", 3.0).unwrap();
2370        builder.append::<Int32Type>("a", 4).unwrap();
2371        builder.append::<Int64Type>("d", 11).unwrap();
2372        let union = builder.build().unwrap();
2373
2374        let schema = Arc::new(Schema::new(vec![Field::new(
2375            "union",
2376            union.data_type().clone(),
2377            false,
2378        )]));
2379
2380        let union_array = Arc::new(union) as ArrayRef;
2381
2382        let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2383        let rb2 = roundtrip_ipc(&rb);
2384        // TODO: equality not yet implemented for union, so we check that the length of the array is
2385        // the same and that all of the buffers are the same instead.
2386        assert_eq!(rb.schema(), rb2.schema());
2387        assert_eq!(rb.num_columns(), rb2.num_columns());
2388        assert_eq!(rb.num_rows(), rb2.num_rows());
2389        let union1 = rb.column(0);
2390        let union2 = rb2.column(0);
2391
2392        assert_eq!(union1, union2);
2393    }
2394
2395    #[test]
2396    fn test_roundtrip_dense_union() {
2397        check_union_with_builder(UnionBuilder::new_dense());
2398    }
2399
2400    #[test]
2401    fn test_roundtrip_sparse_union() {
2402        check_union_with_builder(UnionBuilder::new_sparse());
2403    }
2404
2405    #[test]
2406    fn test_roundtrip_struct_empty_fields() {
2407        let nulls = NullBuffer::from(&[true, true, false]);
2408        let rb = RecordBatch::try_from_iter([(
2409            "",
2410            Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2411        )])
2412        .unwrap();
2413        let rb2 = roundtrip_ipc(&rb);
2414        assert_eq!(rb, rb2);
2415    }
2416
2417    #[test]
2418    fn test_roundtrip_stream_run_array_sliced() {
2419        let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2420            .into_iter()
2421            .collect();
2422        let run_array_1_sliced = run_array_1.slice(2, 5);
2423
2424        let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2425        let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2426        run_array_2_builder.extend(run_array_2_inupt);
2427        let run_array_2 = run_array_2_builder.finish();
2428
2429        let schema = Arc::new(Schema::new(vec![
2430            Field::new(
2431                "run_array_1_sliced",
2432                run_array_1_sliced.data_type().clone(),
2433                false,
2434            ),
2435            Field::new("run_array_2", run_array_2.data_type().clone(), false),
2436        ]));
2437        let input_batch = RecordBatch::try_new(
2438            schema,
2439            vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2440        )
2441        .unwrap();
2442        let output_batch = roundtrip_ipc_stream(&input_batch);
2443
2444        // As partial comparison not yet supported for run arrays, the sliced run array
2445        // has to be unsliced before comparing with the output. the second run array
2446        // can be compared as such.
2447        assert_eq!(input_batch.column(1), output_batch.column(1));
2448
2449        let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2450        assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2451    }
2452
2453    #[test]
2454    fn test_roundtrip_stream_nested_dict() {
2455        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2456        let dict = Arc::new(
2457            xs.clone()
2458                .into_iter()
2459                .collect::<DictionaryArray<Int8Type>>(),
2460        );
2461        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2462        let struct_array = StructArray::from(vec![
2463            (
2464                Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2465                string_array,
2466            ),
2467            (
2468                Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2469                dict.clone() as ArrayRef,
2470            ),
2471        ]);
2472        let schema = Arc::new(Schema::new(vec![
2473            Field::new("f1_string", DataType::Utf8, false),
2474            Field::new("f2_struct", struct_array.data_type().clone(), false),
2475        ]));
2476        let input_batch = RecordBatch::try_new(
2477            schema,
2478            vec![
2479                Arc::new(StringArray::from(xs.clone())),
2480                Arc::new(struct_array),
2481            ],
2482        )
2483        .unwrap();
2484        let output_batch = roundtrip_ipc_stream(&input_batch);
2485        assert_eq!(input_batch, output_batch);
2486    }
2487
2488    #[test]
2489    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2490        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2491        let values = Arc::new(values) as ArrayRef;
2492        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2493        let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2494
2495        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2496        let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2497
2498        #[allow(deprecated)]
2499        let keys_field = Arc::new(Field::new_dict(
2500            "keys",
2501            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2502            true, // It is technically not legal for this field to be null.
2503            1,
2504            false,
2505        ));
2506        #[allow(deprecated)]
2507        let values_field = Arc::new(Field::new_dict(
2508            "values",
2509            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2510            true,
2511            2,
2512            false,
2513        ));
2514        let entry_struct = StructArray::from(vec![
2515            (keys_field, make_array(key_dict_array.into_data())),
2516            (values_field, make_array(value_dict_array.into_data())),
2517        ]);
2518        let map_data_type = DataType::Map(
2519            Arc::new(Field::new(
2520                "entries",
2521                entry_struct.data_type().clone(),
2522                false,
2523            )),
2524            false,
2525        );
2526
2527        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2528        let map_data = ArrayData::builder(map_data_type)
2529            .len(3)
2530            .add_buffer(entry_offsets)
2531            .add_child_data(entry_struct.into_data())
2532            .build()
2533            .unwrap();
2534        let map_array = MapArray::from(map_data);
2535
2536        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2537        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2538
2539        let schema = Arc::new(Schema::new(vec![Field::new(
2540            "f1",
2541            dict_dict_array.data_type().clone(),
2542            false,
2543        )]));
2544        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2545        let output_batch = roundtrip_ipc_stream(&input_batch);
2546        assert_eq!(input_batch, output_batch);
2547    }
2548
2549    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2550        OffsetSize: OffsetSizeTrait,
2551        U: ArrowNativeType,
2552    >(
2553        list_data_type: DataType,
2554        offsets: &[U; 5],
2555    ) {
2556        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2557        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2558        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2559        let dict_data = dict_array.to_data();
2560
2561        let value_offsets = Buffer::from_slice_ref(offsets);
2562
2563        let list_data = ArrayData::builder(list_data_type)
2564            .len(4)
2565            .add_buffer(value_offsets)
2566            .add_child_data(dict_data)
2567            .build()
2568            .unwrap();
2569        let list_array = GenericListArray::<OffsetSize>::from(list_data);
2570
2571        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2572        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2573
2574        let schema = Arc::new(Schema::new(vec![Field::new(
2575            "f1",
2576            dict_dict_array.data_type().clone(),
2577            false,
2578        )]));
2579        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2580        let output_batch = roundtrip_ipc_stream(&input_batch);
2581        assert_eq!(input_batch, output_batch);
2582    }
2583
2584    #[test]
2585    fn test_roundtrip_stream_dict_of_list_of_dict() {
2586        // list
2587        #[allow(deprecated)]
2588        let list_data_type = DataType::List(Arc::new(Field::new_dict(
2589            "item",
2590            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2591            true,
2592            1,
2593            false,
2594        )));
2595        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2596        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2597
2598        // large list
2599        #[allow(deprecated)]
2600        let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2601            "item",
2602            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2603            true,
2604            1,
2605            false,
2606        )));
2607        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2608        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2609    }
2610
2611    #[test]
2612    fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2613        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2614        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2615        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2616        let dict_data = dict_array.into_data();
2617
2618        #[allow(deprecated)]
2619        let list_data_type = DataType::FixedSizeList(
2620            Arc::new(Field::new_dict(
2621                "item",
2622                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2623                true,
2624                1,
2625                false,
2626            )),
2627            3,
2628        );
2629        let list_data = ArrayData::builder(list_data_type)
2630            .len(3)
2631            .add_child_data(dict_data)
2632            .build()
2633            .unwrap();
2634        let list_array = FixedSizeListArray::from(list_data);
2635
2636        let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2637        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2638
2639        let schema = Arc::new(Schema::new(vec![Field::new(
2640            "f1",
2641            dict_dict_array.data_type().clone(),
2642            false,
2643        )]));
2644        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2645        let output_batch = roundtrip_ipc_stream(&input_batch);
2646        assert_eq!(input_batch, output_batch);
2647    }
2648
2649    const LONG_TEST_STRING: &str =
2650        "This is a long string to make sure binary view array handles it";
2651
2652    #[test]
2653    fn test_roundtrip_view_types() {
2654        let schema = Schema::new(vec![
2655            Field::new("field_1", DataType::BinaryView, true),
2656            Field::new("field_2", DataType::Utf8, true),
2657            Field::new("field_3", DataType::Utf8View, true),
2658        ]);
2659        let bin_values: Vec<Option<&[u8]>> = vec![
2660            Some(b"foo"),
2661            None,
2662            Some(b"bar"),
2663            Some(LONG_TEST_STRING.as_bytes()),
2664        ];
2665        let utf8_values: Vec<Option<&str>> =
2666            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2667        let bin_view_array = BinaryViewArray::from_iter(bin_values);
2668        let utf8_array = StringArray::from_iter(utf8_values.iter());
2669        let utf8_view_array = StringViewArray::from_iter(utf8_values);
2670        let record_batch = RecordBatch::try_new(
2671            Arc::new(schema.clone()),
2672            vec![
2673                Arc::new(bin_view_array),
2674                Arc::new(utf8_array),
2675                Arc::new(utf8_view_array),
2676            ],
2677        )
2678        .unwrap();
2679
2680        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2681        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2682
2683        let sliced_batch = record_batch.slice(1, 2);
2684        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2685        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2686    }
2687
2688    #[test]
2689    fn test_roundtrip_view_types_nested_dict() {
2690        let bin_values: Vec<Option<&[u8]>> = vec![
2691            Some(b"foo"),
2692            None,
2693            Some(b"bar"),
2694            Some(LONG_TEST_STRING.as_bytes()),
2695            Some(b"field"),
2696        ];
2697        let utf8_values: Vec<Option<&str>> = vec![
2698            Some("foo"),
2699            None,
2700            Some("bar"),
2701            Some(LONG_TEST_STRING),
2702            Some("field"),
2703        ];
2704        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2705        let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2706
2707        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2708        let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2709        #[allow(deprecated)]
2710        let keys_field = Arc::new(Field::new_dict(
2711            "keys",
2712            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2713            true,
2714            1,
2715            false,
2716        ));
2717
2718        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2719        let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2720        #[allow(deprecated)]
2721        let values_field = Arc::new(Field::new_dict(
2722            "values",
2723            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2724            true,
2725            2,
2726            false,
2727        ));
2728        let entry_struct = StructArray::from(vec![
2729            (keys_field, make_array(key_dict_array.into_data())),
2730            (values_field, make_array(value_dict_array.into_data())),
2731        ]);
2732
2733        let map_data_type = DataType::Map(
2734            Arc::new(Field::new(
2735                "entries",
2736                entry_struct.data_type().clone(),
2737                false,
2738            )),
2739            false,
2740        );
2741        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2742        let map_data = ArrayData::builder(map_data_type)
2743            .len(3)
2744            .add_buffer(entry_offsets)
2745            .add_child_data(entry_struct.into_data())
2746            .build()
2747            .unwrap();
2748        let map_array = MapArray::from(map_data);
2749
2750        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2751        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2752        let schema = Arc::new(Schema::new(vec![Field::new(
2753            "f1",
2754            dict_dict_array.data_type().clone(),
2755            false,
2756        )]));
2757        let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2758        assert_eq!(batch, roundtrip_ipc(&batch));
2759        assert_eq!(batch, roundtrip_ipc_stream(&batch));
2760
2761        let sliced_batch = batch.slice(1, 2);
2762        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2763        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2764    }
2765
2766    #[test]
2767    fn test_no_columns_batch() {
2768        let schema = Arc::new(Schema::empty());
2769        let options = RecordBatchOptions::new()
2770            .with_match_field_names(true)
2771            .with_row_count(Some(10));
2772        let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2773        let output_batch = roundtrip_ipc_stream(&input_batch);
2774        assert_eq!(input_batch, output_batch);
2775    }
2776
2777    #[test]
2778    fn test_unaligned() {
2779        let batch = RecordBatch::try_from_iter(vec![(
2780            "i32",
2781            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2782        )])
2783        .unwrap();
2784
2785        let r#gen = IpcDataGenerator {};
2786        let mut dict_tracker = DictionaryTracker::new(false);
2787        let (_, encoded) = r#gen
2788            .encode(
2789                &batch,
2790                &mut dict_tracker,
2791                &Default::default(),
2792                &mut Default::default(),
2793            )
2794            .unwrap();
2795
2796        let message = root_as_message(&encoded.ipc_message).unwrap();
2797
2798        // Construct an unaligned buffer
2799        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2800        buffer.push(0_u8);
2801        buffer.extend_from_slice(&encoded.arrow_data);
2802        let b = Buffer::from(buffer).slice(1);
2803        assert_ne!(b.as_ptr().align_offset(8), 0);
2804
2805        let ipc_batch = message.header_as_record_batch().unwrap();
2806        let roundtrip = RecordBatchDecoder::try_new(
2807            &b,
2808            ipc_batch,
2809            batch.schema(),
2810            &Default::default(),
2811            &message.version(),
2812        )
2813        .unwrap()
2814        .with_require_alignment(false)
2815        .read_record_batch()
2816        .unwrap();
2817        assert_eq!(batch, roundtrip);
2818    }
2819
2820    #[test]
2821    fn test_unaligned_throws_error_with_require_alignment() {
2822        let batch = RecordBatch::try_from_iter(vec![(
2823            "i32",
2824            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2825        )])
2826        .unwrap();
2827
2828        let r#gen = IpcDataGenerator {};
2829        let mut dict_tracker = DictionaryTracker::new(false);
2830        let (_, encoded) = r#gen
2831            .encode(
2832                &batch,
2833                &mut dict_tracker,
2834                &Default::default(),
2835                &mut Default::default(),
2836            )
2837            .unwrap();
2838
2839        let message = root_as_message(&encoded.ipc_message).unwrap();
2840
2841        // Construct an unaligned buffer
2842        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2843        buffer.push(0_u8);
2844        buffer.extend_from_slice(&encoded.arrow_data);
2845        let b = Buffer::from(buffer).slice(1);
2846        assert_ne!(b.as_ptr().align_offset(8), 0);
2847
2848        let ipc_batch = message.header_as_record_batch().unwrap();
2849        let result = RecordBatchDecoder::try_new(
2850            &b,
2851            ipc_batch,
2852            batch.schema(),
2853            &Default::default(),
2854            &message.version(),
2855        )
2856        .unwrap()
2857        .with_require_alignment(true)
2858        .read_record_batch();
2859
2860        let error = result.unwrap_err();
2861        assert_eq!(
2862            error.to_string(),
2863            "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2864             offset from expected alignment of 4 by 1"
2865        );
2866    }
2867
2868    #[test]
2869    fn test_file_with_massive_column_count() {
2870        // 499_999 is upper limit for default settings (1_000_000)
2871        let limit = 600_000;
2872
2873        let fields = (0..limit)
2874            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2875            .collect::<Vec<_>>();
2876        let schema = Arc::new(Schema::new(fields));
2877        let batch = RecordBatch::new_empty(schema);
2878
2879        let mut buf = Vec::new();
2880        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2881        writer.write(&batch).unwrap();
2882        writer.finish().unwrap();
2883        drop(writer);
2884
2885        let mut reader = FileReaderBuilder::new()
2886            .with_max_footer_fb_tables(1_500_000)
2887            .build(std::io::Cursor::new(buf))
2888            .unwrap();
2889        let roundtrip_batch = reader.next().unwrap().unwrap();
2890
2891        assert_eq!(batch, roundtrip_batch);
2892    }
2893
2894    #[test]
2895    fn test_file_with_deeply_nested_columns() {
2896        // 60 is upper limit for default settings (64)
2897        let limit = 61;
2898
2899        let fields = (0..limit).fold(
2900            vec![Field::new("leaf", DataType::Boolean, false)],
2901            |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2902        );
2903        let schema = Arc::new(Schema::new(fields));
2904        let batch = RecordBatch::new_empty(schema);
2905
2906        let mut buf = Vec::new();
2907        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2908        writer.write(&batch).unwrap();
2909        writer.finish().unwrap();
2910        drop(writer);
2911
2912        let mut reader = FileReaderBuilder::new()
2913            .with_max_footer_fb_depth(65)
2914            .build(std::io::Cursor::new(buf))
2915            .unwrap();
2916        let roundtrip_batch = reader.next().unwrap().unwrap();
2917
2918        assert_eq!(batch, roundtrip_batch);
2919    }
2920
2921    #[test]
2922    fn test_invalid_struct_array_ipc_read_errors() {
2923        let a_field = Field::new("a", DataType::Int32, false);
2924        let b_field = Field::new("b", DataType::Int32, false);
2925        let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
2926
2927        let a_array_data = ArrayData::builder(a_field.data_type().clone())
2928            .len(4)
2929            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2930            .build()
2931            .unwrap();
2932        let b_array_data = ArrayData::builder(b_field.data_type().clone())
2933            .len(3)
2934            .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2935            .build()
2936            .unwrap();
2937
2938        let invalid_struct_arr = unsafe {
2939            StructArray::new_unchecked(
2940                struct_fields,
2941                vec![make_array(a_array_data), make_array(b_array_data)],
2942                None,
2943            )
2944        };
2945
2946        expect_ipc_validation_error(
2947            Arc::new(invalid_struct_arr),
2948            "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
2949        );
2950    }
2951
2952    #[test]
2953    fn test_invalid_nested_array_ipc_read_errors() {
2954        // one of the nested arrays has invalid data
2955        let a_field = Field::new("a", DataType::Int32, false);
2956        let b_field = Field::new("b", DataType::Utf8, false);
2957
2958        let schema = Arc::new(Schema::new(vec![Field::new_struct(
2959            "s",
2960            vec![a_field.clone(), b_field.clone()],
2961            false,
2962        )]));
2963
2964        let a_array_data = ArrayData::builder(a_field.data_type().clone())
2965            .len(4)
2966            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2967            .build()
2968            .unwrap();
2969        // invalid nested child array -- length is correct, but has invalid utf8 data
2970        let b_array_data = {
2971            let valid: &[u8] = b"   ";
2972            let mut invalid = vec![];
2973            invalid.extend_from_slice(b"ValidString");
2974            invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2975            let binary_array =
2976                BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2977            let array = unsafe {
2978                StringArray::new_unchecked(
2979                    binary_array.offsets().clone(),
2980                    binary_array.values().clone(),
2981                    binary_array.nulls().cloned(),
2982                )
2983            };
2984            array.into_data()
2985        };
2986        let struct_data_type = schema.field(0).data_type();
2987
2988        let invalid_struct_arr = unsafe {
2989            make_array(
2990                ArrayData::builder(struct_data_type.clone())
2991                    .len(4)
2992                    .add_child_data(a_array_data)
2993                    .add_child_data(b_array_data)
2994                    .build_unchecked(),
2995            )
2996        };
2997        expect_ipc_validation_error(
2998            Arc::new(invalid_struct_arr),
2999            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3000        );
3001    }
3002
3003    #[test]
3004    fn test_same_dict_id_without_preserve() {
3005        let batch = RecordBatch::try_new(
3006            Arc::new(Schema::new(
3007                ["a", "b"]
3008                    .iter()
3009                    .map(|name| {
3010                        #[allow(deprecated)]
3011                        Field::new_dict(
3012                            name.to_string(),
3013                            DataType::Dictionary(
3014                                Box::new(DataType::Int32),
3015                                Box::new(DataType::Utf8),
3016                            ),
3017                            true,
3018                            0,
3019                            false,
3020                        )
3021                    })
3022                    .collect::<Vec<Field>>(),
3023            )),
3024            vec![
3025                Arc::new(
3026                    vec![Some("c"), Some("d")]
3027                        .into_iter()
3028                        .collect::<DictionaryArray<Int32Type>>(),
3029                ) as ArrayRef,
3030                Arc::new(
3031                    vec![Some("e"), Some("f")]
3032                        .into_iter()
3033                        .collect::<DictionaryArray<Int32Type>>(),
3034                ) as ArrayRef,
3035            ],
3036        )
3037        .expect("Failed to create RecordBatch");
3038
3039        // serialize the record batch as an IPC stream
3040        let mut buf = vec![];
3041        {
3042            let mut writer = crate::writer::StreamWriter::try_new_with_options(
3043                &mut buf,
3044                batch.schema().as_ref(),
3045                crate::writer::IpcWriteOptions::default(),
3046            )
3047            .expect("Failed to create StreamWriter");
3048            writer.write(&batch).expect("Failed to write RecordBatch");
3049            writer.finish().expect("Failed to finish StreamWriter");
3050        }
3051
3052        StreamReader::try_new(std::io::Cursor::new(buf), None)
3053            .expect("Failed to create StreamReader")
3054            .for_each(|decoded_batch| {
3055                assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3056            });
3057    }
3058
3059    #[test]
3060    fn test_validation_of_invalid_list_array() {
3061        // ListArray with invalid offsets
3062        let array = unsafe {
3063            let values = Int32Array::from(vec![1, 2, 3]);
3064            let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); // offsets can't go backwards
3065            let offsets = OffsetBuffer::new_unchecked(bad_offsets); // INVALID array created
3066            let field = Field::new_list_field(DataType::Int32, true);
3067            let nulls = None;
3068            ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3069        };
3070
3071        expect_ipc_validation_error(
3072            Arc::new(array),
3073            "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3074        );
3075    }
3076
3077    #[test]
3078    fn test_validation_of_invalid_string_array() {
3079        let valid: &[u8] = b"   ";
3080        let mut invalid = vec![];
3081        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3082        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3083        let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3084        // data is not valid utf8 we can not construct a correct StringArray
3085        // safely, so purposely create an invalid StringArray
3086        let array = unsafe {
3087            StringArray::new_unchecked(
3088                binary_array.offsets().clone(),
3089                binary_array.values().clone(),
3090                binary_array.nulls().cloned(),
3091            )
3092        };
3093        expect_ipc_validation_error(
3094            Arc::new(array),
3095            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3096        );
3097    }
3098
3099    #[test]
3100    fn test_validation_of_invalid_string_view_array() {
3101        let valid: &[u8] = b"   ";
3102        let mut invalid = vec![];
3103        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3104        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3105        let binary_view_array =
3106            BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3107        // data is not valid utf8 we can not construct a correct StringArray
3108        // safely, so purposely create an invalid StringArray
3109        let array = unsafe {
3110            StringViewArray::new_unchecked(
3111                binary_view_array.views().clone(),
3112                binary_view_array.data_buffers().to_vec(),
3113                binary_view_array.nulls().cloned(),
3114            )
3115        };
3116        expect_ipc_validation_error(
3117            Arc::new(array),
3118            "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3119        );
3120    }
3121
3122    /// return an invalid dictionary array (key is larger than values)
3123    /// ListArray with invalid offsets
3124    #[test]
3125    fn test_validation_of_invalid_dictionary_array() {
3126        let array = unsafe {
3127            let values = StringArray::from_iter_values(["a", "b", "c"]);
3128            let keys = Int32Array::from(vec![1, 200]); // keys are not valid for values
3129            DictionaryArray::new_unchecked(keys, Arc::new(values))
3130        };
3131
3132        expect_ipc_validation_error(
3133            Arc::new(array),
3134            "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3135        );
3136    }
3137
3138    #[test]
3139    fn test_validation_of_invalid_union_array() {
3140        let array = unsafe {
3141            let fields = UnionFields::try_new(
3142                vec![1, 3], // typeids : type id 2 is not valid
3143                vec![
3144                    Field::new("a", DataType::Int32, false),
3145                    Field::new("b", DataType::Utf8, false),
3146                ],
3147            )
3148            .unwrap();
3149            let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); // 2 is invalid
3150            let offsets = None;
3151            let children: Vec<ArrayRef> = vec![
3152                Arc::new(Int32Array::from(vec![10, 20, 30])),
3153                Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3154            ];
3155
3156            UnionArray::new_unchecked(fields, type_ids, offsets, children)
3157        };
3158
3159        expect_ipc_validation_error(
3160            Arc::new(array),
3161            "Invalid argument error: Type Ids values must match one of the field type ids",
3162        );
3163    }
3164
3165    /// Invalid Utf-8 sequence in the first character
3166    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3167    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3168
3169    /// Expect an error when reading the record batch using IPC or IPC Streams
3170    fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3171        let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3172
3173        // IPC Stream format
3174        let buf = write_stream(&rb); // write is ok
3175        read_stream_skip_validation(&buf).unwrap();
3176        let err = read_stream(&buf).unwrap_err();
3177        assert_eq!(err.to_string(), expected_err);
3178
3179        // IPC File format
3180        let buf = write_ipc(&rb); // write is ok
3181        read_ipc_skip_validation(&buf).unwrap();
3182        let err = read_ipc(&buf).unwrap_err();
3183        assert_eq!(err.to_string(), expected_err);
3184
3185        // IPC Format with FileDecoder
3186        read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3187        let err = read_ipc_with_decoder(buf).unwrap_err();
3188        assert_eq!(err.to_string(), expected_err);
3189    }
3190
3191    #[test]
3192    fn test_roundtrip_schema() {
3193        let schema = Schema::new(vec![
3194            Field::new(
3195                "a",
3196                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3197                false,
3198            ),
3199            Field::new(
3200                "b",
3201                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3202                false,
3203            ),
3204        ]);
3205
3206        let options = IpcWriteOptions::default();
3207        let data_gen = IpcDataGenerator::default();
3208        let mut dict_tracker = DictionaryTracker::new(false);
3209        let encoded_data =
3210            data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3211        let mut schema_bytes = vec![];
3212        write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3213
3214        let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3215            4
3216        } else {
3217            0
3218        };
3219
3220        size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3221            .expect_err("size_prefixed_root_as_message");
3222
3223        let msg = parse_message(&schema_bytes).expect("parse_message");
3224        let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3225        let new_schema = fb_to_schema(ipc_schema);
3226
3227        assert_eq!(schema, new_schema);
3228    }
3229
3230    #[test]
3231    fn test_negative_meta_len() {
3232        let bytes = i32::to_le_bytes(-1);
3233        let mut buf = vec![];
3234        buf.extend(CONTINUATION_MARKER);
3235        buf.extend(bytes);
3236
3237        let reader = StreamReader::try_new(Cursor::new(buf), None);
3238        assert!(reader.is_err());
3239    }
3240}