Skip to main content

arrow_ipc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Readers
19//!
20//! # Notes
21//!
22//! The [`FileReader`] and [`StreamReader`] have similar interfaces,
23//! however the [`FileReader`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::{CompressionCodec, DecompressionContext};
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50/// Read a buffer based on offset and length
51/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
52/// Each constituent buffer is first compressed with the indicated
53/// compressor, and then written with the uncompressed length in the first 8
54/// bytes as a 64-bit little-endian signed integer followed by the compressed
55/// buffer bytes (and then padding as required by the protocol). The
56/// uncompressed length may be set to -1 to indicate that the data that
57/// follows is not compressed, which can be useful for cases where
58/// compression does not yield appreciable savings.
59fn read_buffer(
60    buf: &crate::Buffer,
61    a_data: &Buffer,
62    compression_codec: Option<CompressionCodec>,
63    decompression_context: &mut DecompressionContext,
64) -> Result<Buffer, ArrowError> {
65    let start_offset = buf.offset() as usize;
66    let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
67    // corner case: empty buffer
68    match (buf_data.is_empty(), compression_codec) {
69        (true, _) | (_, None) => Ok(buf_data),
70        (false, Some(decompressor)) => {
71            decompressor.decompress_to_buffer(&buf_data, decompression_context)
72        }
73    }
74}
75impl RecordBatchDecoder<'_> {
76    /// Coordinates reading arrays based on data types.
77    ///
78    /// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
79    /// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
80    ///
81    /// Notes:
82    /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
83    /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
84    ///   We thus:
85    ///     - check if the bit width of non-64-bit numbers is 64, and
86    ///     - read the buffer as 64-bit (signed integer or float), and
87    ///     - cast the 64-bit array to the appropriate data type
88    fn create_array(
89        &mut self,
90        field: &Field,
91        variadic_counts: &mut VecDeque<i64>,
92    ) -> Result<ArrayRef, ArrowError> {
93        let data_type = field.data_type();
94        match data_type {
95            Utf8 | Binary | LargeBinary | LargeUtf8 => {
96                let field_node = self.next_node(field)?;
97                let buffers = [
98                    self.next_buffer()?,
99                    self.next_buffer()?,
100                    self.next_buffer()?,
101                ];
102                self.create_primitive_array(field_node, data_type, &buffers)
103            }
104            BinaryView | Utf8View => {
105                let count = variadic_counts
106                    .pop_front()
107                    .ok_or(ArrowError::IpcError(format!(
108                        "Missing variadic count for {data_type} column"
109                    )))?;
110                let count = count + 2; // view and null buffer.
111                let buffers = (0..count)
112                    .map(|_| self.next_buffer())
113                    .collect::<Result<Vec<_>, _>>()?;
114                let field_node = self.next_node(field)?;
115                self.create_primitive_array(field_node, data_type, &buffers)
116            }
117            FixedSizeBinary(_) => {
118                let field_node = self.next_node(field)?;
119                let buffers = [self.next_buffer()?, self.next_buffer()?];
120                self.create_primitive_array(field_node, data_type, &buffers)
121            }
122            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
123                let list_node = self.next_node(field)?;
124                let list_buffers = [self.next_buffer()?, self.next_buffer()?];
125                let values = self.create_array(list_field, variadic_counts)?;
126                self.create_list_array(list_node, data_type, &list_buffers, values)
127            }
128            ListView(list_field) | LargeListView(list_field) => {
129                let list_node = self.next_node(field)?;
130                let list_buffers = [
131                    self.next_buffer()?, // null buffer
132                    self.next_buffer()?, // offsets
133                    self.next_buffer()?, // sizes
134                ];
135                let values = self.create_array(list_field, variadic_counts)?;
136                self.create_list_view_array(list_node, data_type, &list_buffers, values)
137            }
138            FixedSizeList(list_field, _) => {
139                let list_node = self.next_node(field)?;
140                let list_buffers = [self.next_buffer()?];
141                let values = self.create_array(list_field, variadic_counts)?;
142                self.create_list_array(list_node, data_type, &list_buffers, values)
143            }
144            Struct(struct_fields) => {
145                let struct_node = self.next_node(field)?;
146                let null_buffer = self.next_buffer()?;
147
148                // read the arrays for each field
149                let mut struct_arrays = vec![];
150                // TODO investigate whether just knowing the number of buffers could
151                // still work
152                for struct_field in struct_fields {
153                    let child = self.create_array(struct_field, variadic_counts)?;
154                    struct_arrays.push(child);
155                }
156                self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
157            }
158            RunEndEncoded(run_ends_field, values_field) => {
159                let run_node = self.next_node(field)?;
160                let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161                let values = self.create_array(values_field, variadic_counts)?;
162
163                let run_array_length = run_node.length() as usize;
164                let builder = ArrayData::builder(data_type.clone())
165                    .len(run_array_length)
166                    .offset(0)
167                    .add_child_data(run_ends.into_data())
168                    .add_child_data(values.into_data())
169                    .null_count(run_node.null_count() as usize);
170
171                self.create_array_from_builder(builder)
172            }
173            // Create dictionary array from RecordBatch
174            Dictionary(_, _) => {
175                let index_node = self.next_node(field)?;
176                let index_buffers = [self.next_buffer()?, self.next_buffer()?];
177
178                #[allow(deprecated)]
179                let dict_id = field.dict_id().ok_or_else(|| {
180                    ArrowError::ParseError(format!("Field {field} does not have dict id"))
181                })?;
182
183                let value_array = match self.dictionaries_by_id.get(&dict_id) {
184                    Some(array) => array.clone(),
185                    None => {
186                        // Per the IPC spec, dictionary batches may be omitted when all
187                        // values in the column are null. In that case we synthesize an
188                        // empty values array so decoding can proceed.
189                        if let Dictionary(_, value_type) = data_type {
190                            arrow_array::new_empty_array(value_type.as_ref())
191                        } else {
192                            unreachable!()
193                        }
194                    }
195                };
196
197                self.create_dictionary_array(index_node, data_type, &index_buffers, value_array)
198            }
199            Union(fields, mode) => {
200                let union_node = self.next_node(field)?;
201                let len = union_node.length() as usize;
202
203                // In V4, union types has validity bitmap
204                // In V5 and later, union types have no validity bitmap
205                if self.version < MetadataVersion::V5 {
206                    self.next_buffer()?;
207                }
208
209                let type_ids: ScalarBuffer<i8> =
210                    self.next_buffer()?.slice_with_length(0, len).into();
211
212                let value_offsets = match mode {
213                    UnionMode::Dense => {
214                        let offsets: ScalarBuffer<i32> =
215                            self.next_buffer()?.slice_with_length(0, len * 4).into();
216                        Some(offsets)
217                    }
218                    UnionMode::Sparse => None,
219                };
220
221                let mut children = Vec::with_capacity(fields.len());
222
223                for (_id, field) in fields.iter() {
224                    let child = self.create_array(field, variadic_counts)?;
225                    children.push(child);
226                }
227
228                let array = if self.skip_validation.get() {
229                    // safety: flag can only be set via unsafe code
230                    unsafe {
231                        UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
232                    }
233                } else {
234                    UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
235                };
236                Ok(Arc::new(array))
237            }
238            Null => {
239                let node = self.next_node(field)?;
240                let length = node.length();
241                let null_count = node.null_count();
242
243                if length != null_count {
244                    return Err(ArrowError::SchemaError(format!(
245                        "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
246                    )));
247                }
248
249                let builder = ArrayData::builder(data_type.clone())
250                    .len(length as usize)
251                    .offset(0);
252                self.create_array_from_builder(builder)
253            }
254            _ => {
255                let field_node = self.next_node(field)?;
256                let buffers = [self.next_buffer()?, self.next_buffer()?];
257                self.create_primitive_array(field_node, data_type, &buffers)
258            }
259        }
260    }
261
262    /// Reads the correct number of buffers based on data type and null_count, and creates a
263    /// primitive array ref
264    fn create_primitive_array(
265        &self,
266        field_node: &FieldNode,
267        data_type: &DataType,
268        buffers: &[Buffer],
269    ) -> Result<ArrayRef, ArrowError> {
270        let length = field_node.length() as usize;
271        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
272        let mut builder = match data_type {
273            Utf8 | Binary | LargeBinary | LargeUtf8 => {
274                // read 3 buffers: null buffer (optional), offsets buffer and data buffer
275                ArrayData::builder(data_type.clone())
276                    .len(length)
277                    .buffers(buffers[1..3].to_vec())
278                    .null_bit_buffer(null_buffer)
279            }
280            BinaryView | Utf8View => ArrayData::builder(data_type.clone())
281                .len(length)
282                .buffers(buffers[1..].to_vec())
283                .null_bit_buffer(null_buffer),
284            _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
285                // read 2 buffers: null buffer (optional) and data buffer
286                ArrayData::builder(data_type.clone())
287                    .len(length)
288                    .add_buffer(buffers[1].clone())
289                    .null_bit_buffer(null_buffer)
290            }
291            t => unreachable!("Data type {:?} either unsupported or not primitive", t),
292        };
293
294        builder = builder.null_count(field_node.null_count() as usize);
295
296        self.create_array_from_builder(builder)
297    }
298
299    /// Update the ArrayDataBuilder based on settings in this decoder
300    fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
301        let mut builder = builder.align_buffers(!self.require_alignment);
302        if self.skip_validation.get() {
303            // SAFETY: flag can only be set via unsafe code
304            unsafe { builder = builder.skip_validation(true) }
305        };
306        Ok(make_array(builder.build()?))
307    }
308
309    /// Reads the correct number of buffers based on list type and null_count, and creates a
310    /// list array ref
311    fn create_list_array(
312        &self,
313        field_node: &FieldNode,
314        data_type: &DataType,
315        buffers: &[Buffer],
316        child_array: ArrayRef,
317    ) -> Result<ArrayRef, ArrowError> {
318        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
319        let length = field_node.length() as usize;
320        let child_data = child_array.into_data();
321        let mut builder = match data_type {
322            List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
323                .len(length)
324                .add_buffer(buffers[1].clone())
325                .add_child_data(child_data)
326                .null_bit_buffer(null_buffer),
327
328            FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
329                .len(length)
330                .add_child_data(child_data)
331                .null_bit_buffer(null_buffer),
332
333            _ => unreachable!("Cannot create list or map array from {:?}", data_type),
334        };
335
336        builder = builder.null_count(field_node.null_count() as usize);
337
338        self.create_array_from_builder(builder)
339    }
340
341    fn create_list_view_array(
342        &self,
343        field_node: &FieldNode,
344        data_type: &DataType,
345        buffers: &[Buffer],
346        child_array: ArrayRef,
347    ) -> Result<ArrayRef, ArrowError> {
348        assert!(matches!(data_type, ListView(_) | LargeListView(_)));
349
350        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
351        let length = field_node.length() as usize;
352        let child_data = child_array.into_data();
353
354        self.create_array_from_builder(
355            ArrayData::builder(data_type.clone())
356                .len(length)
357                .add_buffer(buffers[1].clone()) // offsets
358                .add_buffer(buffers[2].clone()) // sizes
359                .add_child_data(child_data)
360                .null_bit_buffer(null_buffer)
361                .null_count(field_node.null_count() as usize),
362        )
363    }
364
365    fn create_struct_array(
366        &self,
367        struct_node: &FieldNode,
368        null_buffer: Buffer,
369        struct_fields: &Fields,
370        struct_arrays: Vec<ArrayRef>,
371    ) -> Result<ArrayRef, ArrowError> {
372        let null_count = struct_node.null_count() as usize;
373        let len = struct_node.length() as usize;
374        let skip_validation = self.skip_validation.get();
375
376        let nulls = if null_count > 0 {
377            let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
378            let null_buffer = if skip_validation {
379                // safety: flag can only be set via unsafe code
380                unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
381            } else {
382                let null_buffer = NullBuffer::new(validity_buffer);
383
384                if null_buffer.null_count() != null_count {
385                    return Err(ArrowError::InvalidArgumentError(format!(
386                        "null_count value ({}) doesn't match actual number of nulls in array ({})",
387                        null_count,
388                        null_buffer.null_count()
389                    )));
390                }
391
392                null_buffer
393            };
394
395            Some(null_buffer)
396        } else {
397            None
398        };
399        if struct_arrays.is_empty() {
400            // `StructArray::from` can't infer the correct row count
401            // if we have zero fields
402            return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
403        }
404
405        let struct_array = if skip_validation {
406            // safety: flag can only be set via unsafe code
407            unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
408        } else {
409            StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
410        };
411
412        Ok(Arc::new(struct_array))
413    }
414
415    /// Reads the correct number of buffers based on list type and null_count, and creates a
416    /// list array ref
417    fn create_dictionary_array(
418        &self,
419        field_node: &FieldNode,
420        data_type: &DataType,
421        buffers: &[Buffer],
422        value_array: ArrayRef,
423    ) -> Result<ArrayRef, ArrowError> {
424        if let Dictionary(_, _) = *data_type {
425            let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
426            let builder = ArrayData::builder(data_type.clone())
427                .len(field_node.length() as usize)
428                .add_buffer(buffers[1].clone())
429                .add_child_data(value_array.into_data())
430                .null_bit_buffer(null_buffer)
431                .null_count(field_node.null_count() as usize);
432            self.create_array_from_builder(builder)
433        } else {
434            unreachable!("Cannot create dictionary array from {:?}", data_type)
435        }
436    }
437}
438
439/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
440/// [`RecordBatch`]
441///
442/// [IPC RecordBatch]: crate::RecordBatch
443///
444pub struct RecordBatchDecoder<'a> {
445    /// The flatbuffers encoded record batch
446    batch: crate::RecordBatch<'a>,
447    /// The output schema
448    schema: SchemaRef,
449    /// Decoded dictionaries indexed by dictionary id
450    dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
451    /// Optional compression codec
452    compression: Option<CompressionCodec>,
453    /// Decompression context for reusing zstd decompressor state
454    decompression_context: DecompressionContext,
455    /// The format version
456    version: MetadataVersion,
457    /// The raw data buffer
458    data: &'a Buffer,
459    /// The fields comprising this array
460    nodes: VectorIter<'a, FieldNode>,
461    /// The buffers comprising this array
462    buffers: VectorIter<'a, crate::Buffer>,
463    /// Projection (subset of columns) to read, if any
464    /// See [`RecordBatchDecoder::with_projection`] for details
465    projection: Option<&'a [usize]>,
466    /// Are buffers required to already be aligned? See
467    /// [`RecordBatchDecoder::with_require_alignment`] for details
468    require_alignment: bool,
469    /// Should validation be skipped when reading data? Defaults to false.
470    ///
471    /// See [`FileDecoder::with_skip_validation`] for details.
472    skip_validation: UnsafeFlag,
473}
474
475impl<'a> RecordBatchDecoder<'a> {
476    /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
477    fn try_new(
478        buf: &'a Buffer,
479        batch: crate::RecordBatch<'a>,
480        schema: SchemaRef,
481        dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
482        metadata: &'a MetadataVersion,
483    ) -> Result<Self, ArrowError> {
484        let buffers = batch.buffers().ok_or_else(|| {
485            ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
486        })?;
487        let field_nodes = batch.nodes().ok_or_else(|| {
488            ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
489        })?;
490
491        let batch_compression = batch.compression();
492        let compression = batch_compression
493            .map(|batch_compression| batch_compression.codec().try_into())
494            .transpose()?;
495
496        Ok(Self {
497            batch,
498            schema,
499            dictionaries_by_id,
500            compression,
501            decompression_context: DecompressionContext::new(),
502            version: *metadata,
503            data: buf,
504            nodes: field_nodes.iter(),
505            buffers: buffers.iter(),
506            projection: None,
507            require_alignment: false,
508            skip_validation: UnsafeFlag::new(),
509        })
510    }
511
512    /// Set the projection (default: None)
513    ///
514    /// If set, the projection is the list  of column indices
515    /// that will be read
516    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
517        self.projection = projection;
518        self
519    }
520
521    /// Set require_alignment (default: false)
522    ///
523    /// If true, buffers must be aligned appropriately or error will
524    /// result. If false, buffers will be copied to aligned buffers
525    /// if necessary.
526    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
527        self.require_alignment = require_alignment;
528        self
529    }
530
531    /// Specifies if validation should be skipped when reading data (defaults to `false`)
532    ///
533    /// Note this API is somewhat "funky" as it allows the caller to skip validation
534    /// without having to use `unsafe` code. If this is ever made public
535    /// it should be made clearer that this is a potentially unsafe by
536    /// using an `unsafe` function that takes a boolean flag.
537    ///
538    /// # Safety
539    ///
540    /// Relies on the caller only passing a flag with `true` value if they are
541    /// certain that the data is valid
542    pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
543        self.skip_validation = skip_validation;
544        self
545    }
546
547    /// Read the record batch, consuming the reader
548    fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
549        let mut variadic_counts: VecDeque<i64> = self
550            .batch
551            .variadicBufferCounts()
552            .into_iter()
553            .flatten()
554            .collect();
555
556        let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
557
558        let schema = Arc::clone(&self.schema);
559        if let Some(projection) = self.projection {
560            let mut arrays = vec![];
561            // project fields
562            for (idx, field) in schema.fields().iter().enumerate() {
563                // Create array for projected field
564                if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
565                    let child = self.create_array(field, &mut variadic_counts)?;
566                    arrays.push((proj_idx, child));
567                } else {
568                    self.skip_field(field, &mut variadic_counts)?;
569                }
570            }
571
572            arrays.sort_by_key(|t| t.0);
573
574            let schema = Arc::new(schema.project(projection)?);
575            let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
576
577            if self.skip_validation.get() {
578                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
579                unsafe {
580                    Ok(RecordBatch::new_unchecked(
581                        schema,
582                        columns,
583                        self.batch.length() as usize,
584                    ))
585                }
586            } else {
587                assert!(variadic_counts.is_empty());
588                RecordBatch::try_new_with_options(schema, columns, &options)
589            }
590        } else {
591            let mut children = vec![];
592            // keep track of index as lists require more than one node
593            for field in schema.fields() {
594                let child = self.create_array(field, &mut variadic_counts)?;
595                children.push(child);
596            }
597
598            if self.skip_validation.get() {
599                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
600                unsafe {
601                    Ok(RecordBatch::new_unchecked(
602                        schema,
603                        children,
604                        self.batch.length() as usize,
605                    ))
606                }
607            } else {
608                assert!(variadic_counts.is_empty());
609                RecordBatch::try_new_with_options(schema, children, &options)
610            }
611        }
612    }
613
614    fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
615        let buffer = self.buffers.next().ok_or_else(|| {
616            ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
617        })?;
618        read_buffer(
619            buffer,
620            self.data,
621            self.compression,
622            &mut self.decompression_context,
623        )
624    }
625
626    fn skip_buffer(&mut self) {
627        self.buffers.next().unwrap();
628    }
629
630    fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
631        self.nodes.next().ok_or_else(|| {
632            ArrowError::SchemaError(format!(
633                "Invalid data for schema. {field} refers to node not found in schema",
634            ))
635        })
636    }
637
638    fn skip_field(
639        &mut self,
640        field: &Field,
641        variadic_count: &mut VecDeque<i64>,
642    ) -> Result<(), ArrowError> {
643        self.next_node(field)?;
644
645        match field.data_type() {
646            Utf8 | Binary | LargeBinary | LargeUtf8 => {
647                for _ in 0..3 {
648                    self.skip_buffer()
649                }
650            }
651            Utf8View | BinaryView => {
652                let count = variadic_count
653                    .pop_front()
654                    .ok_or(ArrowError::IpcError(format!(
655                        "Missing variadic count for {} column",
656                        field.data_type()
657                    )))?;
658                let count = count + 2; // view and null buffer.
659                for _i in 0..count {
660                    self.skip_buffer()
661                }
662            }
663            FixedSizeBinary(_) => {
664                self.skip_buffer();
665                self.skip_buffer();
666            }
667            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
668                self.skip_buffer();
669                self.skip_buffer();
670                self.skip_field(list_field, variadic_count)?;
671            }
672            ListView(list_field) | LargeListView(list_field) => {
673                self.skip_buffer(); // Null buffer
674                self.skip_buffer(); // Offsets
675                self.skip_buffer(); // Sizes
676                self.skip_field(list_field, variadic_count)?;
677            }
678            FixedSizeList(list_field, _) => {
679                self.skip_buffer();
680                self.skip_field(list_field, variadic_count)?;
681            }
682            Struct(struct_fields) => {
683                self.skip_buffer();
684
685                // skip for each field
686                for struct_field in struct_fields {
687                    self.skip_field(struct_field, variadic_count)?
688                }
689            }
690            RunEndEncoded(run_ends_field, values_field) => {
691                self.skip_field(run_ends_field, variadic_count)?;
692                self.skip_field(values_field, variadic_count)?;
693            }
694            Dictionary(_, _) => {
695                self.skip_buffer(); // Nulls
696                self.skip_buffer(); // Indices
697            }
698            Union(fields, mode) => {
699                self.skip_buffer(); // Nulls
700
701                match mode {
702                    UnionMode::Dense => self.skip_buffer(),
703                    UnionMode::Sparse => {}
704                };
705
706                for (_, field) in fields.iter() {
707                    self.skip_field(field, variadic_count)?
708                }
709            }
710            // Null has no buffers to skip
711            Null => {}
712
713            // Fixed-width and boolean types: skip null buffer + values buffer
714            Boolean
715            | Int8
716            | Int16
717            | Int32
718            | Int64
719            | UInt8
720            | UInt16
721            | UInt32
722            | UInt64
723            | Float16
724            | Float32
725            | Float64
726            | Timestamp(_, _)
727            | Date32
728            | Date64
729            | Time32(_)
730            | Time64(_)
731            | Duration(_)
732            | Interval(_)
733            | Decimal32(_, _)
734            | Decimal64(_, _)
735            | Decimal128(_, _)
736            | Decimal256(_, _) => {
737                self.skip_buffer();
738                self.skip_buffer();
739            }
740        };
741        Ok(())
742    }
743}
744
745/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
746///
747/// If `require_alignment` is true, this function will return an error if any array data in the
748/// input `buf` is not properly aligned.
749/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct [`arrow_data::ArrayData`].
750///
751/// If `require_alignment` is false, this function will automatically allocate a new aligned buffer
752/// and copy over the data if any array data in the input `buf` is not properly aligned.
753/// (Properly aligned array data will remain zero-copy.)
754/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct [`arrow_data::ArrayData`].
755pub fn read_record_batch(
756    buf: &Buffer,
757    batch: crate::RecordBatch,
758    schema: SchemaRef,
759    dictionaries_by_id: &HashMap<i64, ArrayRef>,
760    projection: Option<&[usize]>,
761    metadata: &MetadataVersion,
762) -> Result<RecordBatch, ArrowError> {
763    RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
764        .with_projection(projection)
765        .with_require_alignment(false)
766        .read_record_batch()
767}
768
769/// Read the dictionary from the buffer and provided metadata,
770/// updating the `dictionaries_by_id` with the resulting dictionary
771pub fn read_dictionary(
772    buf: &Buffer,
773    batch: crate::DictionaryBatch,
774    schema: &Schema,
775    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
776    metadata: &MetadataVersion,
777) -> Result<(), ArrowError> {
778    read_dictionary_impl(
779        buf,
780        batch,
781        schema,
782        dictionaries_by_id,
783        metadata,
784        false,
785        UnsafeFlag::new(),
786    )
787}
788
789fn read_dictionary_impl(
790    buf: &Buffer,
791    batch: crate::DictionaryBatch,
792    schema: &Schema,
793    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
794    metadata: &MetadataVersion,
795    require_alignment: bool,
796    skip_validation: UnsafeFlag,
797) -> Result<(), ArrowError> {
798    let id = batch.id();
799
800    let dictionary_values = get_dictionary_values(
801        buf,
802        batch,
803        schema,
804        dictionaries_by_id,
805        metadata,
806        require_alignment,
807        skip_validation,
808    )?;
809
810    update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
811
812    Ok(())
813}
814
815/// Updates the `dictionaries_by_id` with the provided dictionary values and id.
816///
817/// # Errors
818/// - If `is_delta` is true and there is no existing dictionary for the given
819///   `dict_id`
820/// - If `is_delta` is true and the concatenation of the existing and new
821///   dictionary fails. This usually signals a type mismatch between the old and
822///   new values.
823fn update_dictionaries(
824    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
825    is_delta: bool,
826    dict_id: i64,
827    dict_values: ArrayRef,
828) -> Result<(), ArrowError> {
829    if !is_delta {
830        // We don't currently record the isOrdered field. This could be general
831        // attributes of arrays.
832        // Add (possibly multiple) array refs to the dictionaries array.
833        dictionaries_by_id.insert(dict_id, dict_values.clone());
834        return Ok(());
835    }
836
837    let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
838        ArrowError::InvalidArgumentError(format!(
839            "No existing dictionary for delta dictionary with id '{dict_id}'"
840        ))
841    })?;
842
843    let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
844        ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
845    })?;
846
847    dictionaries_by_id.insert(dict_id, combined);
848
849    Ok(())
850}
851
852/// Given a dictionary batch IPC message/body along with the full state of a
853/// stream including schema, dictionary cache, metadata, and other flags, this
854/// function will parse the buffer into an array of dictionary values.
855fn get_dictionary_values(
856    buf: &Buffer,
857    batch: crate::DictionaryBatch,
858    schema: &Schema,
859    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
860    metadata: &MetadataVersion,
861    require_alignment: bool,
862    skip_validation: UnsafeFlag,
863) -> Result<ArrayRef, ArrowError> {
864    let id = batch.id();
865    #[allow(deprecated)]
866    let fields_using_this_dictionary = schema.fields_with_dict_id(id);
867    let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
868        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
869    })?;
870
871    // As the dictionary batch does not contain the type of the
872    // values array, we need to retrieve this from the schema.
873    // Get an array representing this dictionary's values.
874    let dictionary_values: ArrayRef = match first_field.data_type() {
875        DataType::Dictionary(_, value_type) => {
876            // Make a fake schema for the dictionary batch.
877            let value = value_type.as_ref().clone();
878            let schema = Schema::new(vec![Field::new("", value, true)]);
879            // Read a single column
880            let record_batch = RecordBatchDecoder::try_new(
881                buf,
882                batch.data().unwrap(),
883                Arc::new(schema),
884                dictionaries_by_id,
885                metadata,
886            )?
887            .with_require_alignment(require_alignment)
888            .with_skip_validation(skip_validation)
889            .read_record_batch()?;
890
891            Some(record_batch.column(0).clone())
892        }
893        _ => None,
894    }
895    .ok_or_else(|| {
896        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
897    })?;
898
899    Ok(dictionary_values)
900}
901
902/// Read the data for a given block
903fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
904    reader.seek(SeekFrom::Start(block.offset() as u64))?;
905    let body_len = block.bodyLength().to_usize().unwrap();
906    let metadata_len = block.metaDataLength().to_usize().unwrap();
907    let total_len = body_len.checked_add(metadata_len).unwrap();
908
909    let mut buf = MutableBuffer::from_len_zeroed(total_len);
910    reader.read_exact(&mut buf)?;
911    Ok(buf.into())
912}
913
914/// Parse an encapsulated message
915///
916/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
917fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
918    let buf = match buf[..4] == CONTINUATION_MARKER {
919        true => &buf[8..],
920        false => &buf[4..],
921    };
922    crate::root_as_message(buf)
923        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
924}
925
926/// Read the footer length from the last 10 bytes of an Arrow IPC file
927///
928/// Expects a 4 byte footer length followed by `b"ARROW1"`
929pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
930    if buf[4..] != super::ARROW_MAGIC {
931        return Err(ArrowError::ParseError(
932            "Arrow file does not contain correct footer".to_string(),
933        ));
934    }
935
936    // read footer length
937    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
938    footer_len
939        .try_into()
940        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
941}
942
943/// A low-level, push-based interface for reading an IPC file
944///
945/// For a higher-level interface see [`FileReader`]
946///
947/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] example.
948///
949/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
950///
951/// ```
952/// # use std::sync::Arc;
953/// # use arrow_array::*;
954/// # use arrow_array::types::Int32Type;
955/// # use arrow_buffer::Buffer;
956/// # use arrow_ipc::convert::fb_to_schema;
957/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
958/// # use arrow_ipc::root_as_footer;
959/// # use arrow_ipc::writer::FileWriter;
960/// // Write an IPC file
961///
962/// let batch = RecordBatch::try_from_iter([
963///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
964///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
965///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", "hello", "world"])) as _),
966/// ]).unwrap();
967///
968/// let schema = batch.schema();
969///
970/// let mut out = Vec::with_capacity(1024);
971/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
972/// writer.write(&batch).unwrap();
973/// writer.finish().unwrap();
974///
975/// drop(writer);
976///
977/// // Read IPC file
978///
979/// let buffer = Buffer::from_vec(out);
980/// let trailer_start = buffer.len() - 10;
981/// let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
982/// let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
983///
984/// let back = fb_to_schema(footer.schema().unwrap());
985/// assert_eq!(&back, schema.as_ref());
986///
987/// let mut decoder = FileDecoder::new(schema, footer.version());
988///
989/// // Read dictionaries
990/// for block in footer.dictionaries().iter().flatten() {
991///     let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
992///     let data = buffer.slice_with_length(block.offset() as _, block_len);
993///     decoder.read_dictionary(&block, &data).unwrap();
994/// }
995///
996/// // Read record batch
997/// let batches = footer.recordBatches().unwrap();
998/// assert_eq!(batches.len(), 1); // Only wrote a single batch
999///
1000/// let block = batches.get(0);
1001/// let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
1002/// let data = buffer.slice_with_length(block.offset() as _, block_len);
1003/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
1004///
1005/// assert_eq!(batch, back);
1006/// ```
1007#[derive(Debug)]
1008pub struct FileDecoder {
1009    schema: SchemaRef,
1010    dictionaries: HashMap<i64, ArrayRef>,
1011    version: MetadataVersion,
1012    projection: Option<Vec<usize>>,
1013    require_alignment: bool,
1014    skip_validation: UnsafeFlag,
1015}
1016
1017impl FileDecoder {
1018    /// Create a new [`FileDecoder`] with the given schema and version
1019    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
1020        Self {
1021            schema,
1022            version,
1023            dictionaries: Default::default(),
1024            projection: None,
1025            require_alignment: false,
1026            skip_validation: UnsafeFlag::new(),
1027        }
1028    }
1029
1030    /// Specify a projection
1031    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1032        self.projection = Some(projection);
1033        self
1034    }
1035
1036    /// Specifies if the array data in input buffers is required to be properly aligned.
1037    ///
1038    /// If `require_alignment` is true, this decoder will return an error if any array data in the
1039    /// input `buf` is not properly aligned.
1040    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
1041    /// [`arrow_data::ArrayData`].
1042    ///
1043    /// If `require_alignment` is false (the default), this decoder will automatically allocate a
1044    /// new aligned buffer and copy over the data if any array data in the input `buf` is not
1045    /// properly aligned. (Properly aligned array data will remain zero-copy.)
1046    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build_aligned`] to construct
1047    /// [`arrow_data::ArrayData`].
1048    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1049        self.require_alignment = require_alignment;
1050        self
1051    }
1052
1053    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1054    ///
1055    /// # Safety
1056    ///
1057    /// This flag must only be set to `true` when you trust the input data and are sure the data you are
1058    /// reading is a valid Arrow IPC file, otherwise undefined behavior may
1059    /// result.
1060    ///
1061    /// For example, some programs may wish to trust reading IPC files written
1062    /// by the same process that created the files.
1063    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1064        unsafe { self.skip_validation.set(skip_validation) };
1065        self
1066    }
1067
1068    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1069        let message = parse_message(buf)?;
1070
1071        // some old test data's footer metadata is not set, so we account for that
1072        if self.version != MetadataVersion::V1 && message.version() != self.version {
1073            return Err(ArrowError::IpcError(
1074                "Could not read IPC message as metadata versions mismatch".to_string(),
1075            ));
1076        }
1077        Ok(message)
1078    }
1079
1080    /// Read the dictionary with the given block and data buffer
1081    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1082        let message = self.read_message(buf)?;
1083        match message.header_type() {
1084            crate::MessageHeader::DictionaryBatch => {
1085                let batch = message.header_as_dictionary_batch().unwrap();
1086                read_dictionary_impl(
1087                    &buf.slice(block.metaDataLength() as _),
1088                    batch,
1089                    &self.schema,
1090                    &mut self.dictionaries,
1091                    &message.version(),
1092                    self.require_alignment,
1093                    self.skip_validation.clone(),
1094                )
1095            }
1096            t => Err(ArrowError::ParseError(format!(
1097                "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1098            ))),
1099        }
1100    }
1101
1102    /// Read the RecordBatch with the given block and data buffer
1103    pub fn read_record_batch(
1104        &self,
1105        block: &Block,
1106        buf: &Buffer,
1107    ) -> Result<Option<RecordBatch>, ArrowError> {
1108        let message = self.read_message(buf)?;
1109        match message.header_type() {
1110            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1111                "Not expecting a schema when messages are read".to_string(),
1112            )),
1113            crate::MessageHeader::RecordBatch => {
1114                let batch = message.header_as_record_batch().ok_or_else(|| {
1115                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1116                })?;
1117                // read the block that makes up the record batch into a buffer
1118                RecordBatchDecoder::try_new(
1119                    &buf.slice(block.metaDataLength() as _),
1120                    batch,
1121                    self.schema.clone(),
1122                    &self.dictionaries,
1123                    &message.version(),
1124                )?
1125                .with_projection(self.projection.as_deref())
1126                .with_require_alignment(self.require_alignment)
1127                .with_skip_validation(self.skip_validation.clone())
1128                .read_record_batch()
1129                .map(Some)
1130            }
1131            crate::MessageHeader::NONE => Ok(None),
1132            t => Err(ArrowError::InvalidArgumentError(format!(
1133                "Reading types other than record batches not yet supported, unable to read {t:?}"
1134            ))),
1135        }
1136    }
1137}
1138
1139/// Build an Arrow [`FileReader`] with custom options.
1140#[derive(Debug)]
1141pub struct FileReaderBuilder {
1142    /// Optional projection for which columns to load (zero-based column indices)
1143    projection: Option<Vec<usize>>,
1144    /// Passed through to construct [`VerifierOptions`]
1145    max_footer_fb_tables: usize,
1146    /// Passed through to construct [`VerifierOptions`]
1147    max_footer_fb_depth: usize,
1148}
1149
1150impl Default for FileReaderBuilder {
1151    fn default() -> Self {
1152        let verifier_options = VerifierOptions::default();
1153        Self {
1154            max_footer_fb_tables: verifier_options.max_tables,
1155            max_footer_fb_depth: verifier_options.max_depth,
1156            projection: None,
1157        }
1158    }
1159}
1160
1161impl FileReaderBuilder {
1162    /// Options for creating a new [`FileReader`].
1163    ///
1164    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
1165    pub fn new() -> Self {
1166        Self::default()
1167    }
1168
1169    /// Optional projection for which columns to load (zero-based column indices).
1170    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1171        self.projection = Some(projection);
1172        self
1173    }
1174
1175    /// Flatbuffers option for parsing the footer. Controls the max number of fields and
1176    /// metadata key-value pairs that can be parsed from the schema of the footer.
1177    ///
1178    /// By default this is set to `1_000_000` which roughly translates to a schema with
1179    /// no metadata key-value pairs but 499,999 fields.
1180    ///
1181    /// This default limit is enforced to protect against malicious files with a massive
1182    /// amount of flatbuffer tables which could cause a denial of service attack.
1183    ///
1184    /// If you need to ingest a trusted file with a massive number of fields and/or
1185    /// metadata key-value pairs and are facing the error `"Unable to get root as
1186    /// footer: TooManyTables"` then increase this parameter as necessary.
1187    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1188        self.max_footer_fb_tables = max_footer_fb_tables;
1189        self
1190    }
1191
1192    /// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
1193    /// nested fields parsed from the footer.
1194    ///
1195    /// By default this is set to `64` which roughly translates to a schema with
1196    /// a field nested 60 levels down through other struct fields.
1197    ///
1198    /// This default limit is enforced to protect against malicious files with a extremely
1199    /// deep flatbuffer structure which could cause a denial of service attack.
1200    ///
1201    /// If you need to ingest a trusted file with a deeply nested field and are facing the
1202    /// error `"Unable to get root as footer: DepthLimitReached"` then increase this
1203    /// parameter as necessary.
1204    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1205        self.max_footer_fb_depth = max_footer_fb_depth;
1206        self
1207    }
1208
1209    /// Build [`FileReader`] with given reader.
1210    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1211        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
1212        let mut buffer = [0; 10];
1213        reader.seek(SeekFrom::End(-10))?;
1214        reader.read_exact(&mut buffer)?;
1215
1216        let footer_len = read_footer_length(buffer)?;
1217
1218        // read footer
1219        let mut footer_data = vec![0; footer_len];
1220        reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1221        reader.read_exact(&mut footer_data)?;
1222
1223        let verifier_options = VerifierOptions {
1224            max_tables: self.max_footer_fb_tables,
1225            max_depth: self.max_footer_fb_depth,
1226            ..Default::default()
1227        };
1228        let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1229            |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1230        )?;
1231
1232        let blocks = footer.recordBatches().ok_or_else(|| {
1233            ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1234        })?;
1235
1236        let total_blocks = blocks.len();
1237
1238        let ipc_schema = footer.schema().unwrap();
1239        if !ipc_schema.endianness().equals_to_target_endianness() {
1240            return Err(ArrowError::IpcError(
1241                "the endianness of the source system does not match the endianness of the target system.".to_owned()
1242            ));
1243        }
1244
1245        let schema = crate::convert::fb_to_schema(ipc_schema);
1246
1247        let mut custom_metadata = HashMap::new();
1248        if let Some(fb_custom_metadata) = footer.custom_metadata() {
1249            for kv in fb_custom_metadata.into_iter() {
1250                custom_metadata.insert(
1251                    kv.key().unwrap().to_string(),
1252                    kv.value().unwrap().to_string(),
1253                );
1254            }
1255        }
1256
1257        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1258        if let Some(projection) = self.projection {
1259            decoder = decoder.with_projection(projection)
1260        }
1261
1262        // Create an array of optional dictionary value arrays, one per field.
1263        if let Some(dictionaries) = footer.dictionaries() {
1264            for block in dictionaries {
1265                let buf = read_block(&mut reader, block)?;
1266                decoder.read_dictionary(block, &buf)?;
1267            }
1268        }
1269
1270        Ok(FileReader {
1271            reader,
1272            blocks: blocks.iter().copied().collect(),
1273            current_block: 0,
1274            total_blocks,
1275            decoder,
1276            custom_metadata,
1277        })
1278    }
1279}
1280
1281/// Arrow File Reader
1282///
1283/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC File Format],
1284/// providing random access to the record batches.
1285///
1286/// # See Also
1287///
1288/// * [`Self::set_index`] for random access
1289/// * [`StreamReader`] for reading streaming data
1290///
1291/// # Example: Reading from a `File`
1292/// ```
1293/// # use std::io::Cursor;
1294/// use arrow_array::record_batch;
1295/// # use arrow_ipc::reader::FileReader;
1296/// # use arrow_ipc::writer::FileWriter;
1297/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1298/// # let mut file = vec![]; // mimic a stream for the example
1299/// # {
1300/// #  let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1301/// #  writer.write(&batch).unwrap();
1302/// #  writer.write(&batch).unwrap();
1303/// #  writer.finish().unwrap();
1304/// # }
1305/// # let mut file = Cursor::new(&file);
1306/// let projection = None; // read all columns
1307/// let mut reader = FileReader::try_new(&mut file, projection).unwrap();
1308/// // Position the reader to the second batch
1309/// reader.set_index(1).unwrap();
1310/// // read batches from the reader using the Iterator trait
1311/// let mut num_rows = 0;
1312/// for batch in reader {
1313///    let batch = batch.unwrap();
1314///    num_rows += batch.num_rows();
1315/// }
1316/// assert_eq!(num_rows, 3);
1317/// ```
1318/// # Example: Reading from `mmap`ed file
1319///
1320/// For an example creating Arrays without copying using  memory mapped (`mmap`)
1321/// files see the [`zero_copy_ipc`] example.
1322///
1323/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1324/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
1325pub struct FileReader<R> {
1326    /// File reader that supports reading and seeking
1327    reader: R,
1328
1329    /// The decoder
1330    decoder: FileDecoder,
1331
1332    /// The blocks in the file
1333    ///
1334    /// A block indicates the regions in the file to read to get data
1335    blocks: Vec<Block>,
1336
1337    /// A counter to keep track of the current block that should be read
1338    current_block: usize,
1339
1340    /// The total number of blocks, which may contain record batches and other types
1341    total_blocks: usize,
1342
1343    /// User defined metadata
1344    custom_metadata: HashMap<String, String>,
1345}
1346
1347impl<R> fmt::Debug for FileReader<R> {
1348    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1349        f.debug_struct("FileReader<R>")
1350            .field("decoder", &self.decoder)
1351            .field("blocks", &self.blocks)
1352            .field("current_block", &self.current_block)
1353            .field("total_blocks", &self.total_blocks)
1354            .finish_non_exhaustive()
1355    }
1356}
1357
1358impl<R: Read + Seek> FileReader<BufReader<R>> {
1359    /// Try to create a new file reader with the reader wrapped in a BufReader.
1360    ///
1361    /// See [`FileReader::try_new`] for an unbuffered version.
1362    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1363        Self::try_new(BufReader::new(reader), projection)
1364    }
1365}
1366
1367impl<R: Read + Seek> FileReader<R> {
1368    /// Try to create a new file reader.
1369    ///
1370    /// There is no internal buffering. If buffered reads are needed you likely want to use
1371    /// [`FileReader::try_new_buffered`] instead.
1372    ///
1373    /// # Errors
1374    ///
1375    /// An ['Err'](Result::Err) may be returned if:
1376    /// - the file does not meet the Arrow Format footer requirements, or
1377    /// - file endianness does not match the target endianness.
1378    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1379        let builder = FileReaderBuilder {
1380            projection,
1381            ..Default::default()
1382        };
1383        builder.build(reader)
1384    }
1385
1386    /// Return user defined customized metadata
1387    pub fn custom_metadata(&self) -> &HashMap<String, String> {
1388        &self.custom_metadata
1389    }
1390
1391    /// Return the number of batches in the file
1392    pub fn num_batches(&self) -> usize {
1393        self.total_blocks
1394    }
1395
1396    /// Return the schema of the file
1397    pub fn schema(&self) -> SchemaRef {
1398        self.decoder.schema.clone()
1399    }
1400
1401    /// See to a specific [`RecordBatch`]
1402    ///
1403    /// Sets the current block to the index, allowing random reads
1404    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1405        if index >= self.total_blocks {
1406            Err(ArrowError::InvalidArgumentError(format!(
1407                "Cannot set batch to index {} from {} total batches",
1408                index, self.total_blocks
1409            )))
1410        } else {
1411            self.current_block = index;
1412            Ok(())
1413        }
1414    }
1415
1416    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1417        let block = &self.blocks[self.current_block];
1418        self.current_block += 1;
1419
1420        // read length
1421        let buffer = read_block(&mut self.reader, block)?;
1422        self.decoder.read_record_batch(block, &buffer)
1423    }
1424
1425    /// Gets a reference to the underlying reader.
1426    ///
1427    /// It is inadvisable to directly read from the underlying reader.
1428    pub fn get_ref(&self) -> &R {
1429        &self.reader
1430    }
1431
1432    /// Gets a mutable reference to the underlying reader.
1433    ///
1434    /// It is inadvisable to directly read from the underlying reader.
1435    pub fn get_mut(&mut self) -> &mut R {
1436        &mut self.reader
1437    }
1438
1439    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1440    ///
1441    /// # Safety
1442    ///
1443    /// See [`FileDecoder::with_skip_validation`]
1444    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1445        self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1446        self
1447    }
1448}
1449
1450impl<R: Read + Seek> Iterator for FileReader<R> {
1451    type Item = Result<RecordBatch, ArrowError>;
1452
1453    fn next(&mut self) -> Option<Self::Item> {
1454        // get current block
1455        if self.current_block < self.total_blocks {
1456            self.maybe_next().transpose()
1457        } else {
1458            None
1459        }
1460    }
1461}
1462
1463impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1464    fn schema(&self) -> SchemaRef {
1465        self.schema()
1466    }
1467}
1468
1469/// Arrow Stream Reader
1470///
1471/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC Streaming Format].
1472///
1473/// # See Also
1474///
1475/// * [`FileReader`] for random access.
1476///
1477/// # Example
1478/// ```
1479/// # use arrow_array::record_batch;
1480/// # use arrow_ipc::reader::StreamReader;
1481/// # use arrow_ipc::writer::StreamWriter;
1482/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1483/// # let mut stream = vec![]; // mimic a stream for the example
1484/// # {
1485/// #  let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1486/// #  writer.write(&batch).unwrap();
1487/// #  writer.finish().unwrap();
1488/// # }
1489/// # let stream = stream.as_slice();
1490/// let projection = None; // read all columns
1491/// let mut reader = StreamReader::try_new(stream, projection).unwrap();
1492/// // read batches from the reader using the Iterator trait
1493/// let mut num_rows = 0;
1494/// for batch in reader {
1495///    let batch = batch.unwrap();
1496///    num_rows += batch.num_rows();
1497/// }
1498/// assert_eq!(num_rows, 3);
1499/// ```
1500///
1501/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1502pub struct StreamReader<R> {
1503    /// Stream reader
1504    reader: MessageReader<R>,
1505
1506    /// The schema that is read from the stream's first message
1507    schema: SchemaRef,
1508
1509    /// Optional dictionaries for each schema field.
1510    ///
1511    /// Dictionaries may be appended to in the streaming format.
1512    dictionaries_by_id: HashMap<i64, ArrayRef>,
1513
1514    /// An indicator of whether the stream is complete.
1515    ///
1516    /// This value is set to `true` the first time the reader's `next()` returns `None`.
1517    finished: bool,
1518
1519    /// Optional projection
1520    projection: Option<(Vec<usize>, Schema)>,
1521
1522    /// Should validation be skipped when reading data? Defaults to false.
1523    ///
1524    /// See [`FileDecoder::with_skip_validation`] for details.
1525    skip_validation: UnsafeFlag,
1526}
1527
1528impl<R> fmt::Debug for StreamReader<R> {
1529    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1530        f.debug_struct("StreamReader<R>")
1531            .field("reader", &"R")
1532            .field("schema", &self.schema)
1533            .field("dictionaries_by_id", &self.dictionaries_by_id)
1534            .field("finished", &self.finished)
1535            .field("projection", &self.projection)
1536            .finish()
1537    }
1538}
1539
1540impl<R: Read> StreamReader<BufReader<R>> {
1541    /// Try to create a new stream reader with the reader wrapped in a BufReader.
1542    ///
1543    /// See [`StreamReader::try_new`] for an unbuffered version.
1544    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1545        Self::try_new(BufReader::new(reader), projection)
1546    }
1547}
1548
1549impl<R: Read> StreamReader<R> {
1550    /// Try to create a new stream reader.
1551    ///
1552    /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
1553    ///
1554    /// There is no internal buffering. If buffered reads are needed you likely want to use
1555    /// [`StreamReader::try_new_buffered`] instead.
1556    ///
1557    /// # Errors
1558    ///
1559    /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
1560    /// as the first message in the stream.
1561    pub fn try_new(
1562        reader: R,
1563        projection: Option<Vec<usize>>,
1564    ) -> Result<StreamReader<R>, ArrowError> {
1565        let mut msg_reader = MessageReader::new(reader);
1566        let message = msg_reader.maybe_next()?;
1567        let Some((message, _)) = message else {
1568            return Err(ArrowError::IpcError(
1569                "Expected schema message, found empty stream.".to_string(),
1570            ));
1571        };
1572
1573        if message.header_type() != Message::MessageHeader::Schema {
1574            return Err(ArrowError::IpcError(format!(
1575                "Expected a schema as the first message in the stream, got: {:?}",
1576                message.header_type()
1577            )));
1578        }
1579
1580        let schema = message.header_as_schema().ok_or_else(|| {
1581            ArrowError::ParseError("Failed to parse schema from message header".to_string())
1582        })?;
1583        let schema = crate::convert::fb_to_schema(schema);
1584
1585        // Create an array of optional dictionary value arrays, one per field.
1586        let dictionaries_by_id = HashMap::new();
1587
1588        let projection = match projection {
1589            Some(projection_indices) => {
1590                let schema = schema.project(&projection_indices)?;
1591                Some((projection_indices, schema))
1592            }
1593            _ => None,
1594        };
1595
1596        Ok(Self {
1597            reader: msg_reader,
1598            schema: Arc::new(schema),
1599            finished: false,
1600            dictionaries_by_id,
1601            projection,
1602            skip_validation: UnsafeFlag::new(),
1603        })
1604    }
1605
1606    /// Deprecated, use [`StreamReader::try_new`] instead.
1607    #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1608    pub fn try_new_unbuffered(
1609        reader: R,
1610        projection: Option<Vec<usize>>,
1611    ) -> Result<Self, ArrowError> {
1612        Self::try_new(reader, projection)
1613    }
1614
1615    /// Return the schema of the stream
1616    pub fn schema(&self) -> SchemaRef {
1617        self.schema.clone()
1618    }
1619
1620    /// Check if the stream is finished
1621    pub fn is_finished(&self) -> bool {
1622        self.finished
1623    }
1624
1625    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1626        if self.finished {
1627            return Ok(None);
1628        }
1629
1630        // Read messages until we get a record batch or end of stream
1631        loop {
1632            let message = self.next_ipc_message()?;
1633            let Some(message) = message else {
1634                // If the message is None, we have reached the end of the stream.
1635                self.finished = true;
1636                return Ok(None);
1637            };
1638
1639            match message {
1640                IpcMessage::Schema(_) => {
1641                    return Err(ArrowError::IpcError(
1642                        "Expected a record batch, but found a schema".to_string(),
1643                    ));
1644                }
1645                IpcMessage::RecordBatch(record_batch) => {
1646                    return Ok(Some(record_batch));
1647                }
1648                IpcMessage::DictionaryBatch { .. } => {
1649                    continue;
1650                }
1651            };
1652        }
1653    }
1654
1655    /// Reads and fully parses the next IPC message from the stream. Whereas
1656    /// [`Self::maybe_next`] is a higher level method focused on reading
1657    /// `RecordBatch`es, this method returns the individual fully parsed IPC
1658    /// messages from the underlying stream.
1659    ///
1660    /// This is useful primarily for testing reader/writer behaviors as it
1661    /// allows a full view into the messages that have been written to a stream.
1662    pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1663        let message = self.reader.maybe_next()?;
1664        let Some((message, body)) = message else {
1665            // If the message is None, we have reached the end of the stream.
1666            return Ok(None);
1667        };
1668
1669        let ipc_message = match message.header_type() {
1670            Message::MessageHeader::Schema => {
1671                let schema = message.header_as_schema().ok_or_else(|| {
1672                    ArrowError::ParseError("Failed to parse schema from message header".to_string())
1673                })?;
1674                let arrow_schema = crate::convert::fb_to_schema(schema);
1675                IpcMessage::Schema(arrow_schema)
1676            }
1677            Message::MessageHeader::RecordBatch => {
1678                let batch = message.header_as_record_batch().ok_or_else(|| {
1679                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1680                })?;
1681
1682                let version = message.version();
1683                let schema = self.schema.clone();
1684                let record_batch = RecordBatchDecoder::try_new(
1685                    &body.into(),
1686                    batch,
1687                    schema,
1688                    &self.dictionaries_by_id,
1689                    &version,
1690                )?
1691                .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1692                .with_require_alignment(false)
1693                .with_skip_validation(self.skip_validation.clone())
1694                .read_record_batch()?;
1695                IpcMessage::RecordBatch(record_batch)
1696            }
1697            Message::MessageHeader::DictionaryBatch => {
1698                let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1699                    ArrowError::ParseError(
1700                        "Failed to parse dictionary batch from message header".to_string(),
1701                    )
1702                })?;
1703
1704                let version = message.version();
1705                let dict_values = get_dictionary_values(
1706                    &body.into(),
1707                    dict,
1708                    &self.schema,
1709                    &mut self.dictionaries_by_id,
1710                    &version,
1711                    false,
1712                    self.skip_validation.clone(),
1713                )?;
1714
1715                update_dictionaries(
1716                    &mut self.dictionaries_by_id,
1717                    dict.isDelta(),
1718                    dict.id(),
1719                    dict_values.clone(),
1720                )?;
1721
1722                IpcMessage::DictionaryBatch {
1723                    id: dict.id(),
1724                    is_delta: (dict.isDelta()),
1725                    values: (dict_values),
1726                }
1727            }
1728            x => {
1729                return Err(ArrowError::ParseError(format!(
1730                    "Unsupported message header type in IPC stream: '{x:?}'"
1731                )));
1732            }
1733        };
1734
1735        Ok(Some(ipc_message))
1736    }
1737
1738    /// Gets a reference to the underlying reader.
1739    ///
1740    /// It is inadvisable to directly read from the underlying reader.
1741    pub fn get_ref(&self) -> &R {
1742        self.reader.inner()
1743    }
1744
1745    /// Gets a mutable reference to the underlying reader.
1746    ///
1747    /// It is inadvisable to directly read from the underlying reader.
1748    pub fn get_mut(&mut self) -> &mut R {
1749        self.reader.inner_mut()
1750    }
1751
1752    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1753    ///
1754    /// # Safety
1755    ///
1756    /// See [`FileDecoder::with_skip_validation`]
1757    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1758        unsafe { self.skip_validation.set(skip_validation) };
1759        self
1760    }
1761}
1762
1763impl<R: Read> Iterator for StreamReader<R> {
1764    type Item = Result<RecordBatch, ArrowError>;
1765
1766    fn next(&mut self) -> Option<Self::Item> {
1767        self.maybe_next().transpose()
1768    }
1769}
1770
1771impl<R: Read> RecordBatchReader for StreamReader<R> {
1772    fn schema(&self) -> SchemaRef {
1773        self.schema.clone()
1774    }
1775}
1776
1777/// Representation of a fully parsed IpcMessage from the underlying stream.
1778/// Parsing this kind of message is done by higher level constructs such as
1779/// [`StreamReader`], because fully interpreting the messages into a record
1780/// batch or dictionary batch requires access to stream state such as schema
1781/// and the full dictionary cache.
1782#[derive(Debug)]
1783#[allow(dead_code)]
1784pub(crate) enum IpcMessage {
1785    Schema(arrow_schema::Schema),
1786    RecordBatch(RecordBatch),
1787    DictionaryBatch {
1788        id: i64,
1789        is_delta: bool,
1790        values: ArrayRef,
1791    },
1792}
1793
1794/// A low-level construct that reads [`Message::Message`]s from a reader while
1795/// re-using a buffer for metadata. This is composed into [`StreamReader`].
1796struct MessageReader<R> {
1797    reader: R,
1798    buf: Vec<u8>,
1799}
1800
1801impl<R: Read> MessageReader<R> {
1802    fn new(reader: R) -> Self {
1803        Self {
1804            reader,
1805            buf: Vec::new(),
1806        }
1807    }
1808
1809    /// Reads the entire next message from the underlying reader which includes
1810    /// the metadata length, the metadata, and the body.
1811    ///
1812    /// # Returns
1813    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1814    ///   the first read
1815    /// - `Err(_)` if the reader returns an error other than on the first
1816    ///   read, or if the metadata length is invalid
1817    /// - `Ok(Some(_))` with the Message and buffer containiner the
1818    ///   body bytes otherwise.
1819    fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1820        let meta_len = self.read_meta_len()?;
1821        let Some(meta_len) = meta_len else {
1822            return Ok(None);
1823        };
1824
1825        self.buf.resize(meta_len, 0);
1826        self.reader.read_exact(&mut self.buf)?;
1827
1828        let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1829            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1830        })?;
1831
1832        let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1833        self.reader.read_exact(&mut buf)?;
1834
1835        Ok(Some((message, buf)))
1836    }
1837
1838    /// Get a mutable reference to the underlying reader.
1839    fn inner_mut(&mut self) -> &mut R {
1840        &mut self.reader
1841    }
1842
1843    /// Get an immutable reference to the underlying reader.
1844    fn inner(&self) -> &R {
1845        &self.reader
1846    }
1847
1848    /// Read the metadata length for the next message from the underlying stream.
1849    ///
1850    /// # Returns
1851    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1852    ///   the first read
1853    /// - `Err(_)` if the reader returns an error other than on the first
1854    ///   read, or if the metadata length is less than 0.
1855    /// - `Ok(Some(_))` with the length otherwise.
1856    pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1857        let mut meta_len: [u8; 4] = [0; 4];
1858        match self.reader.read_exact(&mut meta_len) {
1859            Ok(_) => {}
1860            Err(e) => {
1861                return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1862                    // Handle EOF without the "0xFFFFFFFF 0x00000000"
1863                    // valid according to:
1864                    // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1865                    Ok(None)
1866                } else {
1867                    Err(ArrowError::from(e))
1868                };
1869            }
1870        };
1871
1872        let meta_len = {
1873            // If a continuation marker is encountered, skip over it and read
1874            // the size from the next four bytes.
1875            if meta_len == CONTINUATION_MARKER {
1876                self.reader.read_exact(&mut meta_len)?;
1877            }
1878
1879            i32::from_le_bytes(meta_len)
1880        };
1881
1882        if meta_len == 0 {
1883            return Ok(None);
1884        }
1885
1886        let meta_len = usize::try_from(meta_len)
1887            .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1888
1889        Ok(Some(meta_len))
1890    }
1891}
1892
1893#[cfg(test)]
1894mod tests {
1895    use std::io::Cursor;
1896
1897    use crate::convert::fb_to_schema;
1898    use crate::writer::{
1899        DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1900    };
1901
1902    use super::*;
1903
1904    use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1905    use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1906    use arrow_array::types::*;
1907    use arrow_buffer::{NullBuffer, OffsetBuffer};
1908    use arrow_data::ArrayDataBuilder;
1909
1910    fn create_test_projection_schema() -> Schema {
1911        // define field types
1912        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1913
1914        let fixed_size_list_data_type =
1915            DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1916
1917        let union_fields = UnionFields::from_fields(vec![
1918            Field::new("a", DataType::Int32, false),
1919            Field::new("b", DataType::Float64, false),
1920        ]);
1921
1922        let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1923
1924        let struct_fields = Fields::from(vec![
1925            Field::new("id", DataType::Int32, false),
1926            Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1927        ]);
1928        let struct_data_type = DataType::Struct(struct_fields);
1929
1930        let run_encoded_data_type = DataType::RunEndEncoded(
1931            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1932            Arc::new(Field::new("values", DataType::Int32, true)),
1933        );
1934
1935        // define schema
1936        Schema::new(vec![
1937            Field::new("f0", DataType::UInt32, false),
1938            Field::new("f1", DataType::Utf8, false),
1939            Field::new("f2", DataType::Boolean, false),
1940            Field::new("f3", union_data_type, true),
1941            Field::new("f4", DataType::Null, true),
1942            Field::new("f5", DataType::Float64, true),
1943            Field::new("f6", list_data_type, false),
1944            Field::new("f7", DataType::FixedSizeBinary(3), true),
1945            Field::new("f8", fixed_size_list_data_type, false),
1946            Field::new("f9", struct_data_type, false),
1947            Field::new("f10", run_encoded_data_type, false),
1948            Field::new("f11", DataType::Boolean, false),
1949            Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1950            Field::new("f13", DataType::Utf8, false),
1951        ])
1952    }
1953
1954    fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1955        // set test data for each column
1956        let array0 = UInt32Array::from(vec![1, 2, 3]);
1957        let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1958        let array2 = BooleanArray::from(vec![true, false, true]);
1959
1960        let mut union_builder = UnionBuilder::new_dense();
1961        union_builder.append::<Int32Type>("a", 1).unwrap();
1962        union_builder.append::<Float64Type>("b", 10.1).unwrap();
1963        union_builder.append_null::<Float64Type>("b").unwrap();
1964        let array3 = union_builder.build().unwrap();
1965
1966        let array4 = NullArray::new(3);
1967        let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1968        let array6_values = vec![
1969            Some(vec![Some(10), Some(10), Some(10)]),
1970            Some(vec![Some(20), Some(20), Some(20)]),
1971            Some(vec![Some(30), Some(30)]),
1972        ];
1973        let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1974        let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1975        let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1976
1977        let array8_values = ArrayData::builder(DataType::Int32)
1978            .len(9)
1979            .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1980            .build()
1981            .unwrap();
1982        let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1983            .len(3)
1984            .add_child_data(array8_values)
1985            .build()
1986            .unwrap();
1987        let array8 = FixedSizeListArray::from(array8_data);
1988
1989        let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1990        let array9_list: ArrayRef =
1991            Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1992                Some(vec![Some(-10)]),
1993                Some(vec![Some(-20), Some(-20), Some(-20)]),
1994                Some(vec![Some(-30)]),
1995            ]));
1996        let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1997            .add_child_data(array9_id.into_data())
1998            .add_child_data(array9_list.into_data())
1999            .len(3)
2000            .build()
2001            .unwrap();
2002        let array9 = StructArray::from(array9);
2003
2004        let array10_input = vec![Some(1_i32), None, None];
2005        let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2006        array10_builder.extend(array10_input);
2007        let array10 = array10_builder.finish();
2008
2009        let array11 = BooleanArray::from(vec![false, false, true]);
2010
2011        let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
2012        let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
2013        let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
2014
2015        let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
2016
2017        // create record batch
2018        RecordBatch::try_new(
2019            Arc::new(schema.clone()),
2020            vec![
2021                Arc::new(array0),
2022                Arc::new(array1),
2023                Arc::new(array2),
2024                Arc::new(array3),
2025                Arc::new(array4),
2026                Arc::new(array5),
2027                Arc::new(array6),
2028                Arc::new(array7),
2029                Arc::new(array8),
2030                Arc::new(array9),
2031                Arc::new(array10),
2032                Arc::new(array11),
2033                Arc::new(array12),
2034                Arc::new(array13),
2035            ],
2036        )
2037        .unwrap()
2038    }
2039
2040    #[test]
2041    fn test_negative_meta_len_start_stream() {
2042        let bytes = i32::to_le_bytes(-1);
2043        let mut buf = vec![];
2044        buf.extend(CONTINUATION_MARKER);
2045        buf.extend(bytes);
2046
2047        let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2048        assert!(reader_err.is_some());
2049        assert_eq!(
2050            reader_err.unwrap().to_string(),
2051            "Parser error: Invalid metadata length: -1"
2052        );
2053    }
2054
2055    #[test]
2056    fn test_negative_meta_len_mid_stream() {
2057        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2058        let mut buf = Vec::new();
2059        {
2060            let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2061            let batch =
2062                RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2063                    .unwrap();
2064            writer.write(&batch).unwrap();
2065        }
2066
2067        let bytes = i32::to_le_bytes(-1);
2068        buf.extend(CONTINUATION_MARKER);
2069        buf.extend(bytes);
2070
2071        let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2072        // Read the valid value
2073        assert!(reader.maybe_next().is_ok());
2074        // Read the invalid meta len
2075        let batch_err = reader.maybe_next().err();
2076        assert!(batch_err.is_some());
2077        assert_eq!(
2078            batch_err.unwrap().to_string(),
2079            "Parser error: Invalid metadata length: -1"
2080        );
2081    }
2082
2083    #[test]
2084    fn test_missing_buffer_metadata_error() {
2085        use crate::r#gen::Message::*;
2086        use flatbuffers::FlatBufferBuilder;
2087
2088        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2089
2090        // create RecordBatch buffer metadata with invalid buffer count
2091        // Int32Array needs 2 buffers (validity + data) but we provide only 1
2092        let mut fbb = FlatBufferBuilder::new();
2093        let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2094        let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2095        let batch_offset = RecordBatch::create(
2096            &mut fbb,
2097            &RecordBatchArgs {
2098                length: 2,
2099                nodes: Some(nodes),
2100                buffers: Some(buffers),
2101                compression: None,
2102                variadicBufferCounts: None,
2103            },
2104        );
2105        fbb.finish_minimal(batch_offset);
2106        let batch_bytes = fbb.finished_data().to_vec();
2107        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2108
2109        let data_buffer = Buffer::from(vec![0u8; 8]);
2110        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2111        let metadata = MetadataVersion::V5;
2112
2113        let decoder = RecordBatchDecoder::try_new(
2114            &data_buffer,
2115            batch,
2116            schema.clone(),
2117            &dictionaries,
2118            &metadata,
2119        )
2120        .unwrap();
2121
2122        let result = decoder.read_record_batch();
2123
2124        match result {
2125            Err(ArrowError::IpcError(msg)) => {
2126                assert_eq!(msg, "Buffer count mismatched with metadata");
2127            }
2128            other => panic!("unexpected error: {other:?}"),
2129        }
2130    }
2131
2132    /// Test that the reader can read legacy files where empty list arrays were written with a 0-byte offsets buffer.
2133    #[test]
2134    fn test_read_legacy_empty_list_without_offsets_buffer() {
2135        use crate::r#gen::Message::*;
2136        use flatbuffers::FlatBufferBuilder;
2137
2138        let schema = Arc::new(Schema::new(vec![Field::new_list(
2139            "items",
2140            Field::new_list_field(DataType::Int32, true),
2141            true,
2142        )]));
2143
2144        // Legacy arrow-rs versions wrote empty offsets buffers for empty list arrays.
2145        // Keep reader compatibility with such files by accepting a 0-byte offsets buffer.
2146        let mut fbb = FlatBufferBuilder::new();
2147        let nodes = fbb.create_vector(&[
2148            FieldNode::new(0, 0), // list node
2149            FieldNode::new(0, 0), // child int32 node
2150        ]);
2151        let buffers = fbb.create_vector(&[
2152            crate::Buffer::new(0, 0), // list validity
2153            crate::Buffer::new(0, 0), // list offsets (legacy empty buffer)
2154            crate::Buffer::new(0, 0), // child validity
2155            crate::Buffer::new(0, 0), // child values
2156        ]);
2157        let batch_offset = RecordBatch::create(
2158            &mut fbb,
2159            &RecordBatchArgs {
2160                length: 0,
2161                nodes: Some(nodes),
2162                buffers: Some(buffers),
2163                compression: None,
2164                variadicBufferCounts: None,
2165            },
2166        );
2167        fbb.finish_minimal(batch_offset);
2168        let batch_bytes = fbb.finished_data().to_vec();
2169        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2170
2171        let body = Buffer::from(Vec::<u8>::new());
2172        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2173        let metadata = MetadataVersion::V5;
2174
2175        let decoder =
2176            RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2177                .unwrap();
2178
2179        let read_batch = decoder.read_record_batch().unwrap();
2180        assert_eq!(read_batch.num_rows(), 0);
2181
2182        let list = read_batch
2183            .column(0)
2184            .as_any()
2185            .downcast_ref::<ListArray>()
2186            .unwrap();
2187        assert_eq!(list.len(), 0);
2188        assert_eq!(list.values().len(), 0);
2189    }
2190
2191    /// Test that the reader can read legacy files where empty Utf8/Binary arrays were written with a 0-byte offsets buffer.
2192    #[test]
2193    fn test_read_legacy_empty_utf8_and_binary_without_offsets_buffer() {
2194        use crate::r#gen::Message::*;
2195        use flatbuffers::FlatBufferBuilder;
2196
2197        let schema = Arc::new(Schema::new(vec![
2198            Field::new("name", DataType::Utf8, true),
2199            Field::new("payload", DataType::Binary, true),
2200        ]));
2201
2202        // Legacy arrow-rs versions wrote empty offsets buffers for empty Utf8/Binary arrays.
2203        // Keep reader compatibility with such files by accepting 0-byte offsets buffers.
2204        let mut fbb = FlatBufferBuilder::new();
2205        let nodes = fbb.create_vector(&[
2206            FieldNode::new(0, 0), // utf8 node
2207            FieldNode::new(0, 0), // binary node
2208        ]);
2209        let buffers = fbb.create_vector(&[
2210            crate::Buffer::new(0, 0), // utf8 validity
2211            crate::Buffer::new(0, 0), // utf8 offsets (legacy empty buffer)
2212            crate::Buffer::new(0, 0), // utf8 values
2213            crate::Buffer::new(0, 0), // binary validity
2214            crate::Buffer::new(0, 0), // binary offsets (legacy empty buffer)
2215            crate::Buffer::new(0, 0), // binary values
2216        ]);
2217        let batch_offset = RecordBatch::create(
2218            &mut fbb,
2219            &RecordBatchArgs {
2220                length: 0,
2221                nodes: Some(nodes),
2222                buffers: Some(buffers),
2223                compression: None,
2224                variadicBufferCounts: None,
2225            },
2226        );
2227        fbb.finish_minimal(batch_offset);
2228        let batch_bytes = fbb.finished_data().to_vec();
2229        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2230
2231        let body = Buffer::from(Vec::<u8>::new());
2232        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2233        let metadata = MetadataVersion::V5;
2234
2235        let decoder =
2236            RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2237                .unwrap();
2238
2239        let read_batch = decoder.read_record_batch().unwrap();
2240        assert_eq!(read_batch.num_rows(), 0);
2241
2242        let utf8 = read_batch
2243            .column(0)
2244            .as_any()
2245            .downcast_ref::<StringArray>()
2246            .unwrap();
2247        assert_eq!(utf8.len(), 0);
2248        assert_eq!(utf8.value_offsets(), [0]);
2249
2250        let binary = read_batch
2251            .column(1)
2252            .as_any()
2253            .downcast_ref::<BinaryArray>()
2254            .unwrap();
2255        assert_eq!(binary.len(), 0);
2256        assert_eq!(binary.value_offsets(), [0]);
2257    }
2258
2259    #[test]
2260    fn test_projection_array_values() {
2261        // define schema
2262        let schema = create_test_projection_schema();
2263
2264        // create record batch with test data
2265        let batch = create_test_projection_batch_data(&schema);
2266
2267        // write record batch in IPC format
2268        let mut buf = Vec::new();
2269        {
2270            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2271            writer.write(&batch).unwrap();
2272            writer.finish().unwrap();
2273        }
2274
2275        // read record batch with projection
2276        for index in 0..12 {
2277            let projection = vec![index];
2278            let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2279            let read_batch = reader.unwrap().next().unwrap().unwrap();
2280            let projected_column = read_batch.column(0);
2281            let expected_column = batch.column(index);
2282
2283            // check the projected column equals the expected column
2284            assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2285        }
2286
2287        {
2288            // read record batch with reversed projection
2289            let reader =
2290                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2291            let read_batch = reader.unwrap().next().unwrap().unwrap();
2292            let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2293            assert_eq!(read_batch, expected_batch);
2294        }
2295    }
2296
2297    #[test]
2298    fn test_arrow_single_float_row() {
2299        let schema = Schema::new(vec![
2300            Field::new("a", DataType::Float32, false),
2301            Field::new("b", DataType::Float32, false),
2302            Field::new("c", DataType::Int32, false),
2303            Field::new("d", DataType::Int32, false),
2304        ]);
2305        let arrays = vec![
2306            Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2307            Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2308            Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2309            Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2310        ];
2311        let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2312        // create stream writer
2313        let mut file = tempfile::tempfile().unwrap();
2314        let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2315        stream_writer.write(&batch).unwrap();
2316        stream_writer.finish().unwrap();
2317
2318        drop(stream_writer);
2319
2320        file.rewind().unwrap();
2321
2322        // read stream back
2323        let reader = StreamReader::try_new(&mut file, None).unwrap();
2324
2325        reader.for_each(|batch| {
2326            let batch = batch.unwrap();
2327            assert!(
2328                batch
2329                    .column(0)
2330                    .as_any()
2331                    .downcast_ref::<Float32Array>()
2332                    .unwrap()
2333                    .value(0)
2334                    != 0.0
2335            );
2336            assert!(
2337                batch
2338                    .column(1)
2339                    .as_any()
2340                    .downcast_ref::<Float32Array>()
2341                    .unwrap()
2342                    .value(0)
2343                    != 0.0
2344            );
2345        });
2346
2347        file.rewind().unwrap();
2348
2349        // Read with projection
2350        let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2351
2352        reader.for_each(|batch| {
2353            let batch = batch.unwrap();
2354            assert_eq!(batch.schema().fields().len(), 2);
2355            assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2356            assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2357        });
2358    }
2359
2360    /// Write the record batch to an in-memory buffer in IPC File format
2361    fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2362        let mut buf = Vec::new();
2363        let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2364        writer.write(rb).unwrap();
2365        writer.finish().unwrap();
2366        buf
2367    }
2368
2369    /// Return the first record batch read from the IPC File buffer
2370    fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2371        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2372        reader.next().unwrap()
2373    }
2374
2375    /// Return the first record batch read from the IPC File buffer, disabling
2376    /// validation
2377    fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2378        let mut reader = unsafe {
2379            FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2380        };
2381        reader.next().unwrap()
2382    }
2383
2384    fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2385        let buf = write_ipc(rb);
2386        read_ipc(&buf).unwrap()
2387    }
2388
2389    /// Return the first record batch read from the IPC File buffer
2390    /// using the FileDecoder API
2391    fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2392        read_ipc_with_decoder_inner(buf, false)
2393    }
2394
2395    /// Return the first record batch read from the IPC File buffer
2396    /// using the FileDecoder API, disabling validation
2397    fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2398        read_ipc_with_decoder_inner(buf, true)
2399    }
2400
2401    fn read_ipc_with_decoder_inner(
2402        buf: Vec<u8>,
2403        skip_validation: bool,
2404    ) -> Result<RecordBatch, ArrowError> {
2405        let buffer = Buffer::from_vec(buf);
2406        let trailer_start = buffer.len() - 10;
2407        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2408        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2409            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2410
2411        let schema = fb_to_schema(footer.schema().unwrap());
2412
2413        let mut decoder = unsafe {
2414            FileDecoder::new(Arc::new(schema), footer.version())
2415                .with_skip_validation(skip_validation)
2416        };
2417        // Read dictionaries
2418        for block in footer.dictionaries().iter().flatten() {
2419            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2420            let data = buffer.slice_with_length(block.offset() as _, block_len);
2421            decoder.read_dictionary(block, &data)?
2422        }
2423
2424        // Read record batch
2425        let batches = footer.recordBatches().unwrap();
2426        assert_eq!(batches.len(), 1); // Only wrote a single batch
2427
2428        let block = batches.get(0);
2429        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2430        let data = buffer.slice_with_length(block.offset() as _, block_len);
2431        Ok(decoder.read_record_batch(block, &data)?.unwrap())
2432    }
2433
2434    /// Write the record batch to an in-memory buffer in IPC Stream format
2435    fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2436        let mut buf = Vec::new();
2437        let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2438        writer.write(rb).unwrap();
2439        writer.finish().unwrap();
2440        buf
2441    }
2442
2443    /// Return the first record batch read from the IPC Stream buffer
2444    fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2445        let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2446        reader.next().unwrap()
2447    }
2448
2449    /// Return the first record batch read from the IPC Stream buffer,
2450    /// disabling validation
2451    fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2452        let mut reader = unsafe {
2453            StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2454        };
2455        reader.next().unwrap()
2456    }
2457
2458    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2459        let buf = write_stream(rb);
2460        read_stream(&buf).unwrap()
2461    }
2462
2463    #[test]
2464    fn test_roundtrip_with_custom_metadata() {
2465        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2466        let mut buf = Vec::new();
2467        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2468        let mut test_metadata = HashMap::new();
2469        test_metadata.insert("abc".to_string(), "abc".to_string());
2470        test_metadata.insert("def".to_string(), "def".to_string());
2471        for (k, v) in &test_metadata {
2472            writer.write_metadata(k, v);
2473        }
2474        writer.finish().unwrap();
2475        drop(writer);
2476
2477        let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2478        assert_eq!(reader.custom_metadata(), &test_metadata);
2479    }
2480
2481    #[test]
2482    fn test_roundtrip_nested_dict() {
2483        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2484
2485        let array = Arc::new(inner) as ArrayRef;
2486
2487        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2488
2489        let s = StructArray::from(vec![(dctfield, array)]);
2490        let struct_array = Arc::new(s) as ArrayRef;
2491
2492        let schema = Arc::new(Schema::new(vec![Field::new(
2493            "struct",
2494            struct_array.data_type().clone(),
2495            false,
2496        )]));
2497
2498        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2499
2500        assert_eq!(batch, roundtrip_ipc(&batch));
2501    }
2502
2503    #[test]
2504    fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2505        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2506
2507        let array = Arc::new(inner) as ArrayRef;
2508
2509        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2510
2511        let s = StructArray::from(vec![(dctfield, array)]);
2512        let struct_array = Arc::new(s) as ArrayRef;
2513
2514        let schema = Arc::new(Schema::new(vec![Field::new(
2515            "struct",
2516            struct_array.data_type().clone(),
2517            false,
2518        )]));
2519
2520        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2521
2522        let mut buf = Vec::new();
2523        let mut writer = crate::writer::FileWriter::try_new_with_options(
2524            &mut buf,
2525            batch.schema_ref(),
2526            IpcWriteOptions::default(),
2527        )
2528        .unwrap();
2529        writer.write(&batch).unwrap();
2530        writer.finish().unwrap();
2531        drop(writer);
2532
2533        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2534
2535        assert_eq!(batch, reader.next().unwrap().unwrap());
2536    }
2537
2538    fn check_union_with_builder(mut builder: UnionBuilder) {
2539        builder.append::<Int32Type>("a", 1).unwrap();
2540        builder.append_null::<Int32Type>("a").unwrap();
2541        builder.append::<Float64Type>("c", 3.0).unwrap();
2542        builder.append::<Int32Type>("a", 4).unwrap();
2543        builder.append::<Int64Type>("d", 11).unwrap();
2544        let union = builder.build().unwrap();
2545
2546        let schema = Arc::new(Schema::new(vec![Field::new(
2547            "union",
2548            union.data_type().clone(),
2549            false,
2550        )]));
2551
2552        let union_array = Arc::new(union) as ArrayRef;
2553
2554        let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2555        let rb2 = roundtrip_ipc(&rb);
2556        // TODO: equality not yet implemented for union, so we check that the length of the array is
2557        // the same and that all of the buffers are the same instead.
2558        assert_eq!(rb.schema(), rb2.schema());
2559        assert_eq!(rb.num_columns(), rb2.num_columns());
2560        assert_eq!(rb.num_rows(), rb2.num_rows());
2561        let union1 = rb.column(0);
2562        let union2 = rb2.column(0);
2563
2564        assert_eq!(union1, union2);
2565    }
2566
2567    #[test]
2568    fn test_roundtrip_dense_union() {
2569        check_union_with_builder(UnionBuilder::new_dense());
2570    }
2571
2572    #[test]
2573    fn test_roundtrip_sparse_union() {
2574        check_union_with_builder(UnionBuilder::new_sparse());
2575    }
2576
2577    #[test]
2578    fn test_roundtrip_struct_empty_fields() {
2579        let nulls = NullBuffer::from(&[true, true, false]);
2580        let rb = RecordBatch::try_from_iter([(
2581            "",
2582            Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2583        )])
2584        .unwrap();
2585        let rb2 = roundtrip_ipc(&rb);
2586        assert_eq!(rb, rb2);
2587    }
2588
2589    #[test]
2590    fn test_roundtrip_stream_run_array_sliced() {
2591        let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2592            .into_iter()
2593            .collect();
2594        let run_array_1_sliced = run_array_1.slice(2, 5);
2595
2596        let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2597        let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2598        run_array_2_builder.extend(run_array_2_inupt);
2599        let run_array_2 = run_array_2_builder.finish();
2600
2601        let schema = Arc::new(Schema::new(vec![
2602            Field::new(
2603                "run_array_1_sliced",
2604                run_array_1_sliced.data_type().clone(),
2605                false,
2606            ),
2607            Field::new("run_array_2", run_array_2.data_type().clone(), false),
2608        ]));
2609        let input_batch = RecordBatch::try_new(
2610            schema,
2611            vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2612        )
2613        .unwrap();
2614        let output_batch = roundtrip_ipc_stream(&input_batch);
2615
2616        // As partial comparison not yet supported for run arrays, the sliced run array
2617        // has to be unsliced before comparing with the output. the second run array
2618        // can be compared as such.
2619        assert_eq!(input_batch.column(1), output_batch.column(1));
2620
2621        let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2622        assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2623    }
2624
2625    #[test]
2626    fn test_roundtrip_stream_nested_dict() {
2627        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2628        let dict = Arc::new(
2629            xs.clone()
2630                .into_iter()
2631                .collect::<DictionaryArray<Int8Type>>(),
2632        );
2633        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2634        let struct_array = StructArray::from(vec![
2635            (
2636                Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2637                string_array,
2638            ),
2639            (
2640                Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2641                dict.clone() as ArrayRef,
2642            ),
2643        ]);
2644        let schema = Arc::new(Schema::new(vec![
2645            Field::new("f1_string", DataType::Utf8, false),
2646            Field::new("f2_struct", struct_array.data_type().clone(), false),
2647        ]));
2648        let input_batch = RecordBatch::try_new(
2649            schema,
2650            vec![
2651                Arc::new(StringArray::from(xs.clone())),
2652                Arc::new(struct_array),
2653            ],
2654        )
2655        .unwrap();
2656        let output_batch = roundtrip_ipc_stream(&input_batch);
2657        assert_eq!(input_batch, output_batch);
2658    }
2659
2660    #[test]
2661    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2662        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2663        let values = Arc::new(values) as ArrayRef;
2664        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2665        let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2666
2667        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2668        let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2669
2670        #[allow(deprecated)]
2671        let keys_field = Arc::new(Field::new_dict(
2672            "keys",
2673            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2674            true, // It is technically not legal for this field to be null.
2675            1,
2676            false,
2677        ));
2678        #[allow(deprecated)]
2679        let values_field = Arc::new(Field::new_dict(
2680            "values",
2681            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2682            true,
2683            2,
2684            false,
2685        ));
2686        let entry_struct = StructArray::from(vec![
2687            (keys_field, make_array(key_dict_array.into_data())),
2688            (values_field, make_array(value_dict_array.into_data())),
2689        ]);
2690        let map_data_type = DataType::Map(
2691            Arc::new(Field::new(
2692                "entries",
2693                entry_struct.data_type().clone(),
2694                false,
2695            )),
2696            false,
2697        );
2698
2699        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2700        let map_data = ArrayData::builder(map_data_type)
2701            .len(3)
2702            .add_buffer(entry_offsets)
2703            .add_child_data(entry_struct.into_data())
2704            .build()
2705            .unwrap();
2706        let map_array = MapArray::from(map_data);
2707
2708        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2709        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2710
2711        let schema = Arc::new(Schema::new(vec![Field::new(
2712            "f1",
2713            dict_dict_array.data_type().clone(),
2714            false,
2715        )]));
2716        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2717        let output_batch = roundtrip_ipc_stream(&input_batch);
2718        assert_eq!(input_batch, output_batch);
2719    }
2720
2721    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2722        OffsetSize: OffsetSizeTrait,
2723        U: ArrowNativeType,
2724    >(
2725        list_data_type: DataType,
2726        offsets: &[U; 5],
2727    ) {
2728        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2729        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2730        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2731        let dict_data = dict_array.to_data();
2732
2733        let value_offsets = Buffer::from_slice_ref(offsets);
2734
2735        let list_data = ArrayData::builder(list_data_type)
2736            .len(4)
2737            .add_buffer(value_offsets)
2738            .add_child_data(dict_data)
2739            .build()
2740            .unwrap();
2741        let list_array = GenericListArray::<OffsetSize>::from(list_data);
2742
2743        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2744        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2745
2746        let schema = Arc::new(Schema::new(vec![Field::new(
2747            "f1",
2748            dict_dict_array.data_type().clone(),
2749            false,
2750        )]));
2751        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2752        let output_batch = roundtrip_ipc_stream(&input_batch);
2753        assert_eq!(input_batch, output_batch);
2754    }
2755
2756    #[test]
2757    fn test_roundtrip_stream_dict_of_list_of_dict() {
2758        // list
2759        #[allow(deprecated)]
2760        let list_data_type = DataType::List(Arc::new(Field::new_dict(
2761            "item",
2762            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2763            true,
2764            1,
2765            false,
2766        )));
2767        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2768        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2769
2770        // large list
2771        #[allow(deprecated)]
2772        let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2773            "item",
2774            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2775            true,
2776            1,
2777            false,
2778        )));
2779        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2780        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2781    }
2782
2783    #[test]
2784    fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2785        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2786        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2787        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2788        let dict_data = dict_array.into_data();
2789
2790        #[allow(deprecated)]
2791        let list_data_type = DataType::FixedSizeList(
2792            Arc::new(Field::new_dict(
2793                "item",
2794                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2795                true,
2796                1,
2797                false,
2798            )),
2799            3,
2800        );
2801        let list_data = ArrayData::builder(list_data_type)
2802            .len(3)
2803            .add_child_data(dict_data)
2804            .build()
2805            .unwrap();
2806        let list_array = FixedSizeListArray::from(list_data);
2807
2808        let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2809        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2810
2811        let schema = Arc::new(Schema::new(vec![Field::new(
2812            "f1",
2813            dict_dict_array.data_type().clone(),
2814            false,
2815        )]));
2816        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2817        let output_batch = roundtrip_ipc_stream(&input_batch);
2818        assert_eq!(input_batch, output_batch);
2819    }
2820
2821    const LONG_TEST_STRING: &str =
2822        "This is a long string to make sure binary view array handles it";
2823
2824    #[test]
2825    fn test_roundtrip_view_types() {
2826        let schema = Schema::new(vec![
2827            Field::new("field_1", DataType::BinaryView, true),
2828            Field::new("field_2", DataType::Utf8, true),
2829            Field::new("field_3", DataType::Utf8View, true),
2830        ]);
2831        let bin_values: Vec<Option<&[u8]>> = vec![
2832            Some(b"foo"),
2833            None,
2834            Some(b"bar"),
2835            Some(LONG_TEST_STRING.as_bytes()),
2836        ];
2837        let utf8_values: Vec<Option<&str>> =
2838            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2839        let bin_view_array = BinaryViewArray::from_iter(bin_values);
2840        let utf8_array = StringArray::from_iter(utf8_values.iter());
2841        let utf8_view_array = StringViewArray::from_iter(utf8_values);
2842        let record_batch = RecordBatch::try_new(
2843            Arc::new(schema.clone()),
2844            vec![
2845                Arc::new(bin_view_array),
2846                Arc::new(utf8_array),
2847                Arc::new(utf8_view_array),
2848            ],
2849        )
2850        .unwrap();
2851
2852        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2853        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2854
2855        let sliced_batch = record_batch.slice(1, 2);
2856        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2857        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2858    }
2859
2860    #[test]
2861    fn test_roundtrip_view_types_nested_dict() {
2862        let bin_values: Vec<Option<&[u8]>> = vec![
2863            Some(b"foo"),
2864            None,
2865            Some(b"bar"),
2866            Some(LONG_TEST_STRING.as_bytes()),
2867            Some(b"field"),
2868        ];
2869        let utf8_values: Vec<Option<&str>> = vec![
2870            Some("foo"),
2871            None,
2872            Some("bar"),
2873            Some(LONG_TEST_STRING),
2874            Some("field"),
2875        ];
2876        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2877        let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2878
2879        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2880        let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2881        #[allow(deprecated)]
2882        let keys_field = Arc::new(Field::new_dict(
2883            "keys",
2884            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2885            true,
2886            1,
2887            false,
2888        ));
2889
2890        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2891        let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2892        #[allow(deprecated)]
2893        let values_field = Arc::new(Field::new_dict(
2894            "values",
2895            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2896            true,
2897            2,
2898            false,
2899        ));
2900        let entry_struct = StructArray::from(vec![
2901            (keys_field, make_array(key_dict_array.into_data())),
2902            (values_field, make_array(value_dict_array.into_data())),
2903        ]);
2904
2905        let map_data_type = DataType::Map(
2906            Arc::new(Field::new(
2907                "entries",
2908                entry_struct.data_type().clone(),
2909                false,
2910            )),
2911            false,
2912        );
2913        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2914        let map_data = ArrayData::builder(map_data_type)
2915            .len(3)
2916            .add_buffer(entry_offsets)
2917            .add_child_data(entry_struct.into_data())
2918            .build()
2919            .unwrap();
2920        let map_array = MapArray::from(map_data);
2921
2922        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2923        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2924        let schema = Arc::new(Schema::new(vec![Field::new(
2925            "f1",
2926            dict_dict_array.data_type().clone(),
2927            false,
2928        )]));
2929        let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2930        assert_eq!(batch, roundtrip_ipc(&batch));
2931        assert_eq!(batch, roundtrip_ipc_stream(&batch));
2932
2933        let sliced_batch = batch.slice(1, 2);
2934        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2935        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2936    }
2937
2938    #[test]
2939    fn test_no_columns_batch() {
2940        let schema = Arc::new(Schema::empty());
2941        let options = RecordBatchOptions::new()
2942            .with_match_field_names(true)
2943            .with_row_count(Some(10));
2944        let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2945        let output_batch = roundtrip_ipc_stream(&input_batch);
2946        assert_eq!(input_batch, output_batch);
2947    }
2948
2949    #[test]
2950    fn test_unaligned() {
2951        let batch = RecordBatch::try_from_iter(vec![(
2952            "i32",
2953            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2954        )])
2955        .unwrap();
2956
2957        let r#gen = IpcDataGenerator {};
2958        let mut dict_tracker = DictionaryTracker::new(false);
2959        let (_, encoded) = r#gen
2960            .encode(
2961                &batch,
2962                &mut dict_tracker,
2963                &Default::default(),
2964                &mut Default::default(),
2965            )
2966            .unwrap();
2967
2968        let message = root_as_message(&encoded.ipc_message).unwrap();
2969
2970        // Construct an unaligned buffer
2971        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2972        buffer.push(0_u8);
2973        buffer.extend_from_slice(&encoded.arrow_data);
2974        let b = Buffer::from(buffer).slice(1);
2975        assert_ne!(b.as_ptr().align_offset(8), 0);
2976
2977        let ipc_batch = message.header_as_record_batch().unwrap();
2978        let roundtrip = RecordBatchDecoder::try_new(
2979            &b,
2980            ipc_batch,
2981            batch.schema(),
2982            &Default::default(),
2983            &message.version(),
2984        )
2985        .unwrap()
2986        .with_require_alignment(false)
2987        .read_record_batch()
2988        .unwrap();
2989        assert_eq!(batch, roundtrip);
2990    }
2991
2992    #[test]
2993    fn test_unaligned_throws_error_with_require_alignment() {
2994        let batch = RecordBatch::try_from_iter(vec![(
2995            "i32",
2996            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2997        )])
2998        .unwrap();
2999
3000        let r#gen = IpcDataGenerator {};
3001        let mut dict_tracker = DictionaryTracker::new(false);
3002        let (_, encoded) = r#gen
3003            .encode(
3004                &batch,
3005                &mut dict_tracker,
3006                &Default::default(),
3007                &mut Default::default(),
3008            )
3009            .unwrap();
3010
3011        let message = root_as_message(&encoded.ipc_message).unwrap();
3012
3013        // Construct an unaligned buffer
3014        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
3015        buffer.push(0_u8);
3016        buffer.extend_from_slice(&encoded.arrow_data);
3017        let b = Buffer::from(buffer).slice(1);
3018        assert_ne!(b.as_ptr().align_offset(8), 0);
3019
3020        let ipc_batch = message.header_as_record_batch().unwrap();
3021        let result = RecordBatchDecoder::try_new(
3022            &b,
3023            ipc_batch,
3024            batch.schema(),
3025            &Default::default(),
3026            &message.version(),
3027        )
3028        .unwrap()
3029        .with_require_alignment(true)
3030        .read_record_batch();
3031
3032        let error = result.unwrap_err();
3033        assert_eq!(
3034            error.to_string(),
3035            "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
3036             offset from expected alignment of 4 by 1"
3037        );
3038    }
3039
3040    #[test]
3041    fn test_file_with_massive_column_count() {
3042        // 499_999 is upper limit for default settings (1_000_000)
3043        let limit = 600_000;
3044
3045        let fields = (0..limit)
3046            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
3047            .collect::<Vec<_>>();
3048        let schema = Arc::new(Schema::new(fields));
3049        let batch = RecordBatch::new_empty(schema);
3050
3051        let mut buf = Vec::new();
3052        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3053        writer.write(&batch).unwrap();
3054        writer.finish().unwrap();
3055        drop(writer);
3056
3057        let mut reader = FileReaderBuilder::new()
3058            .with_max_footer_fb_tables(1_500_000)
3059            .build(std::io::Cursor::new(buf))
3060            .unwrap();
3061        let roundtrip_batch = reader.next().unwrap().unwrap();
3062
3063        assert_eq!(batch, roundtrip_batch);
3064    }
3065
3066    #[test]
3067    fn test_file_with_deeply_nested_columns() {
3068        // 60 is upper limit for default settings (64)
3069        let limit = 61;
3070
3071        let fields = (0..limit).fold(
3072            vec![Field::new("leaf", DataType::Boolean, false)],
3073            |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
3074        );
3075        let schema = Arc::new(Schema::new(fields));
3076        let batch = RecordBatch::new_empty(schema);
3077
3078        let mut buf = Vec::new();
3079        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3080        writer.write(&batch).unwrap();
3081        writer.finish().unwrap();
3082        drop(writer);
3083
3084        let mut reader = FileReaderBuilder::new()
3085            .with_max_footer_fb_depth(65)
3086            .build(std::io::Cursor::new(buf))
3087            .unwrap();
3088        let roundtrip_batch = reader.next().unwrap().unwrap();
3089
3090        assert_eq!(batch, roundtrip_batch);
3091    }
3092
3093    #[test]
3094    fn test_invalid_struct_array_ipc_read_errors() {
3095        let a_field = Field::new("a", DataType::Int32, false);
3096        let b_field = Field::new("b", DataType::Int32, false);
3097        let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
3098
3099        let a_array_data = ArrayData::builder(a_field.data_type().clone())
3100            .len(4)
3101            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3102            .build()
3103            .unwrap();
3104        let b_array_data = ArrayData::builder(b_field.data_type().clone())
3105            .len(3)
3106            .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
3107            .build()
3108            .unwrap();
3109
3110        let invalid_struct_arr = unsafe {
3111            StructArray::new_unchecked(
3112                struct_fields,
3113                vec![make_array(a_array_data), make_array(b_array_data)],
3114                None,
3115            )
3116        };
3117
3118        expect_ipc_validation_error(
3119            Arc::new(invalid_struct_arr),
3120            "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
3121        );
3122    }
3123
3124    #[test]
3125    fn test_invalid_nested_array_ipc_read_errors() {
3126        // one of the nested arrays has invalid data
3127        let a_field = Field::new("a", DataType::Int32, false);
3128        let b_field = Field::new("b", DataType::Utf8, false);
3129
3130        let schema = Arc::new(Schema::new(vec![Field::new_struct(
3131            "s",
3132            vec![a_field.clone(), b_field.clone()],
3133            false,
3134        )]));
3135
3136        let a_array_data = ArrayData::builder(a_field.data_type().clone())
3137            .len(4)
3138            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3139            .build()
3140            .unwrap();
3141        // invalid nested child array -- length is correct, but has invalid utf8 data
3142        let b_array_data = {
3143            let valid: &[u8] = b"   ";
3144            let mut invalid = vec![];
3145            invalid.extend_from_slice(b"ValidString");
3146            invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3147            let binary_array =
3148                BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3149            let array = unsafe {
3150                StringArray::new_unchecked(
3151                    binary_array.offsets().clone(),
3152                    binary_array.values().clone(),
3153                    binary_array.nulls().cloned(),
3154                )
3155            };
3156            array.into_data()
3157        };
3158        let struct_data_type = schema.field(0).data_type();
3159
3160        let invalid_struct_arr = unsafe {
3161            make_array(
3162                ArrayData::builder(struct_data_type.clone())
3163                    .len(4)
3164                    .add_child_data(a_array_data)
3165                    .add_child_data(b_array_data)
3166                    .build_unchecked(),
3167            )
3168        };
3169        expect_ipc_validation_error(
3170            invalid_struct_arr,
3171            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3172        );
3173    }
3174
3175    #[test]
3176    fn test_same_dict_id_without_preserve() {
3177        let batch = RecordBatch::try_new(
3178            Arc::new(Schema::new(
3179                ["a", "b"]
3180                    .iter()
3181                    .map(|name| {
3182                        #[allow(deprecated)]
3183                        Field::new_dict(
3184                            name.to_string(),
3185                            DataType::Dictionary(
3186                                Box::new(DataType::Int32),
3187                                Box::new(DataType::Utf8),
3188                            ),
3189                            true,
3190                            0,
3191                            false,
3192                        )
3193                    })
3194                    .collect::<Vec<Field>>(),
3195            )),
3196            vec![
3197                Arc::new(
3198                    vec![Some("c"), Some("d")]
3199                        .into_iter()
3200                        .collect::<DictionaryArray<Int32Type>>(),
3201                ) as ArrayRef,
3202                Arc::new(
3203                    vec![Some("e"), Some("f")]
3204                        .into_iter()
3205                        .collect::<DictionaryArray<Int32Type>>(),
3206                ) as ArrayRef,
3207            ],
3208        )
3209        .expect("Failed to create RecordBatch");
3210
3211        // serialize the record batch as an IPC stream
3212        let mut buf = vec![];
3213        {
3214            let mut writer = crate::writer::StreamWriter::try_new_with_options(
3215                &mut buf,
3216                batch.schema().as_ref(),
3217                crate::writer::IpcWriteOptions::default(),
3218            )
3219            .expect("Failed to create StreamWriter");
3220            writer.write(&batch).expect("Failed to write RecordBatch");
3221            writer.finish().expect("Failed to finish StreamWriter");
3222        }
3223
3224        StreamReader::try_new(std::io::Cursor::new(buf), None)
3225            .expect("Failed to create StreamReader")
3226            .for_each(|decoded_batch| {
3227                assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3228            });
3229    }
3230
3231    #[test]
3232    fn test_validation_of_invalid_list_array() {
3233        // ListArray with invalid offsets
3234        let array = unsafe {
3235            let values = Int32Array::from(vec![1, 2, 3]);
3236            let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); // offsets can't go backwards
3237            let offsets = OffsetBuffer::new_unchecked(bad_offsets); // INVALID array created
3238            let field = Field::new_list_field(DataType::Int32, true);
3239            let nulls = None;
3240            ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3241        };
3242
3243        expect_ipc_validation_error(
3244            Arc::new(array),
3245            "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3246        );
3247    }
3248
3249    #[test]
3250    fn test_validation_of_invalid_string_array() {
3251        let valid: &[u8] = b"   ";
3252        let mut invalid = vec![];
3253        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3254        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3255        let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3256        // data is not valid utf8 we can not construct a correct StringArray
3257        // safely, so purposely create an invalid StringArray
3258        let array = unsafe {
3259            StringArray::new_unchecked(
3260                binary_array.offsets().clone(),
3261                binary_array.values().clone(),
3262                binary_array.nulls().cloned(),
3263            )
3264        };
3265        expect_ipc_validation_error(
3266            Arc::new(array),
3267            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3268        );
3269    }
3270
3271    #[test]
3272    fn test_validation_of_invalid_string_view_array() {
3273        let valid: &[u8] = b"   ";
3274        let mut invalid = vec![];
3275        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3276        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3277        let binary_view_array =
3278            BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3279        // data is not valid utf8 we can not construct a correct StringArray
3280        // safely, so purposely create an invalid StringArray
3281        let array = unsafe {
3282            StringViewArray::new_unchecked(
3283                binary_view_array.views().clone(),
3284                binary_view_array.data_buffers().to_vec(),
3285                binary_view_array.nulls().cloned(),
3286            )
3287        };
3288        expect_ipc_validation_error(
3289            Arc::new(array),
3290            "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3291        );
3292    }
3293
3294    /// return an invalid dictionary array (key is larger than values)
3295    /// ListArray with invalid offsets
3296    #[test]
3297    fn test_validation_of_invalid_dictionary_array() {
3298        let array = unsafe {
3299            let values = StringArray::from_iter_values(["a", "b", "c"]);
3300            let keys = Int32Array::from(vec![1, 200]); // keys are not valid for values
3301            DictionaryArray::new_unchecked(keys, Arc::new(values))
3302        };
3303
3304        expect_ipc_validation_error(
3305            Arc::new(array),
3306            "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3307        );
3308    }
3309
3310    #[test]
3311    fn test_validation_of_invalid_union_array() {
3312        let array = unsafe {
3313            let fields = UnionFields::try_new(
3314                vec![1, 3], // typeids : type id 2 is not valid
3315                vec![
3316                    Field::new("a", DataType::Int32, false),
3317                    Field::new("b", DataType::Utf8, false),
3318                ],
3319            )
3320            .unwrap();
3321            let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); // 2 is invalid
3322            let offsets = None;
3323            let children: Vec<ArrayRef> = vec![
3324                Arc::new(Int32Array::from(vec![10, 20, 30])),
3325                Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3326            ];
3327
3328            UnionArray::new_unchecked(fields, type_ids, offsets, children)
3329        };
3330
3331        expect_ipc_validation_error(
3332            Arc::new(array),
3333            "Invalid argument error: Type Ids values must match one of the field type ids",
3334        );
3335    }
3336
3337    /// Invalid Utf-8 sequence in the first character
3338    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3339    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3340
3341    /// Expect an error when reading the record batch using IPC or IPC Streams
3342    fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3343        let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3344
3345        // IPC Stream format
3346        let buf = write_stream(&rb); // write is ok
3347        read_stream_skip_validation(&buf).unwrap();
3348        let err = read_stream(&buf).unwrap_err();
3349        assert_eq!(err.to_string(), expected_err);
3350
3351        // IPC File format
3352        let buf = write_ipc(&rb); // write is ok
3353        read_ipc_skip_validation(&buf).unwrap();
3354        let err = read_ipc(&buf).unwrap_err();
3355        assert_eq!(err.to_string(), expected_err);
3356
3357        // IPC Format with FileDecoder
3358        read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3359        let err = read_ipc_with_decoder(buf).unwrap_err();
3360        assert_eq!(err.to_string(), expected_err);
3361    }
3362
3363    #[test]
3364    fn test_roundtrip_schema() {
3365        let schema = Schema::new(vec![
3366            Field::new(
3367                "a",
3368                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3369                false,
3370            ),
3371            Field::new(
3372                "b",
3373                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3374                false,
3375            ),
3376        ]);
3377
3378        let options = IpcWriteOptions::default();
3379        let data_gen = IpcDataGenerator::default();
3380        let mut dict_tracker = DictionaryTracker::new(false);
3381        let encoded_data =
3382            data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3383        let mut schema_bytes = vec![];
3384        write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3385
3386        let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3387            4
3388        } else {
3389            0
3390        };
3391
3392        size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3393            .expect_err("size_prefixed_root_as_message");
3394
3395        let msg = parse_message(&schema_bytes).expect("parse_message");
3396        let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3397        let new_schema = fb_to_schema(ipc_schema);
3398
3399        assert_eq!(schema, new_schema);
3400    }
3401
3402    #[test]
3403    fn test_negative_meta_len() {
3404        let bytes = i32::to_le_bytes(-1);
3405        let mut buf = vec![];
3406        buf.extend(CONTINUATION_MARKER);
3407        buf.extend(bytes);
3408
3409        let reader = StreamReader::try_new(Cursor::new(buf), None);
3410        assert!(reader.is_err());
3411    }
3412
3413    /// Per the IPC specification, dictionary batches may be omitted for
3414    /// dictionary-encoded columns where all values are null.  The C++
3415    /// implementation relies on this and does not emit a dictionary batch
3416    /// in that case.  Verify that the Rust reader handles such streams
3417    /// by synthesizing an empty dictionary instead of returning an error.
3418    #[test]
3419    fn test_read_null_dict_without_dictionary_batch() {
3420        // Build an all-null dictionary-encoded column.
3421        let keys = Int32Array::new_null(4);
3422        let values: ArrayRef = new_empty_array(&DataType::Utf8);
3423        let dict_array = DictionaryArray::new(keys, values);
3424
3425        let schema = Arc::new(Schema::new(vec![Field::new(
3426            "d",
3427            dict_array.data_type().clone(),
3428            true,
3429        )]));
3430        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap();
3431
3432        // Write a normal IPC stream (which includes the dictionary batch).
3433        let full_stream = write_stream(&batch);
3434
3435        // Parse the stream into individual messages and reconstruct it
3436        // without the DictionaryBatch message, simulating what C++ emits
3437        // for an all-null dictionary column.
3438        let mut stripped = Vec::new();
3439        let mut cursor = Cursor::new(&full_stream);
3440        loop {
3441            // Each message is: [continuation (4 bytes)] [meta_len (4 bytes)]
3442            //                   [metadata (meta_len bytes)] [body (bodyLength bytes)]
3443            let mut header = [0u8; 4];
3444            if cursor.read_exact(&mut header).is_err() {
3445                break;
3446            }
3447            if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() {
3448                break;
3449            }
3450            let meta_len = u32::from_le_bytes(header) as usize;
3451            if meta_len == 0 {
3452                // EOS marker — write it through.
3453                stripped.extend_from_slice(&CONTINUATION_MARKER);
3454                stripped.extend_from_slice(&0u32.to_le_bytes());
3455                break;
3456            }
3457            let mut meta_buf = vec![0u8; meta_len];
3458            cursor.read_exact(&mut meta_buf).unwrap();
3459
3460            let message = root_as_message(&meta_buf).unwrap();
3461            let body_len = message.bodyLength() as usize;
3462            let mut body_buf = vec![0u8; body_len];
3463            cursor.read_exact(&mut body_buf).unwrap();
3464
3465            if message.header_type() == crate::MessageHeader::DictionaryBatch {
3466                // Skip the dictionary batch — this is what C++ does for
3467                // all-null dictionary columns.
3468                continue;
3469            }
3470            stripped.extend_from_slice(&CONTINUATION_MARKER);
3471            stripped.extend_from_slice(&(meta_len as u32).to_le_bytes());
3472            stripped.extend_from_slice(&meta_buf);
3473            stripped.extend_from_slice(&body_buf);
3474        }
3475
3476        // Reading the stripped stream must succeed.
3477        let result = read_stream(&stripped).unwrap();
3478        assert_eq!(result.num_rows(), 4);
3479        assert_eq!(result.num_columns(), 1);
3480
3481        let col = result.column(0);
3482        assert_eq!(col.null_count(), 4);
3483        assert_eq!(col.len(), 4);
3484        // The result must be a dictionary-typed array.
3485        assert!(matches!(col.data_type(), DataType::Dictionary(_, _)));
3486    }
3487
3488    // Tests projected reads where a ListView column is skipped before another column.
3489    // This catches cases where skipping the ListView consumes the wrong number of buffers.
3490    #[test]
3491    fn test_projection_skip_list_view() {
3492        use crate::reader::FileReader;
3493        use crate::writer::FileWriter;
3494        use arrow_array::{
3495            GenericListViewArray, Int32Array, RecordBatch,
3496            builder::{GenericListViewBuilder, UInt32Builder},
3497        };
3498        use arrow_schema::{DataType, Field, Schema};
3499        use std::sync::Arc;
3500
3501        // Build a small ListView column with a mix of valid and null entries
3502        let mut builder = GenericListViewBuilder::<i32, _>::new(UInt32Builder::new());
3503
3504        builder.values().append_value(1);
3505        builder.values().append_value(2);
3506        builder.append(true);
3507
3508        builder.append(false);
3509
3510        builder.values().append_value(3);
3511        builder.values().append_value(4);
3512        builder.append(true);
3513
3514        let list_view: GenericListViewArray<i32> = builder.finish();
3515
3516        // Second column with simple values
3517        let values = Int32Array::from(vec![10, 20, 30]);
3518
3519        // Schema: first column is ListView, second is Int32
3520        let schema = Arc::new(Schema::new(vec![
3521            Field::new("a", list_view.data_type().clone(), true),
3522            Field::new("b", DataType::Int32, false),
3523        ]));
3524        // Create a batch with both columns
3525        let batch =
3526            RecordBatch::try_new(schema, vec![Arc::new(list_view), Arc::new(values.clone())])
3527                .unwrap();
3528
3529        // Write the batch to IPC
3530        let mut buf = Vec::new();
3531        {
3532            let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3533            writer.write(&batch).unwrap();
3534            writer.finish().unwrap();
3535        }
3536
3537        // Skip ListView column and Project only column "b"
3538        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3539        let read_batch = reader.next().unwrap().unwrap();
3540
3541        // Verify that the projected column is read correctly
3542        assert_eq!(read_batch.num_columns(), 1);
3543        assert_eq!(read_batch.column(0).as_ref(), &values);
3544    }
3545
3546    // Tests reading a column when preceding fixed-width and boolean columns are skipped.
3547    // Covers all types that use the same two-buffer layout (null + values).
3548    // Verifies that skipping these types does not affect subsequent column decoding.
3549    #[test]
3550    fn test_projection_skip_fixed_width_types() {
3551        use std::sync::Arc;
3552
3553        use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, make_array};
3554        use arrow_buffer::Buffer;
3555        use arrow_data::ArrayData;
3556        use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
3557
3558        use crate::reader::FileReader;
3559        use crate::writer::FileWriter;
3560
3561        // Create a minimal array for a given fixed-width or boolean type
3562        fn make_array_for_type(data_type: DataType) -> ArrayRef {
3563            let len = 3;
3564
3565            if matches!(data_type, DataType::Boolean) {
3566                return Arc::new(BooleanArray::from(vec![true, false, true]));
3567            }
3568
3569            let width = data_type.primitive_width().unwrap();
3570            let data = ArrayData::builder(data_type)
3571                .len(len)
3572                .add_buffer(Buffer::from(vec![0_u8; len * width]))
3573                .build()
3574                .unwrap();
3575
3576            make_array(data)
3577        }
3578
3579        // List of types that follow the same two-buffer layout (null + values)
3580        let data_types = vec![
3581            DataType::Boolean,
3582            DataType::Int8,
3583            DataType::Int16,
3584            DataType::Int32,
3585            DataType::Int64,
3586            DataType::UInt8,
3587            DataType::UInt16,
3588            DataType::UInt32,
3589            DataType::UInt64,
3590            DataType::Float16,
3591            DataType::Float32,
3592            DataType::Float64,
3593            DataType::Timestamp(TimeUnit::Second, None),
3594            DataType::Date32,
3595            DataType::Date64,
3596            DataType::Time32(TimeUnit::Second),
3597            DataType::Time64(TimeUnit::Microsecond),
3598            DataType::Duration(TimeUnit::Second),
3599            DataType::Interval(IntervalUnit::YearMonth),
3600            DataType::Interval(IntervalUnit::DayTime),
3601            DataType::Interval(IntervalUnit::MonthDayNano),
3602            DataType::Decimal32(9, 2),
3603            DataType::Decimal64(18, 2),
3604            DataType::Decimal128(38, 2),
3605            DataType::Decimal256(76, 2),
3606        ];
3607
3608        // For each type:
3609        // - write a batch with [skipped_column, values]
3610        // - read only the second column
3611        // - verify the result is correct
3612        for data_type in data_types {
3613            let skipped = make_array_for_type(data_type.clone());
3614            let values = Int32Array::from(vec![10, 20, 30]);
3615
3616            let schema = Arc::new(Schema::new(vec![
3617                Field::new("skipped", data_type, false),
3618                Field::new("values", DataType::Int32, false),
3619            ]));
3620
3621            let batch =
3622                RecordBatch::try_new(schema, vec![skipped, Arc::new(values.clone())]).unwrap();
3623
3624            // Serialize the batch into IPC format
3625            let mut buf = Vec::new();
3626            {
3627                let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3628                writer.write(&batch).unwrap();
3629                writer.finish().unwrap();
3630            }
3631
3632            // Read back only the second column (skip the first)
3633            let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3634            let read_batch = reader.next().unwrap().unwrap();
3635
3636            // Verify that the returned column matches the original values column
3637            assert_eq!(read_batch.num_columns(), 1);
3638            assert_eq!(read_batch.column(0).as_ref(), &values);
3639        }
3640    }
3641}