Skip to main content

arrow_ipc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Readers
19//!
20//! # Notes
21//!
22//! The [`FileReader`] and [`StreamReader`] have similar interfaces,
23//! however the [`FileReader`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::{CompressionCodec, DecompressionContext};
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50/// Read a buffer based on offset and length
51/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
52/// Each constituent buffer is first compressed with the indicated
53/// compressor, and then written with the uncompressed length in the first 8
54/// bytes as a 64-bit little-endian signed integer followed by the compressed
55/// buffer bytes (and then padding as required by the protocol). The
56/// uncompressed length may be set to -1 to indicate that the data that
57/// follows is not compressed, which can be useful for cases where
58/// compression does not yield appreciable savings.
59fn read_buffer(
60    buf: &crate::Buffer,
61    a_data: &Buffer,
62    compression_codec: Option<CompressionCodec>,
63    decompression_context: &mut DecompressionContext,
64) -> Result<Buffer, ArrowError> {
65    let start_offset = buf.offset() as usize;
66    let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
67    // corner case: empty buffer
68    match (buf_data.is_empty(), compression_codec) {
69        (true, _) | (_, None) => Ok(buf_data),
70        (false, Some(decompressor)) => {
71            decompressor.decompress_to_buffer(&buf_data, decompression_context)
72        }
73    }
74}
75impl RecordBatchDecoder<'_> {
76    /// Coordinates reading arrays based on data types.
77    ///
78    /// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
79    /// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
80    ///
81    /// Notes:
82    /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
83    /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
84    ///   We thus:
85    ///     - check if the bit width of non-64-bit numbers is 64, and
86    ///     - read the buffer as 64-bit (signed integer or float), and
87    ///     - cast the 64-bit array to the appropriate data type
88    fn create_array(
89        &mut self,
90        field: &Field,
91        variadic_counts: &mut VecDeque<i64>,
92    ) -> Result<ArrayRef, ArrowError> {
93        let data_type = field.data_type();
94        match data_type {
95            Utf8 | Binary | LargeBinary | LargeUtf8 => {
96                let field_node = self.next_node(field)?;
97                let buffers = [
98                    self.next_buffer()?,
99                    self.next_buffer()?,
100                    self.next_buffer()?,
101                ];
102                self.create_primitive_array(field_node, data_type, &buffers)
103            }
104            BinaryView | Utf8View => {
105                let count = variadic_counts
106                    .pop_front()
107                    .ok_or(ArrowError::IpcError(format!(
108                        "Missing variadic count for {data_type} column"
109                    )))?;
110                let count = count + 2; // view and null buffer.
111                let buffers = (0..count)
112                    .map(|_| self.next_buffer())
113                    .collect::<Result<Vec<_>, _>>()?;
114                let field_node = self.next_node(field)?;
115                self.create_primitive_array(field_node, data_type, &buffers)
116            }
117            FixedSizeBinary(_) => {
118                let field_node = self.next_node(field)?;
119                let buffers = [self.next_buffer()?, self.next_buffer()?];
120                self.create_primitive_array(field_node, data_type, &buffers)
121            }
122            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
123                let list_node = self.next_node(field)?;
124                let list_buffers = [self.next_buffer()?, self.next_buffer()?];
125                let values = self.create_array(list_field, variadic_counts)?;
126                self.create_list_array(list_node, data_type, &list_buffers, values)
127            }
128            ListView(list_field) | LargeListView(list_field) => {
129                let list_node = self.next_node(field)?;
130                let list_buffers = [
131                    self.next_buffer()?, // null buffer
132                    self.next_buffer()?, // offsets
133                    self.next_buffer()?, // sizes
134                ];
135                let values = self.create_array(list_field, variadic_counts)?;
136                self.create_list_view_array(list_node, data_type, &list_buffers, values)
137            }
138            FixedSizeList(list_field, _) => {
139                let list_node = self.next_node(field)?;
140                let list_buffers = [self.next_buffer()?];
141                let values = self.create_array(list_field, variadic_counts)?;
142                self.create_list_array(list_node, data_type, &list_buffers, values)
143            }
144            Struct(struct_fields) => {
145                let struct_node = self.next_node(field)?;
146                let null_buffer = self.next_buffer()?;
147
148                // read the arrays for each field
149                let mut struct_arrays = vec![];
150                // TODO investigate whether just knowing the number of buffers could
151                // still work
152                for struct_field in struct_fields {
153                    let child = self.create_array(struct_field, variadic_counts)?;
154                    struct_arrays.push(child);
155                }
156                self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
157            }
158            RunEndEncoded(run_ends_field, values_field) => {
159                let run_node = self.next_node(field)?;
160                let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161                let values = self.create_array(values_field, variadic_counts)?;
162
163                let run_array_length = run_node.length() as usize;
164                let builder = ArrayData::builder(data_type.clone())
165                    .len(run_array_length)
166                    .offset(0)
167                    .add_child_data(run_ends.into_data())
168                    .add_child_data(values.into_data())
169                    .null_count(run_node.null_count() as usize);
170
171                self.create_array_from_builder(builder)
172            }
173            // Create dictionary array from RecordBatch
174            Dictionary(_, _) => {
175                let index_node = self.next_node(field)?;
176                let index_buffers = [self.next_buffer()?, self.next_buffer()?];
177
178                #[allow(deprecated)]
179                let dict_id = field.dict_id().ok_or_else(|| {
180                    ArrowError::ParseError(format!("Field {field} does not have dict id"))
181                })?;
182
183                let value_array = match self.dictionaries_by_id.get(&dict_id) {
184                    Some(array) => array.clone(),
185                    None => {
186                        // Per the IPC spec, dictionary batches may be omitted when all
187                        // values in the column are null. In that case we synthesize an
188                        // empty values array so decoding can proceed.
189                        if let Dictionary(_, value_type) = data_type {
190                            arrow_array::new_empty_array(value_type.as_ref())
191                        } else {
192                            unreachable!()
193                        }
194                    }
195                };
196
197                self.create_dictionary_array(index_node, data_type, &index_buffers, value_array)
198            }
199            Union(fields, mode) => {
200                let union_node = self.next_node(field)?;
201                let len = union_node.length() as usize;
202
203                // In V4, union types has validity bitmap
204                // In V5 and later, union types have no validity bitmap
205                if self.version < MetadataVersion::V5 {
206                    self.next_buffer()?;
207                }
208
209                let type_ids: ScalarBuffer<i8> =
210                    self.next_buffer()?.slice_with_length(0, len).into();
211
212                let value_offsets = match mode {
213                    UnionMode::Dense => {
214                        let offsets: ScalarBuffer<i32> =
215                            self.next_buffer()?.slice_with_length(0, len * 4).into();
216                        Some(offsets)
217                    }
218                    UnionMode::Sparse => None,
219                };
220
221                let mut children = Vec::with_capacity(fields.len());
222
223                for (_id, field) in fields.iter() {
224                    let child = self.create_array(field, variadic_counts)?;
225                    children.push(child);
226                }
227
228                let array = if self.skip_validation.get() {
229                    // safety: flag can only be set via unsafe code
230                    unsafe {
231                        UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
232                    }
233                } else {
234                    UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
235                };
236                Ok(Arc::new(array))
237            }
238            Null => {
239                let node = self.next_node(field)?;
240                let length = node.length();
241                let null_count = node.null_count();
242
243                if length != null_count {
244                    return Err(ArrowError::SchemaError(format!(
245                        "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
246                    )));
247                }
248
249                let builder = ArrayData::builder(data_type.clone())
250                    .len(length as usize)
251                    .offset(0);
252                self.create_array_from_builder(builder)
253            }
254            _ => {
255                let field_node = self.next_node(field)?;
256                let buffers = [self.next_buffer()?, self.next_buffer()?];
257                self.create_primitive_array(field_node, data_type, &buffers)
258            }
259        }
260    }
261
262    /// Reads the correct number of buffers based on data type and null_count, and creates a
263    /// primitive array ref
264    fn create_primitive_array(
265        &self,
266        field_node: &FieldNode,
267        data_type: &DataType,
268        buffers: &[Buffer],
269    ) -> Result<ArrayRef, ArrowError> {
270        let length = field_node.length() as usize;
271        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
272        let mut builder = match data_type {
273            Utf8 | Binary | LargeBinary | LargeUtf8 => {
274                // read 3 buffers: null buffer (optional), offsets buffer and data buffer
275                ArrayData::builder(data_type.clone())
276                    .len(length)
277                    .buffers(buffers[1..3].to_vec())
278                    .null_bit_buffer(null_buffer)
279            }
280            BinaryView | Utf8View => ArrayData::builder(data_type.clone())
281                .len(length)
282                .buffers(buffers[1..].to_vec())
283                .null_bit_buffer(null_buffer),
284            _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
285                // read 2 buffers: null buffer (optional) and data buffer
286                ArrayData::builder(data_type.clone())
287                    .len(length)
288                    .add_buffer(buffers[1].clone())
289                    .null_bit_buffer(null_buffer)
290            }
291            t => unreachable!("Data type {:?} either unsupported or not primitive", t),
292        };
293
294        builder = builder.null_count(field_node.null_count() as usize);
295
296        self.create_array_from_builder(builder)
297    }
298
299    /// Update the ArrayDataBuilder based on settings in this decoder
300    fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
301        let mut builder = builder.align_buffers(!self.require_alignment);
302        if self.skip_validation.get() {
303            // SAFETY: flag can only be set via unsafe code
304            unsafe { builder = builder.skip_validation(true) }
305        };
306        Ok(make_array(builder.build()?))
307    }
308
309    /// Reads the correct number of buffers based on list type and null_count, and creates a
310    /// list array ref
311    fn create_list_array(
312        &self,
313        field_node: &FieldNode,
314        data_type: &DataType,
315        buffers: &[Buffer],
316        child_array: ArrayRef,
317    ) -> Result<ArrayRef, ArrowError> {
318        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
319        let length = field_node.length() as usize;
320        let child_data = child_array.into_data();
321        let mut builder = match data_type {
322            List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
323                .len(length)
324                .add_buffer(buffers[1].clone())
325                .add_child_data(child_data)
326                .null_bit_buffer(null_buffer),
327
328            FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
329                .len(length)
330                .add_child_data(child_data)
331                .null_bit_buffer(null_buffer),
332
333            _ => unreachable!("Cannot create list or map array from {:?}", data_type),
334        };
335
336        builder = builder.null_count(field_node.null_count() as usize);
337
338        self.create_array_from_builder(builder)
339    }
340
341    fn create_list_view_array(
342        &self,
343        field_node: &FieldNode,
344        data_type: &DataType,
345        buffers: &[Buffer],
346        child_array: ArrayRef,
347    ) -> Result<ArrayRef, ArrowError> {
348        assert!(matches!(data_type, ListView(_) | LargeListView(_)));
349
350        let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
351        let length = field_node.length() as usize;
352        let child_data = child_array.into_data();
353
354        self.create_array_from_builder(
355            ArrayData::builder(data_type.clone())
356                .len(length)
357                .add_buffer(buffers[1].clone()) // offsets
358                .add_buffer(buffers[2].clone()) // sizes
359                .add_child_data(child_data)
360                .null_bit_buffer(null_buffer)
361                .null_count(field_node.null_count() as usize),
362        )
363    }
364
365    fn create_struct_array(
366        &self,
367        struct_node: &FieldNode,
368        null_buffer: Buffer,
369        struct_fields: &Fields,
370        struct_arrays: Vec<ArrayRef>,
371    ) -> Result<ArrayRef, ArrowError> {
372        let null_count = struct_node.null_count() as usize;
373        let len = struct_node.length() as usize;
374        let skip_validation = self.skip_validation.get();
375
376        let nulls = if null_count > 0 {
377            let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
378            let null_buffer = if skip_validation {
379                // safety: flag can only be set via unsafe code
380                unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
381            } else {
382                let null_buffer = NullBuffer::new(validity_buffer);
383
384                if null_buffer.null_count() != null_count {
385                    return Err(ArrowError::InvalidArgumentError(format!(
386                        "null_count value ({}) doesn't match actual number of nulls in array ({})",
387                        null_count,
388                        null_buffer.null_count()
389                    )));
390                }
391
392                null_buffer
393            };
394
395            Some(null_buffer)
396        } else {
397            None
398        };
399        if struct_arrays.is_empty() {
400            // `StructArray::from` can't infer the correct row count
401            // if we have zero fields
402            return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
403        }
404
405        let struct_array = if skip_validation {
406            // safety: flag can only be set via unsafe code
407            unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
408        } else {
409            StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
410        };
411
412        Ok(Arc::new(struct_array))
413    }
414
415    /// Reads the correct number of buffers based on list type and null_count, and creates a
416    /// list array ref
417    fn create_dictionary_array(
418        &self,
419        field_node: &FieldNode,
420        data_type: &DataType,
421        buffers: &[Buffer],
422        value_array: ArrayRef,
423    ) -> Result<ArrayRef, ArrowError> {
424        if let Dictionary(_, _) = *data_type {
425            let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
426            let builder = ArrayData::builder(data_type.clone())
427                .len(field_node.length() as usize)
428                .add_buffer(buffers[1].clone())
429                .add_child_data(value_array.into_data())
430                .null_bit_buffer(null_buffer)
431                .null_count(field_node.null_count() as usize);
432            self.create_array_from_builder(builder)
433        } else {
434            unreachable!("Cannot create dictionary array from {:?}", data_type)
435        }
436    }
437}
438
439/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
440/// [`RecordBatch`]
441///
442/// [IPC RecordBatch]: crate::RecordBatch
443///
444pub struct RecordBatchDecoder<'a> {
445    /// The flatbuffers encoded record batch
446    batch: crate::RecordBatch<'a>,
447    /// The output schema
448    schema: SchemaRef,
449    /// Decoded dictionaries indexed by dictionary id
450    dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
451    /// Optional compression codec
452    compression: Option<CompressionCodec>,
453    /// Decompression context for reusing zstd decompressor state
454    decompression_context: DecompressionContext,
455    /// The format version
456    version: MetadataVersion,
457    /// The raw data buffer
458    data: &'a Buffer,
459    /// The fields comprising this array
460    nodes: VectorIter<'a, FieldNode>,
461    /// The buffers comprising this array
462    buffers: VectorIter<'a, crate::Buffer>,
463    /// Projection (subset of columns) to read, if any
464    /// See [`RecordBatchDecoder::with_projection`] for details
465    projection: Option<&'a [usize]>,
466    /// Are buffers required to already be aligned? See
467    /// [`RecordBatchDecoder::with_require_alignment`] for details
468    require_alignment: bool,
469    /// Should validation be skipped when reading data? Defaults to false.
470    ///
471    /// See [`FileDecoder::with_skip_validation`] for details.
472    skip_validation: UnsafeFlag,
473}
474
475impl<'a> RecordBatchDecoder<'a> {
476    /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
477    fn try_new(
478        buf: &'a Buffer,
479        batch: crate::RecordBatch<'a>,
480        schema: SchemaRef,
481        dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
482        metadata: &'a MetadataVersion,
483    ) -> Result<Self, ArrowError> {
484        let buffers = batch.buffers().ok_or_else(|| {
485            ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
486        })?;
487        let field_nodes = batch.nodes().ok_or_else(|| {
488            ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
489        })?;
490
491        let batch_compression = batch.compression();
492        let compression = batch_compression
493            .map(|batch_compression| batch_compression.codec().try_into())
494            .transpose()?;
495
496        Ok(Self {
497            batch,
498            schema,
499            dictionaries_by_id,
500            compression,
501            decompression_context: DecompressionContext::new(),
502            version: *metadata,
503            data: buf,
504            nodes: field_nodes.iter(),
505            buffers: buffers.iter(),
506            projection: None,
507            require_alignment: false,
508            skip_validation: UnsafeFlag::new(),
509        })
510    }
511
512    /// Set the projection (default: None)
513    ///
514    /// If set, the projection is the list  of column indices
515    /// that will be read
516    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
517        self.projection = projection;
518        self
519    }
520
521    /// Set require_alignment (default: false)
522    ///
523    /// If true, buffers must be aligned appropriately or error will
524    /// result. If false, buffers will be copied to aligned buffers
525    /// if necessary.
526    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
527        self.require_alignment = require_alignment;
528        self
529    }
530
531    /// Specifies if validation should be skipped when reading data (defaults to `false`)
532    ///
533    /// Note this API is somewhat "funky" as it allows the caller to skip validation
534    /// without having to use `unsafe` code. If this is ever made public
535    /// it should be made clearer that this is a potentially unsafe by
536    /// using an `unsafe` function that takes a boolean flag.
537    ///
538    /// # Safety
539    ///
540    /// Relies on the caller only passing a flag with `true` value if they are
541    /// certain that the data is valid
542    pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
543        self.skip_validation = skip_validation;
544        self
545    }
546
547    /// Read the record batch, consuming the reader
548    fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
549        let mut variadic_counts: VecDeque<i64> = self
550            .batch
551            .variadicBufferCounts()
552            .into_iter()
553            .flatten()
554            .collect();
555
556        let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
557
558        let schema = Arc::clone(&self.schema);
559        if let Some(projection) = self.projection {
560            let mut arrays = vec![];
561            // project fields
562            for (idx, field) in schema.fields().iter().enumerate() {
563                // A projected field can appear more than once, so collect all matching positions.
564                let mut child = None;
565                for (proj_idx, projected_idx) in projection.iter().enumerate() {
566                    if *projected_idx == idx {
567                        if child.is_none() {
568                            child = Some(self.create_array(field, &mut variadic_counts)?);
569                        }
570
571                        // Reuse the decoded array for duplicate projection entries.
572                        arrays.push((proj_idx, child.as_ref().unwrap().clone()));
573                    }
574                }
575
576                if child.is_none() {
577                    self.skip_field(field, &mut variadic_counts)?;
578                }
579            }
580
581            arrays.sort_by_key(|t| t.0);
582
583            let schema = Arc::new(schema.project(projection)?);
584            let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
585
586            if self.skip_validation.get() {
587                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
588                unsafe {
589                    Ok(RecordBatch::new_unchecked(
590                        schema,
591                        columns,
592                        self.batch.length() as usize,
593                    ))
594                }
595            } else {
596                assert!(variadic_counts.is_empty());
597                RecordBatch::try_new_with_options(schema, columns, &options)
598            }
599        } else {
600            let mut children = vec![];
601            // keep track of index as lists require more than one node
602            for field in schema.fields() {
603                let child = self.create_array(field, &mut variadic_counts)?;
604                children.push(child);
605            }
606
607            if self.skip_validation.get() {
608                // Safety: setting `skip_validation` requires `unsafe`, user assures data is valid
609                unsafe {
610                    Ok(RecordBatch::new_unchecked(
611                        schema,
612                        children,
613                        self.batch.length() as usize,
614                    ))
615                }
616            } else {
617                assert!(variadic_counts.is_empty());
618                RecordBatch::try_new_with_options(schema, children, &options)
619            }
620        }
621    }
622
623    fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
624        let buffer = self.buffers.next().ok_or_else(|| {
625            ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
626        })?;
627        read_buffer(
628            buffer,
629            self.data,
630            self.compression,
631            &mut self.decompression_context,
632        )
633    }
634
635    fn skip_buffer(&mut self) {
636        self.buffers.next().unwrap();
637    }
638
639    fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
640        self.nodes.next().ok_or_else(|| {
641            ArrowError::SchemaError(format!(
642                "Invalid data for schema. {field} refers to node not found in schema",
643            ))
644        })
645    }
646
647    fn skip_field(
648        &mut self,
649        field: &Field,
650        variadic_count: &mut VecDeque<i64>,
651    ) -> Result<(), ArrowError> {
652        self.next_node(field)?;
653
654        match field.data_type() {
655            Utf8 | Binary | LargeBinary | LargeUtf8 => {
656                for _ in 0..3 {
657                    self.skip_buffer()
658                }
659            }
660            Utf8View | BinaryView => {
661                let count = variadic_count
662                    .pop_front()
663                    .ok_or(ArrowError::IpcError(format!(
664                        "Missing variadic count for {} column",
665                        field.data_type()
666                    )))?;
667                let count = count + 2; // view and null buffer.
668                for _i in 0..count {
669                    self.skip_buffer()
670                }
671            }
672            FixedSizeBinary(_) => {
673                self.skip_buffer();
674                self.skip_buffer();
675            }
676            List(list_field) | LargeList(list_field) | Map(list_field, _) => {
677                self.skip_buffer();
678                self.skip_buffer();
679                self.skip_field(list_field, variadic_count)?;
680            }
681            ListView(list_field) | LargeListView(list_field) => {
682                self.skip_buffer(); // Null buffer
683                self.skip_buffer(); // Offsets
684                self.skip_buffer(); // Sizes
685                self.skip_field(list_field, variadic_count)?;
686            }
687            FixedSizeList(list_field, _) => {
688                self.skip_buffer();
689                self.skip_field(list_field, variadic_count)?;
690            }
691            Struct(struct_fields) => {
692                self.skip_buffer();
693
694                // skip for each field
695                for struct_field in struct_fields {
696                    self.skip_field(struct_field, variadic_count)?
697                }
698            }
699            RunEndEncoded(run_ends_field, values_field) => {
700                self.skip_field(run_ends_field, variadic_count)?;
701                self.skip_field(values_field, variadic_count)?;
702            }
703            Dictionary(_, _) => {
704                self.skip_buffer(); // Nulls
705                self.skip_buffer(); // Indices
706            }
707            Union(fields, mode) => {
708                if self.version < MetadataVersion::V5 {
709                    self.skip_buffer(); // Null buffer
710                }
711                self.skip_buffer(); // Type ids
712
713                match mode {
714                    UnionMode::Dense => self.skip_buffer(), // Offsets
715                    UnionMode::Sparse => {}
716                };
717
718                for (_, field) in fields.iter() {
719                    self.skip_field(field, variadic_count)?
720                }
721            }
722            // Null has no buffers to skip
723            Null => {}
724
725            // Fixed-width and boolean types: skip null buffer + values buffer
726            Boolean
727            | Int8
728            | Int16
729            | Int32
730            | Int64
731            | UInt8
732            | UInt16
733            | UInt32
734            | UInt64
735            | Float16
736            | Float32
737            | Float64
738            | Timestamp(_, _)
739            | Date32
740            | Date64
741            | Time32(_)
742            | Time64(_)
743            | Duration(_)
744            | Interval(_)
745            | Decimal32(_, _)
746            | Decimal64(_, _)
747            | Decimal128(_, _)
748            | Decimal256(_, _) => {
749                self.skip_buffer();
750                self.skip_buffer();
751            }
752        };
753        Ok(())
754    }
755}
756
757/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`.
758///
759/// If `require_alignment` is true, this function will return an error if any array data in the
760/// input `buf` is not properly aligned.
761/// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct [`arrow_data::ArrayData`].
762///
763/// If `require_alignment` is false, this function will automatically allocate a new aligned buffer
764/// and copy over the data if any array data in the input `buf` is not properly aligned.
765/// (Properly aligned array data will remain zero-copy.)
766/// Under the hood it will use [`arrow_data::ArrayDataBuilder::align_buffers`] to construct [`arrow_data::ArrayData`].
767pub fn read_record_batch(
768    buf: &Buffer,
769    batch: crate::RecordBatch,
770    schema: SchemaRef,
771    dictionaries_by_id: &HashMap<i64, ArrayRef>,
772    projection: Option<&[usize]>,
773    metadata: &MetadataVersion,
774) -> Result<RecordBatch, ArrowError> {
775    RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
776        .with_projection(projection)
777        .with_require_alignment(false)
778        .read_record_batch()
779}
780
781/// Read the dictionary from the buffer and provided metadata,
782/// updating the `dictionaries_by_id` with the resulting dictionary
783pub fn read_dictionary(
784    buf: &Buffer,
785    batch: crate::DictionaryBatch,
786    schema: &Schema,
787    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
788    metadata: &MetadataVersion,
789) -> Result<(), ArrowError> {
790    read_dictionary_impl(
791        buf,
792        batch,
793        schema,
794        dictionaries_by_id,
795        metadata,
796        false,
797        UnsafeFlag::new(),
798    )
799}
800
801fn read_dictionary_impl(
802    buf: &Buffer,
803    batch: crate::DictionaryBatch,
804    schema: &Schema,
805    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
806    metadata: &MetadataVersion,
807    require_alignment: bool,
808    skip_validation: UnsafeFlag,
809) -> Result<(), ArrowError> {
810    let id = batch.id();
811
812    let dictionary_values = get_dictionary_values(
813        buf,
814        batch,
815        schema,
816        dictionaries_by_id,
817        metadata,
818        require_alignment,
819        skip_validation,
820    )?;
821
822    update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
823
824    Ok(())
825}
826
827/// Updates the `dictionaries_by_id` with the provided dictionary values and id.
828///
829/// # Errors
830/// - If `is_delta` is true and there is no existing dictionary for the given
831///   `dict_id`
832/// - If `is_delta` is true and the concatenation of the existing and new
833///   dictionary fails. This usually signals a type mismatch between the old and
834///   new values.
835fn update_dictionaries(
836    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
837    is_delta: bool,
838    dict_id: i64,
839    dict_values: ArrayRef,
840) -> Result<(), ArrowError> {
841    if !is_delta {
842        // We don't currently record the isOrdered field. This could be general
843        // attributes of arrays.
844        // Add (possibly multiple) array refs to the dictionaries array.
845        dictionaries_by_id.insert(dict_id, dict_values.clone());
846        return Ok(());
847    }
848
849    let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
850        ArrowError::InvalidArgumentError(format!(
851            "No existing dictionary for delta dictionary with id '{dict_id}'"
852        ))
853    })?;
854
855    let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
856        ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
857    })?;
858
859    dictionaries_by_id.insert(dict_id, combined);
860
861    Ok(())
862}
863
864/// Given a dictionary batch IPC message/body along with the full state of a
865/// stream including schema, dictionary cache, metadata, and other flags, this
866/// function will parse the buffer into an array of dictionary values.
867fn get_dictionary_values(
868    buf: &Buffer,
869    batch: crate::DictionaryBatch,
870    schema: &Schema,
871    dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
872    metadata: &MetadataVersion,
873    require_alignment: bool,
874    skip_validation: UnsafeFlag,
875) -> Result<ArrayRef, ArrowError> {
876    let id = batch.id();
877    #[allow(deprecated)]
878    let fields_using_this_dictionary = schema.fields_with_dict_id(id);
879    let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
880        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
881    })?;
882
883    // As the dictionary batch does not contain the type of the
884    // values array, we need to retrieve this from the schema.
885    // Get an array representing this dictionary's values.
886    let dictionary_values: ArrayRef = match first_field.data_type() {
887        DataType::Dictionary(_, value_type) => {
888            // Make a fake schema for the dictionary batch.
889            let value = value_type.as_ref().clone();
890            let schema = Schema::new(vec![Field::new("", value, true)]);
891            // Read a single column
892            let record_batch = RecordBatchDecoder::try_new(
893                buf,
894                batch.data().unwrap(),
895                Arc::new(schema),
896                dictionaries_by_id,
897                metadata,
898            )?
899            .with_require_alignment(require_alignment)
900            .with_skip_validation(skip_validation)
901            .read_record_batch()?;
902
903            Some(record_batch.column(0).clone())
904        }
905        _ => None,
906    }
907    .ok_or_else(|| {
908        ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
909    })?;
910
911    Ok(dictionary_values)
912}
913
914/// Read the data for a given block
915fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
916    reader.seek(SeekFrom::Start(block.offset() as u64))?;
917    let body_len = block.bodyLength().to_usize().unwrap();
918    let metadata_len = block.metaDataLength().to_usize().unwrap();
919    let total_len = body_len.checked_add(metadata_len).unwrap();
920
921    let mut buf = MutableBuffer::from_len_zeroed(total_len);
922    reader.read_exact(&mut buf)?;
923    Ok(buf.into())
924}
925
926/// Parse an encapsulated message
927///
928/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
929fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
930    let buf = match buf[..4] == CONTINUATION_MARKER {
931        true => &buf[8..],
932        false => &buf[4..],
933    };
934    crate::root_as_message(buf)
935        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
936}
937
938/// Read the footer length from the last 10 bytes of an Arrow IPC file
939///
940/// Expects a 4 byte footer length followed by `b"ARROW1"`
941pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
942    if buf[4..] != super::ARROW_MAGIC {
943        return Err(ArrowError::ParseError(
944            "Arrow file does not contain correct footer".to_string(),
945        ));
946    }
947
948    // read footer length
949    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
950    footer_len
951        .try_into()
952        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
953}
954
955/// A low-level, push-based interface for reading an IPC file
956///
957/// For a higher-level interface see [`FileReader`]
958///
959/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] example.
960///
961/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
962///
963/// ```
964/// # use std::sync::Arc;
965/// # use arrow_array::*;
966/// # use arrow_array::types::Int32Type;
967/// # use arrow_buffer::Buffer;
968/// # use arrow_ipc::convert::fb_to_schema;
969/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
970/// # use arrow_ipc::root_as_footer;
971/// # use arrow_ipc::writer::FileWriter;
972/// // Write an IPC file
973///
974/// let batch = RecordBatch::try_from_iter([
975///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
976///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
977///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", "hello", "world"])) as _),
978/// ]).unwrap();
979///
980/// let schema = batch.schema();
981///
982/// let mut out = Vec::with_capacity(1024);
983/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
984/// writer.write(&batch).unwrap();
985/// writer.finish().unwrap();
986///
987/// drop(writer);
988///
989/// // Read IPC file
990///
991/// let buffer = Buffer::from_vec(out);
992/// let trailer_start = buffer.len() - 10;
993/// let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
994/// let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
995///
996/// let back = fb_to_schema(footer.schema().unwrap());
997/// assert_eq!(&back, schema.as_ref());
998///
999/// let mut decoder = FileDecoder::new(schema, footer.version());
1000///
1001/// // Read dictionaries
1002/// for block in footer.dictionaries().iter().flatten() {
1003///     let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
1004///     let data = buffer.slice_with_length(block.offset() as _, block_len);
1005///     decoder.read_dictionary(&block, &data).unwrap();
1006/// }
1007///
1008/// // Read record batch
1009/// let batches = footer.recordBatches().unwrap();
1010/// assert_eq!(batches.len(), 1); // Only wrote a single batch
1011///
1012/// let block = batches.get(0);
1013/// let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
1014/// let data = buffer.slice_with_length(block.offset() as _, block_len);
1015/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
1016///
1017/// assert_eq!(batch, back);
1018/// ```
1019#[derive(Debug)]
1020pub struct FileDecoder {
1021    schema: SchemaRef,
1022    dictionaries: HashMap<i64, ArrayRef>,
1023    version: MetadataVersion,
1024    projection: Option<Vec<usize>>,
1025    require_alignment: bool,
1026    skip_validation: UnsafeFlag,
1027}
1028
1029impl FileDecoder {
1030    /// Create a new [`FileDecoder`] with the given schema and version
1031    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
1032        Self {
1033            schema,
1034            version,
1035            dictionaries: Default::default(),
1036            projection: None,
1037            require_alignment: false,
1038            skip_validation: UnsafeFlag::new(),
1039        }
1040    }
1041
1042    /// Specify a projection
1043    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1044        self.projection = Some(projection);
1045        self
1046    }
1047
1048    /// Specifies if the array data in input buffers is required to be properly aligned.
1049    ///
1050    /// If `require_alignment` is true, this decoder will return an error if any array data in the
1051    /// input `buf` is not properly aligned.
1052    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::build`] to construct
1053    /// [`arrow_data::ArrayData`].
1054    ///
1055    /// If `require_alignment` is false (the default), this decoder will automatically allocate a
1056    /// new aligned buffer and copy over the data if any array data in the input `buf` is not
1057    /// properly aligned. (Properly aligned array data will remain zero-copy.)
1058    /// Under the hood it will use [`arrow_data::ArrayDataBuilder::align_buffers`] to construct
1059    /// [`arrow_data::ArrayData`].
1060    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1061        self.require_alignment = require_alignment;
1062        self
1063    }
1064
1065    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1066    ///
1067    /// # Safety
1068    ///
1069    /// This flag must only be set to `true` when you trust the input data and are sure the data you are
1070    /// reading is a valid Arrow IPC file, otherwise undefined behavior may
1071    /// result.
1072    ///
1073    /// For example, some programs may wish to trust reading IPC files written
1074    /// by the same process that created the files.
1075    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1076        unsafe { self.skip_validation.set(skip_validation) };
1077        self
1078    }
1079
1080    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1081        let message = parse_message(buf)?;
1082
1083        // some old test data's footer metadata is not set, so we account for that
1084        if self.version != MetadataVersion::V1 && message.version() != self.version {
1085            return Err(ArrowError::IpcError(
1086                "Could not read IPC message as metadata versions mismatch".to_string(),
1087            ));
1088        }
1089        Ok(message)
1090    }
1091
1092    /// Read the dictionary with the given block and data buffer
1093    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1094        let message = self.read_message(buf)?;
1095        match message.header_type() {
1096            crate::MessageHeader::DictionaryBatch => {
1097                let batch = message.header_as_dictionary_batch().unwrap();
1098                read_dictionary_impl(
1099                    &buf.slice(block.metaDataLength() as _),
1100                    batch,
1101                    &self.schema,
1102                    &mut self.dictionaries,
1103                    &message.version(),
1104                    self.require_alignment,
1105                    self.skip_validation.clone(),
1106                )
1107            }
1108            t => Err(ArrowError::ParseError(format!(
1109                "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1110            ))),
1111        }
1112    }
1113
1114    /// Read the RecordBatch with the given block and data buffer
1115    pub fn read_record_batch(
1116        &self,
1117        block: &Block,
1118        buf: &Buffer,
1119    ) -> Result<Option<RecordBatch>, ArrowError> {
1120        let message = self.read_message(buf)?;
1121        match message.header_type() {
1122            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1123                "Not expecting a schema when messages are read".to_string(),
1124            )),
1125            crate::MessageHeader::RecordBatch => {
1126                let batch = message.header_as_record_batch().ok_or_else(|| {
1127                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1128                })?;
1129                // read the block that makes up the record batch into a buffer
1130                RecordBatchDecoder::try_new(
1131                    &buf.slice(block.metaDataLength() as _),
1132                    batch,
1133                    self.schema.clone(),
1134                    &self.dictionaries,
1135                    &message.version(),
1136                )?
1137                .with_projection(self.projection.as_deref())
1138                .with_require_alignment(self.require_alignment)
1139                .with_skip_validation(self.skip_validation.clone())
1140                .read_record_batch()
1141                .map(Some)
1142            }
1143            crate::MessageHeader::NONE => Ok(None),
1144            t => Err(ArrowError::InvalidArgumentError(format!(
1145                "Reading types other than record batches not yet supported, unable to read {t:?}"
1146            ))),
1147        }
1148    }
1149}
1150
1151/// Build an Arrow [`FileReader`] with custom options.
1152#[derive(Debug)]
1153pub struct FileReaderBuilder {
1154    /// Optional projection for which columns to load (zero-based column indices)
1155    projection: Option<Vec<usize>>,
1156    /// Passed through to construct [`VerifierOptions`]
1157    max_footer_fb_tables: usize,
1158    /// Passed through to construct [`VerifierOptions`]
1159    max_footer_fb_depth: usize,
1160}
1161
1162impl Default for FileReaderBuilder {
1163    fn default() -> Self {
1164        let verifier_options = VerifierOptions::default();
1165        Self {
1166            max_footer_fb_tables: verifier_options.max_tables,
1167            max_footer_fb_depth: verifier_options.max_depth,
1168            projection: None,
1169        }
1170    }
1171}
1172
1173impl FileReaderBuilder {
1174    /// Options for creating a new [`FileReader`].
1175    ///
1176    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
1177    pub fn new() -> Self {
1178        Self::default()
1179    }
1180
1181    /// Optional projection for which columns to load (zero-based column indices).
1182    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1183        self.projection = Some(projection);
1184        self
1185    }
1186
1187    /// Flatbuffers option for parsing the footer. Controls the max number of fields and
1188    /// metadata key-value pairs that can be parsed from the schema of the footer.
1189    ///
1190    /// By default this is set to `1_000_000` which roughly translates to a schema with
1191    /// no metadata key-value pairs but 499,999 fields.
1192    ///
1193    /// This default limit is enforced to protect against malicious files with a massive
1194    /// amount of flatbuffer tables which could cause a denial of service attack.
1195    ///
1196    /// If you need to ingest a trusted file with a massive number of fields and/or
1197    /// metadata key-value pairs and are facing the error `"Unable to get root as
1198    /// footer: TooManyTables"` then increase this parameter as necessary.
1199    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1200        self.max_footer_fb_tables = max_footer_fb_tables;
1201        self
1202    }
1203
1204    /// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
1205    /// nested fields parsed from the footer.
1206    ///
1207    /// By default this is set to `64` which roughly translates to a schema with
1208    /// a field nested 60 levels down through other struct fields.
1209    ///
1210    /// This default limit is enforced to protect against malicious files with a extremely
1211    /// deep flatbuffer structure which could cause a denial of service attack.
1212    ///
1213    /// If you need to ingest a trusted file with a deeply nested field and are facing the
1214    /// error `"Unable to get root as footer: DepthLimitReached"` then increase this
1215    /// parameter as necessary.
1216    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1217        self.max_footer_fb_depth = max_footer_fb_depth;
1218        self
1219    }
1220
1221    /// Build [`FileReader`] with given reader.
1222    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1223        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
1224        let mut buffer = [0; 10];
1225        reader.seek(SeekFrom::End(-10))?;
1226        reader.read_exact(&mut buffer)?;
1227
1228        let footer_len = read_footer_length(buffer)?;
1229
1230        // read footer
1231        let mut footer_data = vec![0; footer_len];
1232        reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1233        reader.read_exact(&mut footer_data)?;
1234
1235        let verifier_options = VerifierOptions {
1236            max_tables: self.max_footer_fb_tables,
1237            max_depth: self.max_footer_fb_depth,
1238            ..Default::default()
1239        };
1240        let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1241            |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1242        )?;
1243
1244        let blocks = footer.recordBatches().ok_or_else(|| {
1245            ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1246        })?;
1247
1248        let total_blocks = blocks.len();
1249
1250        let ipc_schema = footer.schema().unwrap();
1251        if !ipc_schema.endianness().equals_to_target_endianness() {
1252            return Err(ArrowError::IpcError(
1253                "the endianness of the source system does not match the endianness of the target system.".to_owned()
1254            ));
1255        }
1256
1257        let schema = crate::convert::fb_to_schema(ipc_schema);
1258
1259        let mut custom_metadata = HashMap::new();
1260        if let Some(fb_custom_metadata) = footer.custom_metadata() {
1261            for kv in fb_custom_metadata.into_iter() {
1262                custom_metadata.insert(
1263                    kv.key().unwrap().to_string(),
1264                    kv.value().unwrap().to_string(),
1265                );
1266            }
1267        }
1268
1269        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1270        if let Some(projection) = self.projection {
1271            decoder = decoder.with_projection(projection)
1272        }
1273
1274        // Create an array of optional dictionary value arrays, one per field.
1275        if let Some(dictionaries) = footer.dictionaries() {
1276            for block in dictionaries {
1277                let buf = read_block(&mut reader, block)?;
1278                decoder.read_dictionary(block, &buf)?;
1279            }
1280        }
1281
1282        Ok(FileReader {
1283            reader,
1284            blocks: blocks.iter().copied().collect(),
1285            current_block: 0,
1286            total_blocks,
1287            decoder,
1288            custom_metadata,
1289        })
1290    }
1291}
1292
1293/// Arrow File Reader
1294///
1295/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC File Format],
1296/// providing random access to the record batches.
1297///
1298/// # See Also
1299///
1300/// * [`Self::set_index`] for random access
1301/// * [`StreamReader`] for reading streaming data
1302///
1303/// # Example: Reading from a `File`
1304/// ```
1305/// # use std::io::Cursor;
1306/// use arrow_array::record_batch;
1307/// # use arrow_ipc::reader::FileReader;
1308/// # use arrow_ipc::writer::FileWriter;
1309/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1310/// # let mut file = vec![]; // mimic a stream for the example
1311/// # {
1312/// #  let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1313/// #  writer.write(&batch).unwrap();
1314/// #  writer.write(&batch).unwrap();
1315/// #  writer.finish().unwrap();
1316/// # }
1317/// # let mut file = Cursor::new(&file);
1318/// let projection = None; // read all columns
1319/// let mut reader = FileReader::try_new(&mut file, projection).unwrap();
1320/// // Position the reader to the second batch
1321/// reader.set_index(1).unwrap();
1322/// // read batches from the reader using the Iterator trait
1323/// let mut num_rows = 0;
1324/// for batch in reader {
1325///    let batch = batch.unwrap();
1326///    num_rows += batch.num_rows();
1327/// }
1328/// assert_eq!(num_rows, 3);
1329/// ```
1330/// # Example: Reading from `mmap`ed file
1331///
1332/// For an example creating Arrays without copying using  memory mapped (`mmap`)
1333/// files see the [`zero_copy_ipc`] example.
1334///
1335/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1336/// [`zero_copy_ipc`]: https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
1337pub struct FileReader<R> {
1338    /// File reader that supports reading and seeking
1339    reader: R,
1340
1341    /// The decoder
1342    decoder: FileDecoder,
1343
1344    /// The blocks in the file
1345    ///
1346    /// A block indicates the regions in the file to read to get data
1347    blocks: Vec<Block>,
1348
1349    /// A counter to keep track of the current block that should be read
1350    current_block: usize,
1351
1352    /// The total number of blocks, which may contain record batches and other types
1353    total_blocks: usize,
1354
1355    /// User defined metadata
1356    custom_metadata: HashMap<String, String>,
1357}
1358
1359impl<R> fmt::Debug for FileReader<R> {
1360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1361        f.debug_struct("FileReader<R>")
1362            .field("decoder", &self.decoder)
1363            .field("blocks", &self.blocks)
1364            .field("current_block", &self.current_block)
1365            .field("total_blocks", &self.total_blocks)
1366            .finish_non_exhaustive()
1367    }
1368}
1369
1370impl<R: Read + Seek> FileReader<BufReader<R>> {
1371    /// Try to create a new file reader with the reader wrapped in a BufReader.
1372    ///
1373    /// See [`FileReader::try_new`] for an unbuffered version.
1374    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1375        Self::try_new(BufReader::new(reader), projection)
1376    }
1377}
1378
1379impl<R: Read + Seek> FileReader<R> {
1380    /// Try to create a new file reader.
1381    ///
1382    /// There is no internal buffering. If buffered reads are needed you likely want to use
1383    /// [`FileReader::try_new_buffered`] instead.
1384    ///
1385    /// # Errors
1386    ///
1387    /// An ['Err'](Result::Err) may be returned if:
1388    /// - the file does not meet the Arrow Format footer requirements, or
1389    /// - file endianness does not match the target endianness.
1390    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1391        let builder = FileReaderBuilder {
1392            projection,
1393            ..Default::default()
1394        };
1395        builder.build(reader)
1396    }
1397
1398    /// Return user defined customized metadata
1399    pub fn custom_metadata(&self) -> &HashMap<String, String> {
1400        &self.custom_metadata
1401    }
1402
1403    /// Return the number of batches in the file
1404    pub fn num_batches(&self) -> usize {
1405        self.total_blocks
1406    }
1407
1408    /// Return the schema of the file
1409    pub fn schema(&self) -> SchemaRef {
1410        self.decoder.schema.clone()
1411    }
1412
1413    /// See to a specific [`RecordBatch`]
1414    ///
1415    /// Sets the current block to the index, allowing random reads
1416    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1417        if index >= self.total_blocks {
1418            Err(ArrowError::InvalidArgumentError(format!(
1419                "Cannot set batch to index {} from {} total batches",
1420                index, self.total_blocks
1421            )))
1422        } else {
1423            self.current_block = index;
1424            Ok(())
1425        }
1426    }
1427
1428    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1429        let block = &self.blocks[self.current_block];
1430        self.current_block += 1;
1431
1432        // read length
1433        let buffer = read_block(&mut self.reader, block)?;
1434        self.decoder.read_record_batch(block, &buffer)
1435    }
1436
1437    /// Gets a reference to the underlying reader.
1438    ///
1439    /// It is inadvisable to directly read from the underlying reader.
1440    pub fn get_ref(&self) -> &R {
1441        &self.reader
1442    }
1443
1444    /// Gets a mutable reference to the underlying reader.
1445    ///
1446    /// It is inadvisable to directly read from the underlying reader.
1447    pub fn get_mut(&mut self) -> &mut R {
1448        &mut self.reader
1449    }
1450
1451    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1452    ///
1453    /// # Safety
1454    ///
1455    /// See [`FileDecoder::with_skip_validation`]
1456    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1457        self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1458        self
1459    }
1460}
1461
1462impl<R: Read + Seek> Iterator for FileReader<R> {
1463    type Item = Result<RecordBatch, ArrowError>;
1464
1465    fn next(&mut self) -> Option<Self::Item> {
1466        // get current block
1467        if self.current_block < self.total_blocks {
1468            self.maybe_next().transpose()
1469        } else {
1470            None
1471        }
1472    }
1473}
1474
1475impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1476    fn schema(&self) -> SchemaRef {
1477        self.schema()
1478    }
1479}
1480
1481/// Arrow Stream Reader
1482///
1483/// Reads Arrow [`RecordBatch`]es from bytes in the [IPC Streaming Format].
1484///
1485/// # See Also
1486///
1487/// * [`FileReader`] for random access.
1488///
1489/// # Example
1490/// ```
1491/// # use arrow_array::record_batch;
1492/// # use arrow_ipc::reader::StreamReader;
1493/// # use arrow_ipc::writer::StreamWriter;
1494/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1495/// # let mut stream = vec![]; // mimic a stream for the example
1496/// # {
1497/// #  let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1498/// #  writer.write(&batch).unwrap();
1499/// #  writer.finish().unwrap();
1500/// # }
1501/// # let stream = stream.as_slice();
1502/// let projection = None; // read all columns
1503/// let mut reader = StreamReader::try_new(stream, projection).unwrap();
1504/// // read batches from the reader using the Iterator trait
1505/// let mut num_rows = 0;
1506/// for batch in reader {
1507///    let batch = batch.unwrap();
1508///    num_rows += batch.num_rows();
1509/// }
1510/// assert_eq!(num_rows, 3);
1511/// ```
1512///
1513/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1514pub struct StreamReader<R> {
1515    /// Stream reader
1516    reader: MessageReader<R>,
1517
1518    /// The schema that is read from the stream's first message
1519    schema: SchemaRef,
1520
1521    /// Optional dictionaries for each schema field.
1522    ///
1523    /// Dictionaries may be appended to in the streaming format.
1524    dictionaries_by_id: HashMap<i64, ArrayRef>,
1525
1526    /// An indicator of whether the stream is complete.
1527    ///
1528    /// This value is set to `true` the first time the reader's `next()` returns `None`.
1529    finished: bool,
1530
1531    /// Optional projection
1532    projection: Option<(Vec<usize>, Schema)>,
1533
1534    /// Should validation be skipped when reading data? Defaults to false.
1535    ///
1536    /// See [`FileDecoder::with_skip_validation`] for details.
1537    skip_validation: UnsafeFlag,
1538}
1539
1540impl<R> fmt::Debug for StreamReader<R> {
1541    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1542        f.debug_struct("StreamReader<R>")
1543            .field("reader", &"R")
1544            .field("schema", &self.schema)
1545            .field("dictionaries_by_id", &self.dictionaries_by_id)
1546            .field("finished", &self.finished)
1547            .field("projection", &self.projection)
1548            .finish()
1549    }
1550}
1551
1552impl<R: Read> StreamReader<BufReader<R>> {
1553    /// Try to create a new stream reader with the reader wrapped in a BufReader.
1554    ///
1555    /// See [`StreamReader::try_new`] for an unbuffered version.
1556    pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1557        Self::try_new(BufReader::new(reader), projection)
1558    }
1559}
1560
1561impl<R: Read> StreamReader<R> {
1562    /// Try to create a new stream reader.
1563    ///
1564    /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
1565    ///
1566    /// There is no internal buffering. If buffered reads are needed you likely want to use
1567    /// [`StreamReader::try_new_buffered`] instead.
1568    ///
1569    /// # Errors
1570    ///
1571    /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
1572    /// as the first message in the stream.
1573    pub fn try_new(
1574        reader: R,
1575        projection: Option<Vec<usize>>,
1576    ) -> Result<StreamReader<R>, ArrowError> {
1577        let mut msg_reader = MessageReader::new(reader);
1578        let message = msg_reader.maybe_next()?;
1579        let Some((message, _)) = message else {
1580            return Err(ArrowError::IpcError(
1581                "Expected schema message, found empty stream.".to_string(),
1582            ));
1583        };
1584
1585        if message.header_type() != Message::MessageHeader::Schema {
1586            return Err(ArrowError::IpcError(format!(
1587                "Expected a schema as the first message in the stream, got: {:?}",
1588                message.header_type()
1589            )));
1590        }
1591
1592        let schema = message.header_as_schema().ok_or_else(|| {
1593            ArrowError::ParseError("Failed to parse schema from message header".to_string())
1594        })?;
1595        let schema = crate::convert::fb_to_schema(schema);
1596
1597        // Create an array of optional dictionary value arrays, one per field.
1598        let dictionaries_by_id = HashMap::new();
1599
1600        let projection = match projection {
1601            Some(projection_indices) => {
1602                let schema = schema.project(&projection_indices)?;
1603                Some((projection_indices, schema))
1604            }
1605            _ => None,
1606        };
1607
1608        Ok(Self {
1609            reader: msg_reader,
1610            schema: Arc::new(schema),
1611            finished: false,
1612            dictionaries_by_id,
1613            projection,
1614            skip_validation: UnsafeFlag::new(),
1615        })
1616    }
1617
1618    /// Deprecated, use [`StreamReader::try_new`] instead.
1619    #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1620    pub fn try_new_unbuffered(
1621        reader: R,
1622        projection: Option<Vec<usize>>,
1623    ) -> Result<Self, ArrowError> {
1624        Self::try_new(reader, projection)
1625    }
1626
1627    /// Return the schema of the stream
1628    pub fn schema(&self) -> SchemaRef {
1629        self.schema.clone()
1630    }
1631
1632    /// Check if the stream is finished
1633    pub fn is_finished(&self) -> bool {
1634        self.finished
1635    }
1636
1637    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1638        if self.finished {
1639            return Ok(None);
1640        }
1641
1642        // Read messages until we get a record batch or end of stream
1643        loop {
1644            let message = self.next_ipc_message()?;
1645            let Some(message) = message else {
1646                // If the message is None, we have reached the end of the stream.
1647                self.finished = true;
1648                return Ok(None);
1649            };
1650
1651            match message {
1652                IpcMessage::Schema(_) => {
1653                    return Err(ArrowError::IpcError(
1654                        "Expected a record batch, but found a schema".to_string(),
1655                    ));
1656                }
1657                IpcMessage::RecordBatch(record_batch) => {
1658                    return Ok(Some(record_batch));
1659                }
1660                IpcMessage::DictionaryBatch { .. } => {
1661                    continue;
1662                }
1663            };
1664        }
1665    }
1666
1667    /// Reads and fully parses the next IPC message from the stream. Whereas
1668    /// [`Self::maybe_next`] is a higher level method focused on reading
1669    /// `RecordBatch`es, this method returns the individual fully parsed IPC
1670    /// messages from the underlying stream.
1671    ///
1672    /// This is useful primarily for testing reader/writer behaviors as it
1673    /// allows a full view into the messages that have been written to a stream.
1674    pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1675        let message = self.reader.maybe_next()?;
1676        let Some((message, body)) = message else {
1677            // If the message is None, we have reached the end of the stream.
1678            return Ok(None);
1679        };
1680
1681        let ipc_message = match message.header_type() {
1682            Message::MessageHeader::Schema => {
1683                let schema = message.header_as_schema().ok_or_else(|| {
1684                    ArrowError::ParseError("Failed to parse schema from message header".to_string())
1685                })?;
1686                let arrow_schema = crate::convert::fb_to_schema(schema);
1687                IpcMessage::Schema(arrow_schema)
1688            }
1689            Message::MessageHeader::RecordBatch => {
1690                let batch = message.header_as_record_batch().ok_or_else(|| {
1691                    ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1692                })?;
1693
1694                let version = message.version();
1695                let schema = self.schema.clone();
1696                let record_batch = RecordBatchDecoder::try_new(
1697                    &body.into(),
1698                    batch,
1699                    schema,
1700                    &self.dictionaries_by_id,
1701                    &version,
1702                )?
1703                .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1704                .with_require_alignment(false)
1705                .with_skip_validation(self.skip_validation.clone())
1706                .read_record_batch()?;
1707                IpcMessage::RecordBatch(record_batch)
1708            }
1709            Message::MessageHeader::DictionaryBatch => {
1710                let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1711                    ArrowError::ParseError(
1712                        "Failed to parse dictionary batch from message header".to_string(),
1713                    )
1714                })?;
1715
1716                let version = message.version();
1717                let dict_values = get_dictionary_values(
1718                    &body.into(),
1719                    dict,
1720                    &self.schema,
1721                    &mut self.dictionaries_by_id,
1722                    &version,
1723                    false,
1724                    self.skip_validation.clone(),
1725                )?;
1726
1727                update_dictionaries(
1728                    &mut self.dictionaries_by_id,
1729                    dict.isDelta(),
1730                    dict.id(),
1731                    dict_values.clone(),
1732                )?;
1733
1734                IpcMessage::DictionaryBatch {
1735                    id: dict.id(),
1736                    is_delta: (dict.isDelta()),
1737                    values: (dict_values),
1738                }
1739            }
1740            x => {
1741                return Err(ArrowError::ParseError(format!(
1742                    "Unsupported message header type in IPC stream: '{x:?}'"
1743                )));
1744            }
1745        };
1746
1747        Ok(Some(ipc_message))
1748    }
1749
1750    /// Gets a reference to the underlying reader.
1751    ///
1752    /// It is inadvisable to directly read from the underlying reader.
1753    pub fn get_ref(&self) -> &R {
1754        self.reader.inner()
1755    }
1756
1757    /// Gets a mutable reference to the underlying reader.
1758    ///
1759    /// It is inadvisable to directly read from the underlying reader.
1760    pub fn get_mut(&mut self) -> &mut R {
1761        self.reader.inner_mut()
1762    }
1763
1764    /// Specifies if validation should be skipped when reading data (defaults to `false`)
1765    ///
1766    /// # Safety
1767    ///
1768    /// See [`FileDecoder::with_skip_validation`]
1769    pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1770        unsafe { self.skip_validation.set(skip_validation) };
1771        self
1772    }
1773}
1774
1775impl<R: Read> Iterator for StreamReader<R> {
1776    type Item = Result<RecordBatch, ArrowError>;
1777
1778    fn next(&mut self) -> Option<Self::Item> {
1779        self.maybe_next().transpose()
1780    }
1781}
1782
1783impl<R: Read> RecordBatchReader for StreamReader<R> {
1784    fn schema(&self) -> SchemaRef {
1785        self.schema.clone()
1786    }
1787}
1788
1789/// Representation of a fully parsed IpcMessage from the underlying stream.
1790/// Parsing this kind of message is done by higher level constructs such as
1791/// [`StreamReader`], because fully interpreting the messages into a record
1792/// batch or dictionary batch requires access to stream state such as schema
1793/// and the full dictionary cache.
1794#[derive(Debug)]
1795#[allow(dead_code)]
1796pub(crate) enum IpcMessage {
1797    Schema(arrow_schema::Schema),
1798    RecordBatch(RecordBatch),
1799    DictionaryBatch {
1800        id: i64,
1801        is_delta: bool,
1802        values: ArrayRef,
1803    },
1804}
1805
1806/// A low-level construct that reads [`Message::Message`]s from a reader while
1807/// re-using a buffer for metadata. This is composed into [`StreamReader`].
1808struct MessageReader<R> {
1809    reader: R,
1810    buf: Vec<u8>,
1811}
1812
1813impl<R: Read> MessageReader<R> {
1814    fn new(reader: R) -> Self {
1815        Self {
1816            reader,
1817            buf: Vec::new(),
1818        }
1819    }
1820
1821    /// Reads the entire next message from the underlying reader which includes
1822    /// the metadata length, the metadata, and the body.
1823    ///
1824    /// # Returns
1825    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1826    ///   the first read
1827    /// - `Err(_)` if the reader returns an error other than on the first
1828    ///   read, or if the metadata length is invalid
1829    /// - `Ok(Some(_))` with the Message and buffer containiner the
1830    ///   body bytes otherwise.
1831    fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1832        let meta_len = self.read_meta_len()?;
1833        let Some(meta_len) = meta_len else {
1834            return Ok(None);
1835        };
1836
1837        self.buf.resize(meta_len, 0);
1838        self.reader.read_exact(&mut self.buf)?;
1839
1840        let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1841            ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1842        })?;
1843
1844        let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1845        self.reader.read_exact(&mut buf)?;
1846
1847        Ok(Some((message, buf)))
1848    }
1849
1850    /// Get a mutable reference to the underlying reader.
1851    fn inner_mut(&mut self) -> &mut R {
1852        &mut self.reader
1853    }
1854
1855    /// Get an immutable reference to the underlying reader.
1856    fn inner(&self) -> &R {
1857        &self.reader
1858    }
1859
1860    /// Read the metadata length for the next message from the underlying stream.
1861    ///
1862    /// # Returns
1863    /// - `Ok(None)` if the the reader signals the end of stream with EOF on
1864    ///   the first read
1865    /// - `Err(_)` if the reader returns an error other than on the first
1866    ///   read, or if the metadata length is less than 0.
1867    /// - `Ok(Some(_))` with the length otherwise.
1868    pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1869        let mut meta_len: [u8; 4] = [0; 4];
1870        match self.reader.read_exact(&mut meta_len) {
1871            Ok(_) => {}
1872            Err(e) => {
1873                return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1874                    // Handle EOF without the "0xFFFFFFFF 0x00000000"
1875                    // valid according to:
1876                    // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1877                    Ok(None)
1878                } else {
1879                    Err(ArrowError::from(e))
1880                };
1881            }
1882        };
1883
1884        let meta_len = {
1885            // If a continuation marker is encountered, skip over it and read
1886            // the size from the next four bytes.
1887            if meta_len == CONTINUATION_MARKER {
1888                self.reader.read_exact(&mut meta_len)?;
1889            }
1890
1891            i32::from_le_bytes(meta_len)
1892        };
1893
1894        if meta_len == 0 {
1895            return Ok(None);
1896        }
1897
1898        let meta_len = usize::try_from(meta_len)
1899            .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1900
1901        Ok(Some(meta_len))
1902    }
1903}
1904
1905#[cfg(test)]
1906mod tests {
1907    use std::io::Cursor;
1908
1909    use crate::convert::fb_to_schema;
1910    use crate::writer::{
1911        DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1912    };
1913
1914    use super::*;
1915
1916    use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1917    use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1918    use arrow_array::types::*;
1919    use arrow_buffer::{NullBuffer, OffsetBuffer};
1920    use arrow_data::ArrayDataBuilder;
1921
1922    fn create_test_projection_schema() -> Schema {
1923        // define field types
1924        let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1925
1926        let fixed_size_list_data_type =
1927            DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1928
1929        let union_fields = UnionFields::from_fields(vec![
1930            Field::new("a", DataType::Int32, false),
1931            Field::new("b", DataType::Float64, false),
1932        ]);
1933
1934        let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1935
1936        let struct_fields = Fields::from(vec![
1937            Field::new("id", DataType::Int32, false),
1938            Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1939        ]);
1940        let struct_data_type = DataType::Struct(struct_fields);
1941
1942        let run_encoded_data_type = DataType::RunEndEncoded(
1943            Arc::new(Field::new("run_ends", DataType::Int16, false)),
1944            Arc::new(Field::new("values", DataType::Int32, true)),
1945        );
1946
1947        // define schema
1948        Schema::new(vec![
1949            Field::new("f0", DataType::UInt32, false),
1950            Field::new("f1", DataType::Utf8, false),
1951            Field::new("f2", DataType::Boolean, false),
1952            Field::new("f3", union_data_type, true),
1953            Field::new("f4", DataType::Null, true),
1954            Field::new("f5", DataType::Float64, true),
1955            Field::new("f6", list_data_type, false),
1956            Field::new("f7", DataType::FixedSizeBinary(3), true),
1957            Field::new("f8", fixed_size_list_data_type, false),
1958            Field::new("f9", struct_data_type, false),
1959            Field::new("f10", run_encoded_data_type, false),
1960            Field::new("f11", DataType::Boolean, false),
1961            Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1962            Field::new("f13", DataType::Utf8, false),
1963        ])
1964    }
1965
1966    fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1967        // set test data for each column
1968        let array0 = UInt32Array::from(vec![1, 2, 3]);
1969        let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1970        let array2 = BooleanArray::from(vec![true, false, true]);
1971
1972        let mut union_builder = UnionBuilder::new_dense();
1973        union_builder.append::<Int32Type>("a", 1).unwrap();
1974        union_builder.append::<Float64Type>("b", 10.1).unwrap();
1975        union_builder.append_null::<Float64Type>("b").unwrap();
1976        let array3 = union_builder.build().unwrap();
1977
1978        let array4 = NullArray::new(3);
1979        let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1980        let array6_values = vec![
1981            Some(vec![Some(10), Some(10), Some(10)]),
1982            Some(vec![Some(20), Some(20), Some(20)]),
1983            Some(vec![Some(30), Some(30)]),
1984        ];
1985        let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1986        let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1987        let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1988
1989        let array8_values = ArrayData::builder(DataType::Int32)
1990            .len(9)
1991            .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1992            .build()
1993            .unwrap();
1994        let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1995            .len(3)
1996            .add_child_data(array8_values)
1997            .build()
1998            .unwrap();
1999        let array8 = FixedSizeListArray::from(array8_data);
2000
2001        let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
2002        let array9_list: ArrayRef =
2003            Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
2004                Some(vec![Some(-10)]),
2005                Some(vec![Some(-20), Some(-20), Some(-20)]),
2006                Some(vec![Some(-30)]),
2007            ]));
2008        let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
2009            .add_child_data(array9_id.into_data())
2010            .add_child_data(array9_list.into_data())
2011            .len(3)
2012            .build()
2013            .unwrap();
2014        let array9 = StructArray::from(array9);
2015
2016        let array10_input = vec![Some(1_i32), None, None];
2017        let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2018        array10_builder.extend(array10_input);
2019        let array10 = array10_builder.finish();
2020
2021        let array11 = BooleanArray::from(vec![false, false, true]);
2022
2023        let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
2024        let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
2025        let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
2026
2027        let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
2028
2029        // create record batch
2030        RecordBatch::try_new(
2031            Arc::new(schema.clone()),
2032            vec![
2033                Arc::new(array0),
2034                Arc::new(array1),
2035                Arc::new(array2),
2036                Arc::new(array3),
2037                Arc::new(array4),
2038                Arc::new(array5),
2039                Arc::new(array6),
2040                Arc::new(array7),
2041                Arc::new(array8),
2042                Arc::new(array9),
2043                Arc::new(array10),
2044                Arc::new(array11),
2045                Arc::new(array12),
2046                Arc::new(array13),
2047            ],
2048        )
2049        .unwrap()
2050    }
2051
2052    #[test]
2053    fn test_negative_meta_len_start_stream() {
2054        let bytes = i32::to_le_bytes(-1);
2055        let mut buf = vec![];
2056        buf.extend(CONTINUATION_MARKER);
2057        buf.extend(bytes);
2058
2059        let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2060        assert!(reader_err.is_some());
2061        assert_eq!(
2062            reader_err.unwrap().to_string(),
2063            "Parser error: Invalid metadata length: -1"
2064        );
2065    }
2066
2067    #[test]
2068    fn test_negative_meta_len_mid_stream() {
2069        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2070        let mut buf = Vec::new();
2071        {
2072            let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2073            let batch =
2074                RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2075                    .unwrap();
2076            writer.write(&batch).unwrap();
2077        }
2078
2079        let bytes = i32::to_le_bytes(-1);
2080        buf.extend(CONTINUATION_MARKER);
2081        buf.extend(bytes);
2082
2083        let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2084        // Read the valid value
2085        assert!(reader.maybe_next().is_ok());
2086        // Read the invalid meta len
2087        let batch_err = reader.maybe_next().err();
2088        assert!(batch_err.is_some());
2089        assert_eq!(
2090            batch_err.unwrap().to_string(),
2091            "Parser error: Invalid metadata length: -1"
2092        );
2093    }
2094
2095    #[test]
2096    fn test_missing_buffer_metadata_error() {
2097        use crate::r#gen::Message::*;
2098        use flatbuffers::FlatBufferBuilder;
2099
2100        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2101
2102        // create RecordBatch buffer metadata with invalid buffer count
2103        // Int32Array needs 2 buffers (validity + data) but we provide only 1
2104        let mut fbb = FlatBufferBuilder::new();
2105        let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2106        let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2107        let batch_offset = RecordBatch::create(
2108            &mut fbb,
2109            &RecordBatchArgs {
2110                length: 2,
2111                nodes: Some(nodes),
2112                buffers: Some(buffers),
2113                compression: None,
2114                variadicBufferCounts: None,
2115            },
2116        );
2117        fbb.finish_minimal(batch_offset);
2118        let batch_bytes = fbb.finished_data().to_vec();
2119        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2120
2121        let data_buffer = Buffer::from(vec![0u8; 8]);
2122        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2123        let metadata = MetadataVersion::V5;
2124
2125        let decoder = RecordBatchDecoder::try_new(
2126            &data_buffer,
2127            batch,
2128            schema.clone(),
2129            &dictionaries,
2130            &metadata,
2131        )
2132        .unwrap();
2133
2134        let result = decoder.read_record_batch();
2135
2136        match result {
2137            Err(ArrowError::IpcError(msg)) => {
2138                assert_eq!(msg, "Buffer count mismatched with metadata");
2139            }
2140            other => panic!("unexpected error: {other:?}"),
2141        }
2142    }
2143
2144    /// Test that the reader can read legacy files where empty list arrays were written with a 0-byte offsets buffer.
2145    #[test]
2146    fn test_read_legacy_empty_list_without_offsets_buffer() {
2147        use crate::r#gen::Message::*;
2148        use flatbuffers::FlatBufferBuilder;
2149
2150        let schema = Arc::new(Schema::new(vec![Field::new_list(
2151            "items",
2152            Field::new_list_field(DataType::Int32, true),
2153            true,
2154        )]));
2155
2156        // Legacy arrow-rs versions wrote empty offsets buffers for empty list arrays.
2157        // Keep reader compatibility with such files by accepting a 0-byte offsets buffer.
2158        let mut fbb = FlatBufferBuilder::new();
2159        let nodes = fbb.create_vector(&[
2160            FieldNode::new(0, 0), // list node
2161            FieldNode::new(0, 0), // child int32 node
2162        ]);
2163        let buffers = fbb.create_vector(&[
2164            crate::Buffer::new(0, 0), // list validity
2165            crate::Buffer::new(0, 0), // list offsets (legacy empty buffer)
2166            crate::Buffer::new(0, 0), // child validity
2167            crate::Buffer::new(0, 0), // child values
2168        ]);
2169        let batch_offset = RecordBatch::create(
2170            &mut fbb,
2171            &RecordBatchArgs {
2172                length: 0,
2173                nodes: Some(nodes),
2174                buffers: Some(buffers),
2175                compression: None,
2176                variadicBufferCounts: None,
2177            },
2178        );
2179        fbb.finish_minimal(batch_offset);
2180        let batch_bytes = fbb.finished_data().to_vec();
2181        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2182
2183        let body = Buffer::from(Vec::<u8>::new());
2184        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2185        let metadata = MetadataVersion::V5;
2186
2187        let decoder =
2188            RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2189                .unwrap();
2190
2191        let read_batch = decoder.read_record_batch().unwrap();
2192        assert_eq!(read_batch.num_rows(), 0);
2193
2194        let list = read_batch
2195            .column(0)
2196            .as_any()
2197            .downcast_ref::<ListArray>()
2198            .unwrap();
2199        assert_eq!(list.len(), 0);
2200        assert_eq!(list.values().len(), 0);
2201    }
2202
2203    /// Test that the reader can read legacy files where empty Utf8/Binary arrays were written with a 0-byte offsets buffer.
2204    #[test]
2205    fn test_read_legacy_empty_utf8_and_binary_without_offsets_buffer() {
2206        use crate::r#gen::Message::*;
2207        use flatbuffers::FlatBufferBuilder;
2208
2209        let schema = Arc::new(Schema::new(vec![
2210            Field::new("name", DataType::Utf8, true),
2211            Field::new("payload", DataType::Binary, true),
2212        ]));
2213
2214        // Legacy arrow-rs versions wrote empty offsets buffers for empty Utf8/Binary arrays.
2215        // Keep reader compatibility with such files by accepting 0-byte offsets buffers.
2216        let mut fbb = FlatBufferBuilder::new();
2217        let nodes = fbb.create_vector(&[
2218            FieldNode::new(0, 0), // utf8 node
2219            FieldNode::new(0, 0), // binary node
2220        ]);
2221        let buffers = fbb.create_vector(&[
2222            crate::Buffer::new(0, 0), // utf8 validity
2223            crate::Buffer::new(0, 0), // utf8 offsets (legacy empty buffer)
2224            crate::Buffer::new(0, 0), // utf8 values
2225            crate::Buffer::new(0, 0), // binary validity
2226            crate::Buffer::new(0, 0), // binary offsets (legacy empty buffer)
2227            crate::Buffer::new(0, 0), // binary values
2228        ]);
2229        let batch_offset = RecordBatch::create(
2230            &mut fbb,
2231            &RecordBatchArgs {
2232                length: 0,
2233                nodes: Some(nodes),
2234                buffers: Some(buffers),
2235                compression: None,
2236                variadicBufferCounts: None,
2237            },
2238        );
2239        fbb.finish_minimal(batch_offset);
2240        let batch_bytes = fbb.finished_data().to_vec();
2241        let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2242
2243        let body = Buffer::from(Vec::<u8>::new());
2244        let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2245        let metadata = MetadataVersion::V5;
2246
2247        let decoder =
2248            RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2249                .unwrap();
2250
2251        let read_batch = decoder.read_record_batch().unwrap();
2252        assert_eq!(read_batch.num_rows(), 0);
2253
2254        let utf8 = read_batch
2255            .column(0)
2256            .as_any()
2257            .downcast_ref::<StringArray>()
2258            .unwrap();
2259        assert_eq!(utf8.len(), 0);
2260        assert_eq!(utf8.value_offsets(), [0]);
2261
2262        let binary = read_batch
2263            .column(1)
2264            .as_any()
2265            .downcast_ref::<BinaryArray>()
2266            .unwrap();
2267        assert_eq!(binary.len(), 0);
2268        assert_eq!(binary.value_offsets(), [0]);
2269    }
2270
2271    #[test]
2272    fn test_projection_array_values() {
2273        // define schema
2274        let schema = create_test_projection_schema();
2275
2276        // create record batch with test data
2277        let batch = create_test_projection_batch_data(&schema);
2278
2279        // write record batch in IPC format
2280        let mut buf = Vec::new();
2281        {
2282            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2283            writer.write(&batch).unwrap();
2284            writer.finish().unwrap();
2285        }
2286
2287        // read record batch with projection
2288        for index in 0..12 {
2289            let projection = vec![index];
2290            let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2291            let read_batch = reader.unwrap().next().unwrap().unwrap();
2292            let projected_column = read_batch.column(0);
2293            let expected_column = batch.column(index);
2294
2295            // check the projected column equals the expected column
2296            assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2297        }
2298
2299        {
2300            // read record batch with reversed projection
2301            let reader =
2302                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2303            let read_batch = reader.unwrap().next().unwrap().unwrap();
2304            let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2305            assert_eq!(read_batch, expected_batch);
2306        }
2307    }
2308
2309    #[test]
2310    fn test_projection_duplicate_indices() {
2311        let schema = create_test_projection_schema();
2312        let batch = create_test_projection_batch_data(&schema);
2313
2314        // Write the batch to IPC
2315        let mut buf = Vec::new();
2316        {
2317            let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2318            writer.write(&batch).unwrap();
2319            writer.finish().unwrap();
2320        }
2321
2322        // Verify duplicate([1, 1]) and reordered([2, 0, 2]) projection indices
2323        for projection in [vec![1, 1], vec![2, 0, 2]] {
2324            let reader =
2325                FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection.clone()));
2326            let read_batch = reader.unwrap().next().unwrap().unwrap();
2327
2328            let expected_batch = batch.project(&projection).unwrap();
2329            assert_eq!(read_batch, expected_batch);
2330        }
2331    }
2332
2333    #[test]
2334    fn test_arrow_single_float_row() {
2335        let schema = Schema::new(vec![
2336            Field::new("a", DataType::Float32, false),
2337            Field::new("b", DataType::Float32, false),
2338            Field::new("c", DataType::Int32, false),
2339            Field::new("d", DataType::Int32, false),
2340        ]);
2341        let arrays = vec![
2342            Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2343            Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2344            Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2345            Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2346        ];
2347        let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2348        // create stream writer
2349        let mut file = tempfile::tempfile().unwrap();
2350        let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2351        stream_writer.write(&batch).unwrap();
2352        stream_writer.finish().unwrap();
2353
2354        drop(stream_writer);
2355
2356        file.rewind().unwrap();
2357
2358        // read stream back
2359        let reader = StreamReader::try_new(&mut file, None).unwrap();
2360
2361        reader.for_each(|batch| {
2362            let batch = batch.unwrap();
2363            assert!(
2364                batch
2365                    .column(0)
2366                    .as_any()
2367                    .downcast_ref::<Float32Array>()
2368                    .unwrap()
2369                    .value(0)
2370                    != 0.0
2371            );
2372            assert!(
2373                batch
2374                    .column(1)
2375                    .as_any()
2376                    .downcast_ref::<Float32Array>()
2377                    .unwrap()
2378                    .value(0)
2379                    != 0.0
2380            );
2381        });
2382
2383        file.rewind().unwrap();
2384
2385        // Read with projection
2386        let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2387
2388        reader.for_each(|batch| {
2389            let batch = batch.unwrap();
2390            assert_eq!(batch.schema().fields().len(), 2);
2391            assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2392            assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2393        });
2394    }
2395
2396    /// Write the record batch to an in-memory buffer in IPC File format
2397    fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2398        let mut buf = Vec::new();
2399        let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2400        writer.write(rb).unwrap();
2401        writer.finish().unwrap();
2402        buf
2403    }
2404
2405    /// Return the first record batch read from the IPC File buffer
2406    fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2407        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2408        reader.next().unwrap()
2409    }
2410
2411    /// Return the first record batch read from the IPC File buffer, disabling
2412    /// validation
2413    fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2414        let mut reader = unsafe {
2415            FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2416        };
2417        reader.next().unwrap()
2418    }
2419
2420    fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2421        let buf = write_ipc(rb);
2422        read_ipc(&buf).unwrap()
2423    }
2424
2425    /// Return the first record batch read from the IPC File buffer
2426    /// using the FileDecoder API
2427    fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2428        read_ipc_with_decoder_inner(buf, false)
2429    }
2430
2431    /// Return the first record batch read from the IPC File buffer
2432    /// using the FileDecoder API, disabling validation
2433    fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2434        read_ipc_with_decoder_inner(buf, true)
2435    }
2436
2437    fn read_ipc_with_decoder_inner(
2438        buf: Vec<u8>,
2439        skip_validation: bool,
2440    ) -> Result<RecordBatch, ArrowError> {
2441        let buffer = Buffer::from_vec(buf);
2442        let trailer_start = buffer.len() - 10;
2443        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2444        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2445            .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2446
2447        let schema = fb_to_schema(footer.schema().unwrap());
2448
2449        let mut decoder = unsafe {
2450            FileDecoder::new(Arc::new(schema), footer.version())
2451                .with_skip_validation(skip_validation)
2452        };
2453        // Read dictionaries
2454        for block in footer.dictionaries().iter().flatten() {
2455            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2456            let data = buffer.slice_with_length(block.offset() as _, block_len);
2457            decoder.read_dictionary(block, &data)?
2458        }
2459
2460        // Read record batch
2461        let batches = footer.recordBatches().unwrap();
2462        assert_eq!(batches.len(), 1); // Only wrote a single batch
2463
2464        let block = batches.get(0);
2465        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2466        let data = buffer.slice_with_length(block.offset() as _, block_len);
2467        Ok(decoder.read_record_batch(block, &data)?.unwrap())
2468    }
2469
2470    /// Write the record batch to an in-memory buffer in IPC Stream format
2471    fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2472        let mut buf = Vec::new();
2473        let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2474        writer.write(rb).unwrap();
2475        writer.finish().unwrap();
2476        buf
2477    }
2478
2479    /// Return the first record batch read from the IPC Stream buffer
2480    fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2481        let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2482        reader.next().unwrap()
2483    }
2484
2485    /// Return the first record batch read from the IPC Stream buffer,
2486    /// disabling validation
2487    fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2488        let mut reader = unsafe {
2489            StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2490        };
2491        reader.next().unwrap()
2492    }
2493
2494    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2495        let buf = write_stream(rb);
2496        read_stream(&buf).unwrap()
2497    }
2498
2499    #[test]
2500    fn test_roundtrip_with_custom_metadata() {
2501        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2502        let mut buf = Vec::new();
2503        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2504        let mut test_metadata = HashMap::new();
2505        test_metadata.insert("abc".to_string(), "abc".to_string());
2506        test_metadata.insert("def".to_string(), "def".to_string());
2507        for (k, v) in &test_metadata {
2508            writer.write_metadata(k, v);
2509        }
2510        writer.finish().unwrap();
2511        drop(writer);
2512
2513        let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2514        assert_eq!(reader.custom_metadata(), &test_metadata);
2515    }
2516
2517    #[test]
2518    fn test_roundtrip_nested_dict() {
2519        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2520
2521        let array = Arc::new(inner) as ArrayRef;
2522
2523        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2524
2525        let s = StructArray::from(vec![(dctfield, array)]);
2526        let struct_array = Arc::new(s) as ArrayRef;
2527
2528        let schema = Arc::new(Schema::new(vec![Field::new(
2529            "struct",
2530            struct_array.data_type().clone(),
2531            false,
2532        )]));
2533
2534        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2535
2536        assert_eq!(batch, roundtrip_ipc(&batch));
2537    }
2538
2539    #[test]
2540    fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2541        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2542
2543        let array = Arc::new(inner) as ArrayRef;
2544
2545        let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2546
2547        let s = StructArray::from(vec![(dctfield, array)]);
2548        let struct_array = Arc::new(s) as ArrayRef;
2549
2550        let schema = Arc::new(Schema::new(vec![Field::new(
2551            "struct",
2552            struct_array.data_type().clone(),
2553            false,
2554        )]));
2555
2556        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2557
2558        let mut buf = Vec::new();
2559        let mut writer = crate::writer::FileWriter::try_new_with_options(
2560            &mut buf,
2561            batch.schema_ref(),
2562            IpcWriteOptions::default(),
2563        )
2564        .unwrap();
2565        writer.write(&batch).unwrap();
2566        writer.finish().unwrap();
2567        drop(writer);
2568
2569        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2570
2571        assert_eq!(batch, reader.next().unwrap().unwrap());
2572    }
2573
2574    fn check_union_with_builder(mut builder: UnionBuilder) {
2575        builder.append::<Int32Type>("a", 1).unwrap();
2576        builder.append_null::<Int32Type>("a").unwrap();
2577        builder.append::<Float64Type>("c", 3.0).unwrap();
2578        builder.append::<Int32Type>("a", 4).unwrap();
2579        builder.append::<Int64Type>("d", 11).unwrap();
2580        let union = builder.build().unwrap();
2581
2582        let schema = Arc::new(Schema::new(vec![Field::new(
2583            "union",
2584            union.data_type().clone(),
2585            false,
2586        )]));
2587
2588        let union_array = Arc::new(union) as ArrayRef;
2589
2590        let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2591        let rb2 = roundtrip_ipc(&rb);
2592        // TODO: equality not yet implemented for union, so we check that the length of the array is
2593        // the same and that all of the buffers are the same instead.
2594        assert_eq!(rb.schema(), rb2.schema());
2595        assert_eq!(rb.num_columns(), rb2.num_columns());
2596        assert_eq!(rb.num_rows(), rb2.num_rows());
2597        let union1 = rb.column(0);
2598        let union2 = rb2.column(0);
2599
2600        assert_eq!(union1, union2);
2601    }
2602
2603    #[test]
2604    fn test_roundtrip_dense_union() {
2605        check_union_with_builder(UnionBuilder::new_dense());
2606    }
2607
2608    #[test]
2609    fn test_roundtrip_sparse_union() {
2610        check_union_with_builder(UnionBuilder::new_sparse());
2611    }
2612
2613    #[test]
2614    fn test_roundtrip_struct_empty_fields() {
2615        let nulls = NullBuffer::from(&[true, true, false]);
2616        let rb = RecordBatch::try_from_iter([(
2617            "",
2618            Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2619        )])
2620        .unwrap();
2621        let rb2 = roundtrip_ipc(&rb);
2622        assert_eq!(rb, rb2);
2623    }
2624
2625    #[test]
2626    fn test_roundtrip_stream_run_array_sliced() {
2627        let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2628            .into_iter()
2629            .collect();
2630        let run_array_1_sliced = run_array_1.slice(2, 5);
2631
2632        let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2633        let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2634        run_array_2_builder.extend(run_array_2_inupt);
2635        let run_array_2 = run_array_2_builder.finish();
2636
2637        let schema = Arc::new(Schema::new(vec![
2638            Field::new(
2639                "run_array_1_sliced",
2640                run_array_1_sliced.data_type().clone(),
2641                false,
2642            ),
2643            Field::new("run_array_2", run_array_2.data_type().clone(), false),
2644        ]));
2645        let input_batch = RecordBatch::try_new(
2646            schema,
2647            vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2648        )
2649        .unwrap();
2650        let output_batch = roundtrip_ipc_stream(&input_batch);
2651
2652        // As partial comparison not yet supported for run arrays, the sliced run array
2653        // has to be unsliced before comparing with the output. the second run array
2654        // can be compared as such.
2655        assert_eq!(input_batch.column(1), output_batch.column(1));
2656
2657        let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2658        assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2659    }
2660
2661    #[test]
2662    fn test_roundtrip_stream_nested_dict() {
2663        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2664        let dict = Arc::new(
2665            xs.clone()
2666                .into_iter()
2667                .collect::<DictionaryArray<Int8Type>>(),
2668        );
2669        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2670        let struct_array = StructArray::from(vec![
2671            (
2672                Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2673                string_array,
2674            ),
2675            (
2676                Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2677                dict.clone() as ArrayRef,
2678            ),
2679        ]);
2680        let schema = Arc::new(Schema::new(vec![
2681            Field::new("f1_string", DataType::Utf8, false),
2682            Field::new("f2_struct", struct_array.data_type().clone(), false),
2683        ]));
2684        let input_batch = RecordBatch::try_new(
2685            schema,
2686            vec![
2687                Arc::new(StringArray::from(xs.clone())),
2688                Arc::new(struct_array),
2689            ],
2690        )
2691        .unwrap();
2692        let output_batch = roundtrip_ipc_stream(&input_batch);
2693        assert_eq!(input_batch, output_batch);
2694    }
2695
2696    #[test]
2697    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2698        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2699        let values = Arc::new(values) as ArrayRef;
2700        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2701        let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2702
2703        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2704        let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2705
2706        #[allow(deprecated)]
2707        let keys_field = Arc::new(Field::new_dict(
2708            "keys",
2709            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2710            true, // It is technically not legal for this field to be null.
2711            1,
2712            false,
2713        ));
2714        #[allow(deprecated)]
2715        let values_field = Arc::new(Field::new_dict(
2716            "values",
2717            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2718            true,
2719            2,
2720            false,
2721        ));
2722        let entry_struct = StructArray::from(vec![
2723            (keys_field, make_array(key_dict_array.into_data())),
2724            (values_field, make_array(value_dict_array.into_data())),
2725        ]);
2726        let map_data_type = DataType::Map(
2727            Arc::new(Field::new(
2728                "entries",
2729                entry_struct.data_type().clone(),
2730                false,
2731            )),
2732            false,
2733        );
2734
2735        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2736        let map_data = ArrayData::builder(map_data_type)
2737            .len(3)
2738            .add_buffer(entry_offsets)
2739            .add_child_data(entry_struct.into_data())
2740            .build()
2741            .unwrap();
2742        let map_array = MapArray::from(map_data);
2743
2744        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2745        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2746
2747        let schema = Arc::new(Schema::new(vec![Field::new(
2748            "f1",
2749            dict_dict_array.data_type().clone(),
2750            false,
2751        )]));
2752        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2753        let output_batch = roundtrip_ipc_stream(&input_batch);
2754        assert_eq!(input_batch, output_batch);
2755    }
2756
2757    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2758        OffsetSize: OffsetSizeTrait,
2759        U: ArrowNativeType,
2760    >(
2761        list_data_type: DataType,
2762        offsets: &[U; 5],
2763    ) {
2764        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2765        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2766        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2767        let dict_data = dict_array.to_data();
2768
2769        let value_offsets = Buffer::from_slice_ref(offsets);
2770
2771        let list_data = ArrayData::builder(list_data_type)
2772            .len(4)
2773            .add_buffer(value_offsets)
2774            .add_child_data(dict_data)
2775            .build()
2776            .unwrap();
2777        let list_array = GenericListArray::<OffsetSize>::from(list_data);
2778
2779        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2780        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2781
2782        let schema = Arc::new(Schema::new(vec![Field::new(
2783            "f1",
2784            dict_dict_array.data_type().clone(),
2785            false,
2786        )]));
2787        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2788        let output_batch = roundtrip_ipc_stream(&input_batch);
2789        assert_eq!(input_batch, output_batch);
2790    }
2791
2792    #[test]
2793    fn test_roundtrip_stream_dict_of_list_of_dict() {
2794        // list
2795        #[allow(deprecated)]
2796        let list_data_type = DataType::List(Arc::new(Field::new_dict(
2797            "item",
2798            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2799            true,
2800            1,
2801            false,
2802        )));
2803        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2804        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2805
2806        // large list
2807        #[allow(deprecated)]
2808        let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2809            "item",
2810            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2811            true,
2812            1,
2813            false,
2814        )));
2815        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2816        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2817    }
2818
2819    #[test]
2820    fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2821        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2822        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2823        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2824        let dict_data = dict_array.into_data();
2825
2826        #[allow(deprecated)]
2827        let list_data_type = DataType::FixedSizeList(
2828            Arc::new(Field::new_dict(
2829                "item",
2830                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2831                true,
2832                1,
2833                false,
2834            )),
2835            3,
2836        );
2837        let list_data = ArrayData::builder(list_data_type)
2838            .len(3)
2839            .add_child_data(dict_data)
2840            .build()
2841            .unwrap();
2842        let list_array = FixedSizeListArray::from(list_data);
2843
2844        let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2845        let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2846
2847        let schema = Arc::new(Schema::new(vec![Field::new(
2848            "f1",
2849            dict_dict_array.data_type().clone(),
2850            false,
2851        )]));
2852        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2853        let output_batch = roundtrip_ipc_stream(&input_batch);
2854        assert_eq!(input_batch, output_batch);
2855    }
2856
2857    const LONG_TEST_STRING: &str =
2858        "This is a long string to make sure binary view array handles it";
2859
2860    #[test]
2861    fn test_roundtrip_view_types() {
2862        let schema = Schema::new(vec![
2863            Field::new("field_1", DataType::BinaryView, true),
2864            Field::new("field_2", DataType::Utf8, true),
2865            Field::new("field_3", DataType::Utf8View, true),
2866        ]);
2867        let bin_values: Vec<Option<&[u8]>> = vec![
2868            Some(b"foo"),
2869            None,
2870            Some(b"bar"),
2871            Some(LONG_TEST_STRING.as_bytes()),
2872        ];
2873        let utf8_values: Vec<Option<&str>> =
2874            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2875        let bin_view_array = BinaryViewArray::from_iter(bin_values);
2876        let utf8_array = StringArray::from_iter(utf8_values.iter());
2877        let utf8_view_array = StringViewArray::from_iter(utf8_values);
2878        let record_batch = RecordBatch::try_new(
2879            Arc::new(schema.clone()),
2880            vec![
2881                Arc::new(bin_view_array),
2882                Arc::new(utf8_array),
2883                Arc::new(utf8_view_array),
2884            ],
2885        )
2886        .unwrap();
2887
2888        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2889        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2890
2891        let sliced_batch = record_batch.slice(1, 2);
2892        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2893        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2894    }
2895
2896    #[test]
2897    fn test_roundtrip_view_types_nested_dict() {
2898        let bin_values: Vec<Option<&[u8]>> = vec![
2899            Some(b"foo"),
2900            None,
2901            Some(b"bar"),
2902            Some(LONG_TEST_STRING.as_bytes()),
2903            Some(b"field"),
2904        ];
2905        let utf8_values: Vec<Option<&str>> = vec![
2906            Some("foo"),
2907            None,
2908            Some("bar"),
2909            Some(LONG_TEST_STRING),
2910            Some("field"),
2911        ];
2912        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2913        let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2914
2915        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2916        let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2917        #[allow(deprecated)]
2918        let keys_field = Arc::new(Field::new_dict(
2919            "keys",
2920            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2921            true,
2922            1,
2923            false,
2924        ));
2925
2926        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2927        let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2928        #[allow(deprecated)]
2929        let values_field = Arc::new(Field::new_dict(
2930            "values",
2931            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2932            true,
2933            2,
2934            false,
2935        ));
2936        let entry_struct = StructArray::from(vec![
2937            (keys_field, make_array(key_dict_array.into_data())),
2938            (values_field, make_array(value_dict_array.into_data())),
2939        ]);
2940
2941        let map_data_type = DataType::Map(
2942            Arc::new(Field::new(
2943                "entries",
2944                entry_struct.data_type().clone(),
2945                false,
2946            )),
2947            false,
2948        );
2949        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2950        let map_data = ArrayData::builder(map_data_type)
2951            .len(3)
2952            .add_buffer(entry_offsets)
2953            .add_child_data(entry_struct.into_data())
2954            .build()
2955            .unwrap();
2956        let map_array = MapArray::from(map_data);
2957
2958        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2959        let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2960        let schema = Arc::new(Schema::new(vec![Field::new(
2961            "f1",
2962            dict_dict_array.data_type().clone(),
2963            false,
2964        )]));
2965        let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2966        assert_eq!(batch, roundtrip_ipc(&batch));
2967        assert_eq!(batch, roundtrip_ipc_stream(&batch));
2968
2969        let sliced_batch = batch.slice(1, 2);
2970        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2971        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2972    }
2973
2974    #[test]
2975    fn test_no_columns_batch() {
2976        let schema = Arc::new(Schema::empty());
2977        let options = RecordBatchOptions::new()
2978            .with_match_field_names(true)
2979            .with_row_count(Some(10));
2980        let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2981        let output_batch = roundtrip_ipc_stream(&input_batch);
2982        assert_eq!(input_batch, output_batch);
2983    }
2984
2985    #[test]
2986    fn test_unaligned() {
2987        let batch = RecordBatch::try_from_iter(vec![(
2988            "i32",
2989            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2990        )])
2991        .unwrap();
2992
2993        let r#gen = IpcDataGenerator {};
2994        let mut dict_tracker = DictionaryTracker::new(false);
2995        let (_, encoded) = r#gen
2996            .encode(
2997                &batch,
2998                &mut dict_tracker,
2999                &Default::default(),
3000                &mut Default::default(),
3001            )
3002            .unwrap();
3003
3004        let message = root_as_message(&encoded.ipc_message).unwrap();
3005
3006        // Construct an unaligned buffer
3007        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
3008        buffer.push(0_u8);
3009        buffer.extend_from_slice(&encoded.arrow_data);
3010        let b = Buffer::from(buffer).slice(1);
3011        assert_ne!(b.as_ptr().align_offset(8), 0);
3012
3013        let ipc_batch = message.header_as_record_batch().unwrap();
3014        let roundtrip = RecordBatchDecoder::try_new(
3015            &b,
3016            ipc_batch,
3017            batch.schema(),
3018            &Default::default(),
3019            &message.version(),
3020        )
3021        .unwrap()
3022        .with_require_alignment(false)
3023        .read_record_batch()
3024        .unwrap();
3025        assert_eq!(batch, roundtrip);
3026    }
3027
3028    #[test]
3029    fn test_unaligned_throws_error_with_require_alignment() {
3030        let batch = RecordBatch::try_from_iter(vec![(
3031            "i32",
3032            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
3033        )])
3034        .unwrap();
3035
3036        let r#gen = IpcDataGenerator {};
3037        let mut dict_tracker = DictionaryTracker::new(false);
3038        let (_, encoded) = r#gen
3039            .encode(
3040                &batch,
3041                &mut dict_tracker,
3042                &Default::default(),
3043                &mut Default::default(),
3044            )
3045            .unwrap();
3046
3047        let message = root_as_message(&encoded.ipc_message).unwrap();
3048
3049        // Construct an unaligned buffer
3050        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
3051        buffer.push(0_u8);
3052        buffer.extend_from_slice(&encoded.arrow_data);
3053        let b = Buffer::from(buffer).slice(1);
3054        assert_ne!(b.as_ptr().align_offset(8), 0);
3055
3056        let ipc_batch = message.header_as_record_batch().unwrap();
3057        let result = RecordBatchDecoder::try_new(
3058            &b,
3059            ipc_batch,
3060            batch.schema(),
3061            &Default::default(),
3062            &message.version(),
3063        )
3064        .unwrap()
3065        .with_require_alignment(true)
3066        .read_record_batch();
3067
3068        let error = result.unwrap_err();
3069        assert_eq!(
3070            error.to_string(),
3071            "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
3072             offset from expected alignment of 4 by 1"
3073        );
3074    }
3075
3076    #[test]
3077    fn test_file_with_massive_column_count() {
3078        // 499_999 is upper limit for default settings (1_000_000)
3079        let limit = 600_000;
3080
3081        let fields = (0..limit)
3082            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
3083            .collect::<Vec<_>>();
3084        let schema = Arc::new(Schema::new(fields));
3085        let batch = RecordBatch::new_empty(schema);
3086
3087        let mut buf = Vec::new();
3088        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3089        writer.write(&batch).unwrap();
3090        writer.finish().unwrap();
3091        drop(writer);
3092
3093        let mut reader = FileReaderBuilder::new()
3094            .with_max_footer_fb_tables(1_500_000)
3095            .build(std::io::Cursor::new(buf))
3096            .unwrap();
3097        let roundtrip_batch = reader.next().unwrap().unwrap();
3098
3099        assert_eq!(batch, roundtrip_batch);
3100    }
3101
3102    #[test]
3103    fn test_file_with_deeply_nested_columns() {
3104        // 60 is upper limit for default settings (64)
3105        let limit = 61;
3106
3107        let fields = (0..limit).fold(
3108            vec![Field::new("leaf", DataType::Boolean, false)],
3109            |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
3110        );
3111        let schema = Arc::new(Schema::new(fields));
3112        let batch = RecordBatch::new_empty(schema);
3113
3114        let mut buf = Vec::new();
3115        let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3116        writer.write(&batch).unwrap();
3117        writer.finish().unwrap();
3118        drop(writer);
3119
3120        let mut reader = FileReaderBuilder::new()
3121            .with_max_footer_fb_depth(65)
3122            .build(std::io::Cursor::new(buf))
3123            .unwrap();
3124        let roundtrip_batch = reader.next().unwrap().unwrap();
3125
3126        assert_eq!(batch, roundtrip_batch);
3127    }
3128
3129    #[test]
3130    fn test_invalid_struct_array_ipc_read_errors() {
3131        let a_field = Field::new("a", DataType::Int32, false);
3132        let b_field = Field::new("b", DataType::Int32, false);
3133        let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
3134
3135        let a_array_data = ArrayData::builder(a_field.data_type().clone())
3136            .len(4)
3137            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3138            .build()
3139            .unwrap();
3140        let b_array_data = ArrayData::builder(b_field.data_type().clone())
3141            .len(3)
3142            .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
3143            .build()
3144            .unwrap();
3145
3146        let invalid_struct_arr = unsafe {
3147            StructArray::new_unchecked(
3148                struct_fields,
3149                vec![make_array(a_array_data), make_array(b_array_data)],
3150                None,
3151            )
3152        };
3153
3154        expect_ipc_validation_error(
3155            Arc::new(invalid_struct_arr),
3156            "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
3157        );
3158    }
3159
3160    #[test]
3161    fn test_invalid_nested_array_ipc_read_errors() {
3162        // one of the nested arrays has invalid data
3163        let a_field = Field::new("a", DataType::Int32, false);
3164        let b_field = Field::new("b", DataType::Utf8, false);
3165
3166        let schema = Arc::new(Schema::new(vec![Field::new_struct(
3167            "s",
3168            vec![a_field.clone(), b_field.clone()],
3169            false,
3170        )]));
3171
3172        let a_array_data = ArrayData::builder(a_field.data_type().clone())
3173            .len(4)
3174            .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3175            .build()
3176            .unwrap();
3177        // invalid nested child array -- length is correct, but has invalid utf8 data
3178        let b_array_data = {
3179            let valid: &[u8] = b"   ";
3180            let mut invalid = vec![];
3181            invalid.extend_from_slice(b"ValidString");
3182            invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3183            let binary_array =
3184                BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3185            let array = unsafe {
3186                StringArray::new_unchecked(
3187                    binary_array.offsets().clone(),
3188                    binary_array.values().clone(),
3189                    binary_array.nulls().cloned(),
3190                )
3191            };
3192            array.into_data()
3193        };
3194        let struct_data_type = schema.field(0).data_type();
3195
3196        let invalid_struct_arr = unsafe {
3197            make_array(
3198                ArrayData::builder(struct_data_type.clone())
3199                    .len(4)
3200                    .add_child_data(a_array_data)
3201                    .add_child_data(b_array_data)
3202                    .build_unchecked(),
3203            )
3204        };
3205        expect_ipc_validation_error(
3206            invalid_struct_arr,
3207            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3208        );
3209    }
3210
3211    #[test]
3212    fn test_same_dict_id_without_preserve() {
3213        let batch = RecordBatch::try_new(
3214            Arc::new(Schema::new(
3215                ["a", "b"]
3216                    .iter()
3217                    .map(|name| {
3218                        #[allow(deprecated)]
3219                        Field::new_dict(
3220                            name.to_string(),
3221                            DataType::Dictionary(
3222                                Box::new(DataType::Int32),
3223                                Box::new(DataType::Utf8),
3224                            ),
3225                            true,
3226                            0,
3227                            false,
3228                        )
3229                    })
3230                    .collect::<Vec<Field>>(),
3231            )),
3232            vec![
3233                Arc::new(
3234                    vec![Some("c"), Some("d")]
3235                        .into_iter()
3236                        .collect::<DictionaryArray<Int32Type>>(),
3237                ) as ArrayRef,
3238                Arc::new(
3239                    vec![Some("e"), Some("f")]
3240                        .into_iter()
3241                        .collect::<DictionaryArray<Int32Type>>(),
3242                ) as ArrayRef,
3243            ],
3244        )
3245        .expect("Failed to create RecordBatch");
3246
3247        // serialize the record batch as an IPC stream
3248        let mut buf = vec![];
3249        {
3250            let mut writer = crate::writer::StreamWriter::try_new_with_options(
3251                &mut buf,
3252                batch.schema().as_ref(),
3253                crate::writer::IpcWriteOptions::default(),
3254            )
3255            .expect("Failed to create StreamWriter");
3256            writer.write(&batch).expect("Failed to write RecordBatch");
3257            writer.finish().expect("Failed to finish StreamWriter");
3258        }
3259
3260        StreamReader::try_new(std::io::Cursor::new(buf), None)
3261            .expect("Failed to create StreamReader")
3262            .for_each(|decoded_batch| {
3263                assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3264            });
3265    }
3266
3267    #[test]
3268    fn test_validation_of_invalid_list_array() {
3269        // ListArray with invalid offsets
3270        let array = unsafe {
3271            let values = Int32Array::from(vec![1, 2, 3]);
3272            let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); // offsets can't go backwards
3273            let offsets = OffsetBuffer::new_unchecked(bad_offsets); // INVALID array created
3274            let field = Field::new_list_field(DataType::Int32, true);
3275            let nulls = None;
3276            ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3277        };
3278
3279        expect_ipc_validation_error(
3280            Arc::new(array),
3281            "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3282        );
3283    }
3284
3285    #[test]
3286    fn test_validation_of_invalid_string_array() {
3287        let valid: &[u8] = b"   ";
3288        let mut invalid = vec![];
3289        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3290        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3291        let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3292        // data is not valid utf8 we can not construct a correct StringArray
3293        // safely, so purposely create an invalid StringArray
3294        let array = unsafe {
3295            StringArray::new_unchecked(
3296                binary_array.offsets().clone(),
3297                binary_array.values().clone(),
3298                binary_array.nulls().cloned(),
3299            )
3300        };
3301        expect_ipc_validation_error(
3302            Arc::new(array),
3303            "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3304        );
3305    }
3306
3307    #[test]
3308    fn test_validation_of_invalid_string_view_array() {
3309        let valid: &[u8] = b"   ";
3310        let mut invalid = vec![];
3311        invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3312        invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3313        let binary_view_array =
3314            BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3315        // data is not valid utf8 we can not construct a correct StringArray
3316        // safely, so purposely create an invalid StringArray
3317        let array = unsafe {
3318            StringViewArray::new_unchecked(
3319                binary_view_array.views().clone(),
3320                binary_view_array.data_buffers().to_vec(),
3321                binary_view_array.nulls().cloned(),
3322            )
3323        };
3324        expect_ipc_validation_error(
3325            Arc::new(array),
3326            "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3327        );
3328    }
3329
3330    /// return an invalid dictionary array (key is larger than values)
3331    /// ListArray with invalid offsets
3332    #[test]
3333    fn test_validation_of_invalid_dictionary_array() {
3334        let array = unsafe {
3335            let values = StringArray::from_iter_values(["a", "b", "c"]);
3336            let keys = Int32Array::from(vec![1, 200]); // keys are not valid for values
3337            DictionaryArray::new_unchecked(keys, Arc::new(values))
3338        };
3339
3340        expect_ipc_validation_error(
3341            Arc::new(array),
3342            "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3343        );
3344    }
3345
3346    #[test]
3347    fn test_validation_of_invalid_union_array() {
3348        let array = unsafe {
3349            let fields = UnionFields::try_new(
3350                vec![1, 3], // typeids : type id 2 is not valid
3351                vec![
3352                    Field::new("a", DataType::Int32, false),
3353                    Field::new("b", DataType::Utf8, false),
3354                ],
3355            )
3356            .unwrap();
3357            let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); // 2 is invalid
3358            let offsets = None;
3359            let children: Vec<ArrayRef> = vec![
3360                Arc::new(Int32Array::from(vec![10, 20, 30])),
3361                Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3362            ];
3363
3364            UnionArray::new_unchecked(fields, type_ids, offsets, children)
3365        };
3366
3367        expect_ipc_validation_error(
3368            Arc::new(array),
3369            "Invalid argument error: Type Ids values must match one of the field type ids",
3370        );
3371    }
3372
3373    /// Invalid Utf-8 sequence in the first character
3374    /// <https://stackoverflow.com/questions/1301402/example-invalid-utf8-string>
3375    const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3376
3377    /// Expect an error when reading the record batch using IPC or IPC Streams
3378    fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3379        let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3380
3381        // IPC Stream format
3382        let buf = write_stream(&rb); // write is ok
3383        read_stream_skip_validation(&buf).unwrap();
3384        let err = read_stream(&buf).unwrap_err();
3385        assert_eq!(err.to_string(), expected_err);
3386
3387        // IPC File format
3388        let buf = write_ipc(&rb); // write is ok
3389        read_ipc_skip_validation(&buf).unwrap();
3390        let err = read_ipc(&buf).unwrap_err();
3391        assert_eq!(err.to_string(), expected_err);
3392
3393        // IPC Format with FileDecoder
3394        read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3395        let err = read_ipc_with_decoder(buf).unwrap_err();
3396        assert_eq!(err.to_string(), expected_err);
3397    }
3398
3399    #[test]
3400    fn test_roundtrip_schema() {
3401        let schema = Schema::new(vec![
3402            Field::new(
3403                "a",
3404                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3405                false,
3406            ),
3407            Field::new(
3408                "b",
3409                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3410                false,
3411            ),
3412        ]);
3413
3414        let options = IpcWriteOptions::default();
3415        let data_gen = IpcDataGenerator::default();
3416        let mut dict_tracker = DictionaryTracker::new(false);
3417        let encoded_data =
3418            data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3419        let mut schema_bytes = vec![];
3420        write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3421
3422        let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3423            4
3424        } else {
3425            0
3426        };
3427
3428        size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3429            .expect_err("size_prefixed_root_as_message");
3430
3431        let msg = parse_message(&schema_bytes).expect("parse_message");
3432        let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3433        let new_schema = fb_to_schema(ipc_schema);
3434
3435        assert_eq!(schema, new_schema);
3436    }
3437
3438    #[test]
3439    fn test_negative_meta_len() {
3440        let bytes = i32::to_le_bytes(-1);
3441        let mut buf = vec![];
3442        buf.extend(CONTINUATION_MARKER);
3443        buf.extend(bytes);
3444
3445        let reader = StreamReader::try_new(Cursor::new(buf), None);
3446        assert!(reader.is_err());
3447    }
3448
3449    /// Per the IPC specification, dictionary batches may be omitted for
3450    /// dictionary-encoded columns where all values are null.  The C++
3451    /// implementation relies on this and does not emit a dictionary batch
3452    /// in that case.  Verify that the Rust reader handles such streams
3453    /// by synthesizing an empty dictionary instead of returning an error.
3454    #[test]
3455    fn test_read_null_dict_without_dictionary_batch() {
3456        // Build an all-null dictionary-encoded column.
3457        let keys = Int32Array::new_null(4);
3458        let values: ArrayRef = new_empty_array(&DataType::Utf8);
3459        let dict_array = DictionaryArray::new(keys, values);
3460
3461        let schema = Arc::new(Schema::new(vec![Field::new(
3462            "d",
3463            dict_array.data_type().clone(),
3464            true,
3465        )]));
3466        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap();
3467
3468        // Write a normal IPC stream (which includes the dictionary batch).
3469        let full_stream = write_stream(&batch);
3470
3471        // Parse the stream into individual messages and reconstruct it
3472        // without the DictionaryBatch message, simulating what C++ emits
3473        // for an all-null dictionary column.
3474        let mut stripped = Vec::new();
3475        let mut cursor = Cursor::new(&full_stream);
3476        loop {
3477            // Each message is: [continuation (4 bytes)] [meta_len (4 bytes)]
3478            //                   [metadata (meta_len bytes)] [body (bodyLength bytes)]
3479            let mut header = [0u8; 4];
3480            if cursor.read_exact(&mut header).is_err() {
3481                break;
3482            }
3483            if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() {
3484                break;
3485            }
3486            let meta_len = u32::from_le_bytes(header) as usize;
3487            if meta_len == 0 {
3488                // EOS marker — write it through.
3489                stripped.extend_from_slice(&CONTINUATION_MARKER);
3490                stripped.extend_from_slice(&0u32.to_le_bytes());
3491                break;
3492            }
3493            let mut meta_buf = vec![0u8; meta_len];
3494            cursor.read_exact(&mut meta_buf).unwrap();
3495
3496            let message = root_as_message(&meta_buf).unwrap();
3497            let body_len = message.bodyLength() as usize;
3498            let mut body_buf = vec![0u8; body_len];
3499            cursor.read_exact(&mut body_buf).unwrap();
3500
3501            if message.header_type() == crate::MessageHeader::DictionaryBatch {
3502                // Skip the dictionary batch — this is what C++ does for
3503                // all-null dictionary columns.
3504                continue;
3505            }
3506            stripped.extend_from_slice(&CONTINUATION_MARKER);
3507            stripped.extend_from_slice(&(meta_len as u32).to_le_bytes());
3508            stripped.extend_from_slice(&meta_buf);
3509            stripped.extend_from_slice(&body_buf);
3510        }
3511
3512        // Reading the stripped stream must succeed.
3513        let result = read_stream(&stripped).unwrap();
3514        assert_eq!(result.num_rows(), 4);
3515        assert_eq!(result.num_columns(), 1);
3516
3517        let col = result.column(0);
3518        assert_eq!(col.null_count(), 4);
3519        assert_eq!(col.len(), 4);
3520        // The result must be a dictionary-typed array.
3521        assert!(matches!(col.data_type(), DataType::Dictionary(_, _)));
3522    }
3523
3524    // Tests projected reads where a ListView column is skipped before another column.
3525    // This catches cases where skipping the ListView consumes the wrong number of buffers.
3526    #[test]
3527    fn test_projection_skip_list_view() {
3528        use crate::reader::FileReader;
3529        use crate::writer::FileWriter;
3530        use arrow_array::{
3531            GenericListViewArray, Int32Array, RecordBatch,
3532            builder::{GenericListViewBuilder, UInt32Builder},
3533        };
3534        use arrow_schema::{DataType, Field, Schema};
3535        use std::sync::Arc;
3536
3537        // Build a small ListView column with a mix of valid and null entries
3538        let mut builder = GenericListViewBuilder::<i32, _>::new(UInt32Builder::new());
3539
3540        builder.values().append_value(1);
3541        builder.values().append_value(2);
3542        builder.append(true);
3543
3544        builder.append(false);
3545
3546        builder.values().append_value(3);
3547        builder.values().append_value(4);
3548        builder.append(true);
3549
3550        let list_view: GenericListViewArray<i32> = builder.finish();
3551
3552        // Second column with simple values
3553        let values = Int32Array::from(vec![10, 20, 30]);
3554
3555        // Schema: first column is ListView, second is Int32
3556        let schema = Arc::new(Schema::new(vec![
3557            Field::new("a", list_view.data_type().clone(), true),
3558            Field::new("b", DataType::Int32, false),
3559        ]));
3560        // Create a batch with both columns
3561        let batch =
3562            RecordBatch::try_new(schema, vec![Arc::new(list_view), Arc::new(values.clone())])
3563                .unwrap();
3564
3565        // Write the batch to IPC
3566        let mut buf = Vec::new();
3567        {
3568            let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3569            writer.write(&batch).unwrap();
3570            writer.finish().unwrap();
3571        }
3572
3573        // Skip ListView column and Project only column "b"
3574        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3575        let read_batch = reader.next().unwrap().unwrap();
3576
3577        // Verify that the projected column is read correctly
3578        assert_eq!(read_batch.num_columns(), 1);
3579        assert_eq!(read_batch.column(0).as_ref(), &values);
3580    }
3581
3582    // Tests reading a column when a preceding V4 Union column is skipped.
3583    // V4 Union columns include a null buffer and type ids (and offsets for dense unions).
3584    #[test]
3585    fn test_projection_skip_union_v4() {
3586        use crate::MetadataVersion;
3587        use crate::reader::FileReader;
3588        use crate::writer::{FileWriter, IpcWriteOptions};
3589        use arrow_array::{
3590            ArrayRef, Int32Array, RecordBatch, builder::UnionBuilder, types::Int32Type,
3591        };
3592        use arrow_schema::{DataType, Field, Schema};
3593        use std::sync::Arc;
3594
3595        // Build a dense Union column with simple Int32 values
3596        let mut builder = UnionBuilder::new_dense();
3597        builder.append::<Int32Type>("a", 1).unwrap();
3598        builder.append::<Int32Type>("a", 2).unwrap();
3599        builder.append::<Int32Type>("a", 3).unwrap();
3600        let union = builder.build().unwrap();
3601
3602        // Second column with known values to verify correctness after projection
3603        let values = Int32Array::from(vec![10, 20, 30]);
3604
3605        // Schema: first column is Union (to be skipped), second is Int32 (to be read)
3606        let schema = Arc::new(Schema::new(vec![
3607            Field::new("union", union.data_type().clone(), false),
3608            Field::new("values", DataType::Int32, false),
3609        ]));
3610
3611        // Create a batch containing both columns
3612        let batch = RecordBatch::try_new(
3613            schema,
3614            vec![Arc::new(union) as ArrayRef, Arc::new(values.clone())],
3615        )
3616        .unwrap();
3617
3618        // Write IPC using V4 metadata to trigger Union null buffer behavior
3619        let mut buf = Vec::new();
3620        {
3621            let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap();
3622            let mut writer =
3623                FileWriter::try_new_with_options(&mut buf, &batch.schema(), options).unwrap();
3624            writer.write(&batch).unwrap();
3625            writer.finish().unwrap();
3626        }
3627        // Read only the second column (skip the Union column)
3628        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3629        let read_batch = reader.next().unwrap().unwrap();
3630
3631        // Verify that the projected column is read correctly after skipping Union
3632        assert_eq!(read_batch.num_columns(), 1);
3633        assert_eq!(read_batch.column(0).as_ref(), &values);
3634    }
3635
3636    // Tests reading a column when preceding fixed-width and boolean columns are skipped.
3637    // Covers all types that use the same two-buffer layout (null + values).
3638    // Verifies that skipping these types does not affect subsequent column decoding.
3639    #[test]
3640    fn test_projection_skip_fixed_width_types() {
3641        use std::sync::Arc;
3642
3643        use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, make_array};
3644        use arrow_buffer::Buffer;
3645        use arrow_data::ArrayData;
3646        use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
3647
3648        use crate::reader::FileReader;
3649        use crate::writer::FileWriter;
3650
3651        // Create a minimal array for a given fixed-width or boolean type
3652        fn make_array_for_type(data_type: DataType) -> ArrayRef {
3653            let len = 3;
3654
3655            if matches!(data_type, DataType::Boolean) {
3656                return Arc::new(BooleanArray::from(vec![true, false, true]));
3657            }
3658
3659            let width = data_type.primitive_width().unwrap();
3660            let data = ArrayData::builder(data_type)
3661                .len(len)
3662                .add_buffer(Buffer::from(vec![0_u8; len * width]))
3663                .build()
3664                .unwrap();
3665
3666            make_array(data)
3667        }
3668
3669        // List of types that follow the same two-buffer layout (null + values)
3670        let data_types = vec![
3671            DataType::Boolean,
3672            DataType::Int8,
3673            DataType::Int16,
3674            DataType::Int32,
3675            DataType::Int64,
3676            DataType::UInt8,
3677            DataType::UInt16,
3678            DataType::UInt32,
3679            DataType::UInt64,
3680            DataType::Float16,
3681            DataType::Float32,
3682            DataType::Float64,
3683            DataType::Timestamp(TimeUnit::Second, None),
3684            DataType::Date32,
3685            DataType::Date64,
3686            DataType::Time32(TimeUnit::Second),
3687            DataType::Time64(TimeUnit::Microsecond),
3688            DataType::Duration(TimeUnit::Second),
3689            DataType::Interval(IntervalUnit::YearMonth),
3690            DataType::Interval(IntervalUnit::DayTime),
3691            DataType::Interval(IntervalUnit::MonthDayNano),
3692            DataType::Decimal32(9, 2),
3693            DataType::Decimal64(18, 2),
3694            DataType::Decimal128(38, 2),
3695            DataType::Decimal256(76, 2),
3696        ];
3697
3698        // For each type:
3699        // - write a batch with [skipped_column, values]
3700        // - read only the second column
3701        // - verify the result is correct
3702        for data_type in data_types {
3703            let skipped = make_array_for_type(data_type.clone());
3704            let values = Int32Array::from(vec![10, 20, 30]);
3705
3706            let schema = Arc::new(Schema::new(vec![
3707                Field::new("skipped", data_type, false),
3708                Field::new("values", DataType::Int32, false),
3709            ]));
3710
3711            let batch =
3712                RecordBatch::try_new(schema, vec![skipped, Arc::new(values.clone())]).unwrap();
3713
3714            // Serialize the batch into IPC format
3715            let mut buf = Vec::new();
3716            {
3717                let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3718                writer.write(&batch).unwrap();
3719                writer.finish().unwrap();
3720            }
3721
3722            // Read back only the second column (skip the first)
3723            let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3724            let read_batch = reader.next().unwrap().unwrap();
3725
3726            // Verify that the returned column matches the original values column
3727            assert_eq!(read_batch.num_columns(), 1);
3728            assert_eq!(read_batch.column(0).as_ref(), &values);
3729        }
3730    }
3731}